巡风源码浅析
字数 713 2025-08-26 22:11:15

巡风扫描器源码深度解析与教学指南

一、巡风扫描器概述

巡风是一款优秀的网络资产探测与漏洞扫描系统,由YSRC团队开发。其主要功能包括:

  • 网络资产信息抓取(存活主机、开放端口、服务识别)
  • 漏洞检测引擎(支持插件式漏洞检测)
  • Web界面展示与管理

源码地址:https://github.com/ysrc/xunfeng

二、系统架构分析

1. 文件结构

Config.py          # 配置文件
README.md         # 说明文档
Run.bat           # Windows启动服务
Run.py            # webserver
Run.sh            # Linux启动服务

aider/
  Aider.py        # 辅助验证脚本

db/               # 初始数据库结构
masscan/          # 内置Masscan程序

nascan/
  NAScan.py       # 网络资产信息抓取引擎
  lib/
    common.py     # 通用方法
    icmp.py       # ICMP发送类
    log.py        # 日志输出
    mongo.py      # 数据库连接
    scan.py       # 扫描与识别
    start.py      # 线程控制
  plugin/
    masscan.py    # 调用Masscan脚本

views/            # web请求处理
  View.py
  lib/
    Conn.py       # 数据库公共类
    CreateExcel.py # 表格处理
    Login.py      # 权限验证
    QueryLogic.py # 查询语句解析

static/           # 静态资源目录
templates/        # 模板文件目录

vulscan/
  VulScan.py      # 漏洞检测引擎
  vuldb/          # 漏洞库目录

2. 服务启动流程

Run.sh启动脚本启动了四个核心服务:

#!/bin/bash
CURRENT_PATH=`dirname $0`
cd $CURRENT_PATH

XUNFENG_LOG=/var/log/xunfeng
XUNFENG_DB=/var/lib/mongodb

[ ! -d $XUNFENG_LOG ] && mkdir -p ${XUNFENG_LOG}
[ ! -d $XUNFENG_DB ] && mkdir -p ${XUNFENG_DB}

nohup mongod --port 65521 --dbpath=${XUNFENG_DB} --auth > ${XUNFENG_LOG}/db.log &
nohup python ./Run.py > ${XUNFENG_LOG}/web.log &
nohup python ./aider/Aider.py > ${XUNFENG_LOG}/aider.log &
nohup python ./nascan/NAScan.py > ${XUNFENG_LOG}/scan.log &
nohup python ./vulscan/VulScan.py > ${XUNFENG_LOG}/vul.log &
  1. MongoDB服务:端口65521,启用认证
  2. Web服务(Run.py):基于Flask框架
  3. 辅助验证服务(Aider.py):简单的DNS log平台
  4. 资产扫描服务(NAScan.py):网络资产信息抓取引擎
  5. 漏洞扫描服务(VulScan.py):漏洞检测引擎

三、网络资产扫描模块(NAScan)深度解析

1. 主流程分析

NAScan.py主流程:

# 读取配置
CONFIG_INI = get_config()  
log.write('info', None, 0, u'获取配置成功')
STATISTICS = get_statistics()

# 状态标识
MASSCAN_AC = [0]  # Masscan使用标识
NACHANGE = [0]    # 扫描列表变更标识

# 启动监控线程
thread.start_new_thread(monitor, (CONFIG_INI, STATISTICS, NACHANGE))
thread.start_new_thread(cruise, (STATISTICS, MASSCAN_AC))

# 设置超时
socket.setdefaulttimeout(int(CONFIG_INI['Timeout'])/2)

ac_data = []
while True:
    # 检查扫描周期条件
    if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:
        ac_data.append(now_date)
        NACHANGE[0] = 0
        log.write('info', None, 0, u'开始扫描')
        
        # 启动扫描
        s = start(CONFIG_INI)
        s.masscan_ac = MASSCAN_AC
        s.statistics = STATISTICS
        s.run()
        
        time.sleep(60)

2. 关键组件详解

2.1 监控线程(monitor)

def monitor(CONFIG_INI, STATISTICS, NACHANGE):
    while True:
        try:
            # 更新心跳
            mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}})
            
            # 更新统计信息
            mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True)
            
            # 检查配置变更
            new_config = get_config()
            if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):
                NACHANGE[0] = 1  # 触发重新扫描
                CONFIG_INI.update(new_config)
                
        except Exception, e:
            print e
        time.sleep(30)  # 30秒检测一次

2.2 失效记录清理线程(cruise)

def cruise(STATISTICS, MASSCAN_AC):
    while True:
        # 只在工作日9-18点执行清理
        if week >= 1 and week <= 5 and hour >= 9 and hour <= 18:
            try:
                data = mongo.NA_INFO.find().sort("time", 1)
                for history_info in data:
                    # 等待Masscan完成
                    while MASSCAN_AC[0]:
                        time.sleep(10)
                        
                    # 检查端口存活
                    try:
                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        sock.connect((ip, int(port)))
                        sock.close()
                    except:
                        # 不存活则删除记录
                        mongo.NA_INFO.remove({"ip": ip, "port": port})
                        STATISTICS[date_]['delete'] += 1
                        # 记录删除历史
                        mongo.NA_HISTORY.insert(history_info)
            except:
                pass
        time.sleep(3600)  # 每小时检测一次

2.3 资产扫描核心类(start)

class start:
    def __init__(self, config):
        self.config_ini = config
        self.queue = Queue.Queue()
        self.thread = int(self.config_ini['Thread'])
        self.scan_list = self.config_ini['Scan_list'].split('\n')
        self.mode = int(self.config_ini['Masscan'].split('|')[0])
        self.icmp = int(self.config_ini['Port_list'].split('|')[0])
        self.white_list = self.config_ini.get('White_list', '').split('\n')

2.4 扫描执行流程(run方法)

def run(self):
    global AC_PORT_LIST
    all_ip_list = []
    
    # 处理扫描IP列表
    for ip in self.scan_list:
        if "/" in ip:  # CIDR格式处理
            ip = cidr.CIDR(ip)
        ip_list = self.get_ip_list(ip)
        
        # 过滤白名单
        for white_ip in self.white_list:
            if white_ip in ip_list:
                ip_list.remove(white_ip)
                
        # Masscan模式
        if self.mode == 1:
            self.masscan_path = self.config_ini['Masscan'].split('|')[2]
            self.masscan_rate = self.config_ini['Masscan'].split('|')[1]
            
            # 获取存活IP
            ip_list = self.get_ac_ip(ip_list)
            self.masscan_ac[0] = 1
            
            # Masscan全端口扫描
            AC_PORT_LIST = self.masscan(ip_list)
            self.masscan_ac[0] = 0
            
            # 加入扫描队列
            for ip_str in AC_PORT_LIST.keys():
                self.queue.put(ip_str)
            self.scan_start()
            
        # 非Masscan模式
        else:
            all_ip_list.extend(ip_list)
            
    if self.mode == 0:
        if self.icmp:
            all_ip_list = self.get_ac_ip(all_ip_list)
        for ip_str in all_ip_list:
            self.queue.put(ip_str)
        self.scan_start()

2.5 存活主机探测(get_ac_ip)

def get_ac_ip(self, ip_list):
    try:
        s = icmp.Nscan()
        ipPool = set(ip_list)
        return s.mPing(ipPool)
    except Exception, e:
        print 'The current user permissions unable to send icmp packets'
        return ip_list

ICMP探测核心类:

class Nscan:
    def mPing(self, ipPool):
        Sock = self.__icmpSocket
        Sock.settimeout(self.timeout)
        packet = self.__icmpPacket
        recvFroms = set()
        
        # 启动发送线程
        sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
        sendThr.start()
        
        while True:
            try:
                # 接收响应
                ac_ip = Sock.recvfrom(1024)[1][0]
                if ac_ip not in recvFroms:
                    log.write("active", ac_ip, 0, None)
                    recvFroms.add(ac_ip)
            except Exception:
                pass
            finally:
                if not sendThr.isAlive():
                    break
                    
        return recvFroms & ipPool

2.6 Masscan集成

def masscan(self, ip):
    try:
        if len(ip) == 0:
            return
            
        sys.path.append(sys.path[0] + "/plugin")
        m_scan = __import__("masscan")
        result = m_scan.run(ip, self.masscan_path, self.masscan_rate)
        return result
    except Exception, e:
        print e
        print 'No masscan plugin detected'

Masscan插件实现:

def run(ip_list, path, rate):
    try:
        # 写入目标文件
        ip_file = open('target.log', 'w')
        ip_file.write("\n".join(ip_list))
        ip_file.close()
        
        # 过滤危险字符
        path = str(path).translate(None, ';|&`\n')
        rate = str(rate).translate(None, ';|&`\n')
        
        # 执行Masscan扫描
        os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s" % (path, rate))
        
        # 解析结果
        result_file = open('tmp.log', 'r')
        result_json = result_file.readlines()
        result_file.close()
        
        del result_json[0]
        del result_json[-1]
        
        open_list = {}
        for res in result_json:
            try:
                ip = res.split()[3]
                port = res.split()[2]
                if ip in open_list:
                    open_list[ip].append(port)
                else:
                    open_list[ip] = [port]
            except:
                pass
                
        # 清理临时文件
        os.remove('target.log')
        os.remove('tmp.log')
        
        return open_list
    except:
        pass

2.7 端口扫描与服务识别(scan.py)

class scan:
    def run(self):
        self.timeout = int(self.config_ini['Timeout'])
        
        for _port in self.port_list:
            self.server = ''
            self.banner = ''
            self.port = int(_port)
            
            # 端口扫描
            self.scan_port()
            if not self.banner:
                continue
                
            # 服务识别
            self.server_discern()
            
            if self.server == '':
                # 尝试Web访问
                web_info = self.try_web()
                if web_info:
                    log.write('web', self.ip, self.port, web_info)
                    time_ = datetime.datetime.now()
                    mongo.NA_INFO.update(
                        {'ip': self.ip, 'port': self.port},
                        {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info, 'time': time_}}
                    )

端口扫描实现:

def scan_port(self):
    try:
        # TCP连接
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.connect((self.ip, self.port))
        time.sleep(0.2)
    except Exception, e:
        return
        
    try:
        # 获取Banner
        self.banner = sock.recv(1024)
        sock.close()
        if len(self.banner) <= 2:
            self.banner = 'NULL'
    except Exception, e:
        self.banner = 'NULL'
        
    log.write('portscan', self.ip, self.port, None)
    
    # 存储结果
    banner = ''
    hostname = self.ip2hostname(self.ip)
    time_ = datetime.datetime.now()
    date_ = time_.strftime('%Y-%m-%d')
    
    try:
        banner = unicode(self.banner, errors='replace')
        if self.banner == 'NULL':
            banner = ''
            
        mongo.NA_INFO.insert({
            "ip": self.ip, "port": self.port, 
            "hostname": hostname, "banner": banner, 
            "time": time_
        })
        self.statistics[date_]['add'] += 1
    except:
        if banner:
            # 原子操作更新
            history_info = mongo.NA_INFO.find_and_modify(
                query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}},
                remove=True
            )
            if history_info:
                mongo.NA_INFO.insert({
                    "ip": self.ip, "port": self.port,
                    "hostname": hostname, "banner": banner,
                    "time": time_
                })
                self.statistics[date_]['update'] += 1
                
                # 记录历史
                del history_info["_id"]
                history_info['del_time'] = time_
                history_info['type'] = 'update'
                mongo.NA_HISTORY.insert(history_info)

服务识别实现:

def server_discern(self):
    # 快速匹配识别
    for mark_info in self.config_ini['Discern_server']:
        try:
            name, default_port, mode, reg = mark_info
            if mode == 'default':
                if int(default_port) == self.port:
                    self.server = name
            elif mode == 'banner':
                matchObj = re.search(reg, self.banner, re.I | re.M)
                if matchObj:
                    self.server = name
            if self.server:
                break
        except:
            continue
            
    # 未识别且非Web端口的服务
    if not self.server and self.port not in [80, 443, 8080]:
        for mark_info in self.config_ini['Discern_server']:
            try:
                name, default_port, mode, reg = mark_info
                if mode not in ['default', 'banner']:
                    # 发送特定探测包
                    dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    dis_sock.connect((self.ip, self.port))
                    mode = mode.decode('string_escape')
                    reg = reg.decode('string_escape')
                    dis_sock.send(mode)
                    time.sleep(0.3)
                    dis_recv = dis_sock.recv(1024)
                    dis_sock.close()
                    
                    matchObj = re.search(reg, dis_recv, re.I | re.M)
                    if matchObj:
                        self.server = name
                        break
            except:
                pass
                
    if self.server:
        log.write("server", self.ip, self.port, self.server)
        mongo.NA_INFO.update(
            {"ip": self.ip, "port": self.port},
            {"$set": {"server": self.server}}
        )

Web服务检测:

def try_web(self):
    title_str, html = '', ''
    try:
        # HTTP/HTTPS请求
        if self.port == 443:
            info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout)
        else:
            info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout)
        html = info.read()
        header = info.headers
    except urllib2.HTTPError, e:
        html = e.read()
        header = e.headers
    except:
        return
        
    if not header:
        return
        
    # 处理gzip压缩
    if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']:
        html_data = StringIO.StringIO(html)
        gz = gzip.GzipFile(fileobj=html_data)
        html = gz.read()
        
    try:
        # 编码转换
        html_code = self.get_code(header, html).strip()
        if html_code and len(html_code) < 12:
            html = html.decode(html_code).encode('utf-8')
    except:
        pass
        
    try:
        # 提取Title
        title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M)
        if title:
            title_str = title.group(1)
    except:
        pass
        
    try:
        # 设置Banner信息
        web_banner = str(header) + "\r\n\r\n" + html
        self.banner = web_banner
        
        # 更新数据库
        history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port})
        if 'server' not in history_info:
            tag = self.get_tag()
            web_info = {'title': title_str, 'tag': tag}
            return web_info
        else:
            if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner)/60:
                del history_info['_id']
                history_info['del_time'] = datetime.datetime.now()
                mongo.NA_HISTORY.insert(history_info)
                tag = self.get_tag()
                web_info = {'title': title_str, 'tag': tag}
                date_ = datetime.datetime.now().strftime('%Y-%m-%d')
                self.statistics[date_]['update'] += 1
                log.write('info', None, 0, '%s:%s update web info' % (self.ip, self.port))
                return web_info
    except:
        return

Web特征识别:

def get_tag(self):
    try:
        url = self.ip + ':' + str(self.port)
        # CMS、容器、语言识别
        tag = map(self.discern, ['Discern_cms', 'Discern_con', 'Discern_lang'], [url, url, url])
        return filter(None, tag)
    except Exception, e:
        return

def discern(self, dis_type, domain):
    file_tmp = {}
    if int(domain.split(":")[1]) == 443:
        protocol = "https://"
    else:
        protocol = "http://"
        
    try:
        req = urllib2.urlopen(protocol + domain, timeout=self.timeout)
        header = req.headers
        html = req.read()
    except urllib2.HTTPError, e:
        html = e.read()
        header = e.headers
    except Exception, e:
        return
        
    # 根据不同类型进行识别
    for mark_info in self.config_ini[dis_type]:
        if mark_info[1] == 'header':
            try:
                if not header:
                    return
                if re.search(mark_info[3], header[mark_info[2]], re.I):
                    return mark_info[0]
            except Exception, e:
                continue
        elif mark_info[1] == 'file':
            if mark_info[2] == 'index':
                try:
                    if not html:
                        return
                    if re.search(mark_info[3], html, re.I):
                        return mark_info[0]
                except Exception, e:
                    continue
            else:
                if mark_info[2] in file_tmp:
                    re_html = file_tmp[mark_info[2]]
                else:
                    try:
                        re_html = urllib2.urlopen(protocol + domain + "/" + mark_info[2], timeout=self.timeout).read()
                    except urllib2.HTTPError, e:
                        re_html = e.read()
                    except Exception, e:
                        return
                    file_tmp[mark_info[2]] = re_html
                    
                try:
                    if re.search(mark_info[3], re_html, re.I):
                        return mark_info[0]
                except Exception, e:
                    print mark_info[3]

四、漏洞扫描模块(VulScan)深度解析

1. 主流程分析

if __name__ == '__main__':
    init()  # 插件初始化
    PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
    
    # 启动监控线程
    thread.start_new_thread(monitor, ())
    
    while True:
        # 获取任务
        task_id, task_plan, task_target, task_plugin = queue_get()
        if task_id == '':
            time.sleep(10)
            continue
            
        # 清理插件缓存
        if PLUGIN_DB:
            del sys.modules[PLUGIN_DB.keys()[0]]
            PLUGIN_DB.clear()
            
        # 执行扫描
        for task_netloc in task_target:
            while True:
                if int(thread._count()) < THREAD_COUNT:
                    if task_netloc[0] in WHITE_LIST:
                        break
                    thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
                    break
                else:
                    time.sleep(2)
                    
        # 更新任务状态
        if task_plan == 0:
            na_task.update({"_id": task_id}, {"$set": {"status": 2}})

2. 关键组件详解

2.1 插件初始化(init)

def init():
    if na_plugin.find().count() >= 1:
        return
        
    script_plugin = []
    json_plugin = []
    
    # 获取vuldb目录下所有插件
    file_list = os.listdir(sys.path[0] + '/vuldb')
    time_ = datetime.datetime.now()
    
    # 分类插件
    for filename in file_list:
        try:
            if filename.split('.')[1] == 'py':
                script_plugin.append(filename.split('.')[0])
            if filename.split('.')[1] == 'json':
                json_plugin.append(filename)
        except:
            pass
            
    # 处理Python插件
    for plugin_name in script_plugin:
        try:
            res_tmp = __import__(plugin_name)
            plugin_info = res_tmp.get_plugin_info()
            plugin_info['add_time'] = time_
            plugin_info['filename'] = plugin_name
            plugin_info['count'] = 0
            na_plugin.insert(plugin_info)
        except:
            pass
            
    # 处理JSON插件
    for plugin_name in json_plugin:
        try:
            json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()
            plugin_info = json.loads(json_text)
            plugin_info['add_time'] = time_
            plugin_info['filename'] = plugin_name
            plugin_info['count'] = 0
            del plugin_info['plugin']
            na_plugin.insert(plugin_info)
        except:
            pass

2.2 配置获取(get_config)

def get_config():
    try:
        config_info = na_config.find_one({"type": "vulscan"})
        pass_row = config_info['config']['Password_dic']
        thread_row = config_info['config']['Thread']
        timeout_row = config_info['config']['Timeout']
        white_row = config_info['config']['White_list']
        
        password_dic = pass_row['value'].split('\n')
        thread_count = int(thread_row['value'])
        timeout = int(timeout_row['value'])
        white_list = white_row['value'].split('\n')
        
        return password_dic, thread_count, timeout, white_list
    except Exception, e:
        print e

2.3 监控线程(monitor)

def monitor():
    global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST
    while True:
        # 计算负载
        queue_count = na_task.find({"status": 0, "plan": 0}).count()
        if queue_count:
            load = 1
        else:
            ac_count = thread._count()
            load = float(ac_count - 4) / THREAD_COUNT
            if load > 1:
                load = 1
            if load < 0:
                load = 0
                
        # 更新心跳
        na_heart.update(
            {"name": "load"},
            {"$set": {"value": load, "up_time": datetime.datetime.now()}}
        )
        
        # 更新配置
        PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
        
        # 调整检测间隔
        if load > 0:
            time.sleep(8)
        else:
            time.sleep(60)

2.4 任务获取(queue_get)

def queue_get():
    global TASK_DATE_DIC
    
    # 获取未启动的任务
    task_req = na_task.find_and_modify(
        query={"status": 0, "plan": 0},
        update={"$set": {"status": 1}},
        sort={'time': 1}
    )
    
    if task_req:
        TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
        return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
    else:
        # 检查计划任务
        task_req_row = na_task.find({"plan": {"$ne": 0}})
        if task_req_row:
            for task_req in task_req_row:
                # 检查是否到达执行时间
                if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']):
                    if task_req['isupdate'] == 1:
                        # 更新目标
                        task_req['target'] = update_target(json.loads(task_req['query']))
                        na_task.update(
                           
巡风扫描器源码深度解析与教学指南 一、巡风扫描器概述 巡风是一款优秀的网络资产探测与漏洞扫描系统,由YSRC团队开发。其主要功能包括: 网络资产信息抓取(存活主机、开放端口、服务识别) 漏洞检测引擎(支持插件式漏洞检测) Web界面展示与管理 源码地址:https://github.com/ysrc/xunfeng 二、系统架构分析 1. 文件结构 2. 服务启动流程 Run.sh启动脚本启动了四个核心服务: MongoDB服务 :端口65521,启用认证 Web服务(Run.py) :基于Flask框架 辅助验证服务(Aider.py) :简单的DNS log平台 资产扫描服务(NAScan.py) :网络资产信息抓取引擎 漏洞扫描服务(VulScan.py) :漏洞检测引擎 三、网络资产扫描模块(NAScan)深度解析 1. 主流程分析 NAScan.py主流程: 2. 关键组件详解 2.1 监控线程(monitor) 2.2 失效记录清理线程(cruise) 2.3 资产扫描核心类(start) 2.4 扫描执行流程(run方法) 2.5 存活主机探测(get_ ac_ ip) ICMP探测核心类: 2.6 Masscan集成 Masscan插件实现: 2.7 端口扫描与服务识别(scan.py) 端口扫描实现: 服务识别实现: Web服务检测: Web特征识别: 四、漏洞扫描模块(VulScan)深度解析 1. 主流程分析 2. 关键组件详解 2.1 插件初始化(init) 2.2 配置获取(get_ config) 2.3 监控线程(monitor) 2.4 任务获取(queue_ get)