基于AI的智能目录扫描与敏感信息收集工具开发
字数 947 2025-09-01 11:25:54
MCP-Finder: 基于AI的智能目录扫描与敏感信息收集工具开发指南
1. 工具概述
MCP-Finder是一款创新的网络安全扫描工具,巧妙地将传统目录扫描技术与现代AI大模型分析能力相结合,通过整合MCP协议中的streamable-http方案,为安全研究人员提供了更智能、更高效的漏洞发现解决方案。
核心特点
- 双引擎设计:传统扫描+AI分析
- 流式传输:基于MCP协议的streamable-http方案
- 敏感数据收集:特定状态码(如500)页面信息泄露收集
- AI增强分析:理解上下文关系,发现潜在安全问题
- 自动化报告:结构化漏洞报告,包含修复建议
2. 环境搭建
2.1 项目初始化
uv init MCP-Finder
cd MCP-Finder
uv venv
2.2 依赖安装
# 通过脚本安装最新版pip
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python get-pip.py
# 激活虚拟环境
source .venv/bin/activate
# 安装MCP和dirsearch所需依赖
uv add "mcp[cli]"
uv add requests
pip install setuptools
# 退出虚拟环境
deactivate
# 安装dirsearch依赖
cd dirsearch
python -m pip3 install -r requirements.txt
3. 项目架构
MCP-Finder/
├── main.py # 主程序入口
├── results/ # 存放扫描结果
│ └── Finder_*.txt # 按时间戳命名的JSON结果文件
└── dirsearch-master/ # dirsearch源码目录
└── dirsearch.py # dirsearch主程序
4. 核心功能实现
4.1 MCP服务初始化
from mcp.server.fastmcp import FastMCP
# 基本配置
MCP = FastMCP("MCP-Finder", port="8001")
# 高级配置
MCP = FastMCP(
name="MCP-Finder",
port=8001,
host="0.0.0.0", # 允许外部访问
debug=True # 开启调试模式
)
4.2 Streamable HTTP传输
if __name__ == "__main__":
MCP.run("streamable-http")
流式传输特性:
- 需要维持持久连接进行分块数据传输
- 服务器可能间歇性推送数据
- 单个请求的完整响应周期被拉长
- 短超时会中断传输
4.3 工具函数注册
同步工具示例
@mcp.tool
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate the distance between two coordinates."""
return 42.5
异步工具示例
@mcp.tool
async def fetch_weather(city: str) -> dict:
"""Retrieve current weather conditions for a city."""
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/weather/{city}") as response:
response.raise_for_status()
return await response.json()
4.4 核心扫描功能
扫描工具函数
@MCP.tool()
async def scan(url: str) -> dict:
"""
执行网站目录扫描,返回结构化扫描结果
Args:
url (str): 目标网站URL,需包含协议头(如http/https)
Returns:
dict: 包含以下键的字典:
- status_200: 200状态的有效路径列表
- status_500: 500状态的有效结果列表
- status_other: 非200和500状态码的有效结果列表
- stat_counts: 各类状态码统计
- result_path: 结果文件路径
"""
# 生成时间戳文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"Finder_{timestamp}.txt"
outpath = f"./results/{filename}"
# 定义dirsearch命令
command = [
"python3", "./dirsearch-master/dirsearch.py",
"-u", url,
"-o", outpath,
"--output-formats=json",
]
# 异步执行扫描
try:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300.0)
if process.returncode != 0:
raise RuntimeError(f"Command failed with error: {stderr.decode()}")
print(f"Scan completed, results saved to {outpath}")
return process_results(outpath)
except Exception as e:
print(f"An error occurred: {e}")
raise
结果处理函数
def process_results(file_path: str) -> dict:
"""
处理dirsearch扫描生成的JSON结果文件
Args:
file_path (str): JSON格式的扫描结果文件路径
Returns:
dict: 分类后的扫描结果
"""
status_200 = []
stats_other = []
stats_500 = []
stats = {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
results = data.get('results', [])
for result in results:
url = result.get('url')
status_code = result.get('status')
content_length = result.get('contentLength')
# 状态码统计
stats[status_code] = stats.get(status_code, 0) + 1
# 分类处理
if status_code == 404:
continue
elif status_code == 200:
status_200.append({
"url": url,
"status": status_code,
"length": content_length
})
elif status_code == 500:
stats_500.append({
"url": url,
"status": status_code,
"length": content_length
})
else:
stats_other.append({
"url": url,
"status": status_code,
"length": content_length
})
except Exception as e:
print(f"Error processing file: {e}")
return {
"status_200": status_200,
"status_500": stats_500,
"status_other": stats_other,
"stat_counts": stats,
"result_path": file_path
}
4.5 敏感信息收集
500状态页面信息收集
@MCP.tool()
async def get_500_message(url_path: str) -> str:
"""
对500状态码页面提取错误信息
Args:
url_path (str): 目标URL
Returns:
str: 错误信息或错误字典
"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(url_path, timeout=5.0)
response.raise_for_status()
return response.text
except httpx.HTTPStatusError as e:
return {"error": f"请求失败: {e}"}
except Exception as e:
return {"error": f"请求失败: {str(e)}"}
敏感信息检测
# 敏感信息检测模式
SENSITIVE_PATTERNS = [
r"(?i)password\s*[=:]\s*['\"]?\w+",
r"(?i)api[_-]?key\s*[=:]\s*['\"]?\w+",
r"-----BEGIN (RSA|OPENSSH) PRIVATE KEY-----"
]
@MCP.tool()
async def sensitive_seek(url_path: str) -> dict:
"""
对URL路径进行敏感信息检测
Args:
url_path (str): 目标URL路径
Returns:
dict: 包含敏感信息检测结果
"""
async with httpx.AsyncClient() as client:
try:
sensitive_msg = []
response = await client.get(url_path, timeout=5.0)
response.raise_for_status()
content = response.text
for pattern in SENSITIVE_PATTERNS:
if re.search(pattern, content):
msg = re.findall(pattern, content)
sensitive_msg.append(msg)
return {
"url": url_path,
"sensitive_msg": sensitive_msg
}
except Exception as e:
return {"error": f"请求失败: {str(e)}"}
5. 高级功能
5.1 深度递归扫描
command = [
...
"--recursive", # 启用递归扫描
"--max-recursion-depth", "3" # 控制递归深度
]
5.2 提示词优化
请使用已有的MCP工具对网站https://xxx.xxx.com/进行全面扫描,具体要求如下:
1. 目录与页面扫描自动遍历网站常见目录与页面,重点关注可能存在版本目录、备份目录、敏感文件等信息泄露风险。
2. 500响应处理
对所有返回500状态码的页面,自动调用`get_500_message`工具函数获取详细错误内容,并作出信息泄露和利用分析。
3. 输出结构化报告
以Markdown表格形式输出以下字段:
- 目录/页面路径
- HTTP状态码
- 危害描述
- 利用方法
- 修复建议
6. 实战应用
6.1 启动服务
python main.py
6.2 扫描报告示例
工具可以生成美观易读的扫描报告,包含:
- 漏洞类型
- 风险等级
- 受影响URL
- 修复建议
- 危害描述
- 利用方法
7. 未来计划
- 增加更多漏洞类型的检测能力
- 实现完整的漏洞扫描自动化流程
- 优化AI模型的分析准确性
- 扩展对更多Web框架的支持
- 增强分布式扫描能力
8. 最佳实践
- 异常处理:对扫描、文件读写、网络请求等环节增加细致的异常捕获与日志输出
- 异步优化:充分利用asyncio提升大规模扫描性能
- 模块化设计:将功能模块化,便于维护和扩展
- 数据存储:使用时间戳命名结果文件,便于查找和管理
- Token优化:通过文件中转减少直接输入LLM的数据量
9. 注意事项
- 流式传输需要维持持久连接,避免设置过短的超时时间
- 大规模扫描时注意资源占用和性能优化
- AI分析结果需要与原始扫描数据对比验证
- 敏感信息检测规则需要定期更新维护
- 递归扫描深度需要合理设置以避免过度请求