瞎捣鼓之核心API未授权监控
字数 934 2025-08-30 06:50:27
API未授权访问监控系统设计与实现
1. 概述
本文档详细讲解如何构建一个核心API未授权访问监控系统,该系统能够定期检测关键API是否存在未授权访问漏洞,并通过钉钉机器人及时通知安全人员。
2. 系统设计原理
2.1 未授权访问漏洞特点
- 比SQL注入、命令执行等漏洞更隐蔽
- 通常由系统更新或配置变更意外引入
- 难以通过常规安全测试发现
- 危害性大,可能导致数据泄露或未授权操作
2.2 监控系统核心思路
- 预先定义需要监控的核心API列表
- 定期发送请求测试这些API
- 分析响应状态码判断是否存在未授权访问
- 通过钉钉机器人发送告警通知
3. 系统实现细节
3.1 目录结构
.
├── targets.json # 存储需要监控的API列表
└── monitor_script.py # 监控主程序
3.2 targets.json文件格式
[
{
"method": "POST",
"url": "https://example.com/api/v1/sensitive",
"headers": {
"Content-Type": "application/json"
},
"body": {
"key": "value"
}
},
{
"method": "GET",
"url": "https://example.com/api/v1/private",
"headers": {}
}
]
3.3 监控脚本核心功能
3.3.1 主要模块
- API加载模块:从JSON文件读取API配置
- 请求测试模块:发送各种HTTP方法请求
- 结果分析模块:判断响应状态码
- 告警通知模块:通过钉钉机器人发送告警
3.3.2 关键代码解析
import requests
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# 钉钉机器人webhook地址
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=?"
unauthorized_apis = []
def load_targets(filename="targets.json"):
"""加载API配置文件"""
with open(filename, "r") as f:
return json.load(f)
def send_to_dingtalk(message):
"""发送钉钉告警"""
headers = {"Content-Type": "application/json"}
data = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
resp = requests.post(DINGTALK_WEBHOOK, headers=headers, data=json.dumps(data))
print(f"[] 已发送钉钉告警,响应: {resp.status_code}")
except Exception as e:
print(f"[] 钉钉推送失败: {e}")
def report_unauthorized(apis):
"""生成并发送未授权API报告"""
print("=== 开始发送dingtalk ===")
if not apis:
print("无未授权接口")
return
msg = " 以下 API 存在未授权访问风险:\n"
for item in apis:
msg += f"- {item['method']}{item['url']},状态码: {item['status']}\n"
send_to_dingtalk(msg)
def make_request(target):
"""发送API请求并检查响应"""
method = target.get("method", "GET").upper()
url = target["url"]
headers = target.get("headers", {})
body = target.get("body", {})
try:
if method == "GET":
resp = requests.get(url, headers=headers, timeout=10)
elif method == "POST":
resp = requests.post(url, json=body, headers=headers, timeout=10)
elif method == "PUT":
resp = requests.put(url, json=body, headers=headers, timeout=10)
elif method == "DELETE":
resp = requests.delete(url, headers=headers, timeout=10)
elif method == "PATCH":
resp = requests.patch(url, headers=headers, timeout=10)
else:
print(f"[️] 不支持的方法: {method}")
return
print(f"[{resp.status_code}] {method}{url}")
# 如果状态码不是401/403/405,则认为可能存在未授权访问
if resp.status_code not in [401, 403, 405]:
unauthorized_apis.append({
"method": method,
"url": url,
"status": resp.status_code
})
except Exception as e:
print(f"[] 请求失败 {method}{url}: {e}")
def main():
"""主函数"""
print("=== 多方法 API 未授权访问检测脚本 ===")
targets = load_targets()
# 使用线程池并发测试API
thread_count = 5
max_threads = min(thread_count, len(targets))
with ThreadPoolExecutor(max_workers=max_threads) as executor:
future_to_target = {
executor.submit(make_request, target): target
for target in targets
}
for future in as_completed(future_to_target):
target = future_to_target[future]
try:
future.result()
except Exception as e:
print(f"{target} 执行失败: {e}")
print("=== 所有未授权扫描任务完成 ===")
report_unauthorized(unauthorized_apis)
if __name__ == "__main__":
main()
3.4 状态码判断逻辑
- 401 Unauthorized:需要认证
- 403 Forbidden:禁止访问
- 405 Method Not Allowed:方法不允许
如果响应状态码不是以上三种之一,则认为可能存在未授权访问风险。
4. 自动化部署方案
4.1 使用GitHub Actions定时触发
name: API Unauthorized Access Monitor
on:
schedule:
- cron: '0 */6 * * *' # 每6小时运行一次
workflow_dispatch:
jobs:
monitor:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install requests
- name: Run monitor
run: python monitor_script.py
5. 扩展与优化建议
5.1 功能增强
- 增加响应内容分析:检查响应中是否包含敏感数据
- 添加重试机制:对失败的请求进行自动重试
- 支持更多认证方式:如Basic Auth、OAuth等
- 历史记录与趋势分析:保存扫描结果进行对比分析
5.2 性能优化
- 动态调整线程数:根据API数量自动调整并发数
- 请求限速:避免对生产环境造成过大压力
- 分布式部署:对于大规模API系统
5.3 安全考虑
- 敏感信息保护:加密存储钉钉webhook等敏感信息
- 请求频率控制:避免被误认为攻击行为
- 白名单机制:允许某些API的正常未授权访问
6. 总结
本系统通过定期自动化测试核心API的访问控制情况,能够及时发现未授权访问漏洞,相比传统安全测试方法更加持续和高效。系统设计简单但实用,可根据实际需求进行扩展和优化。