Transformer与CNN在恶意URL路径识别中的实践探索
字数 1375 2025-09-23 19:27:38
Transformer与CNN在恶意URL路径识别中的实践教学文档
1. 引言与背景
恶意URL路径识别是网络安全防护的关键任务。传统依赖人工分析的方式效率低下且成本高昂。本文深入探讨如何利用一维卷积神经网络(1D CNN)和Transformer架构来自动识别恶意URL路径,为安全防护提供智能化解决方案。
2. URL数据处理方法
2.1 字符级嵌入(Character-level Embedding)
将URL路径中的每个字符作为基本处理单元,转换为模型可识别的数值向量序列。
2.2 处理流程
-
Tokenization:将URL分割成单独字符并映射为数值
- 示例:
/shop?item=book→[3, 12, 5, 9, 17, ...]
- 示例:
-
Padding:确保所有输入长度一致
- 短序列补零,长序列截断
-
嵌入表示:
- 使用预训练字符嵌入(如fastText)或One-hot encoding
- 最终得到向量序列:
[[0.2,0.5], [0.1,0.3], ...]
2.3 数据预处理函数
def build_vocab(urls):
chars = set("".join(urls))
char2idx = {c: i+1 for i, c in enumerate(chars)} # 0: padding
return char2idx
def encode_url(url, char2idx, max_len=100):
seq = [char2idx.get(c, 0) for c in url[:max_len]]
if len(seq) < max_len:
seq += [0] * (max_len - len(seq))
return seq
3. 1D CNN模型架构与原理
3.1 核心机制
- 卷积核滑动:在字符序列上滑动,捕捉局部模式
- 特征学习:识别SQL注入特征、路径遍历特征、命令注入特征等恶意模式
3.2 网络结构
- 嵌入层:将序列从
[B, L]转换为[B, L, E] - 卷积层:使用一维卷积捕捉局部攻击特征
- 池化层:最大池化压缩特征,聚焦关键信息
- 全连接层:完成二分类预测
3.3 代码实现
import torch.nn as nn
class CNNModel(nn.Module):
def __init__(self, vocab_size, embed_dim=128, num_filters=100, kernel_size=3):
super(CNNModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.conv = nn.Conv1d(embed_dim, num_filters, kernel_size)
self.pool = nn.MaxPool1d(2)
self.fc = nn.Linear(num_filters * 49, 1) # 假设序列长度100,池化后为49
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.embedding(x) # [B, L, E]
x = x.permute(0, 2, 1) # [B, E, L]
x = self.conv(x) # [B, C, L']
x = torch.relu(x)
x = self.pool(x) # [B, C, L'//2]
x = x.view(x.size(0), -1) # Flatten
x = self.fc(x) # [B, 1]
return self.sigmoid(x)
4. Transformer模型架构与原理
4.1 核心组件
- 嵌入层:将token映射为向量
- 位置编码:添加序列位置信息
- 多头自注意力机制:捕捉序列元素间依赖关系
- 前馈网络:进行非线性变换与信息抽象
4.2 位置编码实现
import math
import torch
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
self.d_model = d_model
self.max_len = max_len
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe)
def forward(self, x):
seq_len = x.size(0)
if seq_len <= self.pe.size(0):
pe = self.pe[:seq_len, :].to(x.device)
else:
position = torch.arange(0, seq_len, dtype=torch.float, device=x.device).unsqueeze(1)
div_term = torch.exp(torch.arange(0, self.d_model, 2, device=x.device).float() *
-(math.log(10000.0) / self.d_model))
pe = torch.zeros(seq_len, self.d_model, device=x.device)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return x + pe.unsqueeze(1)
4.3 Transformer模型完整实现
class TransformerModel(nn.Module):
def __init__(self, vocab_size, d_model=128, nhead=8, num_layers=3, max_len=100):
super(TransformerModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoder = PositionalEncoding(d_model, max_len)
encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward=512)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
self.fc = nn.Linear(d_model * max_len, 1)
self.sigmoid = nn.Sigmoid()
self.d_model = d_model
def forward(self, x):
x = self.embedding(x) * math.sqrt(self.d_model) # [B, L] -> [B, L, E]
x = x.permute(1, 0, 2) # [L, B, E] (Transformer expects seq_len first)
x = self.pos_encoder(x)
x = self.transformer_encoder(x) # [L, B, E]
x = x.permute(1, 0, 2).contiguous() # [B, L, E]
x = x.view(x.size(0), -1) # Flatten
x = self.fc(x)
return self.sigmoid(x)
5. 数据集准备与处理
5.1 数据来源
- 攻击流量:WAF平台收集,保存为
black_log.txt - 正常流量:业务URL收集,保存为
white_log.txt - 验证数据:黑白流量随机组合,保存为
b_w_f.txt
5.2 数据集类实现
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
class URLDataset(Dataset):
def __init__(self, urls, labels, char2idx, max_len=100):
self.urls = urls
self.labels = labels
self.char2idx = char2idx
self.max_len = max_len
def __len__(self):
return len(self.urls)
def __getitem__(self, idx):
x = torch.tensor(
encode_url(self.urls[idx], self.char2idx, self.max_len),
dtype=torch.long
)
y = torch.tensor(self.labels[idx], dtype=torch.float)
return x, y
6. 模型训练与验证
6.1 训练流程
def main():
# 数据加载与预处理
black = open('black_log.txt', 'r', encoding='utf-8').readlines()
white = open('white_log.txt', 'r', encoding='utf-8').readlines()
urls = black + white
labels = [1] * len(black) + [0] * len(white)
char2idx = build_vocab(urls)
X_train, X_val, y_train, y_val = train_test_split(urls, labels, test_size=0.5, random_state=42)
train_dataset = URLDataset(X_train, y_train, char2idx)
val_dataset = URLDataset(X_val, y_val, char2idx)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
# 模型初始化
model = TransformerModel(vocab_size=len(char2idx))
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练循环
best_f1 = 0.0
for epoch in range(10):
model.train()
total_loss = 0
for x, y in train_loader:
optimizer.zero_grad()
y_pred = model(x)
loss = criterion(y_pred.view(-1), y.view(-1))
loss.backward()
optimizer.step()
total_loss += loss.item()
# 验证评估
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
for x_val, y_val_batch in val_loader:
y_pred_prob = model(x_val)
y_pred = (y_pred_prob > 0.5).int()
all_preds.extend(y_pred.view(-1).cpu().numpy())
all_labels.extend(y_val_batch.view(-1).cpu().numpy())
acc = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
# 保存最佳模型
if f1 > best_f1:
best_f1 = f1
torch.save(model.state_dict(), "best_model.pth")
7. 模型预测与部署
7.1 预测代码
def predict_urls(model_path, test_file, char2idx):
# 加载模型
model = TransformerModel(vocab_size=len(char2idx))
model.load_state_dict(torch.load(model_path))
model.eval()
# 读取测试数据
with open(test_file, 'r', encoding='utf-8') as f:
test_urls = [line.strip() for line in f if line.strip()]
# 编码预测
test_encoded = torch.tensor([encode_url(u, char2idx) for u in test_urls], dtype=torch.long)
with torch.no_grad():
preds = model(test_encoded)
preds_label = (preds > 0.5).int().view(-1).tolist()
# 输出结果
for i, (u, p) in enumerate(zip(test_urls, preds_label), 1):
result = "恶意" if p == 1 else "正常"
print(f"{i:<5} | {u:<60} | {result:<6}")
8. 性能优化与改进方案
8.1 数据预处理增强
import urllib.parse
import html
import unicodedata
import re
def preprocess_url(u, lower=True):
"""URL预处理规范化"""
s = u
# 多次百分比解码
for _ in range(2):
s = urllib.parse.unquote(s)
s = html.unescape(s)
s = unicodedata.normalize("NFKC", s)
s = re.sub(r"\s+", " ", s)
if lower:
s = s.lower()
return s.strip()
8.2 规则检测增强
import re
SQLI_PATTERNS = [
r"(?i)\bor\b\s+1\s*=\s*1",
r"(?i)union\s+select",
r"(?i)select\s+.*\s+from",
r"(?i)drop\s+table",
r"(--|#\s|/\*)",
r"(?i)or\s+'.+'\s*=\s*'.+'",
r"%27|%22|%3D",
]
COMPILED_SQLI = [re.compile(p) for p in SQLI_PATTERNS]
def rule_based_sqli(url):
"""基于规则的SQL注入检测"""
for p in COMPILED_SQLI:
if p.search(url):
return True
return False
8.3 数据增强技术
import random
def synth_sql_variants(url):
"""生成SQL注入变体增强数据"""
variants = set()
variants.add(url)
variants.add(urllib.parse.quote(url, safe="/=&?"))
variants.add(urllib.parse.quote(urllib.parse.quote(url, safe="/=&?"), safe="/=&?"))
variants.add(url.replace(" ", "+"))
variants.add(url.upper())
variants.add(url.lower())
variants.add(url + " --")
variants.add(url.replace("'", "%27"))
variants.add(url.replace("%27", "'"))
return list(variants)
8.4 LLM辅助验证
import requests
import json
def llm_check(url, predicted_label, api_key):
"""使用大模型API辅助验证"""
result_str = "恶意" if predicted_label == 1 else "正常"
prompt = f"你是一个网络安全专家。现在给你一个URL和模型预测的结果,请判断这个预测是否合理。请只返回'正确'或'错误'。\n\nURL: {url}\n模型预测: {result_str}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0
}
response = requests.post("https://api.openai.com/v1/chat/completions",
headers=headers, data=json.dumps(data))
if response.status_code == 200:
reply = response.json()["choices"][0]["message"]["content"].strip()
return reply == "正确"
return False
9. 实际应用建议
- 混合检测策略:规则检测 + 模型预测 + LLM验证三级检测体系
- 持续学习:建立反馈机制,将误报样本加入训练集持续优化
- 性能监控:监控模型在不同类型攻击上的检测效果,针对性优化
- 部署考虑:
- 生产环境使用ONNX或TensorRT加速推理
- 考虑模型蒸馏减小部署体积
- 实现实时流量处理流水线
10. 总结
本文详细介绍了使用1D CNN和Transformer进行恶意URL路径识别的完整流程,从数据预处理、模型构建、训练验证到部署优化的各个环节。通过结合传统规则检测和现代深度学习技术,可以构建高效准确的恶意URL检测系统,显著提升网络安全防护能力。
关键成功因素包括:充分的数据预处理、合适的模型架构选择、有效的数据增强策略以及多层次的验证体系。实际应用中应根据具体场景调整模型参数和检测策略,达到最佳检测效果。