如何开发用于漏洞研究的Ghidra插件,Part 1
字数 1314 2025-08-05 11:39:26
Ghidra插件开发教学:漏洞研究中的Sink点检测
概述
本教学文档基于Somerset Recon团队关于Ghidra插件开发的系列文章,重点介绍如何开发用于漏洞研究的Ghidra插件,特别是如何识别可能导致安全漏洞的sink点(如strcpy、memcpy等危险函数)。
Ghidra插件基础
核心概念
Ghidra插件运行时可以访问五个关键状态对象:
currentProgram: 当前活动程序currentAddress: 当前光标位置地址currentLocation: 当前程序位置currentSelection: 当前选择区域currentHighlight: 当前高亮显示
开发环境
Ghidra插件可以使用Java或Jython编写,本教学使用Jython。有三种方式使用Ghidra的Jython API:
- 使用Python IDE(类似IDA Python控制台)
- 通过脚本管理器加载脚本(右键脚本 → Edit with Basic Editor)
- Headless模式(无GUI运行)
示例脚本位于Ghidra安装目录的ghidra/features/python/ghidra_scripts下。
Sink点检测技术
Sink点列表
首先需要定义可能导致缓冲区溢出的危险函数列表:
sinks = [
"strcpy", "memcpy", "gets", "memmove", "scanf",
"strcpyA", "strcpyW", "wcscpy", "_tcscpy", "_mbscpy",
"StrCpy", "StrCpyA", "lstrcpyA", "lstrcpy",
# 可根据需要扩展更多危险函数
]
检测方法一:线性搜索
遍历二进制文件的text段,检查每条指令:
listing = currentProgram.getListing()
ins_list = listing.getInstructions(1) # 获取指令迭代器
while ins_list.hasNext():
ins = ins_list.next()
mnemonic = ins.getMnemonicString()
# 检查是否为CALL指令
if mnemonic == "CALL":
ops = ins.getOpObjects(0)
try:
target_addr = ops[0]
sink_func = listing.getFunctionAt(target_addr)
sink_func_name = sink_func.getName()
if sink_func_name in sinks:
print(f"发现sink点: {sink_func_name} at {target_addr}")
except:
pass
注意:此方法可能无法检测外部DLL函数调用,需要额外处理。
检测方法二:交叉引用(Xrefs)
更高效的方法是使用Ghidra的交叉引用功能:
fm = currentProgram.getFunctionManager()
ext_fm = fm.getExternalFunctions() # 获取外部函数
while ext_fm.hasNext():
ext_func = ext_fm.next()
target_func = ext_func.getName()
if target_func in sinks:
loc = ext_func.getExternalLocation()
sink_func_addr = loc.getAddress() or ext_func.getEntryPoint()
if sink_func_addr:
references = getReferencesTo(sink_func_addr)
for ref in references:
call_addr = ref.getFromAddress()
ins = listing.getInstructionAt(call_addr)
if ins.getMnemonicString() == "CALL":
print(f"{target_func} called from {call_addr}")
高级功能:可视化调用关系
构建调用图
使用Graphviz可视化调用关系:
from graphviz import Digraph
sink_dic = {} # 存储调用关系的字典
# 在检测过程中填充字典
if func_name in sinks:
references = getReferencesTo(target_addr)
for ref in references:
call_addr = ref.getFromAddress()
parent_func_name = getFunctionBefore(call_addr).getName()
if parent_func_name not in sink_dic:
sink_dic[parent_func_name] = {}
if func_name not in sink_dic[parent_func_name]:
sink_dic[parent_func_name][func_name] = {
"address": target_addr,
"call_address": []
}
sink_dic[parent_func_name][func_name]["call_address"].append(call_addr)
# 创建图形
graph = Digraph("ReferenceTree")
graph.graph_attr['rankdir'] = 'LR'
for parent_func_name, sink_func_list in sink_dic.items():
graph.node(parent_func_name, style="filled", color="blue", fontcolor="white")
for sink_name, sink_data in sink_func_list.items():
graph.node(sink_name, style="filled", color="red", fontcolor="white")
for call_addr in sink_data["call_address"]:
graph.edge(parent_func_name, sink_name, label=call_addr.toString())
graph.render("sink_and_caller.gv", view=True)
多格式输出
提供多种输出格式选择:
from beautifultable import BeautifulTable
# 文本表格输出
table = BeautifulTable()
table.columns.header = ["调用函数", "Sink点", "调用地址"]
for parent_func, sinks in sink_dic.items():
for sink_name, sink_data in sinks.items():
for addr in sink_data["call_address"]:
table.rows.append([parent_func, sink_name, str(addr)])
print(table)
# JSON输出
import json
with open("sinks.json", "w") as f:
json.dump(sink_dic, f, indent=2)
实际案例分析:CoreFTPServer漏洞
在CoreFTPServer v1.2 build 505中,插件成功识别出lstrcpyA等危险函数调用。通过进一步分析,确认这些调用确实导致了缓冲区溢出漏洞(EIP被0x42424242覆盖)。
已知限制与解决方案
-
标准库函数识别问题:
- Ghidra的FLIRT签名支持不如IDA Pro完善
- 解决方案:手动添加外部函数定义或等待Ghidra改进
-
外部函数调用检测:
- 需要特别处理外部DLL函数
- 使用
getExternalFunctions()和getExternalLocation()
-
性能考虑:
- 线性搜索可能较慢,优先使用交叉引用方法
- 对大二进制文件考虑分块处理
完整脚本示例
from ghidra.program.model.address import Address
from ghidra.program.model.listing import CodeUnit, Listing
import sys
import os
from beautifultable import BeautifulTable
from graphviz import Digraph
# 配置Python路径
ghidra_default_dir = os.getcwd()
jython_dir = os.path.join(ghidra_default_dir, "Ghidra", "Features", "Python",
"lib", "Lib", "site-packages")
sys.path.insert(0, jython_dir)
# Sink点定义
sinks = [
"strcpy", "memcpy", "gets", "memmove", "scanf",
"strcpyA", "strcpyW", "wcscpy", "_tcscpy", "_mbscpy",
"StrCpy", "StrCpyA", "StrCpyW", "lstrcpy", "lstrcpyA", "lstrcpyW"
]
def find_sinks():
sink_dic = {}
listing = currentProgram.getListing()
fm = currentProgram.getFunctionManager()
ext_fm = fm.getExternalFunctions()
# 处理外部函数
ext_sinks = {}
while ext_fm.hasNext():
ext_func = ext_fm.next()
func_name = ext_func.getName()
if func_name in sinks:
loc = ext_func.getExternalLocation()
addr = loc.getAddress() or ext_func.getEntryPoint()
if addr:
ext_sinks[addr] = func_name
# 遍历指令
ins_list = listing.getInstructions(1)
while ins_list.hasNext():
ins = ins_list.next()
if ins.getMnemonicString() == "CALL":
try:
target_addr = ins.getOpObjects(0)[0]
func_name = None
if isinstance(target_addr, Address):
# 检查外部函数
if target_addr in ext_sinks:
func_name = ext_sinks[target_addr]
else:
# 检查内部函数
func = listing.getFunctionAt(target_addr)
if func:
func_name = func.getName()
if func_name in sinks:
refs = getReferencesTo(target_addr)
for ref in refs:
call_addr = ref.getFromAddress()
parent_func = getFunctionBefore(call_addr)
if parent_func:
parent_name = parent_func.getName()
if parent_name not in sink_dic:
sink_dic[parent_name] = {}
if func_name not in sink_dic[parent_name]:
sink_dic[parent_name][func_name] = {
"address": target_addr,
"call_address": []
}
if call_addr not in sink_dic[parent_name][func_name]["call_address"]:
sink_dic[parent_name][func_name]["call_address"].append(call_addr)
except:
continue
return sink_dic
def display_results(sink_dic, format="text"):
if format == "text":
table = BeautifulTable()
table.columns.header = ["调用函数", "Sink点", "调用地址"]
for parent, sinks in sink_dic.items():
for sink, data in sinks.items():
for addr in data["call_address"]:
table.rows.append([parent, sink, str(addr)])
print(table)
elif format == "graph":
graph = Digraph("SinkCallGraph")
graph.graph_attr['rankdir'] = 'LR'
for parent, sinks in sink_dic.items():
graph.node(parent, style="filled", color="blue", fontcolor="white")
for sink, data in sinks.items():
graph.node(sink, style="filled", color="red", fontcolor="white")
for addr in data["call_address"]:
graph.edge(parent, sink, label=str(addr))
output_file = os.path.join(os.getcwd(), "sink_graph")
graph.render(output_file, view=True)
elif format == "json":
import json
output_file = os.path.join(os.getcwd(), "sinks.json")
with open(output_file, "w") as f:
json.dump(sink_dic, f, indent=2)
print(f"结果已保存到 {output_file}")
# 主执行逻辑
if __name__ == "__main__":
sink_data = find_sinks()
# 这里可以添加GUI让用户选择输出格式
display_results(sink_data, format="graph")
display_results(sink_data, format="text")
总结与进阶方向
-
增强检测能力:
- 添加更多类型的sink点(如格式化字符串漏洞相关函数)
- 实现参数分析,检测可能受控的参数
-
提高准确性:
- 结合数据流分析,追踪危险数据到sink点的路径
- 实现污点分析,识别用户可控输入
-
性能优化:
- 对大型二进制文件实现并行处理
- 缓存常用函数和引用信息
-
集成更多功能:
- 添加漏洞模式匹配
- 实现自动化漏洞验证功能
通过本教学,您已经掌握了Ghidra插件开发的基础知识,特别是针对漏洞研究的sink点检测技术。这些技术可以扩展到其他类型的漏洞分析,为您的安全研究工作提供强大支持。