基于qemu和unicorn的Fuzz技术分析
字数 913 2025-08-24 23:51:13
基于QEMU和Unicorn的Fuzz技术教学文档
1. AFL Fork Server机制
1.1 通信流程
afl-fuzz调用init_forkserver函数fork出新进程作为fork server- 等待fork server发送4字节数据确认启动正常
- fork server使用read阻塞,等待afl-fuzz发送命令启动测试进程
- 测试时,afl-fuzz调用
run_target发送4字节通知fork server fork测试进程 - fork server通过管道发送fork出的进程pid给afl-fuzz
- afl-fuzz根据pid等待测试进程结束并处理覆盖率信息
2. AFL QEMU模式
2.1 基本原理
QEMU在执行程序时:
- 从入口点开始翻译并执行基本块
- 翻译的基本块存入cache提升效率
- AFL通过在基本块执行和翻译前插入代码实现覆盖率收集
2.2 关键代码实现
#define AFL_QEMU_CPU_SNIPPET2 do { \
if(itb->pc == afl_entry_point) { \
afl_setup(); \
afl_forkserver(cpu); \
afl_maybe_log(itb->pc); \
} \
} while (0)
2.3 覆盖率统计方式
cur_loc = (cur_loc >> 4) ^ (cur_loc << 8);
cur_loc &= MAP_SIZE - 1;
afl_area_ptr[cur_loc ^ prev_loc]++; // 统计edge覆盖率
2.4 Fork Server实现
static void afl_forkserver(CPUState *cpu) {
// 通知afl-fuzz启动正常
if (write(FORKSRV_FD + 1, tmp, 4) != 4) return;
while (1) {
// 等待afl-fuzz命令
if (read(FORKSRV_FD, tmp, 4) != 4) exit(2);
child_pid = fork(); // fork新进程
if (!child_pid) { // 子进程
afl_fork_child = 1;
close(FORKSRV_FD);
close(FORKSRV_FD + 1);
close(t_fd[0]);
return;
}
// 发送pid给afl-fuzz
if (write(FORKSRV_FD + 1, &child_pid, 4) != 4) exit(5);
// 处理子进程翻译请求
afl_wait_tsl(cpu, t_fd[0]);
// 等待子进程结束
if (waitpid(child_pid, &status, 0) < 0) exit(6);
if (write(FORKSRV_FD + 1, &status, 4) != 4) exit(7);
}
}
2.5 基本块翻译请求处理
static void afl_wait_tsl(CPUState *cpu, int fd) {
while (1) {
// 接收子进程翻译请求
if (read(fd, &t, sizeof(struct afl_tsl)) != sizeof(struct afl_tsl)) break;
// 在cache中查找
tb = tb_htable_lookup(cpu, t.pc, t.cs_base, t.flags);
// 不在cache中则翻译
if(!tb) {
mmap_lock();
tb_lock();
tb_gen_code(cpu, t.pc, t.cs_base, t.flags, 0);
mmap_unlock();
tb_unlock();
}
}
close(fd);
}
2.6 改进版本
- 将统计覆盖率代码直接插入到每个翻译的基本块前面
- 同步子进程生成的chain到fork server进程
- 启用chain功能提升性能
关键改进代码:
bool was_translated = false, was_chained = false;
tb = tb_lookup__cpu_state(cpu, &pc, &cs_base, &flags, cf_mask);
if (tb == NULL) {
mmap_lock();
tb = tb_gen_code(cpu, pc, cs_base, flags, cf_mask);
was_translated = true;
mmap_unlock();
if (last_tb) {
tb_add_jump(last_tb, tb_exit, tb);
was_chained = true;
}
if (was_translated || was_chained) {
afl_request_tsl(pc, cs_base, flags, cf_mask, was_chained ? last_tb : NULL, tb_exit);
}
}
2.7 Persistent模式实现
在被测函数开始和结束插入指令:
#define AFL_QEMU_TARGET_i386_SNIPPET \
if (is_persistent) { \
if (s->pc == afl_persistent_addr) { \
I386_RESTORE_STATE_FOR_PERSISTENT; \
if (afl_persistent_ret_addr == 0) { \
TCGv_ptr paddr = tcg_const_ptr(afl_persistent_addr); \
tcg_gen_st_tl(paddr, cpu_regs[R_ESP], persisent_retaddr_offset); \
tcg_gen_afl_call0(&afl_persistent_loop); \
} \
} else if (afl_persistent_ret_addr && s->pc == afl_persistent_ret_addr) { \
gen_jmp_im(s, afl_persistent_addr); \
gen_eob(s); \
} \
}
3. AFL Unicorn模式
3.1 基本原理
与QEMU模式类似,Unicorn基于QEMU实现,通过在cpu_tb_exec执行基本块前插入代码:
#define AFL_UNICORN_CPU_SNIPPET2 do { \
if(afl_first_instr == 0) { \
afl_setup(); \
afl_forkserver(env); \
afl_first_instr = 1; \
} \
afl_maybe_log(tb->pc); \
} while (0)
4. libFuzzer Unicorn模式
4.1 基本原理
libFuzzer支持从外部获取覆盖率信息:
__attribute__((section("__libfuzzer_extra_counters")))
uint8_t Counters[PCS_N];
4.2 实现方式
- 通过Unicorn的基本块hook事件收集执行信息
- 在回调函数中更新Counters
// hook basic block to get code coverage
uc_hook hookHandle;
uc_hook_add(uc, &hookHandle, UC_HOOK_BLOCK, hookBlock, NULL, 1, 0);
// update code coverage counters
void hookBlock(uc_engine *uc, uint64_t address, uint32_t size, void *user_data) {
uint16_t pr = crc16(address);
uint16_t idx = pr ^ prevPR;
Counters[idx]++;
prevPR = (pr >> 1);
}
5. 总结
- AFL的变异策略调度模块和被测程序执行/覆盖率收集模块是独立的
- 两者通过命名管道进行通信
- 实现新的覆盖率收集方式需要:
- 模拟fork server与afl-fuzz通信
- 将覆盖率反馈给afl-fuzz
- libFuzzer通过全局变量
__libfuzzer_extra_counters获取外部覆盖率信息