
随着网络技术的飞速发展,网络安全威胁日益复杂和多样化。网络流量分析作为网络安全的重要组成部分,不仅能够帮助我们理解网络行为模式,还能够及时发现和响应异常活动。在当今数字化时代,组织面临着从简单的网络扫描到复杂的APT攻击等各种安全挑战,有效的网络流量分析和异常检测能力已成为保障网络安全的关键。
本指南将深入探讨网络流量分析的核心概念、关键技术和最佳实践,从基本的协议解析到高级的异常检测算法,为网络安全专业人员提供全面的技术参考。通过理论讲解和实战案例相结合的方式,帮助读者构建完整的网络流量分析和异常检测体系。
网络流量分析(Network Traffic Analysis, NTA)是指对网络中传输的数据流量进行收集、记录、分析和解释的过程。通过网络流量分析,我们可以:
网络流量分析在以下场景中尤为重要:
网络流量由以下几个关键要素组成:
数据包是网络通信的基本单位,包含以下部分:
网络流量的主要特征包括:
根据OSI七层模型,网络流量可以分为以下层次:
网络流量分析主要有以下几种方法:
直接捕获和分析网络数据包,检查数据包的内容、头部信息等。这种方法提供了最详细的信息,但数据量较大,处理成本高。
将网络流量聚合为流(Flow)进行分析。流通常定义为具有相同源IP、目标IP、源端口、目标端口和协议类型的数据包集合。这种方法减少了数据量,提高了分析效率。
分析网络流量的统计特征,如流量速率、数据包大小分布、连接数等。这种方法适用于检测异常流量模式。
使用机器学习算法分析网络流量,识别复杂的异常模式。这种方法能够检测未知的攻击类型,但需要大量的训练数据和计算资源。
网络流量分析面临以下主要挑战:
数据包捕获是网络流量分析的第一步,其基本原理是:
tcpdump是一个强大的命令行数据包捕获工具,适用于各种操作系统。
基本用法:
# 在指定接口上捕获数据包
tcpdump -i eth0
# 捕获指定数量的数据包
tcpdump -i eth0 -c 100
# 保存捕获的数据包
tcpdump -i eth0 -w capture.pcap
# 读取保存的数据包
tcpdump -r capture.pcap
# 使用过滤表达式
tcpdump -i eth0 host 192.168.1.1 and port 80高级过滤示例:
# 捕获TCP SYN数据包
tcpdump -i eth0 'tcp[tcpflags] & (tcp-syn) != 0'
# 捕获HTTP GET请求
tcpdump -i eth0 -A 'tcp port 80 and (((ip[2:2] - ((ip[0]&0xf)<<2)) - ((tcp[12]&0xf0)>>2)) != 0)' | grep -i 'GET'
# 捕获DNS查询
tcpdump -i eth0 -n port 53Wireshark是一个功能强大的图形化网络协议分析工具,提供了丰富的数据包捕获和分析功能。
主要特点:
常用过滤表达式:
# IP地址过滤
ip.addr == 192.168.1.1
# 端口过滤
tcp.port == 80 or udp.port == 53
# 协议过滤
http or dns or ssh
# TCP标志过滤
tcp.flags.syn == 1 and tcp.flags.ack == 0
# 内容过滤
http.request.uri contains "login"
# 时间过滤
frame.time > "2023-01-01 00:00:00" and frame.time < "2023-01-01 23:59:59"tshark是Wireshark的命令行版本,适用于自动化脚本和远程服务器。
基本用法:
# 捕获数据包
tshark -i eth0
# 保存捕获的数据包
tshark -i eth0 -w capture.pcap
# 读取保存的数据包
tshark -r capture.pcap
# 使用过滤表达式
tshark -i eth0 -f "host 192.168.1.1 and port 80"
# 提取特定字段
tshark -r capture.pcap -T fields -e ip.src -e ip.dst -e tcp.srcport -e tcp.dstportNetworkMiner是一个网络取证分析工具,可以从网络流量中提取文件、图片、凭据等信息。
主要功能:
以太网是最常用的局域网技术,其帧格式如下:
+------+------+------+-------------------------------+------+
| 目的MAC | 源MAC | 类型/Length | 数据 | FCS |
+------+------+------+-------------------------------+------+
| 6字节 | 6字节 | 2字节 | 46-1500字节 | 4字节|
+------+------+------+-------------------------------+------+主要字段说明:
以太网协议解析示例(Python代码):
def parse_ethernet_frame(frame):
"""解析以太网帧"""
# 提取MAC地址
dst_mac = ':'.join(f'{b:02x}' for b in frame[0:6])
src_mac = ':'.join(f'{b:02x}' for b in frame[6:12])
# 提取协议类型
eth_type = int.from_bytes(frame[12:14], byteorder='big')
# 提取数据部分
payload = frame[14:]
# 协议类型映射
protocol_map = {
0x0800: "IPv4",
0x0806: "ARP",
0x86DD: "IPv6"
}
protocol = protocol_map.get(eth_type, f"Unknown(0x{eth_type:04x})")
return {
"dst_mac": dst_mac,
"src_mac": src_mac,
"protocol": protocol,
"payload": payload
}IP协议是网络层的核心协议,分为IPv4和IPv6两个版本。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+---------------+---------------+-------------------------------+
| 版本 | IHL | 服务类型 | 总长度 |
+---------------+---------------+-------------------------------+
| 标识 | 标志 | 片偏移 |
+---------------+---------------+-------------------------------+
| 生存时间 | 协议 | 头部校验和 |
+---------------+---------------+-------------------------------+
| 源IP地址 |
+-----------------------------------------------+
| 目标IP地址 |
+-----------------------------------------------+
| 选项(可选) |
+-----------------------------------------------+主要字段说明:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+---------------+---------------+-------------------------------+
| 版本 | 流量类别 | 流标签 |
+---------------+---------------+-------------------------------+
| 有效载荷长度 | 下一头部 | 跳限制 |
+---------------+---------------+-------------------------------+
| |
| 源IPv6地址 |
| |
+---------------------------------------------------------------+
| |
| 目标IPv6地址 |
| |
+---------------------------------------------------------------+主要字段说明:
def parse_ipv4_packet(packet):
"""解析IPv4数据包"""
# 提取基本头部信息
version = (packet[0] >> 4) & 0x0F
ihl = packet[0] & 0x0F
ttl = packet[8]
protocol = packet[9]
src_ip = '.'.join(map(str, packet[12:16]))
dst_ip = '.'.join(map(str, packet[16:20]))
# 计算头部长度(字节)
header_length = ihl * 4
# 提取数据部分
payload = packet[header_length:]
# 协议类型映射
protocol_map = {
1: "ICMP",
6: "TCP",
17: "UDP"
}
protocol_name = protocol_map.get(protocol, f"Unknown({protocol})")
return {
"version": version,
"header_length": header_length,
"ttl": ttl,
"protocol": protocol_name,
"src_ip": src_ip,
"dst_ip": dst_ip,
"payload": payload
}
# 简化的IPv6解析函数
def parse_ipv6_packet(packet):
"""解析IPv6数据包"""
# 提取基本信息
version = (packet[0] >> 4) & 0x0F
traffic_class = ((packet[0] & 0x0F) << 4) | ((packet[1] >> 4) & 0x0F)
flow_label = ((packet[1] & 0x0F) << 16) | (packet[2] << 8) | packet[3]
payload_length = int.from_bytes(packet[4:6], byteorder='big')
next_header = packet[6]
hop_limit = packet[7]
# 提取源IP和目标IP
src_ip_parts = []
for i in range(0, 16, 2):
src_ip_parts.append(f"{int.from_bytes(packet[8+i:8+i+2], byteorder='big'):04x}")
src_ip = ':'.join(src_ip_parts)
dst_ip_parts = []
for i in range(0, 16, 2):
dst_ip_parts.append(f"{int.from_bytes(packet[24+i:24+i+2], byteorder='big'):04x}")
dst_ip = ':'.join(dst_ip_parts)
# 提取数据部分
payload = packet[40:]
return {
"version": version,
"traffic_class": traffic_class,
"flow_label": flow_label,
"payload_length": payload_length,
"next_header": next_header,
"hop_limit": hop_limit,
"src_ip": src_ip,
"dst_ip": dst_ip,
"payload": payload
}TCP(传输控制协议)是一种面向连接的、可靠的传输层协议。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+---------------+---------------+-------------------------------+
| 源端口 | 目标端口 |
+---------------+---------------+-------------------------------+
| 序列号 |
+---------------------------------------------------------------+
| 确认序列号 |
+---------------+---------------+-------------------------------+
| 数据偏移 | 保留 | 标志 | 窗口大小 |
+---------------+---------------+-------------------------------+
| 校验和 | 紧急指针 |
+---------------+---------------+-------------------------------+
| 选项(可选) |
+---------------------------------------------------------------+主要字段说明:
TCP连接建立(三次握手):
TCP连接断开(四次挥手):
def parse_tcp_segment(segment):
"""解析TCP段"""
# 提取基本信息
src_port = int.from_bytes(segment[0:2], byteorder='big')
dst_port = int.from_bytes(segment[2:4], byteorder='big')
seq_num = int.from_bytes(segment[4:8], byteorder='big')
ack_num = int.from_bytes(segment[8:12], byteorder='big')
data_offset = (segment[12] >> 4) & 0x0F
flags = segment[13]
window_size = int.from_bytes(segment[14:16], byteorder='big')
checksum = int.from_bytes(segment[16:18], byteorder='big')
# 解析标志位
urg = (flags & 0x20) != 0
ack = (flags & 0x10) != 0
psh = (flags & 0x08) != 0
rst = (flags & 0x04) != 0
syn = (flags & 0x02) != 0
fin = (flags & 0x01) != 0
# 计算头部长度(字节)
header_length = data_offset * 4
# 提取数据部分
payload = segment[header_length:]
# 构建标志字符串
flag_str = ""
if urg: flag_str += "URG "
if ack: flag_str += "ACK "
if psh: flag_str += "PSH "
if rst: flag_str += "RST "
if syn: flag_str += "SYN "
if fin: flag_str += "FIN "
return {
"src_port": src_port,
"dst_port": dst_port,
"seq_num": seq_num,
"ack_num": ack_num,
"header_length": header_length,
"flags": flag_str.strip(),
"window_size": window_size,
"checksum": checksum,
"payload_length": len(payload),
"payload": payload
}UDP(用户数据报协议)是一种无连接的传输层协议,提供不可靠的服务。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+---------------+---------------+-------------------------------+
| 源端口 | 目标端口 |
+---------------+---------------+-------------------------------+
| 长度 | 校验和 |
+---------------+---------------+-------------------------------+主要字段说明:
def parse_udp_datagram(datagram):
"""解析UDP数据报"""
# 提取基本信息
src_port = int.from_bytes(datagram[0:2], byteorder='big')
dst_port = int.from_bytes(datagram[2:4], byteorder='big')
length = int.from_bytes(datagram[4:6], byteorder='big')
checksum = int.from_bytes(datagram[6:8], byteorder='big')
# 提取数据部分
payload = datagram[8:]
return {
"src_port": src_port,
"dst_port": dst_port,
"length": length,
"checksum": checksum,
"payload_length": len(payload),
"payload": payload
}HTTP(超文本传输协议)是最常用的Web应用协议,基于TCP传输。
HTTP请求格式:
GET /index.html HTTP/1.1
Host: www.example.com
User-Agent: Mozilla/5.0
Accept: text/htmlHTTP响应格式:
HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: 1234
Server: Apache
<html><body>...</body></html>HTTP协议解析示例(Python代码):
def parse_http_request(data):
"""解析HTTP请求"""
try:
# 解码数据
data_str = data.decode('utf-8', errors='ignore')
# 分割请求行和头部
lines = data_str.split('\r\n')
if not lines:
return None
# 解析请求行
request_line = lines[0].split()
if len(request_line) < 3:
return None
method = request_line[0]
path = request_line[1]
version = request_line[2]
# 解析头部
headers = {}
body_start = data_str.find('\r\n\r\n') + 4
for i in range(1, len(lines)):
line = lines[i]
if not line: # 空行表示头部结束
break
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
# 提取请求体
body = data[body_start:]
return {
"method": method,
"path": path,
"version": version,
"headers": headers,
"body": body,
"body_length": len(body)
}
except Exception:
return None
def parse_http_response(data):
"""解析HTTP响应"""
try:
# 解码数据
data_str = data.decode('utf-8', errors='ignore')
# 分割状态行和头部
lines = data_str.split('\r\n')
if not lines:
return None
# 解析状态行
status_line = lines[0].split()
if len(status_line) < 3:
return None
version = status_line[0]
status_code = int(status_line[1])
status_message = ' '.join(status_line[2:])
# 解析头部
headers = {}
body_start = data_str.find('\r\n\r\n') + 4
for i in range(1, len(lines)):
line = lines[i]
if not line: # 空行表示头部结束
break
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
# 提取响应体
body = data[body_start:]
return {
"version": version,
"status_code": status_code,
"status_message": status_message,
"headers": headers,
"body": body,
"body_length": len(body)
}
except Exception:
return NoneDNS(域名系统)是一种将域名解析为IP地址的服务,基于UDP协议。
DNS消息格式:
+---------------------+
| 头部 |
+---------------------+
| 问题 |
+---------------------+
| 回答 |
+---------------------+
| 授权记录 |
+---------------------+
| 额外记录 |
+---------------------+DNS协议解析示例(Python代码):
def parse_dns_message(data):
"""简化的DNS消息解析"""
try:
# 解析头部
id = int.from_bytes(data[0:2], byteorder='big')
flags = int.from_bytes(data[2:4], byteorder='big')
qdcount = int.from_bytes(data[4:6], byteorder='big')
ancount = int.from_bytes(data[6:8], byteorder='big')
nscount = int.from_bytes(data[8:10], byteorder='big')
arcount = int.from_bytes(data[10:12], byteorder='big')
# 解析标志
qr = (flags >> 15) & 0x01
opcode = (flags >> 11) & 0x0F
aa = (flags >> 10) & 0x01
tc = (flags >> 9) & 0x01
rd = (flags >> 8) & 0x01
ra = (flags >> 7) & 0x01
z = (flags >> 4) & 0x07
rcode = flags & 0x0F
# 解析问题部分
position = 12
questions = []
for _ in range(qdcount):
# 解析域名
domain = []
while True:
length = data[position]
if length == 0:
position += 1
break
domain_part = data[position+1:position+1+length].decode('utf-8', errors='ignore')
domain.append(domain_part)
position += length + 1
domain_str = '.'.join(domain)
# 解析查询类型和类
qtype = int.from_bytes(data[position:position+2], byteorder='big')
qclass = int.from_bytes(data[position+2:position+4], byteorder='big')
position += 4
questions.append({
"domain": domain_str,
"qtype": qtype,
"qclass": qclass
})
# 简化处理,不解析回答、授权和额外记录
return {
"id": id,
"is_response": bool(qr),
"opcode": opcode,
"authoritative_answer": bool(aa),
"truncated": bool(tc),
"recursion_desired": bool(rd),
"recursion_available": bool(ra),
"response_code": rcode,
"question_count": qdcount,
"answer_count": ancount,
"authority_count": nscount,
"additional_count": arcount,
"questions": questions
}
except Exception:
return None网络流量统计是分析网络行为的重要方法,主要统计指标包括:
网络流量可视化有助于直观理解网络行为和发现异常模式,常用的可视化方法包括:
显示流量随时间的变化趋势,包括:
显示各种属性的分布情况,包括:
显示网络实体之间的关系,包括:
集成多种指标的综合视图,包括:
下面是一个基于Python的简单网络流量分析工具示例,使用Scapy库进行数据包捕获和分析:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import time
import datetime
import json
from collections import defaultdict, Counter
# 尝试导入Scapy库
try:
from scapy.all import sniff, IP, TCP, UDP, ICMP, ARP, Ether, conf
# 禁用Scapy的详细输出
conf.verb = 0
except ImportError:
print("需要安装Scapy库: pip install scapy")
exit(1)
# 网络流量分析器类
class NetworkTrafficAnalyzer:
def __init__(self):
# 初始化统计数据
self.stats = {
"total_packets": 0,
"total_bytes": 0,
"start_time": datetime.datetime.now(),
"end_time": None,
"protocols": defaultdict(int),
"src_ips": defaultdict(int),
"dst_ips": defaultdict(int),
"src_ports": defaultdict(int),
"dst_ports": defaultdict(int),
"conversations": defaultdict(int),
"packet_sizes": [],
"tcp_flags": defaultdict(int)
}
# 初始化TCP状态跟踪
self.tcp_connections = {}
# 流量速率计算
self.last_packet_count = 0
self.last_byte_count = 0
self.last_time = time.time()
# 捕获控制
self.running = False
self.packet_callback = None
def packet_handler(self, packet):
"""
数据包处理回调函数
"""
with self.lock:
# 更新基本统计信息
self.stats["total_packets"] += 1
packet_size = len(packet)
self.stats["total_bytes"] += packet_size
self.stats["end_time"] = datetime.datetime.now()
self.stats["packet_sizes"].append(packet_size)
# 提取以太网信息
if Ether in packet:
self.stats["protocols"]["Ethernet"] += 1
# 提取IP信息
if IP in packet:
ip_layer = packet[IP]
self.stats["protocols"]["IPv4"] += 1
self.stats["src_ips"][ip_layer.src] += 1
self.stats["dst_ips"][ip_layer.dst] += 1
# 提取传输层协议信息
if TCP in packet:
tcp_layer = packet[TCP]
self.stats["protocols"]["TCP"] += 1
self.stats["src_ports"][tcp_layer.sport] += 1
self.stats["dst_ports"][tcp_layer.dport] += 1
# 记录TCP标志
flags = tcp_layer.flags
flag_str = ""
if flags & 0x01: flag_str += "F"
if flags & 0x02: flag_str += "S"
if flags & 0x04: flag_str += "R"
if flags & 0x08: flag_str += "P"
if flags & 0x10: flag_str += "A"
if flags & 0x20: flag_str += "U"
if flags & 0x40: flag_str += "E"
if flags & 0x80: flag_str += "C"
if flag_str:
self.stats["tcp_flags"][flag_str] += 1
# 记录TCP连接状态
self._update_tcp_connection(ip_layer.src, tcp_layer.sport, ip_layer.dst, tcp_layer.dport, flags)
# 记录会话
conversation_key = f"{ip_layer.src}:{tcp_layer.sport} -> {ip_layer.dst}:{tcp_layer.dport}"
self.stats["conversations"][conversation_key] += 1
elif UDP in packet:
udp_layer = packet[UDP]
self.stats["protocols"]["UDP"] += 1
self.stats["src_ports"][udp_layer.sport] += 1
self.stats["dst_ports"][udp_layer.dport] += 1
# 记录会话
conversation_key = f"{ip_layer.src}:{udp_layer.sport} -> {ip_layer.dst}:{udp_layer.dport}"
self.stats["conversations"][conversation_key] += 1
elif ICMP in packet:
self.stats["protocols"]["ICMP"] += 1
# 记录会话
conversation_key = f"{ip_layer.src} -> {ip_layer.dst} (ICMP)"
self.stats["conversations"][conversation_key] += 1
elif ARP in packet:
self.stats["protocols"]["ARP"] += 1
# 调用外部回调函数
if self.packet_callback:
try:
self.packet_callback(packet)
except Exception:
pass
def _update_tcp_connection(self, src_ip, src_port, dst_ip, dst_port, flags):
"""
更新TCP连接状态
"""
# 创建连接标识
conn_key = f"{src_ip}:{src_port}-{dst_ip}:{dst_port}"
reverse_key = f"{dst_ip}:{dst_port}-{src_ip}:{src_port}"
# 检查是否是新连接(SYN标志)
if flags & 0x02 and not (flags & 0x10): # SYN但没有ACK
self.tcp_connections[conn_key] = {
"state": "SYN_SENT",
"src_ip": src_ip,
"src_port": src_port,
"dst_ip": dst_ip,
"dst_port": dst_port,
"start_time": datetime.datetime.now(),
"last_seen": datetime.datetime.now()
}
# 检查是否是SYN-ACK响应
elif flags & 0x12: # SYN+ACK
# 查找对应的连接
if reverse_key in self.tcp_connections:
conn = self.tcp_connections[reverse_key]
conn["state"] = "SYN_RECEIVED"
conn["last_seen"] = datetime.datetime.now()
# 检查是否是ACK确认
elif flags & 0x10: # ACK
# 查找连接并更新状态
if conn_key in self.tcp_connections:
conn = self.tcp_connections[conn_key]
if conn["state"] == "SYN_RECEIVED":
conn["state"] = "ESTABLISHED"
conn["last_seen"] = datetime.datetime.now()
# 检查是否是FIN挥手
elif flags & 0x01: # FIN
# 查找连接并更新状态
if conn_key in self.tcp_connections:
conn = self.tcp_connections[conn_key]
conn["state"] = "FIN_WAIT"
conn["last_seen"] = datetime.datetime.now()
# 清理过期连接
self._cleanup_old_connections()
def _cleanup_old_connections(self):
"""
清理过期的TCP连接记录
"""
current_time = datetime.datetime.now()
timeout = datetime.timedelta(minutes=5)
# 查找过期连接
expired_keys = []
for key, conn in self.tcp_connections.items():
if current_time - conn["last_seen"] > timeout:
expired_keys.append(key)
# 删除过期连接
for key in expired_keys:
del self.tcp_connections[key]
def start_capture(self, interface=None, filter=None, count=None, timeout=None, callback=None):
"""
开始捕获数据包
参数:
interface: 网络接口
filter: BPF过滤表达式
count: 捕获的数据包数量
timeout: 捕获超时时间(秒)
callback: 数据包回调函数
"""
self.lock = threading.Lock()
self.running = True
self.packet_callback = callback
print(f"开始捕获网络流量...")
if interface:
print(f"接口: {interface}")
if filter:
print(f"过滤器: {filter}")
if count:
print(f"数据包数量: {count}")
if timeout:
print(f"超时时间: {timeout}秒")
try:
# 开始捕获
sniff(
iface=interface,
filter=filter,
prn=self.packet_handler,
count=count,
timeout=timeout,
store=0 # 不保存数据包到内存
)
# 捕获完成
self.running = False
self.print_summary()
return True
except Exception as e:
print(f"捕获过程中发生错误: {e}")
self.running = False
return False
def stop_capture(self):
"""
停止捕获
"""
self.running = False
def calculate_rates(self):
"""
计算流量速率
"""
current_time = time.time()
time_diff = current_time - self.last_time
if time_diff > 0:
packet_rate = (self.stats["total_packets"] - self.last_packet_count) / time_diff
byte_rate = (self.stats["total_bytes"] - self.last_byte_count) / time_diff
# 更新上次统计值
self.last_packet_count = self.stats["total_packets"]
self.last_byte_count = self.stats["total_bytes"]
self.last_time = current_time
return {
"packet_rate": packet_rate,
"byte_rate": byte_rate
}
return {"packet_rate": 0, "byte_rate": 0}
def get_top_talkers(self, count=10):
"""
获取流量最多的IP地址
"""
# 合并源IP和目标IP的统计
ip_total = defaultdict(int)
for ip, count_ip in self.stats["src_ips"].items():
ip_total[ip] += count_ip
for ip, count_ip in self.stats["dst_ips"].items():
ip_total[ip] += count_ip
# 排序并返回前N个
sorted_ips = sorted(ip_total.items(), key=lambda x: x[1], reverse=True)[:count]
return sorted_ips
def get_top_ports(self, count=10):
"""
获取使用最多的端口
"""
# 合并源端口和目标端口的统计
port_total = defaultdict(int)
for port, count_port in self.stats["src_ports"].items():
port_total[port] += count_port
for port, count_port in self.stats["dst_ports"].items():
port_total[port] += count_port
# 排序并返回前N个
sorted_ports = sorted(port_total.items(), key=lambda x: x[1], reverse=True)[:count]
return sorted_ports
def get_top_conversations(self, count=10):
"""
获取流量最多的会话
"""
# 排序并返回前N个
sorted_conversations = sorted(
self.stats["conversations"].items(),
key=lambda x: x[1],
reverse=True
)[:count]
return sorted_conversations
def get_packet_size_stats(self):
"""
获取数据包大小统计信息
"""
if not self.stats["packet_sizes"]:
return {
"min": 0,
"max": 0,
"avg": 0
}
sizes = self.stats["packet_sizes"]
return {
"min": min(sizes),
"max": max(sizes),
"avg": sum(sizes) / len(sizes)
}
def get_tcp_connection_stats(self):
"""
获取TCP连接统计信息
"""
# 计算各状态的连接数
state_counts = Counter(conn["state"] for conn in self.tcp_connections.values())
return {
"total_connections": len(self.tcp_connections),
"state_distribution": dict(state_counts)
}
def print_summary(self):
"""
打印流量分析摘要
"""
print("\n" + "=" * 80)
print("网络流量分析摘要")
print("=" * 80)
# 基本信息
duration = self.stats["end_time"] - self.stats["start_time"] if self.stats["end_time"] else datetime.timedelta(0)
print(f"总数据包数: {self.stats['total_packets']}")
print(f"总流量: {self.stats['total_bytes'] / (1024 * 1024):.2f} MB")
print(f"捕获时间: {self.stats['start_time'].isoformat()} - {self.stats['end_time'].isoformat() if self.stats['end_time'] else '进行中'}")
print(f"持续时间: {duration}")
# 速率信息
rates = self.calculate_rates()
print(f"平均数据包速率: {rates['packet_rate']:.2f} 包/秒")
print(f"平均流量速率: {rates['byte_rate'] / 1024:.2f} KB/秒")
# 数据包大小统计
size_stats = self.get_packet_size_stats()
print(f"数据包大小: 最小={size_stats['min']} 字节, 最大={size_stats['max']} 字节, 平均={size_stats['avg']:.2f} 字节")
# 协议分布
print("\n协议分布:")
total_protocol_packets = sum(self.stats["protocols"].values())
if total_protocol_packets > 0:
sorted_protocols = sorted(self.stats["protocols"].items(), key=lambda x: x[1], reverse=True)
for protocol, count in sorted_protocols:
percentage = count / total_protocol_packets * 100 if total_protocol_packets > 0 else 0
print(f" {protocol}: {count} ({percentage:.2f}%)")
# Top Talkers
print("\n流量最多的IP地址:")
top_talkers = self.get_top_talkers(5)
for ip, count in top_talkers:
print(f" {ip}: {count} 个数据包")
# Top Ports
print("\n使用最多的端口:")
top_ports = self.get_top_ports(5)
for port, count in top_ports:
print(f" {port}: {count} 次")
# Top Conversations
print("\n流量最多的会话:")
top_conversations = self.get_top_conversations(5)
for conversation, count in top_conversations:
print(f" {conversation}: {count} 个数据包")
# TCP连接状态
tcp_stats = self.get_tcp_connection_stats()
print(f"\nTCP连接数: {tcp_stats['total_connections']}")
if tcp_stats["state_distribution"]:
print("TCP连接状态分布:")
for state, count in tcp_stats["state_distribution"].items():
print(f" {state}: {count}")
# TCP标志分布
if self.stats["tcp_flags"]:
print("\nTCP标志分布:")
sorted_flags = sorted(self.stats["tcp_flags"].items(), key=lambda x: x[1], reverse=True)
for flag, count in sorted_flags[:10]: # 只显示前10个
print(f" {flag}: {count}")
print("=" * 80)
def export_stats(self, filename):
"""
导出统计信息到JSON文件
"""
# 准备导出数据
export_data = {
"summary": {
"total_packets": self.stats["total_packets"],
"total_bytes": self.stats["total_bytes"],
"start_time": self.stats["start_time"].isoformat() if self.stats["start_time"] else "N/A",
"end_time": self.stats["end_time"].isoformat() if self.stats["end_time"] else "N/A",
"duration": str(self.stats["end_time"] - self.stats["start_time"]) if self.stats["end_time"] and self.stats["start_time"] else "N/A"
},
"rates": self.calculate_rates(),
"packet_size_stats": self.get_packet_size_stats(),
"protocols": dict(self.stats["protocols"]),
"top_talkers": self.get_top_talkers(20),
"top_ports": self.get_top_ports(20),
"top_conversations": self.get_top_conversations(20),
"tcp_connections": self.get_tcp_connection_stats(),
"tcp_flags": dict(self.stats["tcp_flags"])
}
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
print(f"统计信息已导出到: {filename}")
return True
except Exception as e:
print(f"导出统计信息时发生错误: {e}")
return False
# 主函数
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description="网络流量分析工具")
parser.add_argument("--interface", "-i", help="网络接口")
parser.add_argument("--filter", "-f", help="BPF过滤表达式")
parser.add_argument("--count", "-c", type=int, help="捕获的数据包数量")
parser.add_argument("--timeout", "-t", type=int, help="捕获超时时间(秒)")
parser.add_argument("--export", "-e", help="导出统计信息到JSON文件")
args = parser.parse_args()
# 创建分析器
analyzer = NetworkTrafficAnalyzer()
# 检查是否需要导入threading
import threading
try:
# 开始捕获
success = analyzer.start_capture(
interface=args.interface,
filter=args.filter,
count=args.count,
timeout=args.timeout
)
# 导出统计信息
if success and args.export:
analyzer.export_stats(args.export)
except KeyboardInterrupt:
print("\n捕获已中断")
analyzer.stop_capture()
# 导出统计信息
if args.export:
analyzer.export_stats(args.export)
except Exception as e:
print(f"发生错误: {e}")
if __name__ == "__main__":
main()使用方法:
# 捕获所有流量
sudo python traffic_analyzer.py --interface eth0
# 使用过滤器捕获特定流量
sudo python traffic_analyzer.py --interface eth0 --filter "tcp port 80"
# 限制捕获的数据包数量
sudo python traffic_analyzer.py --interface eth0 --count 1000
# 设置捕获超时时间
sudo python traffic_analyzer.py --interface eth0 --timeout 60
# 导出统计信息
sudo python traffic_analyzer.py --interface eth0 --timeout 30 --export stats.json