PY-RASP 项目深度拆解教学文档
第一章:项目概述与核心设计思想
项目地址:https://github.com/ez-lbz/PY-RASP
项目定位:PY-RASP 是一个用于学习目的的、Demo级别的开源 Python RASP (Runtime Application Self-Protection) 项目。其核心解决了一个Python生态中长期存在的现实问题:如何有效地对危险系统API(如open, subprocess, socket, eval等)进行运行时保护,同时避免对业务代码和框架自身的正常逻辑产生过大影响。
核心挑战:传统的全局Hook一旦接管范围过大,会将业务代码、框架代码及正常逻辑一并影响,导致“杀敌一千,自损八百”。
核心理念:本项目最重要的价值在于其解决了一个更具体的问题:如何在Python进程中,只为“当前请求”启用防护,同时尽量不破坏请求之外的执行路径(如定时任务、后台线程、日志组件等)。其设计不是为每个Web框架单独实现一套安全能力,而是采用分层设计:
- 框架层:负责识别并提供“当前是否处于一次Web请求”的上下文。
- Hook层:负责监控和报告“当前代码触发了何种敏感操作”。
- 策略层:负责裁决“此操作是否应被拦截”。
- 日志层:负责处理“拦截后如何记录”。
第二章:项目结构与初始化入口
2.1 对外接口
项目主入口位于py_rasp/__init__.py,对外仅暴露三个核心符号:
from .core import RASP, install
from .policy import RASPPolicy
__all__ = ['RASP', 'RASPPolicy', 'install']
2.2 核心初始化类 RASP
核心逻辑位于py_rasp/core.py 的 RASP 类。
主要职责:
- 构造核心组件:初始化
RASPPolicy(策略)、RequestTracer(请求追踪器)和HookManager(Hook管理器)。 - 安装全局Hook:在
install()方法中执行HookManager的安装,实现对所有目标API的全局替换。 - 框架适配:提供针对主流Web框架(Flask, FastAPI, Django)的接入方法(
patch_flask,patch_fastapi,django_middleware)。
代码示例:
class RASP:
def __init__(self, policy: RASPPolicy | None=None) -> None:
self.policy = policy or RASPPolicy()
self.tracer = RequestTracer()
self.hooks = HookManager(policy=self.policy, tracer=self.tracer)
def install(self) -> 'RASP':
self.hooks.install() # 安装全局Hook
set_active_tracer(self.tracer) # 激活Tracer
return self
install()函数是快捷入口,用于无框架适配的快速集成:
def install(policy: RASPPolicy | None=None) -> RASP:
return RASP(policy=policy).install()
第三章:策略层 (RASPPolicy) 详解
策略定义位于py_rasp/policy.py 的 RASPPolicy 类。它是一个数据类,定义了防护的规则、白名单和日志配置。
防护开关:控制各类操作是否默认拦截。
block_command:命令执行block_network:网络连接block_file_read:文件读取block_file_write:文件写入block_code_execution:代码执行(eval/exec/__import__等)block_archive_extract:归档解压block_native_load:本地库加载(ctypes)block_sql_injection:SQL注入log_trace_events:是否记录追踪事件
防护模型:
- 文件访问:采用目录边界模型。以
project_root(默认当前工作目录)或显式指定的allowed_file_roots为允许访问的根目录。在此目录外的访问会被拦截。同时支持file_allow_prefixes和file_write_allow_prefixes进行前缀白名单匹配。优点是简单、可预期;缺点是如果业务本身需要跨目录访问,策略会显得粗糙。 - SQL防护:通过
protected_sql_apis列表指定受保护的数据库API(如sqlite3.Connection.execute,pymysql.Cursor.execute等)。仅对这些API的调用进行拦截判定。 - 命令/网络白名单:通过
command_allowlist和network_allowlist指定允许的命令前缀和目标地址前缀。
关键方法:
is_file_allowed,is_file_write_allowed:判断文件路径是否在允许的根目录或前缀列表中。is_command_allowed,is_network_allowed:判断命令或网络目标是否在白名单内。
第四章:请求上下文的建立与追踪
这是实现“请求内防护”的关键。核心在于区分请求内和请求外的执行线程。
4.1 框架适配层 (frameworks.py)
负责在请求开始时建立上下文,请求结束时清理。为Flask, FastAPI, Django分别提供了适配器。
核心流程:
- 请求开始:调用
_start_request()。此函数:- 收集请求元数据(框架、方法、路径等)。
- 调用
_extract_request_inputs(),扁平化收集所有请求参数(包括query, form, json, path参数),形成一个字符串元组。这是后续SQL注入检测的数据源。 - 通过
enter_request_scope()建立请求作用域。 - 调用
tracer.enable_for_current_thread()和sys.settrace(),为当前线程开启调用追踪。
- 请求结束:调用
_finish_request()。清理线程的追踪启用状态,并重置上下文变量,防止状态泄漏污染后续无关代码。
示例 (Flask):
@app.before_request
def _py_rasp_before_request():
token = _start_request(tracer=tracer, framework='flask', ...)
request.environ['py_rasp_request_token'] = token
@app.after_request
def _py_rasp_after_request(response: Any):
token = request.environ.pop('py_rasp_request_token', None)
if token is not None:
_finish_request(tracer, token)
return response
细节:Flask同时使用了
after_request和teardown_request作为双保险,确保异常路径下也能清理上下文。
4.2 请求作用域 (context.py)
使用contextvars.ContextVar存储请求上下文信息,比threading.local()更适合同步/异步混合环境。
RequestScope 数据结构:
@dataclass(slots=True)
class RequestScope:
framework: str # 框架名称
method: str # 请求方法
path: str # 请求路径
thread_id: int # 线程ID
started_at: float # 开始时间
metadata: dict[str, Any] # 元数据(如远端地址)
request_inputs: tuple[str, ...] # 扁平化的请求输入参数
trace_events: list[dict[str, Any]] # 追踪事件列表
关键函数:
enter_request_scope:创建并设置请求作用域。exit_request_scope:退出作用域。get_request_scope:获取当前作用域。is_request_thread:判断当前线程是否处于请求上下文中。
4.3 调用追踪器 (tracer.py)
RequestTracer 负责在启用的请求线程内记录函数调用事件。
核心方法:
enable_for_current_thread/disable_for_current_thread:启用/禁用当前线程的追踪。is_enabled:判断当前线程是否启用追踪。trace:作为sys.settrace的回调函数,在函数调用时记录事件到RequestScope中。
追踪器不实现复杂的污点传播,仅记录调用栈信息,用于在拦截时生成调用者摘要。
第五章:Hook管理器与统一裁决
5.1 Hook管理器 (hook_manager.py)
HookManager 负责组织和安装所有系统级Hook。它采用Mixin模式,将不同类别的Hook(文件、进程、网络、SQL等)分到不同模块,结构清晰。
Hook覆盖范围广泛,包括:
- 代码执行:
eval,exec,__import__,importlib.import_module - 本地库加载:
ctypes.CDLL,ctypes.PyDLL,ctypes.WinDLL等 - 进程创建:
os.system,os.popen,subprocess.Popen, 以及底层的os.spawn*,os.exec*, 甚至是_winapi.CreateProcess和_posixsubprocess.fork_exec - 网络连接:
socket.socket.connect,socket.create_connection - 文件操作:
open,io.open,os.open,pathlib.Path的读写方法 - 数据库连接:
sqlite3.connect,pymysql.connect,psycopg2.connect等
安装流程:install()方法会按顺序安装上述所有Hook,并保存原始函数的引用,确保拦截逻辑可被安全移除。
5.2 统一裁决层 (hook_runtime.py)
HookRuntime 是所有Hook wrapper的“裁判”,集中了所有“是否拦截”的判断逻辑。这种设计使得拦截条件集中、日志统一、行为一致且易于测试。
裁决逻辑概览:
- 命令/网络/代码执行/本地库加载/归档解压:如果对应策略开关开启
(block_*),且当前处于请求线程(is_request_thread()),且不在对应的白名单内,则拦截。 - 文件读写:在上述条件基础上,额外判断文件打开模式
(mode)和路径是否被允许。 - SQL注入:逻辑相对复杂,是唯一具备“检测”性质的规则:
- SQL注入防护开关
(block_sql_injection)必须开启,且当前处于请求线程。 - 调用的API必须在
protected_sql_apis列表中。 - SQL语句不能为空。
- 关键启发式规则:如果SQL语句中包含占位符
(?, %s, :)且同时提供了参数(params),则认为使用了参数化查询,放行。 - 获取当前请求的
request_inputs,遍历每个输入值,如果其原始字符串(长度>=3)出现在SQL文本中(不区分大小写),则判定为SQL注入,拦截。
- SQL注入防护开关
record_block方法:负责记录拦截事件。它会将事件添加到当前请求的trace_events中,并调用日志器记录到文件。记录时通过tracer.caller_summary()生成简短的调用栈摘要。
第六章:Hook实现与Mock机制
6.1 拦截与“假成功”机制
与直接抛出异常不同,PY-RASP大量使用Mock对象来模拟成功执行,避免因拦截导致上层业务出现500错误。
核心Mock对象(位于mock.py):
MockPopen:模拟subprocess.Popen对象,提供communicate,wait,poll,kill,terminate等方法,返回空输出和成功(或终止)状态码。mock_open:根据模式返回io.BytesIO(b'')或io.StringIO(''),模拟文件句柄。MockLibrary:模拟动态库加载的返回。MockDBCursor:模拟数据库游标。
6.2 文件系统Hook示例 (hook_mixins/filesystem.py)
def _wrap_open(self, original):
@wraps(original)
def wrapper(file, mode='r', *args, **kwargs):
path = str(file)
if self.runtime.should_block_file_read(path, mode):
self.runtime.record_block('block.file_read', {'path': path, 'mode': mode})
return mock_open(mode) # 返回空文件对象
if self.runtime.should_block_file_write(path, mode):
self.runtime.record_block('block.file_write', {'path': path, 'mode': mode})
return mock_open(mode)
return original(file, mode, *args, **kwargs)
return wrapper
对于os.open,拦截后会重定向到os.devnull进行打开,以防止副作用。
6.3 进程Hook示例 (hook_mixins/processes.py)
进程创建Hook最为复杂,因为subprocess.Popen是一个类。项目通过继承原始类并创建代理类的方式,确保接口兼容性。
def _make_popen_class(self, original: type[subprocess.Popen]):
manager = self
class PatchedPopen(original):
def __init__(self, args, *pargs, **kwargs):
command_text = stringify_command(args)
if manager.runtime.should_block_command(command_text):
manager.runtime.record_block('block.command', {'api': 'subprocess.Popen', 'command': command_text})
# 创建Mock对象,而非调用父类__init__
self._py_rasp_mock = MockPopen(args, *pargs, **kwargs)
self.args = args
self.returncode = 0
self.pid = 0
# ... 其他属性初始化
return
self._py_rasp_mock = None
super().__init__(args, *pargs, **kwargs) # 放行,调用原始初始化
# 重写关键方法,在拦截时返回Mock行为
def communicate(self, input=None, timeout=None):
if self._py_rasp_mock is not None:
return self._py_rasp_mock.communicate(input=input, timeout=timeout)
return super().communicate(input=input, timeout=timeout)
# ... 重写 wait, poll, kill, terminate, __enter__, __exit__ 等方法
return PatchedPopen
6.4 SQL Hook示例 (hook_mixins/sql.py)
SQL防护不是Patch execute方法,而是从connect()入口开始包装整个连接对象。
def _wrap_dbapi_connect(self, db_label, original):
@wraps(original)
def wrapper(*pargs, **kwargs):
return ConnectionProxy(original(*pargs, **kwargs)) # 返回代理连接
return wrapper
ConnectionProxy和CursorProxy会包装原始的连接和游标。在代理游标的execute等方法中,调用HookRuntime.should_block_sql进行注入判断。若判定为注入,则返回MockDBCursor(),否则转发给原始游标执行。
SQL防护的局限性:
- 仅防护通过被Patch的
connect()新建的连接。 - 不支持ORM框架(如SQLAlchemy, Django ORM)。
- 基于字符串匹配,无法应对复杂拼接、编码变形等情况。
第七章:日志记录
日志记录器BlockEventLogger(位于logger.py)负责将拦截事件写入文件。
关键设计:日志写入没有使用已被Hook的open等高级文件API,而是直接使用原始的底层系统调用_ORIGINAL_OS_OPEN, _ORIGINAL_OS_WRITE, _ORIGINAL_OS_CLOSE。这是为了避免在记录日志时再次触发文件写拦截,造成无限递归。
日志格式:每条日志是一个JSON行,包含时间戳、动作、框架、方法、路径、线程ID、请求元数据和拦截负载(包括调用栈摘要)。
第八章:完整请求防护链路总结
- 启动阶段:应用初始化时,调用
RASP(policy).patch_*framework*(app)或install()。HookManager.install()被执行,将危险API全局替换为包装函数。 - 请求开始:Web请求到达框架,适配中间件/钩子调用
_start_request():- 创建
RequestScope上下文,保存请求信息。 - 扁平化收集所有请求输入参数。
- 为当前线程启用
RequestTracer。
- 创建
- 请求处理:业务代码执行。
- 若调用被Hook的API(如
open(),subprocess.run()),进入对应wrapper。 - wrapper将操作交给
HookRuntime裁决。 HookRuntime结合RASPPolicy策略和当前RequestScope,判断是否拦截。- 放行:调用原始API并返回结果。
- 拦截:调用
HookRuntime.record_block()记录事件,并返回对应的Mock对象(空内容、假句柄、成功状态码等),模拟操作成功。
- 若调用被Hook的API(如
- 请求结束:框架适配层调用
_finish_request():- 关闭当前线程的追踪。
- 清理线程启用状态集合。
- 重置上下文变量
(contextvars.ContextVar),确保状态不泄漏。
第九章:项目特点与局限性总结
特点:
- 请求作用域隔离:核心创新点,通过框架集成和上下文管理,实现防护仅作用于Web请求线程,不影响其他进程内任务。
- 分层架构清晰:入口、策略、上下文、Hook、裁决、日志、Mock各司其职,耦合度低。
- Mock机制:采用返回Mock对象而非抛出异常的方式,提高了与现有业务的兼容性。
- 防护范围全面:覆盖了文件、进程、网络、代码执行、库加载、SQL等多个维度。
- 实现完整:虽为Demo,但提供了框架适配、测试、示例,形成了一个可运行的原型。
局限性/注意事项:
- 非生产级:作者明确说明这是一个用于学习和比赛(如AWD)的原型,防护规则(尤其是SQL注入检测)相对简单,容易被绕过。
- 性能开销:全局Hook和函数调用追踪会引入性能损耗,需评估。
- 覆盖漏洞:
- 仅防护显式Hook的API,通过其他间接方式(如序列化、模板注入、FFI)的攻击可能失效。
- SQL防护无法覆盖ORM和复杂的查询构建。
- 对异步IO(
asyncio)相关API的Hook支持未提及。
- 兼容性风险:Monkey Patching可能破坏依赖原始API行为的第三方库或复杂继承关系(项目通过测试
subprocess.Popen可继承来部分规避)。 - 状态管理:依赖于正确的框架集成和请求生命周期的管理,配置错误可能导致防护失效或状态泄漏。