基于AI的智能目录扫描与敏感信息收集工具开发
字数 947 2025-09-01 11:25:54

MCP-Finder: 基于AI的智能目录扫描与敏感信息收集工具开发指南

1. 工具概述

MCP-Finder是一款创新的网络安全扫描工具,巧妙地将传统目录扫描技术与现代AI大模型分析能力相结合,通过整合MCP协议中的streamable-http方案,为安全研究人员提供了更智能、更高效的漏洞发现解决方案。

核心特点

  • 双引擎设计:传统扫描+AI分析
  • 流式传输:基于MCP协议的streamable-http方案
  • 敏感数据收集:特定状态码(如500)页面信息泄露收集
  • AI增强分析:理解上下文关系,发现潜在安全问题
  • 自动化报告:结构化漏洞报告,包含修复建议

2. 环境搭建

2.1 项目初始化

uv init MCP-Finder
cd MCP-Finder
uv venv

2.2 依赖安装

# 通过脚本安装最新版pip
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python get-pip.py

# 激活虚拟环境
source .venv/bin/activate

# 安装MCP和dirsearch所需依赖
uv add "mcp[cli]"
uv add requests
pip install setuptools

# 退出虚拟环境
deactivate

# 安装dirsearch依赖
cd dirsearch
python -m pip3 install -r requirements.txt

3. 项目架构

MCP-Finder/
├── main.py                # 主程序入口
├── results/               # 存放扫描结果
│   └── Finder_*.txt       # 按时间戳命名的JSON结果文件
└── dirsearch-master/      # dirsearch源码目录
    └── dirsearch.py       # dirsearch主程序

4. 核心功能实现

4.1 MCP服务初始化

from mcp.server.fastmcp import FastMCP

# 基本配置
MCP = FastMCP("MCP-Finder", port="8001")

# 高级配置
MCP = FastMCP(
    name="MCP-Finder",
    port=8001,
    host="0.0.0.0",  # 允许外部访问
    debug=True       # 开启调试模式
)

4.2 Streamable HTTP传输

if __name__ == "__main__":
    MCP.run("streamable-http")

流式传输特性

  • 需要维持持久连接进行分块数据传输
  • 服务器可能间歇性推送数据
  • 单个请求的完整响应周期被拉长
  • 短超时会中断传输

4.3 工具函数注册

同步工具示例

@mcp.tool
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Calculate the distance between two coordinates."""
    return 42.5

异步工具示例

@mcp.tool
async def fetch_weather(city: str) -> dict:
    """Retrieve current weather conditions for a city."""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/weather/{city}") as response:
            response.raise_for_status()
            return await response.json()

4.4 核心扫描功能

扫描工具函数

@MCP.tool()
async def scan(url: str) -> dict:
    """
    执行网站目录扫描,返回结构化扫描结果
    Args:
        url (str): 目标网站URL,需包含协议头(如http/https)
    Returns:
        dict: 包含以下键的字典:
            - status_200: 200状态的有效路径列表
            - status_500: 500状态的有效结果列表
            - status_other: 非200和500状态码的有效结果列表
            - stat_counts: 各类状态码统计
            - result_path: 结果文件路径
    """
    # 生成时间戳文件名
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"Finder_{timestamp}.txt"
    outpath = f"./results/{filename}"
    
    # 定义dirsearch命令
    command = [
        "python3", "./dirsearch-master/dirsearch.py",
        "-u", url,
        "-o", outpath,
        "--output-formats=json",
    ]
    
    # 异步执行扫描
    try:
        process = await asyncio.create_subprocess_exec(
            *command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300.0)
        
        if process.returncode != 0:
            raise RuntimeError(f"Command failed with error: {stderr.decode()}")
            
        print(f"Scan completed, results saved to {outpath}")
        return process_results(outpath)
        
    except Exception as e:
        print(f"An error occurred: {e}")
        raise

结果处理函数

def process_results(file_path: str) -> dict:
    """
    处理dirsearch扫描生成的JSON结果文件
    Args:
        file_path (str): JSON格式的扫描结果文件路径
    Returns:
        dict: 分类后的扫描结果
    """
    status_200 = []
    stats_other = []
    stats_500 = []
    stats = {}
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
            results = data.get('results', [])
            
            for result in results:
                url = result.get('url')
                status_code = result.get('status')
                content_length = result.get('contentLength')
                
                # 状态码统计
                stats[status_code] = stats.get(status_code, 0) + 1
                
                # 分类处理
                if status_code == 404:
                    continue
                elif status_code == 200:
                    status_200.append({
                        "url": url,
                        "status": status_code,
                        "length": content_length
                    })
                elif status_code == 500:
                    stats_500.append({
                        "url": url,
                        "status": status_code,
                        "length": content_length
                    })
                else:
                    stats_other.append({
                        "url": url,
                        "status": status_code,
                        "length": content_length
                    })
                    
    except Exception as e:
        print(f"Error processing file: {e}")
        
    return {
        "status_200": status_200,
        "status_500": stats_500,
        "status_other": stats_other,
        "stat_counts": stats,
        "result_path": file_path
    }

4.5 敏感信息收集

500状态页面信息收集

@MCP.tool()
async def get_500_message(url_path: str) -> str:
    """
    对500状态码页面提取错误信息
    Args:
        url_path (str): 目标URL
    Returns:
        str: 错误信息或错误字典
    """
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url_path, timeout=5.0)
            response.raise_for_status()
            return response.text
        except httpx.HTTPStatusError as e:
            return {"error": f"请求失败: {e}"}
        except Exception as e:
            return {"error": f"请求失败: {str(e)}"}

敏感信息检测

# 敏感信息检测模式
SENSITIVE_PATTERNS = [
    r"(?i)password\s*[=:]\s*['\"]?\w+",
    r"(?i)api[_-]?key\s*[=:]\s*['\"]?\w+",
    r"-----BEGIN (RSA|OPENSSH) PRIVATE KEY-----"
]

@MCP.tool()
async def sensitive_seek(url_path: str) -> dict:
    """
    对URL路径进行敏感信息检测
    Args:
        url_path (str): 目标URL路径
    Returns:
        dict: 包含敏感信息检测结果
    """
    async with httpx.AsyncClient() as client:
        try:
            sensitive_msg = []
            response = await client.get(url_path, timeout=5.0)
            response.raise_for_status()
            content = response.text
            
            for pattern in SENSITIVE_PATTERNS:
                if re.search(pattern, content):
                    msg = re.findall(pattern, content)
                    sensitive_msg.append(msg)
                    
            return {
                "url": url_path,
                "sensitive_msg": sensitive_msg
            }
            
        except Exception as e:
            return {"error": f"请求失败: {str(e)}"}

5. 高级功能

5.1 深度递归扫描

command = [
    ...
    "--recursive",            # 启用递归扫描
    "--max-recursion-depth", "3"  # 控制递归深度
]

5.2 提示词优化

请使用已有的MCP工具对网站https://xxx.xxx.com/进行全面扫描,具体要求如下:

1. 目录与页面扫描自动遍历网站常见目录与页面,重点关注可能存在版本目录、备份目录、敏感文件等信息泄露风险。

2. 500响应处理
对所有返回500状态码的页面,自动调用`get_500_message`工具函数获取详细错误内容,并作出信息泄露和利用分析。

3. 输出结构化报告
以Markdown表格形式输出以下字段:
- 目录/页面路径
- HTTP状态码
- 危害描述
- 利用方法
- 修复建议

6. 实战应用

6.1 启动服务

python main.py

6.2 扫描报告示例

工具可以生成美观易读的扫描报告,包含:

  • 漏洞类型
  • 风险等级
  • 受影响URL
  • 修复建议
  • 危害描述
  • 利用方法

7. 未来计划

  1. 增加更多漏洞类型的检测能力
  2. 实现完整的漏洞扫描自动化流程
  3. 优化AI模型的分析准确性
  4. 扩展对更多Web框架的支持
  5. 增强分布式扫描能力

8. 最佳实践

  1. 异常处理:对扫描、文件读写、网络请求等环节增加细致的异常捕获与日志输出
  2. 异步优化:充分利用asyncio提升大规模扫描性能
  3. 模块化设计:将功能模块化,便于维护和扩展
  4. 数据存储:使用时间戳命名结果文件,便于查找和管理
  5. Token优化:通过文件中转减少直接输入LLM的数据量

9. 注意事项

  1. 流式传输需要维持持久连接,避免设置过短的超时时间
  2. 大规模扫描时注意资源占用和性能优化
  3. AI分析结果需要与原始扫描数据对比验证
  4. 敏感信息检测规则需要定期更新维护
  5. 递归扫描深度需要合理设置以避免过度请求
MCP-Finder: 基于AI的智能目录扫描与敏感信息收集工具开发指南 1. 工具概述 MCP-Finder是一款创新的网络安全扫描工具,巧妙地将传统目录扫描技术与现代AI大模型分析能力相结合,通过整合MCP协议中的streamable-http方案,为安全研究人员提供了更智能、更高效的漏洞发现解决方案。 核心特点 双引擎设计 :传统扫描+AI分析 流式传输 :基于MCP协议的streamable-http方案 敏感数据收集 :特定状态码(如500)页面信息泄露收集 AI增强分析 :理解上下文关系,发现潜在安全问题 自动化报告 :结构化漏洞报告,包含修复建议 2. 环境搭建 2.1 项目初始化 2.2 依赖安装 3. 项目架构 4. 核心功能实现 4.1 MCP服务初始化 4.2 Streamable HTTP传输 流式传输特性 : 需要维持持久连接进行分块数据传输 服务器可能间歇性推送数据 单个请求的完整响应周期被拉长 短超时会中断传输 4.3 工具函数注册 同步工具示例 异步工具示例 4.4 核心扫描功能 扫描工具函数 结果处理函数 4.5 敏感信息收集 500状态页面信息收集 敏感信息检测 5. 高级功能 5.1 深度递归扫描 5.2 提示词优化 6. 实战应用 6.1 启动服务 6.2 扫描报告示例 工具可以生成美观易读的扫描报告,包含: 漏洞类型 风险等级 受影响URL 修复建议 危害描述 利用方法 7. 未来计划 增加更多漏洞类型的检测能力 实现完整的漏洞扫描自动化流程 优化AI模型的分析准确性 扩展对更多Web框架的支持 增强分布式扫描能力 8. 最佳实践 异常处理 :对扫描、文件读写、网络请求等环节增加细致的异常捕获与日志输出 异步优化 :充分利用asyncio提升大规模扫描性能 模块化设计 :将功能模块化,便于维护和扩展 数据存储 :使用时间戳命名结果文件,便于查找和管理 Token优化 :通过文件中转减少直接输入LLM的数据量 9. 注意事项 流式传输需要维持持久连接,避免设置过短的超时时间 大规模扫描时注意资源占用和性能优化 AI分析结果需要与原始扫描数据对比验证 敏感信息检测规则需要定期更新维护 递归扫描深度需要合理设置以避免过度请求