如何实现一个 fuzzer
字数 698 2025-08-05 11:39:48
如何实现一个 Fuzzer - 详细教学文档
前言
本文基于对 go fuzz 源码的研究和 "Build simple fuzzer" 系列文章的分析,详细讲解如何实现一个能够 fuzz C/C++ 可执行文件的 fuzzer。我们将从整体架构到具体实现,逐步解析 fuzzer 的各个组成部分。
Fuzzer 整体流程
一个基本的 fuzzer 工作流程如下:
- 声明要 fuzz 的对象、语料库、断点等
- 语料读入,初始化变异器
- 变异器进行迭代
- 执行 fuzz 逻辑
- 记录根据当前变异器得到的文件所能观察到的块
- 更新 corpus
- 检查用户是否发出终止命令,发出则保存 crash 文件,否则返回步骤3继续执行
核心组件详解
变异器类 (Mutator)
变异器是 fuzzer 的核心组件,负责生成和选择测试用例:
class Mutator:
def __init__(self, core):
self.core = core # 核心样本集
self.trace = set() # 当前观察到的块集合
self.corpus = [] # 执行过的样本集
self.pool = [] # 变异池
self.samples = [] # 变异后的样本集
def __iter__(self):
# 初始化变异轮次
self._fit_pool()
self._mutate_pool()
return self
def __next__(self):
if not self.samples:
self._fit_pool()
self._mutate_pool()
global stop_flag
if stop_flag:
raise StopIteration
else:
return self.samples.pop()
变异池初始化 (_fit_pool)
def _fit_pool(self):
# 1. 总是复制初始语料库
for sample in self.core:
self.pool.append((sample, []))
# 2. 选择发现新块的样本
for sample, trace in self.corpus:
if trace - self.trace: # 有新块被发现
self.pool.append((sample, trace))
# 3. 补充到100个样本
if self.corpus and len(self.pool) < 100:
self.corpus.sort(reverse=True, key=lambda x: len(x[1]))
for _ in range(min(100-len(self.pool), len(self.corpus))):
v = random.random() * random.random() * len(self.corpus)
self.pool.append(self.corpus[int(v)])
self.corpus.pop(int(v))
# 更新trace信息并清空corpus
for _, t in self.corpus:
self.trace |= t
self.corpus = []
变异操作 (_mutate_pool)
def _mutate_pool(self):
while self.pool:
sample,_ = self.pool.pop()
for _ in range(10): # 每个样本生成10个变异
self.samples.append(Mutator.mutate_sample(sample))
变异策略
变异器提供了多种变异方法:
@staticmethod
def mutate_sample(sample):
_sample = sample[:] # 复制样本
methods = [
Mutator.bit_flip,
Mutator.byte_flip,
Mutator.magic_number,
Mutator.add_block,
Mutator.remove_block,
]
f = random.choice(methods) # 随机选择变异方法
idx = random.choice(range(0, len(_sample))) # 随机选择位置
f(idx, _sample)
return _sample
具体变异方法实现:
SIZE = [4, 8, 16, 32, 64]
FLIP_ARRAY = [1, 2, 4, 8, 16, 32, 64, 128]
MAGIC_VALS = [
[0xFF],
[0x7F],
[0x00],
[0xFF, 0xFF], # 0xFFFF
[0x00, 0x00], # 0x0000
[0xFF, 0xFF, 0xFF, 0xFF], # 0xFFFFFFFF
[0x00, 0x00, 0x00, 0x00], # 0x00000000
[0x00, 0x00, 0x00, 0x80], # 0x80000000
[0x00, 0x00, 0x00, 0x40], # 0x40000000
[0xFF, 0xFF, 0xFF, 0x7F], # 0x7FFFFFFF
]
@staticmethod
def bit_flip(index, _sample):
num = random.choice(SIZE)
for idx in random.choices(range(len(_sample)), k=num):
_sample[idx] = _sample[idx] ^ random.choice(FLIP_ARRAY)
@staticmethod
def byte_flip(index, _sample):
num = random.choice(SIZE)
for idx in random.choices(range(len(_sample)), k=num):
_sample[idx] = _sample[idx] ^ random.getrandbits(8)
@staticmethod
def magic_number(index, _sample):
selected_magic = random.choice(MAGIC_VALS)
if index > (len(_sample) - len(selected_magic)):
index = len(_sample) - len(selected_magic)
for c, v in enumerate(selected_magic):
_sample[index + c] = v
@staticmethod
def add_block(index, _sample):
size = random.choice(SIZE)
_sample[index:index] = bytearray((random.getrandbits(8) for i in range(size)))
@staticmethod
def remove_block(index, _sample):
size = random.choice(SIZE)
_sample = _sample[:index] + _sample[index+size:]
调试与执行
处理 ASLR 随机化
def get_base(vmmap):
for m in vmmap:
if 'x' in m.permissions and m.pathname.endswith(os.path.basename(config['target'])):
return m.start
执行 Fuzz
def execute_fuzz(dbg, data, bpmap):
trace = set()
cmd = [config['target'], config['file']]
pid = debugger.child.createChild(cmd, no_stdout=True, env=None)
proc = dbg.addProcess(pid, True)
base = get_base(proc.readMappings())
# 设置断点
if bpmap:
for offset in bpmap:
proc.createBreakpoint(base + offset)
while True:
proc.cont()
event = dbg.waitProcessEvent()
if event.signum == signal.SIGSEGV:
crash_ip = proc.getInstrPointer() - base - 1
if crash_ip not in crashes:
crashes[crash_ip] = data
proc.detach()
break
elif event.signum == signal.SIGTRAP:
ip = proc.getInstrPointer()
br = proc.findBreakpoint(ip-1).desinstall()
proc.setInstrPointer(ip-1)
trace.add(ip - base - 1)
elif event.signum == signal.SIGINT:
print('Stoping execution')
proc.detach()
break
elif isinstance(event, debugger.ProcessExit):
proc.detach()
break
else:
print('Something went wrong -> {}'.format(event))
return trace
IDA 脚本插件
import idaapi
import idautils
skip_func = ['__libc_csu_init', '__libc_csu_fini', '_fini',
'__do_global_dtors_aux', '_start', '_init', 'sub_1034']
min_ea = idaapi.cvar.inf.min_ea
max_ea = idaapi.cvar.inf.max_ea
bv = idaapi.get_bytes(idaapi.get_fileregion_offset(min_ea), max_ea - min_ea)
base = idaapi.get_segm_by_name(".text").start_ea
for func_ea in idautils.Functions(base, idaapi.get_segm_by_name(".text").end_ea):
func_name = idaapi.get_func_name(func_ea)
if func_name in skip_func:
continue
func = idaapi.get_func(func_ea)
if func_name is None or func.flags & idaapi.FUNC_LIB:
continue
output = ""
for bb in idaapi.FlowChart(idaapi.get_func(func_ea)):
output += "0x{:x} ".format(bb.start_ea - base)
print(output)
变异算法优化
原始算法使用 random.random() * random.random() 进行选择,但存在选择概率不合理的问题。建议改用轮盘赌算法:
def roulette_wheel_selection(population, fitness):
total_fitness = sum(fitness)
probabilities = [fit / total_fitness for fit in fitness]
cumulative_probabilities = [sum(probabilities[:i+1]) for i in range(len(probabilities))]
random_num = random.random()
for i, prob in enumerate(cumulative_probabilities):
if random_num <= prob:
return population[i]
return population[-1]
Go 语言实现示例
func main() {
if _, err := os.Stat("crashes"); os.IsNotExist(err) {
if err := os.Mkdir("crashes", 0755); err != nil {
fmt.Printf("创建文件夹失败:%v\n", err)
os.Exit(1)
}
}
if len(os.Args) < 2 {
usage()
} else {
filename := os.Args[2]
origData := getBytes(filename)
counter := 0
for counter < 100 {
data := make([]byte, len(origData))
copy(data, origData)
mutatedData := mutate(data)
createNew(mutatedData)
executeFuzz(mutatedData, counter)
if counter%10 == 0 {
fmt.Printf("Counter: %d\n", counter)
}
counter++
}
}
}
const FLIP_RATIO = 0.01
var FLIP_ARRAY = []byte{1, 2, 4, 8, 16, 32, 64, 128}
var MAGIC_VALS = [][]byte{
{0xFF}, {0x7F}, {0x00}, {0xFF, 0xFF}, {0x00, 0x00},
{0xFF, 0xFF, 0xFF, 0xFF}, {0x00, 0x00, 0x00, 0x00},
{0x00, 0x00, 0x00, 0x80}, {0x00, 0x00, 0x00, 0x40},
{0xFF, 0xFF, 0xFF, 0x7F},
}
func mutate(data []byte) []byte {
flips := int(float64(len(data)-4) * FLIP_RATIO)
flipIndexes := rand.Perm(len(data) - 8)[:flips]
for _, idx := range flipIndexes {
method := rand.Intn(2)
if method == 0 {
data[idx+2] = bitFlip(data[idx+2])
} else {
magic(data, idx+2)
}
}
return data
}
func bitFlip(byteVal byte) byte {
flipVal := FLIP_ARRAY[rand.Intn(len(FLIP_ARRAY))]
return byteVal ^ flipVal
}
func magic(data []byte, idx int) {
pickedMagic := MAGIC_VALS[rand.Intn(len(MAGIC_VALS))]
for i, m := range pickedMagic {
data[idx+i] = m
}
}
总结
本文详细介绍了如何实现一个基本的 fuzzer,包括:
- 变异器的设计与实现
- 多种变异策略的实现
- 调试与执行流程
- IDA 脚本插件的开发
- 变异算法的优化
- Go 语言实现示例
通过理解这些核心概念和技术,读者可以构建自己的 fuzzer 或对现有 fuzzer 进行定制化改进。