flaskpython代码审计思路及实战记录
字数 946 2025-08-22 12:23:41
Flask Python代码审计全面指南
1. SQL注入审计与防御
1.1 危险模式识别
- 使用字符串拼接方式构造SQL语句:
"SELECT * FROM table WHERE id=" + value "SELECT * FROM table WHERE id=%s" % value "SELECT * FROM table WHERE id={0}".format(value) - SQLAlchemy的
text()函数未参数化:from sqlalchemy import text stmt = text(f"SELECT * FROM users WHERE name = '{username}'") # 危险
1.2 安全实践
- 使用参数化查询:
stmt = "SELECT * FROM table WHERE id=?" connection.execute(stmt, (value,)) - SQLAlchemy安全用法:
stmt = text("SELECT * FROM users WHERE name = :username").bindparams(username=username) - 优先使用ORM方法:
User.query.filter_by(username=username).first()
2. 命令/代码执行漏洞审计
2.1 危险函数检查
popen,system,commands,subprocess,exec,eval- 示例漏洞:
import subprocess @app.route('/ping') def ping(): ip = request.args.get('ip') result = subprocess.run(["ping", "-c", "1", ip], capture_output=True, text=True) return f"<pre>{result.stdout}</pre>"
2.2 第三方库风险
- PyYAML不安全用法:
import yaml data = yaml.load(user_input, Loader=yaml.Loader) # 可触发任意代码执行
2.3 防御措施
- 使用安全的YAML加载方式:
data = yaml.load(user_input, Loader=yaml.SafeLoader) - 限制子进程参数:
import shlex safe_ip = shlex.quote(ip)
3. 反序列化漏洞审计
3.1 危险模块
pickle,marshal,PyYAML等
3.2 漏洞示例
import pickle
data = request.get_data()
obj = pickle.loads(data) # 攻击者可构造恶意序列化对象
3.3 安全实践
- 优先使用JSON代替pickle
- pickle安全使用:
import hmac, pickle key = b'secret_key' data = request.get_data() if not hmac.compare_digest(hmac.new(key, data).digest(), request.headers.get('Signature')): abort(403) obj = pickle.loads(data)
4. 文件操作安全
4.1 危险函数
file(),file.save(),open(),codecs.open()
4.2 安全文件上传
- 白名单限制文件类型:
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS - 文件内容验证:
import magic mime = magic.from_buffer(file.read(1024), mime=True) if mime not in ['image/png', 'image/jpeg', 'image/gif']: abort(400, "Invalid file type")
4.3 防止路径遍历
from werkzeug.utils import secure_filename
import os
filename = secure_filename(request.form['filename'])
base_dir = os.path.abspath("/var/data")
target_path = os.path.join(base_dir, filename)
if not os.path.commonprefix([base_dir, target_path]) == base_dir:
abort(403, "Invalid path")
5. XXE漏洞审计
5.1 危险用法
from lxml import etree
xml_data='''<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///c:/cc.txt">]><data>&xxe;</data>'''
root = etree.fromstring(xml_data) # 允许解析外部实体
5.2 防御措施
- 使用defusedxml:
from defusedxml.ElementTree import parse tree = parse(xml_file) # 默认禁用外部实体 - 禁用外部实体:
parser = etree.XMLParser(resolve_entities=False) root = etree.fromstring(xml_data, parser=parser)
6. SSRF漏洞审计
6.1 危险模式
url = request.form['url']
response = requests.get(url) # 可能访问内部服务
6.2 防御措施
- 域名白名单:
ALLOWED_DOMAINS = {'example.com', 'cdn.example.net'} from urllib.parse import urlparse def is_allowed_url(url): parsed = urlparse(url) return parsed.hostname in ALLOWED_DOMAINS - 防止DNS重绑定:
import socket from urllib.parse import urlparse parsed = urlparse(url) resolved_ip = socket.gethostbyname(parsed.hostname) if resolved_ip in ['127.0.0.1', '169.254.169.254']: abort(403, "Forbidden IP")
7. XSS漏洞审计
7.1 危险模式
- 使用
|safe过滤器或Markup类 - 直接渲染未转义的用户输入
- 示例:
from flask import Markup user_input = request.args.get('q') return render_template('search.html', result=Markup(user_input))
7.2 常见payload
<svg><script>alert(1)</script></svg>
7.3 防御措施
- 使用
escape()过滤:from markupsafe import escape username = escape(request.args.get('username')) - 设置CSP策略
- 安全处理文件上传:
return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True)
8. 其他安全考虑
8.1 CSRF防护
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
8.2 权限校验
from flask_login import current_user
user = User.query.get(user_id)
if user.id != current_user.id:
abort(403)
8.3 并发控制
try:
db.session.add(new_user)
db.session.commit()
except Exception as e:
db.session.rollback()
print(f"Error: {e}")
8.4 最小权限原则
CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'strongpassword';
GRANT SELECT, INSERT, UPDATE ON mydb.* TO 'appuser'@'localhost';
8.5 缓存安全
app.config['CACHE_TYPE'] = 'RedisCache'
app.config['CACHE_REDIS_HOST'] = '10.0.0.5'
app.config['CACHE_REDIS_PASSWORD'] = 'strong_redis_password'
app.config['CACHE_KEY_PREFIX'] = 'myapp:'
9. 实战审计案例
9.1 存储型XSS漏洞
- 查找模板中的
|safe关键词 - 发现
render_recommendations上下文函数:def render_recommendations(): template_path = os.path.join(os.path.dirname(__file__), 'templates', 'recommendations.html') if os.path.exists(template_path): with open(template_path, 'r', encoding='utf-8') as f: template = f.read() return Markup(render_template_string(template)) return '' - 利用点:
title、category、tags三处未过滤用户输入 - 提交payload:``