瞎捣鼓之核心API未授权监控
字数 934 2025-08-30 06:50:27

API未授权访问监控系统设计与实现

1. 概述

本文档详细讲解如何构建一个核心API未授权访问监控系统,该系统能够定期检测关键API是否存在未授权访问漏洞,并通过钉钉机器人及时通知安全人员。

2. 系统设计原理

2.1 未授权访问漏洞特点

  • 比SQL注入、命令执行等漏洞更隐蔽
  • 通常由系统更新或配置变更意外引入
  • 难以通过常规安全测试发现
  • 危害性大,可能导致数据泄露或未授权操作

2.2 监控系统核心思路

  1. 预先定义需要监控的核心API列表
  2. 定期发送请求测试这些API
  3. 分析响应状态码判断是否存在未授权访问
  4. 通过钉钉机器人发送告警通知

3. 系统实现细节

3.1 目录结构

.
├── targets.json        # 存储需要监控的API列表
└── monitor_script.py   # 监控主程序

3.2 targets.json文件格式

[
    {
        "method": "POST",
        "url": "https://example.com/api/v1/sensitive",
        "headers": {
            "Content-Type": "application/json"
        },
        "body": {
            "key": "value"
        }
    },
    {
        "method": "GET",
        "url": "https://example.com/api/v1/private",
        "headers": {}
    }
]

3.3 监控脚本核心功能

3.3.1 主要模块

  1. API加载模块:从JSON文件读取API配置
  2. 请求测试模块:发送各种HTTP方法请求
  3. 结果分析模块:判断响应状态码
  4. 告警通知模块:通过钉钉机器人发送告警

3.3.2 关键代码解析

import requests
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# 钉钉机器人webhook地址
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=?"
unauthorized_apis = []

def load_targets(filename="targets.json"):
    """加载API配置文件"""
    with open(filename, "r") as f:
        return json.load(f)

def send_to_dingtalk(message):
    """发送钉钉告警"""
    headers = {"Content-Type": "application/json"}
    data = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    try:
        resp = requests.post(DINGTALK_WEBHOOK, headers=headers, data=json.dumps(data))
        print(f"[] 已发送钉钉告警,响应: {resp.status_code}")
    except Exception as e:
        print(f"[] 钉钉推送失败: {e}")

def report_unauthorized(apis):
    """生成并发送未授权API报告"""
    print("=== 开始发送dingtalk ===")
    if not apis:
        print("无未授权接口")
        return
    
    msg = " 以下 API 存在未授权访问风险:\n"
    for item in apis:
        msg += f"- {item['method']}{item['url']},状态码: {item['status']}\n"
    send_to_dingtalk(msg)

def make_request(target):
    """发送API请求并检查响应"""
    method = target.get("method", "GET").upper()
    url = target["url"]
    headers = target.get("headers", {})
    body = target.get("body", {})
    
    try:
        if method == "GET":
            resp = requests.get(url, headers=headers, timeout=10)
        elif method == "POST":
            resp = requests.post(url, json=body, headers=headers, timeout=10)
        elif method == "PUT":
            resp = requests.put(url, json=body, headers=headers, timeout=10)
        elif method == "DELETE":
            resp = requests.delete(url, headers=headers, timeout=10)
        elif method == "PATCH":
            resp = requests.patch(url, headers=headers, timeout=10)
        else:
            print(f"[️] 不支持的方法: {method}")
            return
        
        print(f"[{resp.status_code}] {method}{url}")
        
        # 如果状态码不是401/403/405,则认为可能存在未授权访问
        if resp.status_code not in [401, 403, 405]:
            unauthorized_apis.append({
                "method": method,
                "url": url,
                "status": resp.status_code
            })
    except Exception as e:
        print(f"[] 请求失败 {method}{url}: {e}")

def main():
    """主函数"""
    print("=== 多方法 API 未授权访问检测脚本 ===")
    targets = load_targets()
    
    # 使用线程池并发测试API
    thread_count = 5
    max_threads = min(thread_count, len(targets))
    
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        future_to_target = {
            executor.submit(make_request, target): target 
            for target in targets
        }
        
        for future in as_completed(future_to_target):
            target = future_to_target[future]
            try:
                future.result()
            except Exception as e:
                print(f"{target} 执行失败: {e}")
    
    print("=== 所有未授权扫描任务完成 ===")
    report_unauthorized(unauthorized_apis)

if __name__ == "__main__":
    main()

3.4 状态码判断逻辑

  • 401 Unauthorized:需要认证
  • 403 Forbidden:禁止访问
  • 405 Method Not Allowed:方法不允许

如果响应状态码是以上三种之一,则认为可能存在未授权访问风险。

4. 自动化部署方案

4.1 使用GitHub Actions定时触发

name: API Unauthorized Access Monitor

on:
  schedule:
    - cron: '0 */6 * * *'  # 每6小时运行一次
  workflow_dispatch:

jobs:
  monitor:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.8'
    
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install requests
                
    - name: Run monitor
      run: python monitor_script.py

5. 扩展与优化建议

5.1 功能增强

  1. 增加响应内容分析:检查响应中是否包含敏感数据
  2. 添加重试机制:对失败的请求进行自动重试
  3. 支持更多认证方式:如Basic Auth、OAuth等
  4. 历史记录与趋势分析:保存扫描结果进行对比分析

5.2 性能优化

  1. 动态调整线程数:根据API数量自动调整并发数
  2. 请求限速:避免对生产环境造成过大压力
  3. 分布式部署:对于大规模API系统

5.3 安全考虑

  1. 敏感信息保护:加密存储钉钉webhook等敏感信息
  2. 请求频率控制:避免被误认为攻击行为
  3. 白名单机制:允许某些API的正常未授权访问

6. 总结

本系统通过定期自动化测试核心API的访问控制情况,能够及时发现未授权访问漏洞,相比传统安全测试方法更加持续和高效。系统设计简单但实用,可根据实际需求进行扩展和优化。

API未授权访问监控系统设计与实现 1. 概述 本文档详细讲解如何构建一个核心API未授权访问监控系统,该系统能够定期检测关键API是否存在未授权访问漏洞,并通过钉钉机器人及时通知安全人员。 2. 系统设计原理 2.1 未授权访问漏洞特点 比SQL注入、命令执行等漏洞更隐蔽 通常由系统更新或配置变更意外引入 难以通过常规安全测试发现 危害性大,可能导致数据泄露或未授权操作 2.2 监控系统核心思路 预先定义需要监控的核心API列表 定期发送请求测试这些API 分析响应状态码判断是否存在未授权访问 通过钉钉机器人发送告警通知 3. 系统实现细节 3.1 目录结构 3.2 targets.json文件格式 3.3 监控脚本核心功能 3.3.1 主要模块 API加载模块 :从JSON文件读取API配置 请求测试模块 :发送各种HTTP方法请求 结果分析模块 :判断响应状态码 告警通知模块 :通过钉钉机器人发送告警 3.3.2 关键代码解析 3.4 状态码判断逻辑 401 Unauthorized :需要认证 403 Forbidden :禁止访问 405 Method Not Allowed :方法不允许 如果响应状态码 不 是以上三种之一,则认为可能存在未授权访问风险。 4. 自动化部署方案 4.1 使用GitHub Actions定时触发 5. 扩展与优化建议 5.1 功能增强 增加响应内容分析 :检查响应中是否包含敏感数据 添加重试机制 :对失败的请求进行自动重试 支持更多认证方式 :如Basic Auth、OAuth等 历史记录与趋势分析 :保存扫描结果进行对比分析 5.2 性能优化 动态调整线程数 :根据API数量自动调整并发数 请求限速 :避免对生产环境造成过大压力 分布式部署 :对于大规模API系统 5.3 安全考虑 敏感信息保护 :加密存储钉钉webhook等敏感信息 请求频率控制 :避免被误认为攻击行为 白名单机制 :允许某些API的正常未授权访问 6. 总结 本系统通过定期自动化测试核心API的访问控制情况,能够及时发现未授权访问漏洞,相比传统安全测试方法更加持续和高效。系统设计简单但实用,可根据实际需求进行扩展和优化。