我已经阅读了封包手册页和一些试图理解如何使用PACKET_FANOUT套接字选项来扩展接收到的数据的处理(我希望使用SOCK_RAW来捕获高速、>10 10Gbps的流量)。我已经阅读过这示例代码(下面复制),但我不确定我是否完全理解它。
让我们设想一个场景;RSS已经在NIC上设置,并且在RX队列之间均衡地分配了插入的流量,有一个8核心CPU和8个NIC RX队列,每个RX队列0-7分别向CPU 0-7发送一个中断(关于MMAP、零拷贝、轮询(轮询()等)的进一步讨论不在这里讨论)。
这是我在示例代码中看到的事件顺序:
setup_socket()
来创建一个套接字(再说一次,我们将说0-7)。read()
调用时可用的数据(所以套接字0只由线程0创建),然后将数据复制到仅用于该线程的用户接收缓冲区,因为有此标志。在我对这一过程的理解中,第4点是主要的疑问点。我是否正确地理解了如何在这个场景中使用PACKET_FANOUT进行缩放,以及如何将工作线程锁定到处理中断的同一个核心?
void start_af_packet_capture(std::string interface_name, int fanout_group_id) {
// setup_socket() calls socket() (using SOCK_RAW) to created the socketFD,
// setsockopt() to enable promisc mode on the NIC,
// bind() to bind the socketFD to NIC,
// and setsockopt() again to set PACKET_FANOUT + PACKET_FANOUT_CPU
int packet_socket = setup_socket(interface_name, fanout_group_id);
if (packet_socket == -1) {
printf("Can't create socket\n");
return;
}
unsigned int capture_length = 1500;
char buffer[capture_length];
while (true) {
received_packets++;
int readed_bytes = read(packet_socket, buffer, capture_length);
// printf("Got %d bytes from interface\n", readed_bytes);
consume_pkt((u_char*)buffer, readed_bytes);
if (readed_bytes < 0) {
break;
}
}
}
...
bool use_multiple_fanout_processes = true;
// Could get some speed up on NUMA servers
bool execute_strict_cpu_affinity = false;
int main() {
boost::thread speed_printer_thread( speed_printer );
int fanout_group_id = getpid() & 0xffff;
if (use_multiple_fanout_processes) {
boost::thread_group packet_receiver_thread_group;
unsigned int num_cpus = 8;
for (int cpu = 0; cpu < num_cpus; cpu++) {
boost::thread::attributes thread_attrs;
if (execute_strict_cpu_affinity) {
cpu_set_t current_cpu_set;
int cpu_to_bind = cpu % num_cpus;
CPU_ZERO(¤t_cpu_set);
// We count cpus from zero
CPU_SET(cpu_to_bind, ¤t_cpu_set);
int set_affinity_result = pthread_attr_setaffinity_np(thread_attrs.native_handle(), sizeof(cpu_set_t), ¤t_cpu_set);
if (set_affinity_result != 0) {
printf("Can't set CPU affinity for thread\n");
}
}
packet_receiver_thread_group.add_thread(
new boost::thread(thread_attrs, boost::bind(start_af_packet_capture, "eth6", fanout_group_id))
);
}
// Wait all processes for finish
packet_receiver_thread_group.join_all();
} else {
start_af_packet_capture("eth6", 0);
}
speed_printer_thread.join();
}
编辑:奖金问题
这可能是太不相关,在这种情况下,请告知,我将开始一个单独的SO帖子。这里的目标不仅是在多个核之间扩展数据包处理,而且还将分组处理代码放在接收该数据包的同一个核心上(稍后将探讨MMAP和RX_RING ),以便在CPU上减少上下文切换和缓存丢失。我的理解是,这个目标在这里已经实现了,有人能证实或否认吗?
发布于 2018-02-02 17:42:01
因为我有50+的声誉,所以我不能评论。我会在这里留下答复。@Jim D的回答是正确的。我要补充的是(至少在最近的Linux内核版本中),套接字按您添加的顺序添加到扇出组中的套接字数组中。要添加的第0插槽将位于位置0,第一个套接字位于位置1,依此类推。这意味着,如果您有一个来自NIC的中断被固定到CPU 0,并且您希望它被一个应用程序线程处理--您也已经将它固定在CPU 0上(对于CPU 1也是如此),那么CPU扇出算法将对您有效。如果您有一个支持DDIO的NIC,那么在读取框架时,您也更有可能得到这样的缓存。
但是,值得注意的是,如果您从扇出组中移除套接字,它将从数组中移除,并且数组中的最后一个套接字将被替换(即移除位于i
->sock_arr[i] = sock_arr[num_sockets_in_fanout_group - 1]; num_sockets_in_fanout_group--;
位置的套接字)。因此,顺序是确定性的,但请记住,如果要动态地从组中添加和删除套接字,则顺序可能发生更改。
我想补充的另一件事是,如果一个人的目标是跨多个核来增加吞吐量,那么应该评估是否应该跳过CPU 0,从CPU 1开始。CPU 0是操作系统任务和其他未与特定CPU核心“关联”的任务通常运行的地方。同样,确保没有其他中断或任务被固定在应用程序的核心上是值得的。
https://stackoverflow.com/questions/41660747
复制相似问题