PY-RASP 项目深度拆解
字数 6489
更新时间 2026-03-26 14:11:29

PY-RASP 项目深度拆解教学文档

第一章:项目概述与核心设计思想

项目地址:https://github.com/ez-lbz/PY-RASP

项目定位:PY-RASP 是一个用于学习目的的、Demo级别的开源 Python RASP (Runtime Application Self-Protection) 项目。其核心解决了一个Python生态中长期存在的现实问题:如何有效地对危险系统API(如opensubprocesssocketeval等)进行运行时保护,同时避免对业务代码和框架自身的正常逻辑产生过大影响。

核心挑战:传统的全局Hook一旦接管范围过大,会将业务代码、框架代码及正常逻辑一并影响,导致“杀敌一千,自损八百”。

核心理念:本项目最重要的价值在于其解决了一个更具体的问题:如何在Python进程中,只为“当前请求”启用防护,同时尽量不破坏请求之外的执行路径(如定时任务、后台线程、日志组件等)。其设计不是为每个Web框架单独实现一套安全能力,而是采用分层设计:

  1. 框架层:负责识别并提供“当前是否处于一次Web请求”的上下文。
  2. Hook层:负责监控和报告“当前代码触发了何种敏感操作”。
  3. 策略层:负责裁决“此操作是否应被拦截”。
  4. 日志层:负责处理“拦截后如何记录”。

第二章:项目结构与初始化入口

2.1 对外接口

项目主入口位于py_rasp/__init__.py,对外仅暴露三个核心符号:

from .core import RASP, install
from .policy import RASPPolicy
__all__ = ['RASP', 'RASPPolicy', 'install']

2.2 核心初始化类 RASP

核心逻辑位于py_rasp/core.pyRASP 类。

主要职责

  1. 构造核心组件:初始化RASPPolicy(策略)、RequestTracer(请求追踪器)和HookManager(Hook管理器)。
  2. 安装全局Hook:在install()方法中执行HookManager的安装,实现对所有目标API的全局替换。
  3. 框架适配:提供针对主流Web框架(Flask, FastAPI, Django)的接入方法(patch_flaskpatch_fastapidjango_middleware)。

代码示例

class RASP:
    def __init__(self, policy: RASPPolicy | None=None) -> None:
        self.policy = policy or RASPPolicy()
        self.tracer = RequestTracer()
        self.hooks = HookManager(policy=self.policy, tracer=self.tracer)

    def install(self) -> 'RASP':
        self.hooks.install()  # 安装全局Hook
        set_active_tracer(self.tracer)  # 激活Tracer
        return self

install()函数是快捷入口,用于无框架适配的快速集成:

def install(policy: RASPPolicy | None=None) -> RASP:
    return RASP(policy=policy).install()

第三章:策略层 (RASPPolicy) 详解

策略定义位于py_rasp/policy.pyRASPPolicy 类。它是一个数据类,定义了防护的规则、白名单和日志配置。

防护开关:控制各类操作是否默认拦截。

  • block_command:命令执行
  • block_network:网络连接
  • block_file_read:文件读取
  • block_file_write:文件写入
  • block_code_execution:代码执行(eval/exec/__import__等)
  • block_archive_extract:归档解压
  • block_native_load:本地库加载(ctypes)
  • block_sql_injection:SQL注入
  • log_trace_events:是否记录追踪事件

防护模型

  • 文件访问:采用目录边界模型。以project_root(默认当前工作目录)或显式指定的allowed_file_roots为允许访问的根目录。在此目录外的访问会被拦截。同时支持file_allow_prefixesfile_write_allow_prefixes进行前缀白名单匹配。优点是简单、可预期;缺点是如果业务本身需要跨目录访问,策略会显得粗糙。
  • SQL防护:通过protected_sql_apis列表指定受保护的数据库API(如sqlite3.Connection.executepymysql.Cursor.execute等)。仅对这些API的调用进行拦截判定。
  • 命令/网络白名单:通过command_allowlistnetwork_allowlist指定允许的命令前缀和目标地址前缀。

关键方法

  • is_file_allowedis_file_write_allowed:判断文件路径是否在允许的根目录或前缀列表中。
  • is_command_allowedis_network_allowed:判断命令或网络目标是否在白名单内。

第四章:请求上下文的建立与追踪

这是实现“请求内防护”的关键。核心在于区分请求内和请求外的执行线程。

4.1 框架适配层 (frameworks.py)

负责在请求开始时建立上下文,请求结束时清理。为Flask, FastAPI, Django分别提供了适配器。

核心流程

  1. 请求开始:调用_start_request()。此函数:
    • 收集请求元数据(框架、方法、路径等)。
    • 调用_extract_request_inputs()扁平化收集所有请求参数(包括query, form, json, path参数),形成一个字符串元组。这是后续SQL注入检测的数据源
    • 通过enter_request_scope()建立请求作用域。
    • 调用tracer.enable_for_current_thread()sys.settrace(),为当前线程开启调用追踪。
  2. 请求结束:调用_finish_request()。清理线程的追踪启用状态,并重置上下文变量,防止状态泄漏污染后续无关代码。

示例 (Flask):

@app.before_request
def _py_rasp_before_request():
    token = _start_request(tracer=tracer, framework='flask', ...)
    request.environ['py_rasp_request_token'] = token

@app.after_request
def _py_rasp_after_request(response: Any):
    token = request.environ.pop('py_rasp_request_token', None)
    if token is not None:
        _finish_request(tracer, token)
    return response

细节:Flask同时使用了after_requestteardown_request作为双保险,确保异常路径下也能清理上下文。

4.2 请求作用域 (context.py)

使用contextvars.ContextVar存储请求上下文信息,比threading.local()更适合同步/异步混合环境。

RequestScope 数据结构:

@dataclass(slots=True)
class RequestScope:
    framework: str          # 框架名称
    method: str             # 请求方法
    path: str               # 请求路径
    thread_id: int          # 线程ID
    started_at: float       # 开始时间
    metadata: dict[str, Any] # 元数据(如远端地址)
    request_inputs: tuple[str, ...] # 扁平化的请求输入参数
    trace_events: list[dict[str, Any]] # 追踪事件列表

关键函数

  • enter_request_scope:创建并设置请求作用域。
  • exit_request_scope:退出作用域。
  • get_request_scope:获取当前作用域。
  • is_request_thread:判断当前线程是否处于请求上下文中。

4.3 调用追踪器 (tracer.py)

RequestTracer 负责在启用的请求线程内记录函数调用事件。

核心方法

  • enable_for_current_thread / disable_for_current_thread:启用/禁用当前线程的追踪。
  • is_enabled:判断当前线程是否启用追踪。
  • trace:作为sys.settrace的回调函数,在函数调用时记录事件到RequestScope中。

追踪器不实现复杂的污点传播,仅记录调用栈信息,用于在拦截时生成调用者摘要。

第五章:Hook管理器与统一裁决

5.1 Hook管理器 (hook_manager.py)

HookManager 负责组织和安装所有系统级Hook。它采用Mixin模式,将不同类别的Hook(文件、进程、网络、SQL等)分到不同模块,结构清晰。

Hook覆盖范围广泛,包括:

  • 代码执行evalexec__import__importlib.import_module
  • 本地库加载ctypes.CDLLctypes.PyDLLctypes.WinDLL
  • 进程创建os.systemos.popensubprocess.Popen, 以及底层的os.spawn*os.exec*, 甚至是_winapi.CreateProcess_posixsubprocess.fork_exec
  • 网络连接socket.socket.connectsocket.create_connection
  • 文件操作openio.openos.openpathlib.Path的读写方法
  • 数据库连接sqlite3.connectpymysql.connectpsycopg2.connect

安装流程install()方法会按顺序安装上述所有Hook,并保存原始函数的引用,确保拦截逻辑可被安全移除。

5.2 统一裁决层 (hook_runtime.py)

HookRuntime 是所有Hook wrapper的“裁判”,集中了所有“是否拦截”的判断逻辑。这种设计使得拦截条件集中、日志统一、行为一致且易于测试。

裁决逻辑概览

  • 命令/网络/代码执行/本地库加载/归档解压:如果对应策略开关开启(block_*)当前处于请求线程(is_request_thread())不在对应的白名单内,则拦截。
  • 文件读写:在上述条件基础上,额外判断文件打开模式(mode)和路径是否被允许。
  • SQL注入:逻辑相对复杂,是唯一具备“检测”性质的规则:
    1. SQL注入防护开关(block_sql_injection)必须开启,且当前处于请求线程。
    2. 调用的API必须在protected_sql_apis列表中。
    3. SQL语句不能为空。
    4. 关键启发式规则:如果SQL语句中包含占位符(?, %s, :)且同时提供了参数(params),则认为使用了参数化查询,放行
    5. 获取当前请求的request_inputs,遍历每个输入值,如果其原始字符串(长度>=3)出现在SQL文本中(不区分大小写),则判定为SQL注入,拦截

record_block方法:负责记录拦截事件。它会将事件添加到当前请求的trace_events中,并调用日志器记录到文件。记录时通过tracer.caller_summary()生成简短的调用栈摘要。

第六章:Hook实现与Mock机制

6.1 拦截与“假成功”机制

与直接抛出异常不同,PY-RASP大量使用Mock对象来模拟成功执行,避免因拦截导致上层业务出现500错误。

核心Mock对象(位于mock.py):

  • MockPopen:模拟subprocess.Popen对象,提供communicatewaitpollkillterminate等方法,返回空输出和成功(或终止)状态码。
  • mock_open:根据模式返回io.BytesIO(b'')io.StringIO(''),模拟文件句柄。
  • MockLibrary:模拟动态库加载的返回。
  • MockDBCursor:模拟数据库游标。

6.2 文件系统Hook示例 (hook_mixins/filesystem.py)

def _wrap_open(self, original):
    @wraps(original)
    def wrapper(file, mode='r', *args, **kwargs):
        path = str(file)
        if self.runtime.should_block_file_read(path, mode):
            self.runtime.record_block('block.file_read', {'path': path, 'mode': mode})
            return mock_open(mode)  # 返回空文件对象
        if self.runtime.should_block_file_write(path, mode):
            self.runtime.record_block('block.file_write', {'path': path, 'mode': mode})
            return mock_open(mode)
        return original(file, mode, *args, **kwargs)
    return wrapper

对于os.open,拦截后会重定向到os.devnull进行打开,以防止副作用。

6.3 进程Hook示例 (hook_mixins/processes.py)

进程创建Hook最为复杂,因为subprocess.Popen是一个类。项目通过继承原始类并创建代理类的方式,确保接口兼容性。

def _make_popen_class(self, original: type[subprocess.Popen]):
    manager = self
    class PatchedPopen(original):
        def __init__(self, args, *pargs, **kwargs):
            command_text = stringify_command(args)
            if manager.runtime.should_block_command(command_text):
                manager.runtime.record_block('block.command', {'api': 'subprocess.Popen', 'command': command_text})
                # 创建Mock对象,而非调用父类__init__
                self._py_rasp_mock = MockPopen(args, *pargs, **kwargs)
                self.args = args
                self.returncode = 0
                self.pid = 0
                # ... 其他属性初始化
                return
            self._py_rasp_mock = None
            super().__init__(args, *pargs, **kwargs) # 放行,调用原始初始化
        # 重写关键方法,在拦截时返回Mock行为
        def communicate(self, input=None, timeout=None):
            if self._py_rasp_mock is not None:
                return self._py_rasp_mock.communicate(input=input, timeout=timeout)
            return super().communicate(input=input, timeout=timeout)
        # ... 重写 wait, poll, kill, terminate, __enter__, __exit__ 等方法
    return PatchedPopen

6.4 SQL Hook示例 (hook_mixins/sql.py)

SQL防护不是Patch execute方法,而是从connect()入口开始包装整个连接对象。

def _wrap_dbapi_connect(self, db_label, original):
    @wraps(original)
    def wrapper(*pargs, **kwargs):
        return ConnectionProxy(original(*pargs, **kwargs)) # 返回代理连接
    return wrapper

ConnectionProxyCursorProxy会包装原始的连接和游标。在代理游标的execute等方法中,调用HookRuntime.should_block_sql进行注入判断。若判定为注入,则返回MockDBCursor(),否则转发给原始游标执行。

SQL防护的局限性

  1. 仅防护通过被Patch的connect()新建的连接。
  2. 不支持ORM框架(如SQLAlchemy, Django ORM)。
  3. 基于字符串匹配,无法应对复杂拼接、编码变形等情况。

第七章:日志记录

日志记录器BlockEventLogger(位于logger.py)负责将拦截事件写入文件。

关键设计:日志写入没有使用已被Hook的open等高级文件API,而是直接使用原始的底层系统调用_ORIGINAL_OS_OPEN_ORIGINAL_OS_WRITE_ORIGINAL_OS_CLOSE。这是为了避免在记录日志时再次触发文件写拦截,造成无限递归。

日志格式:每条日志是一个JSON行,包含时间戳、动作、框架、方法、路径、线程ID、请求元数据和拦截负载(包括调用栈摘要)。

第八章:完整请求防护链路总结

  1. 启动阶段:应用初始化时,调用RASP(policy).patch_*framework*(app)install()HookManager.install()被执行,将危险API全局替换为包装函数。
  2. 请求开始:Web请求到达框架,适配中间件/钩子调用_start_request()
    • 创建RequestScope上下文,保存请求信息。
    • 扁平化收集所有请求输入参数。
    • 为当前线程启用RequestTracer
  3. 请求处理:业务代码执行。
    • 若调用被Hook的API(如open()subprocess.run()),进入对应wrapper。
    • wrapper将操作交给HookRuntime裁决。
    • HookRuntime结合RASPPolicy策略和当前RequestScope,判断是否拦截。
    • 放行:调用原始API并返回结果。
    • 拦截:调用HookRuntime.record_block()记录事件,并返回对应的Mock对象(空内容、假句柄、成功状态码等),模拟操作成功。
  4. 请求结束:框架适配层调用_finish_request()
    • 关闭当前线程的追踪。
    • 清理线程启用状态集合。
    • 重置上下文变量(contextvars.ContextVar),确保状态不泄漏。

第九章:项目特点与局限性总结

特点

  1. 请求作用域隔离:核心创新点,通过框架集成和上下文管理,实现防护仅作用于Web请求线程,不影响其他进程内任务。
  2. 分层架构清晰:入口、策略、上下文、Hook、裁决、日志、Mock各司其职,耦合度低。
  3. Mock机制:采用返回Mock对象而非抛出异常的方式,提高了与现有业务的兼容性。
  4. 防护范围全面:覆盖了文件、进程、网络、代码执行、库加载、SQL等多个维度。
  5. 实现完整:虽为Demo,但提供了框架适配、测试、示例,形成了一个可运行的原型。

局限性/注意事项

  1. 非生产级:作者明确说明这是一个用于学习和比赛(如AWD)的原型,防护规则(尤其是SQL注入检测)相对简单,容易被绕过。
  2. 性能开销:全局Hook和函数调用追踪会引入性能损耗,需评估。
  3. 覆盖漏洞
    • 仅防护显式Hook的API,通过其他间接方式(如序列化、模板注入、FFI)的攻击可能失效。
    • SQL防护无法覆盖ORM和复杂的查询构建。
    • 对异步IO(asyncio)相关API的Hook支持未提及。
  4. 兼容性风险:Monkey Patching可能破坏依赖原始API行为的第三方库或复杂继承关系(项目通过测试subprocess.Popen可继承来部分规避)。
  5. 状态管理:依赖于正确的框架集成和请求生命周期的管理,配置错误可能导致防护失效或状态泄漏。
相似文章
相似文章
 全屏