如何实现一个 fuzzer
字数 698 2025-08-05 11:39:48

如何实现一个 Fuzzer - 详细教学文档

前言

本文基于对 go fuzz 源码的研究和 "Build simple fuzzer" 系列文章的分析,详细讲解如何实现一个能够 fuzz C/C++ 可执行文件的 fuzzer。我们将从整体架构到具体实现,逐步解析 fuzzer 的各个组成部分。

Fuzzer 整体流程

一个基本的 fuzzer 工作流程如下:

  1. 声明要 fuzz 的对象、语料库、断点等
  2. 语料读入,初始化变异器
  3. 变异器进行迭代
  4. 执行 fuzz 逻辑
  5. 记录根据当前变异器得到的文件所能观察到的块
  6. 更新 corpus
  7. 检查用户是否发出终止命令,发出则保存 crash 文件,否则返回步骤3继续执行

核心组件详解

变异器类 (Mutator)

变异器是 fuzzer 的核心组件,负责生成和选择测试用例:

class Mutator:
    def __init__(self, core):
        self.core = core  # 核心样本集
        self.trace = set()  # 当前观察到的块集合
        self.corpus = []  # 执行过的样本集
        self.pool = []  # 变异池
        self.samples = []  # 变异后的样本集

    def __iter__(self):
        # 初始化变异轮次
        self._fit_pool()
        self._mutate_pool()
        return self

    def __next__(self):
        if not self.samples:
            self._fit_pool()
            self._mutate_pool()

        global stop_flag
        if stop_flag:
            raise StopIteration
        else:
            return self.samples.pop()

变异池初始化 (_fit_pool)

def _fit_pool(self):
    # 1. 总是复制初始语料库
    for sample in self.core:
        self.pool.append((sample, []))
    
    # 2. 选择发现新块的样本
    for sample, trace in self.corpus:
        if trace - self.trace:  # 有新块被发现
            self.pool.append((sample, trace))
    
    # 3. 补充到100个样本
    if self.corpus and len(self.pool) < 100:
        self.corpus.sort(reverse=True, key=lambda x: len(x[1]))
        for _ in range(min(100-len(self.pool), len(self.corpus))):
            v = random.random() * random.random() * len(self.corpus)
            self.pool.append(self.corpus[int(v)])
            self.corpus.pop(int(v))
    
    # 更新trace信息并清空corpus
    for _, t in self.corpus:
        self.trace |= t
    self.corpus = []

变异操作 (_mutate_pool)

def _mutate_pool(self):
    while self.pool:
        sample,_ = self.pool.pop()
        for _ in range(10):  # 每个样本生成10个变异
            self.samples.append(Mutator.mutate_sample(sample))

变异策略

变异器提供了多种变异方法:

@staticmethod
def mutate_sample(sample):
    _sample = sample[:]  # 复制样本
    
    methods = [
        Mutator.bit_flip,
        Mutator.byte_flip,
        Mutator.magic_number,
        Mutator.add_block,
        Mutator.remove_block,
    ]
    
    f = random.choice(methods)  # 随机选择变异方法
    idx = random.choice(range(0, len(_sample)))  # 随机选择位置
    f(idx, _sample)
    
    return _sample

具体变异方法实现:

SIZE = [4, 8, 16, 32, 64]
FLIP_ARRAY = [1, 2, 4, 8, 16, 32, 64, 128]
MAGIC_VALS = [
    [0xFF],
    [0x7F],
    [0x00],
    [0xFF, 0xFF],  # 0xFFFF
    [0x00, 0x00],  # 0x0000
    [0xFF, 0xFF, 0xFF, 0xFF],  # 0xFFFFFFFF
    [0x00, 0x00, 0x00, 0x00],  # 0x00000000
    [0x00, 0x00, 0x00, 0x80],  # 0x80000000
    [0x00, 0x00, 0x00, 0x40],  # 0x40000000
    [0xFF, 0xFF, 0xFF, 0x7F],  # 0x7FFFFFFF
]

@staticmethod
def bit_flip(index, _sample):
    num = random.choice(SIZE)
    for idx in random.choices(range(len(_sample)), k=num):
        _sample[idx] = _sample[idx] ^ random.choice(FLIP_ARRAY)

@staticmethod
def byte_flip(index, _sample):
    num = random.choice(SIZE)
    for idx in random.choices(range(len(_sample)), k=num):
        _sample[idx] = _sample[idx] ^ random.getrandbits(8)

@staticmethod
def magic_number(index, _sample):
    selected_magic = random.choice(MAGIC_VALS)
    if index > (len(_sample) - len(selected_magic)):
        index = len(_sample) - len(selected_magic)
    for c, v in enumerate(selected_magic):
        _sample[index + c] = v

@staticmethod
def add_block(index, _sample):
    size = random.choice(SIZE)
    _sample[index:index] = bytearray((random.getrandbits(8) for i in range(size)))

@staticmethod
def remove_block(index, _sample):
    size = random.choice(SIZE)
    _sample = _sample[:index] + _sample[index+size:]

调试与执行

处理 ASLR 随机化

def get_base(vmmap):
    for m in vmmap:
        if 'x' in m.permissions and m.pathname.endswith(os.path.basename(config['target'])):
            return m.start

执行 Fuzz

def execute_fuzz(dbg, data, bpmap):
    trace = set()
    cmd = [config['target'], config['file']]
    pid = debugger.child.createChild(cmd, no_stdout=True, env=None)
    proc = dbg.addProcess(pid, True)
    base = get_base(proc.readMappings())
    
    # 设置断点
    if bpmap:
        for offset in bpmap:
            proc.createBreakpoint(base + offset)
    
    while True:
        proc.cont()
        event = dbg.waitProcessEvent()
        
        if event.signum == signal.SIGSEGV:
            crash_ip = proc.getInstrPointer() - base - 1
            if crash_ip not in crashes:
                crashes[crash_ip] = data
            proc.detach()
            break
        elif event.signum == signal.SIGTRAP:
            ip = proc.getInstrPointer()
            br = proc.findBreakpoint(ip-1).desinstall()
            proc.setInstrPointer(ip-1)
            trace.add(ip - base - 1)
        elif event.signum == signal.SIGINT:
            print('Stoping execution')
            proc.detach()
            break
        elif isinstance(event, debugger.ProcessExit):
            proc.detach()
            break
        else:
            print('Something went wrong -> {}'.format(event))
    
    return trace

IDA 脚本插件

import idaapi
import idautils

skip_func = ['__libc_csu_init', '__libc_csu_fini', '_fini', 
             '__do_global_dtors_aux', '_start', '_init', 'sub_1034']

min_ea = idaapi.cvar.inf.min_ea
max_ea = idaapi.cvar.inf.max_ea
bv = idaapi.get_bytes(idaapi.get_fileregion_offset(min_ea), max_ea - min_ea)
base = idaapi.get_segm_by_name(".text").start_ea

for func_ea in idautils.Functions(base, idaapi.get_segm_by_name(".text").end_ea):
    func_name = idaapi.get_func_name(func_ea)
    if func_name in skip_func:
        continue
    func = idaapi.get_func(func_ea)
    if func_name is None or func.flags & idaapi.FUNC_LIB:
        continue
    output = ""
    for bb in idaapi.FlowChart(idaapi.get_func(func_ea)):
        output += "0x{:x} ".format(bb.start_ea - base)
    print(output)

变异算法优化

原始算法使用 random.random() * random.random() 进行选择,但存在选择概率不合理的问题。建议改用轮盘赌算法:

def roulette_wheel_selection(population, fitness):
    total_fitness = sum(fitness)
    probabilities = [fit / total_fitness for fit in fitness]
    cumulative_probabilities = [sum(probabilities[:i+1]) for i in range(len(probabilities))]
    
    random_num = random.random()
    for i, prob in enumerate(cumulative_probabilities):
        if random_num <= prob:
            return population[i]
    return population[-1]

Go 语言实现示例

func main() {
    if _, err := os.Stat("crashes"); os.IsNotExist(err) {
        if err := os.Mkdir("crashes", 0755); err != nil {
            fmt.Printf("创建文件夹失败:%v\n", err)
            os.Exit(1)
        }
    }
    
    if len(os.Args) < 2 {
        usage()
    } else {
        filename := os.Args[2]
        origData := getBytes(filename)
        counter := 0
        
        for counter < 100 {
            data := make([]byte, len(origData))
            copy(data, origData)
            mutatedData := mutate(data)
            createNew(mutatedData)
            executeFuzz(mutatedData, counter)
            
            if counter%10 == 0 {
                fmt.Printf("Counter: %d\n", counter)
            }
            counter++
        }
    }
}

const FLIP_RATIO = 0.01

var FLIP_ARRAY = []byte{1, 2, 4, 8, 16, 32, 64, 128}
var MAGIC_VALS = [][]byte{
    {0xFF}, {0x7F}, {0x00}, {0xFF, 0xFF}, {0x00, 0x00},
    {0xFF, 0xFF, 0xFF, 0xFF}, {0x00, 0x00, 0x00, 0x00},
    {0x00, 0x00, 0x00, 0x80}, {0x00, 0x00, 0x00, 0x40},
    {0xFF, 0xFF, 0xFF, 0x7F},
}

func mutate(data []byte) []byte {
    flips := int(float64(len(data)-4) * FLIP_RATIO)
    flipIndexes := rand.Perm(len(data) - 8)[:flips]
    
    for _, idx := range flipIndexes {
        method := rand.Intn(2)
        if method == 0 {
            data[idx+2] = bitFlip(data[idx+2])
        } else {
            magic(data, idx+2)
        }
    }
    return data
}

func bitFlip(byteVal byte) byte {
    flipVal := FLIP_ARRAY[rand.Intn(len(FLIP_ARRAY))]
    return byteVal ^ flipVal
}

func magic(data []byte, idx int) {
    pickedMagic := MAGIC_VALS[rand.Intn(len(MAGIC_VALS))]
    for i, m := range pickedMagic {
        data[idx+i] = m
    }
}

总结

本文详细介绍了如何实现一个基本的 fuzzer,包括:

  1. 变异器的设计与实现
  2. 多种变异策略的实现
  3. 调试与执行流程
  4. IDA 脚本插件的开发
  5. 变异算法的优化
  6. Go 语言实现示例

通过理解这些核心概念和技术,读者可以构建自己的 fuzzer 或对现有 fuzzer 进行定制化改进。

如何实现一个 Fuzzer - 详细教学文档 前言 本文基于对 go fuzz 源码的研究和 "Build simple fuzzer" 系列文章的分析,详细讲解如何实现一个能够 fuzz C/C++ 可执行文件的 fuzzer。我们将从整体架构到具体实现,逐步解析 fuzzer 的各个组成部分。 Fuzzer 整体流程 一个基本的 fuzzer 工作流程如下: 声明要 fuzz 的对象、语料库、断点等 语料读入,初始化变异器 变异器进行迭代 执行 fuzz 逻辑 记录根据当前变异器得到的文件所能观察到的块 更新 corpus 检查用户是否发出终止命令,发出则保存 crash 文件,否则返回步骤3继续执行 核心组件详解 变异器类 (Mutator) 变异器是 fuzzer 的核心组件,负责生成和选择测试用例: 变异池初始化 (_ fit_ pool) 变异操作 (_ mutate_ pool) 变异策略 变异器提供了多种变异方法: 具体变异方法实现: 调试与执行 处理 ASLR 随机化 执行 Fuzz IDA 脚本插件 变异算法优化 原始算法使用 random.random() * random.random() 进行选择,但存在选择概率不合理的问题。建议改用轮盘赌算法: Go 语言实现示例 总结 本文详细介绍了如何实现一个基本的 fuzzer,包括: 变异器的设计与实现 多种变异策略的实现 调试与执行流程 IDA 脚本插件的开发 变异算法的优化 Go 语言实现示例 通过理解这些核心概念和技术,读者可以构建自己的 fuzzer 或对现有 fuzzer 进行定制化改进。