巡风源码浅析
字数 713 2025-08-26 22:11:15
巡风扫描器源码深度解析与教学指南
一、巡风扫描器概述
巡风是一款优秀的网络资产探测与漏洞扫描系统,由YSRC团队开发。其主要功能包括:
- 网络资产信息抓取(存活主机、开放端口、服务识别)
- 漏洞检测引擎(支持插件式漏洞检测)
- Web界面展示与管理
源码地址:https://github.com/ysrc/xunfeng
二、系统架构分析
1. 文件结构
Config.py # 配置文件
README.md # 说明文档
Run.bat # Windows启动服务
Run.py # webserver
Run.sh # Linux启动服务
aider/
Aider.py # 辅助验证脚本
db/ # 初始数据库结构
masscan/ # 内置Masscan程序
nascan/
NAScan.py # 网络资产信息抓取引擎
lib/
common.py # 通用方法
icmp.py # ICMP发送类
log.py # 日志输出
mongo.py # 数据库连接
scan.py # 扫描与识别
start.py # 线程控制
plugin/
masscan.py # 调用Masscan脚本
views/ # web请求处理
View.py
lib/
Conn.py # 数据库公共类
CreateExcel.py # 表格处理
Login.py # 权限验证
QueryLogic.py # 查询语句解析
static/ # 静态资源目录
templates/ # 模板文件目录
vulscan/
VulScan.py # 漏洞检测引擎
vuldb/ # 漏洞库目录
2. 服务启动流程
Run.sh启动脚本启动了四个核心服务:
#!/bin/bash
CURRENT_PATH=`dirname $0`
cd $CURRENT_PATH
XUNFENG_LOG=/var/log/xunfeng
XUNFENG_DB=/var/lib/mongodb
[ ! -d $XUNFENG_LOG ] && mkdir -p ${XUNFENG_LOG}
[ ! -d $XUNFENG_DB ] && mkdir -p ${XUNFENG_DB}
nohup mongod --port 65521 --dbpath=${XUNFENG_DB} --auth > ${XUNFENG_LOG}/db.log &
nohup python ./Run.py > ${XUNFENG_LOG}/web.log &
nohup python ./aider/Aider.py > ${XUNFENG_LOG}/aider.log &
nohup python ./nascan/NAScan.py > ${XUNFENG_LOG}/scan.log &
nohup python ./vulscan/VulScan.py > ${XUNFENG_LOG}/vul.log &
- MongoDB服务:端口65521,启用认证
- Web服务(Run.py):基于Flask框架
- 辅助验证服务(Aider.py):简单的DNS log平台
- 资产扫描服务(NAScan.py):网络资产信息抓取引擎
- 漏洞扫描服务(VulScan.py):漏洞检测引擎
三、网络资产扫描模块(NAScan)深度解析
1. 主流程分析
NAScan.py主流程:
# 读取配置
CONFIG_INI = get_config()
log.write('info', None, 0, u'获取配置成功')
STATISTICS = get_statistics()
# 状态标识
MASSCAN_AC = [0] # Masscan使用标识
NACHANGE = [0] # 扫描列表变更标识
# 启动监控线程
thread.start_new_thread(monitor, (CONFIG_INI, STATISTICS, NACHANGE))
thread.start_new_thread(cruise, (STATISTICS, MASSCAN_AC))
# 设置超时
socket.setdefaulttimeout(int(CONFIG_INI['Timeout'])/2)
ac_data = []
while True:
# 检查扫描周期条件
if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:
ac_data.append(now_date)
NACHANGE[0] = 0
log.write('info', None, 0, u'开始扫描')
# 启动扫描
s = start(CONFIG_INI)
s.masscan_ac = MASSCAN_AC
s.statistics = STATISTICS
s.run()
time.sleep(60)
2. 关键组件详解
2.1 监控线程(monitor)
def monitor(CONFIG_INI, STATISTICS, NACHANGE):
while True:
try:
# 更新心跳
mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}})
# 更新统计信息
mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True)
# 检查配置变更
new_config = get_config()
if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):
NACHANGE[0] = 1 # 触发重新扫描
CONFIG_INI.update(new_config)
except Exception, e:
print e
time.sleep(30) # 30秒检测一次
2.2 失效记录清理线程(cruise)
def cruise(STATISTICS, MASSCAN_AC):
while True:
# 只在工作日9-18点执行清理
if week >= 1 and week <= 5 and hour >= 9 and hour <= 18:
try:
data = mongo.NA_INFO.find().sort("time", 1)
for history_info in data:
# 等待Masscan完成
while MASSCAN_AC[0]:
time.sleep(10)
# 检查端口存活
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, int(port)))
sock.close()
except:
# 不存活则删除记录
mongo.NA_INFO.remove({"ip": ip, "port": port})
STATISTICS[date_]['delete'] += 1
# 记录删除历史
mongo.NA_HISTORY.insert(history_info)
except:
pass
time.sleep(3600) # 每小时检测一次
2.3 资产扫描核心类(start)
class start:
def __init__(self, config):
self.config_ini = config
self.queue = Queue.Queue()
self.thread = int(self.config_ini['Thread'])
self.scan_list = self.config_ini['Scan_list'].split('\n')
self.mode = int(self.config_ini['Masscan'].split('|')[0])
self.icmp = int(self.config_ini['Port_list'].split('|')[0])
self.white_list = self.config_ini.get('White_list', '').split('\n')
2.4 扫描执行流程(run方法)
def run(self):
global AC_PORT_LIST
all_ip_list = []
# 处理扫描IP列表
for ip in self.scan_list:
if "/" in ip: # CIDR格式处理
ip = cidr.CIDR(ip)
ip_list = self.get_ip_list(ip)
# 过滤白名单
for white_ip in self.white_list:
if white_ip in ip_list:
ip_list.remove(white_ip)
# Masscan模式
if self.mode == 1:
self.masscan_path = self.config_ini['Masscan'].split('|')[2]
self.masscan_rate = self.config_ini['Masscan'].split('|')[1]
# 获取存活IP
ip_list = self.get_ac_ip(ip_list)
self.masscan_ac[0] = 1
# Masscan全端口扫描
AC_PORT_LIST = self.masscan(ip_list)
self.masscan_ac[0] = 0
# 加入扫描队列
for ip_str in AC_PORT_LIST.keys():
self.queue.put(ip_str)
self.scan_start()
# 非Masscan模式
else:
all_ip_list.extend(ip_list)
if self.mode == 0:
if self.icmp:
all_ip_list = self.get_ac_ip(all_ip_list)
for ip_str in all_ip_list:
self.queue.put(ip_str)
self.scan_start()
2.5 存活主机探测(get_ac_ip)
def get_ac_ip(self, ip_list):
try:
s = icmp.Nscan()
ipPool = set(ip_list)
return s.mPing(ipPool)
except Exception, e:
print 'The current user permissions unable to send icmp packets'
return ip_list
ICMP探测核心类:
class Nscan:
def mPing(self, ipPool):
Sock = self.__icmpSocket
Sock.settimeout(self.timeout)
packet = self.__icmpPacket
recvFroms = set()
# 启动发送线程
sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
sendThr.start()
while True:
try:
# 接收响应
ac_ip = Sock.recvfrom(1024)[1][0]
if ac_ip not in recvFroms:
log.write("active", ac_ip, 0, None)
recvFroms.add(ac_ip)
except Exception:
pass
finally:
if not sendThr.isAlive():
break
return recvFroms & ipPool
2.6 Masscan集成
def masscan(self, ip):
try:
if len(ip) == 0:
return
sys.path.append(sys.path[0] + "/plugin")
m_scan = __import__("masscan")
result = m_scan.run(ip, self.masscan_path, self.masscan_rate)
return result
except Exception, e:
print e
print 'No masscan plugin detected'
Masscan插件实现:
def run(ip_list, path, rate):
try:
# 写入目标文件
ip_file = open('target.log', 'w')
ip_file.write("\n".join(ip_list))
ip_file.close()
# 过滤危险字符
path = str(path).translate(None, ';|&`\n')
rate = str(rate).translate(None, ';|&`\n')
# 执行Masscan扫描
os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s" % (path, rate))
# 解析结果
result_file = open('tmp.log', 'r')
result_json = result_file.readlines()
result_file.close()
del result_json[0]
del result_json[-1]
open_list = {}
for res in result_json:
try:
ip = res.split()[3]
port = res.split()[2]
if ip in open_list:
open_list[ip].append(port)
else:
open_list[ip] = [port]
except:
pass
# 清理临时文件
os.remove('target.log')
os.remove('tmp.log')
return open_list
except:
pass
2.7 端口扫描与服务识别(scan.py)
class scan:
def run(self):
self.timeout = int(self.config_ini['Timeout'])
for _port in self.port_list:
self.server = ''
self.banner = ''
self.port = int(_port)
# 端口扫描
self.scan_port()
if not self.banner:
continue
# 服务识别
self.server_discern()
if self.server == '':
# 尝试Web访问
web_info = self.try_web()
if web_info:
log.write('web', self.ip, self.port, web_info)
time_ = datetime.datetime.now()
mongo.NA_INFO.update(
{'ip': self.ip, 'port': self.port},
{"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info, 'time': time_}}
)
端口扫描实现:
def scan_port(self):
try:
# TCP连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((self.ip, self.port))
time.sleep(0.2)
except Exception, e:
return
try:
# 获取Banner
self.banner = sock.recv(1024)
sock.close()
if len(self.banner) <= 2:
self.banner = 'NULL'
except Exception, e:
self.banner = 'NULL'
log.write('portscan', self.ip, self.port, None)
# 存储结果
banner = ''
hostname = self.ip2hostname(self.ip)
time_ = datetime.datetime.now()
date_ = time_.strftime('%Y-%m-%d')
try:
banner = unicode(self.banner, errors='replace')
if self.banner == 'NULL':
banner = ''
mongo.NA_INFO.insert({
"ip": self.ip, "port": self.port,
"hostname": hostname, "banner": banner,
"time": time_
})
self.statistics[date_]['add'] += 1
except:
if banner:
# 原子操作更新
history_info = mongo.NA_INFO.find_and_modify(
query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}},
remove=True
)
if history_info:
mongo.NA_INFO.insert({
"ip": self.ip, "port": self.port,
"hostname": hostname, "banner": banner,
"time": time_
})
self.statistics[date_]['update'] += 1
# 记录历史
del history_info["_id"]
history_info['del_time'] = time_
history_info['type'] = 'update'
mongo.NA_HISTORY.insert(history_info)
服务识别实现:
def server_discern(self):
# 快速匹配识别
for mark_info in self.config_ini['Discern_server']:
try:
name, default_port, mode, reg = mark_info
if mode == 'default':
if int(default_port) == self.port:
self.server = name
elif mode == 'banner':
matchObj = re.search(reg, self.banner, re.I | re.M)
if matchObj:
self.server = name
if self.server:
break
except:
continue
# 未识别且非Web端口的服务
if not self.server and self.port not in [80, 443, 8080]:
for mark_info in self.config_ini['Discern_server']:
try:
name, default_port, mode, reg = mark_info
if mode not in ['default', 'banner']:
# 发送特定探测包
dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dis_sock.connect((self.ip, self.port))
mode = mode.decode('string_escape')
reg = reg.decode('string_escape')
dis_sock.send(mode)
time.sleep(0.3)
dis_recv = dis_sock.recv(1024)
dis_sock.close()
matchObj = re.search(reg, dis_recv, re.I | re.M)
if matchObj:
self.server = name
break
except:
pass
if self.server:
log.write("server", self.ip, self.port, self.server)
mongo.NA_INFO.update(
{"ip": self.ip, "port": self.port},
{"$set": {"server": self.server}}
)
Web服务检测:
def try_web(self):
title_str, html = '', ''
try:
# HTTP/HTTPS请求
if self.port == 443:
info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout)
else:
info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout)
html = info.read()
header = info.headers
except urllib2.HTTPError, e:
html = e.read()
header = e.headers
except:
return
if not header:
return
# 处理gzip压缩
if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']:
html_data = StringIO.StringIO(html)
gz = gzip.GzipFile(fileobj=html_data)
html = gz.read()
try:
# 编码转换
html_code = self.get_code(header, html).strip()
if html_code and len(html_code) < 12:
html = html.decode(html_code).encode('utf-8')
except:
pass
try:
# 提取Title
title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M)
if title:
title_str = title.group(1)
except:
pass
try:
# 设置Banner信息
web_banner = str(header) + "\r\n\r\n" + html
self.banner = web_banner
# 更新数据库
history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port})
if 'server' not in history_info:
tag = self.get_tag()
web_info = {'title': title_str, 'tag': tag}
return web_info
else:
if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner)/60:
del history_info['_id']
history_info['del_time'] = datetime.datetime.now()
mongo.NA_HISTORY.insert(history_info)
tag = self.get_tag()
web_info = {'title': title_str, 'tag': tag}
date_ = datetime.datetime.now().strftime('%Y-%m-%d')
self.statistics[date_]['update'] += 1
log.write('info', None, 0, '%s:%s update web info' % (self.ip, self.port))
return web_info
except:
return
Web特征识别:
def get_tag(self):
try:
url = self.ip + ':' + str(self.port)
# CMS、容器、语言识别
tag = map(self.discern, ['Discern_cms', 'Discern_con', 'Discern_lang'], [url, url, url])
return filter(None, tag)
except Exception, e:
return
def discern(self, dis_type, domain):
file_tmp = {}
if int(domain.split(":")[1]) == 443:
protocol = "https://"
else:
protocol = "http://"
try:
req = urllib2.urlopen(protocol + domain, timeout=self.timeout)
header = req.headers
html = req.read()
except urllib2.HTTPError, e:
html = e.read()
header = e.headers
except Exception, e:
return
# 根据不同类型进行识别
for mark_info in self.config_ini[dis_type]:
if mark_info[1] == 'header':
try:
if not header:
return
if re.search(mark_info[3], header[mark_info[2]], re.I):
return mark_info[0]
except Exception, e:
continue
elif mark_info[1] == 'file':
if mark_info[2] == 'index':
try:
if not html:
return
if re.search(mark_info[3], html, re.I):
return mark_info[0]
except Exception, e:
continue
else:
if mark_info[2] in file_tmp:
re_html = file_tmp[mark_info[2]]
else:
try:
re_html = urllib2.urlopen(protocol + domain + "/" + mark_info[2], timeout=self.timeout).read()
except urllib2.HTTPError, e:
re_html = e.read()
except Exception, e:
return
file_tmp[mark_info[2]] = re_html
try:
if re.search(mark_info[3], re_html, re.I):
return mark_info[0]
except Exception, e:
print mark_info[3]
四、漏洞扫描模块(VulScan)深度解析
1. 主流程分析
if __name__ == '__main__':
init() # 插件初始化
PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
# 启动监控线程
thread.start_new_thread(monitor, ())
while True:
# 获取任务
task_id, task_plan, task_target, task_plugin = queue_get()
if task_id == '':
time.sleep(10)
continue
# 清理插件缓存
if PLUGIN_DB:
del sys.modules[PLUGIN_DB.keys()[0]]
PLUGIN_DB.clear()
# 执行扫描
for task_netloc in task_target:
while True:
if int(thread._count()) < THREAD_COUNT:
if task_netloc[0] in WHITE_LIST:
break
thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
break
else:
time.sleep(2)
# 更新任务状态
if task_plan == 0:
na_task.update({"_id": task_id}, {"$set": {"status": 2}})
2. 关键组件详解
2.1 插件初始化(init)
def init():
if na_plugin.find().count() >= 1:
return
script_plugin = []
json_plugin = []
# 获取vuldb目录下所有插件
file_list = os.listdir(sys.path[0] + '/vuldb')
time_ = datetime.datetime.now()
# 分类插件
for filename in file_list:
try:
if filename.split('.')[1] == 'py':
script_plugin.append(filename.split('.')[0])
if filename.split('.')[1] == 'json':
json_plugin.append(filename)
except:
pass
# 处理Python插件
for plugin_name in script_plugin:
try:
res_tmp = __import__(plugin_name)
plugin_info = res_tmp.get_plugin_info()
plugin_info['add_time'] = time_
plugin_info['filename'] = plugin_name
plugin_info['count'] = 0
na_plugin.insert(plugin_info)
except:
pass
# 处理JSON插件
for plugin_name in json_plugin:
try:
json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()
plugin_info = json.loads(json_text)
plugin_info['add_time'] = time_
plugin_info['filename'] = plugin_name
plugin_info['count'] = 0
del plugin_info['plugin']
na_plugin.insert(plugin_info)
except:
pass
2.2 配置获取(get_config)
def get_config():
try:
config_info = na_config.find_one({"type": "vulscan"})
pass_row = config_info['config']['Password_dic']
thread_row = config_info['config']['Thread']
timeout_row = config_info['config']['Timeout']
white_row = config_info['config']['White_list']
password_dic = pass_row['value'].split('\n')
thread_count = int(thread_row['value'])
timeout = int(timeout_row['value'])
white_list = white_row['value'].split('\n')
return password_dic, thread_count, timeout, white_list
except Exception, e:
print e
2.3 监控线程(monitor)
def monitor():
global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST
while True:
# 计算负载
queue_count = na_task.find({"status": 0, "plan": 0}).count()
if queue_count:
load = 1
else:
ac_count = thread._count()
load = float(ac_count - 4) / THREAD_COUNT
if load > 1:
load = 1
if load < 0:
load = 0
# 更新心跳
na_heart.update(
{"name": "load"},
{"$set": {"value": load, "up_time": datetime.datetime.now()}}
)
# 更新配置
PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
# 调整检测间隔
if load > 0:
time.sleep(8)
else:
time.sleep(60)
2.4 任务获取(queue_get)
def queue_get():
global TASK_DATE_DIC
# 获取未启动的任务
task_req = na_task.find_and_modify(
query={"status": 0, "plan": 0},
update={"$set": {"status": 1}},
sort={'time': 1}
)
if task_req:
TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
else:
# 检查计划任务
task_req_row = na_task.find({"plan": {"$ne": 0}})
if task_req_row:
for task_req in task_req_row:
# 检查是否到达执行时间
if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']):
if task_req['isupdate'] == 1:
# 更新目标
task_req['target'] = update_target(json.loads(task_req['query']))
na_task.update(