protobuf 协议恢复与利用的实战研究
字数 3325
更新时间 2026-03-18 13:08:48

Protobuf协议恢复与利用实战教学文档

1. 概述与背景

本文档基于一篇关于“protobuf协议恢复与利用的实战研究”的文章编写,该文章分析了2025年CISCN(全国大学生信息安全竞赛)半决赛中一道名为“prompt”的题目。核心内容聚焦于如何在二进制安全题目(特别是堆利用相关)中,处理和分析使用了protobuf-c库的程序。文档将详细阐述从协议恢复、交互分析到漏洞挖掘的完整流程。

2. 核心挑战:Protobuf在二进制题目中的角色

在二进制题目中首次遇到Protobuf时,新手容易将注意力集中在“字段如何定义”、“pb2文件如何生成”等技术细节上。然而,真正的难点通常在于:Protobuf往往只是程序整个输入协议的一部分。恢复出proto消息结构仅仅是第一步,后续还需要解决:

  • 消息如何被程序分发(分发逻辑)。
  • 外层如何封装(例如长度头)。
  • 序列化的正文如何被解包。
  • 各个字段在具体功能函数(如add, free, edit, show)中如何被使用。

3. 第一步:恢复原始消息结构

分析的第一步也是最关键的一步,是从目标ELF程序中提取出protobuf-c库使用的描述符(Descriptor),从而恢复出程序实际处理的消息定义。手动猜测字段既困难又容易出错。

3.1 自动化恢复工具:analy.py

文章中提供了一个名为analy.py的Python脚本,其工作原理是扫描ELF文件,定位protobuf-c描述符的魔数0x28AAEEF9,然后解析描述符结构,自动重建出.proto文件以及对应的*_pb2.py文件,供后续利用脚本使用。

关键脚本功能

  • 自动搜索描述符魔数。
  • 解析消息名、字段数量、字段表地址。
  • 提取每个字段的编号、类型和标签(如required, optional, repeated)。
  • 自动判断语法为proto2proto3
  • 生成可用的.proto文件和Python的_pb2.py模块。

使用方法

python3 analy.py -f <目标ELF文件> -o <输出proto文件名>

示例执行命令:

python3 analy.py -f pwn -o recovered.proto

3.2 恢复结果

对示例题目执行脚本后,得到以下消息定义:

syntax = "proto3";

message HeapPayload {
  int32 option = 1;
  repeated int32 chunk_sizes = 2;
  repeated int32 heap_chunks_id = 3;
  bytes heap_content = 4;
}

字段解析

  • option: 控制操作类型(对应菜单功能)。
  • chunk_sizes: 可重复的整数,表示块大小。
  • heap_chunks_id: 可重复的整数,表示堆块的索引ID。
  • heap_content: 字节串,存放堆内容。

此步骤的重要性:此结论是后续所有分析(阅读汇编、编写利用脚本)的基础。它明确了程序交互的数据格式,但并未揭示外层的封装协议、option值与具体处理函数的映射关系。

4. 第二步:分析外层输入协议(Framing)

一个常见错误是直接将SerializeToString()的结果发送给程序,忽略了外层的封装。必须分析程序读取输入的具体逻辑。

4.1 长度头分析

从程序反汇编代码可知其读取逻辑:

  1. 首先调用read(0, &len, 4),读取4个字节作为长度头。
  2. 检查读取返回值是否为4,否则退出。
  3. 将读取的4字节解释为一个uint32_t类型的小端(Little-Endian)整数,并进行上限检查(例如是否<= 0x1000)。
  4. 根据该长度值malloc分配缓冲区。
  5. 再次调用read(0, buf, len),读取完整的消息正文。
  6. 最后将正文传递给protobuf_c_message_unpack进行反序列化。

伪代码表示:

read(0, &len, 4);
buf = malloc(len);
read(0, buf, len);
msg = protobuf_c_message_unpack(..., len, buf);

4.2 利用脚本中的正确封装

因此,在利用脚本(例如使用pwntools)中,发送消息的函数必须正确实现此前置长度头:

from pwn import p32

def send_msg(io, serialized_payload):
    # 先发送4字节小端长度头
    io.send(p32(len(serialized_payload)))
    # 再发送protobuf序列化后的正文
    io.send(serialized_payload)

常见错误:错误地使用p64(8字节)作为长度头。这会导致前4字节被读作长度,多余的4个\x00字节会残留在输入流中,被下一次read当作正文开头读取,导致整个数据错位,反序列化失败。

5. 第三步:分析消息分发逻辑(Handler Mapping)

在明确数据结构和外层协议后,需要确定HeapPayload中的option字段如何映射到具体的处理函数。

通过分析程序的反汇编代码,可以找到类似如下的分发逻辑:

if (msg->option >= 1 && msg->option <= 5) {
    // 假设 handlers 是一个函数指针数组
    handlers[msg->option - 1](msg);
} else {
    printf("Unknown option: %d\n", msg->option);
}

因此,需要确定handlers数组中每个下标对应的功能。根据文章,在示例题目中的映射关系为:

  • option = 1 -> add 函数
  • option = 2 -> free 函数
  • option = 3 -> edit 函数
  • option = 4 -> show 函数
  • option = 5 -> exit 功能

注意:这与许多传统堆菜单题的常见映射(如0=add, 1=free, 2=edit, 3=show)不同,必须通过逆向确认。

至此,程序的完整输入协议已清晰:[4字节小端长度头] + [HeapPayload的protobuf序列化数据],其中HeapPayload.option决定执行路径。

6. 第四步:编写利用脚本的交互层

消息结构恢复后,应使用生成的_pb2.py模块来构造消息,避免手动拼接字节流出错。

6.1 消息构造与发送封装

一个推荐的脚本结构如下:

from pwn import *
from recovered_pb2 import HeapPayload # 导入生成的模块

def pack_msg(option, ids=None, sizes=None, content=None):
    """构造并序列化HeapPayload消息"""
    msg = HeapPayload()
    msg.option = option
    if ids is not None:
        if isinstance(ids, int):
            msg.heap_chunks_id.append(ids)
        else:
            msg.heap_chunks_id.extend(ids) # 处理repeated字段
    if sizes is not None:
        if isinstance(sizes, int):
            msg.chunk_sizes.append(sizes)
        else:
            msg.chunk_sizes.extend(sizes)
    if content is not None:
        msg.heap_content = content
    return msg.SerializeToString()

def send_msg(io, serialized_payload):
    """发送带长度头的完整消息"""
    io.send(p32(len(serialized_payload)))
    io.send(serialized_payload)

6.2 功能函数封装

基于以上基础函数,可以封装出语义更清晰的功能函数:

def add(io, size, content=None):
    """对应 option=1 """
    payload = pack_msg(option=1, sizes=size, content=content)
    send_msg(io, payload)

def free(io, idx):
    """对应 option=2 """
    payload = pack_msg(option=2, ids=idx)
    send_msg(io, payload)

def edit(io, idx, size, content=None):
    """对应 option=3 """
    payload = pack_msg(option=3, ids=idx, sizes=size, content=content)
    send_msg(io, payload)

def show(io, idx):
    """对应 option=4 """
    payload = pack_msg(option=4, ids=idx)
    send_msg(io, payload)

这种分层设计(Protobuf构造、长度头封装、业务语义)使得代码清晰,易于调试。

7. 第五步:定位与分析漏洞

在理清所有交互协议后,方可深入分析各功能函数本身的逻辑漏洞。在示例题目中,漏洞存在于edit函数。

7.1 漏洞代码逻辑

edit函数的简化伪代码如下:

void edit(HeapPayload *msg) {
    int idx = msg->heap_chunks_id[0];
    int new_size = msg->chunk_sizes[0]; // 从消息中读取新的size

    // 检查索引和new_size的范围
    if (idx < 0 || idx > 0xf) return;
    if (new_size < 0 || new_size > 0x500) return; // 只有上限检查

    // 更新记录该chunk的“尺寸表”
    chunk_size_table[idx] = new_size;

    char *ptr = chunk_table[idx]; // 获取原来chunk的指针

    if (msg->heap_content.len != 0 && msg->heap_content.data != NULL) {
        size_t n = min((size_t)new_size, msg->heap_content.len);
        memcpy(ptr, msg->heap_content.data, n); // 关键操作
    } else {
        memset(ptr, 0, new_size); // 关键操作
    }
}

7.2 漏洞原理

堆溢出(Heap Overflow)
漏洞根源在于,edit函数虽然检查了用户传入的new_size是否超过一个全局上限(0x500),但没有检查这个new_size是否小于或等于该chunk最初通过add申请时的真实大小

攻击场景

  1. 使用add(io, 0x20, b”A”*8)申请一个大小为0x20的小chunk。
  2. 使用edit(io, 0, 0x200, b”B”*0x200)尝试“编辑”这个chunk,并指定新大小为0x200
  3. 程序不会重新申请一个0x200的新chunk,而是直接向原chunk(仍为0x20大小)的起始地址写入0x200字节的数据。
  4. 这会导致写入0x200 - 0x20 = 0x1E0字节的越界数据,覆盖其后相邻chunk的内容和堆元数据,从而可能篡改tcache、fastbin等堆管理结构,实现利用。

重要细节:即使不提供heap_content(走memset分支),只要new_size大于原chunk大小,同样会发生越界清零,触发漏洞。因此,漏洞的本质是size参数未与原始chunk大小绑定,而非内容拷贝本身。

8. 总结:系统化的分析方法

面对包含Protobuf的二进制题目,推荐遵循以下分析顺序,将协议层与业务层分离:

  1. 恢复消息结构:使用工具(如analy.py)从二进制中提取protobuf-c描述符,生成.proto文件,明确程序接收的数据格式。
  2. 确认分发逻辑:逆向分析主流程,确定option等字段与具体处理函数(add/free/edit/show)的映射关系。
  3. 分析外层协议(Framing):确定消息正文之外是否还有长度头、校验和等封装,并明确其格式(如4字节小端长度)。
  4. 深入功能函数:在前三步的基础上,分析各个处理函数的具体逻辑,寻找内存损坏(如堆溢出、UAF)、逻辑错误等漏洞。
  5. 编写利用脚本:基于生成的_pb2.py模块和明确的交互协议,分层编写健壮的交互和利用代码。

Protobuf在此类题目中是一个重要的协议门槛,跨过它才能与程序正确交互。其本身通常并非漏洞所在,但若不能正确处理,则会阻碍后续的漏洞分析。正确的协议分析是高效完成此类题目的基石。

相似文章
相似文章
 全屏