前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一个UDP可读缓冲区不够导致丢包的现象

一个UDP可读缓冲区不够导致丢包的现象

作者头像
呱牛笔记
发布2023-05-02 15:17:56
1.6K0
发布2023-05-02 15:17:56
举报
文章被收录于专栏:呱牛笔记

今天看到一篇写UDP 丢包场景的文章,其中提到如果UDP 缓冲区填满导致丢包的问题,写了个小程序验证了下,确实之前没有细究过,描述如下:

  • 数据报分片重组丢失:UDP 协议本身规定的大小是 64kb,但是在数据链路层有 MTU 的限制,大小大概在 5kb,所以当你发送一个很大的 UDP 包的时候,这个包会在 IP 层进行分片,然后重组。这个过程就有可能导致分片的包丢失。UDP 本身有 CRC 检测机制,会抛弃掉丢失的 UDP 包;
  • UDP 缓冲区填满:当 UDP 的缓冲区已经被填满的时候,接收方还没有处理这部分的 UDP 数据报,这个时候再过来的数据报就没有地方可以存了,自然就都被丢弃了。 client发送两次UDP数据,第一次 500字节,第二次300字节,server端阻塞模式下接包,第一次recvfrom( 1000 ),收到是 1000,还是500,还是300,还是其他? 由于UDP通信的有界性,接收到只能是500或300,又由于UDP的无序性和非可靠性,接收到可能是300,也可能是500,也可能一直阻塞在recvfrom调用上,直到超时返回(也就是什么也收不到)。 第二种情况:在假定数据包是不丢失并且是按照发送顺序按序到达的情况下,server端阻塞模式下接包,先后三次调用:recvfrom( 200),recvfrom( 1000),recvfrom( 1000),接收情况如何呢? 由于UDP通信的有界性,第一次recvfrom( 200)将接收第一个500字节的数据包,但是因为用户空间buf只有200字节,于是只会返回前面200字节,剩下300字节将丢弃。第二次recvfrom( 1000)将返回300字节,第三次recvfrom( 1000)将会阻塞。

如何解决:

以libevent测试程序为例,在接收到缓冲区有数据的事件后,首先通过如下的方法,或者libevent封装的方法,获取到系统缓冲区中可读数据的大小,然后申请到对应大小的buffer去调用recvfrom方法,否则会出现如上UDP可读缓冲区小余可读数据的情况,导致出现UDP数据读不全的问题!

代码语言:javascript
复制
int ret = ioctl(fd, FIONREAD, &totallen);

或者

代码语言:javascript
复制
static int
get_n_bytes_readable_on_socket(evutil_socket_t fd)
{
#if defined(FIONREAD) && defined(_WIN32)
        unsigned long lng = EVBUFFER_MAX_READ;
        if (ioctlsocket(fd, FIONREAD, &lng) < 0)
                return -1;
        /* Can overflow, but mostly harmlessly. XXXX */
        return (int)lng;
#elif defined(FIONREAD)
        int n = EVBUFFER_MAX_READ;
        if (ioctl(fd, FIONREAD, &n) < 0)
                return -1;
        return n;
#else
        return EVBUFFER_MAX_READ;
#endif
}

服务器收到包之后全部回发给客户端:

[root@localhost sample]# ./test_udp_server

bind() success – [0.0.0.0] [8888]

Read: len [2] – content [bc]

Read: len [4] – content [abcd]

Read: len [4] – content [abcd]

Read: len [17] – content [12345678901111111]

Read: len [13] – content [aaaaaaaaab123]

客户端发送字符给服务器,并输出服务器发回来的字符串:

[root@localhost sample]# ./test_udp_client

set sz:2097152, et recv_buff:0, len:0

set sz:2097152, get snd_buff:0, len:0

bind() success – [11.12.115.239] [8888]

input 0: exit!

input other: send to other msg!

abc

input message to others : 

sendToServer: len [2] – content [bc]

input message to others : Read:count:2 ,but read len [2] – content [bc]

recvfrom() no pending data.: Success

abcd

sendToServer: len [4] – content [abcd]

input message to others : Read:count:4 ,but read len [4] – content [abcd]

recvfrom() no pending data.: Success

abcd

sendToServer: len [4] – content [abcd]

input message to others : Read:count:4 ,but read len [4] – content [abcd]

recvfrom() no pending data.: Success

12345678901111111

sendToServer: len [17] – content [12345678901111111]

input message to others : Read:count:17 ,but read len [10] – content [1234567890

                                                                                 ­]

recvfrom() no pending data.: Success

aaaaaaaaab123

sendToServer: len [13] – content [aaaaaaaaab123]

input message to others : Read:count:13 ,but read len [10] – content [aaaaaaaaab

                                                                                 ­]

recvfrom() no pending data.: Success

服务器的代码:

代码语言:javascript
复制
//event test udp socket server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <event.h>
#include <event2/listener.h>

#include <fcntl.h>

#include <pthread.h>

#include <signal.h>

#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/resource.h>

/*
[root@localhost sample]# gcc -o test_udp_server test_udp_server.c -levent -lpthread
*/
#define SVR_IP "0.0.0.0"
#define SVR_PORT 8888
#define BUF_SIZE 1024

#define UR_CLIENT_SOCK_BUF_SIZE (65536)
#define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32)

void read_cb(int fd, short event, void *arg) {
	char buf[BUF_SIZE];
	int len;
	int size = sizeof(struct sockaddr);
	struct sockaddr_in client_addr;

	memset(buf, 0, sizeof(buf));
	len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size);

	if (len == -1) {
		perror("recvfrom()");
	} else if (len == 0) {
		printf("Connection Closed\n");
	} else {
		printf("Read: len [%d] – content [%s]\n", len, buf);

		/* Echo */
		sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size);
	}
}

int set_sock_buf_size(int fd, int sz0)
{
	int sz;

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}

	if (sz < 1) {
		perror("Cannot set socket rcv size"); 
	}

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}

	if (sz < 1) {
		perror("Cannot set socket snd size"); 
	}

	return 0;
}

int bind_socket(struct event *ev) {
	int sock_fd;
	int flag = 1;
	struct sockaddr_in sin;

	/* Create endpoint */
	if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
		perror("socket()");
		return -1;
	}

	/* Set socket option */
	if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) {
		perror("setsockopt()");
		return 1;
	}

	/* Set IP, port */
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_addr.s_addr = inet_addr(SVR_IP);
	sin.sin_port = htons(SVR_PORT);

#ifdef IP_RECVERR
		if (sin.sin_family != AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IP_RECVERR");
		}
#endif

#ifdef IPV6_RECVERR
		if (sin.sin_family == AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IPV6_RECVERR");
		}
#endif

    if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) {
        perror("O_NONBLOCK");
        return -1;
    }
	set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE);

	/* Bind */
	if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) {
		perror("bind()");
		return -1;
	} else {
		printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT);
	}


	/* Init one event and add to active events */
	event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL);
	if (event_add(ev, NULL) == -1) {
		printf("event_add() failed\n");
	}

	return 0;
}

int
main(int argc, char **argv)
{
	struct event ev;

	/* Init. event */
	if (event_init() == NULL) {
		printf("event_init() failed\n");
		return -1;
	}

	/* Bind socket */
	if (bind_socket(&ev) != 0) {  
		printf("bind_socket() failed\n");
		return -1;
	}

	/* Enter event loop */
	event_dispatch();

	printf("done\n");
	return 0;
}

客户端的代码:

代码语言:javascript
复制
//event test udp socket server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <event.h>
#include <event2/listener.h>


#include <fcntl.h>

#include <pthread.h> 
#include <signal.h>

#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/resource.h>
/*

[root@localhost sample]# gcc -o test_udp_client test_udp_client.c -levent -lpthread
*/

#define LOCAL_IP "0.0.0.0"
#define LOCAL_PORT 19393

#define SVR_IP "11.12.115.239"
#define SVR_PORT  8888
#define BUF_SIZE 10 //1024

#define UR_CLIENT_SOCK_BUF_SIZE (65536)
#define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32)

void read_cb(int fd, short event, void *arg) {
	char buf[BUF_SIZE];
	int len = 0;
	int totallen = 0;
	int size = sizeof(struct sockaddr);
	struct sockaddr_in client_addr;

	while(1){
        int ret = ioctl(fd, FIONREAD, &totallen);
        if(ret < 0 || !totallen) {
			perror("recvfrom() no pending data.");
        	break;
        }
		memset(buf, 0, sizeof(buf));
		len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size);

		if (len == -1) {
			perror("recvfrom() error");
			break;
		} else if (len == 0) {
			printf("Connection Closed\n");
		} else {
			printf("Read:count:%d ,but read len [%d] – content [%s]\n",totallen, len, buf);

			/* Echo */
			//sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size);
		}

	}
}
 

int set_sock_buf_size(int fd, int sz0)
{
	int sz;

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}

	if (sz < 1) {
		perror("Cannot set socket rcv size"); 
	}

        int val=0, len = 0;
        int ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&val, &len);
	printf("set sz:%d, et recv_buff:%d, len:%d\r\n", sz, val, len);

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}
        
	val=0, len = 0;
	ret = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&val, &len);
	printf("set sz:%d, get snd_buff:%d, len:%d\r\n", sz, val, len);

	if (sz < 1) {
		perror("Cannot set socket snd size"); 
	}

	return 0;
}

void sendToServer(int fd, char* msg, int len, struct sockaddr_in *server){
	printf("sendToServer: len [%d] – content [%s]\n", len, msg);
	sendto(fd, msg, len, 0, (struct sockaddr *)server, sizeof(struct sockaddr_in));
}

typedef struct AAA{
    struct sockaddr_in *server;
    int fd;
}ServerInfo;

//recv user input value
static void * recv_input_thread(void *arg)  {
	if (arg == NULL){
    	return NULL;
	}
    ServerInfo *serverInfo = (ServerInfo *) arg;

	char  input;
	char msg[1024];

	printf("input 0: exit!\n"); 
	printf("input other: send to other msg!\n");

    scanf("%c", &input);
    fflush(stdin);
    if (input == '0'){
    	return NULL;
    }
	do{

	    printf("input message to others : "); 
        memset(msg, 0x00, 1024);
        //scanf("%s", &msg); 
        gets(msg);
        msg[1023] = '\0';
        printf("\n"); 


		sendToServer(serverInfo->fd, msg, strlen(msg), serverInfo->server);
 
	}while(input != '0'); 

	printf("Done!\n"); 
    free(serverInfo);
	return NULL;
}


int bind_socket(struct event *ev, int sock_fd, int local_port) { 
	int flag = 1;
	struct sockaddr_in sin;
  
	/* Set IP, port */
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_addr.s_addr = inet_addr(LOCAL_IP);
	sin.sin_port = local_port;

#ifdef IP_RECVERR
		if (sin.sin_family != AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IP_RECVERR");
		}
#endif

#ifdef IPV6_RECVERR
		if (sin.sin_family == AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IPV6_RECVERR");
		}
#endif

    if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) {
        perror("O_NONBLOCK");
        return -1;
    }
	set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE);

	/* Bind */
	if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) {
		perror("bind()");
		return -1;
	} else {
		printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT);
	}


	/* Init one event and add to active events */
	event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL);
	if (event_add(ev, NULL) == -1) {
		printf("event_add() failed\n");
	}

	return 0;
}

int
main(int argc, char **argv)
{
	struct event udp_event;

	/* Init. event */
	if (event_init() == NULL) {
		printf("event_init() failed\n");
		return -1;
	} 
 
	int sock_fd;
	int flag = 1;
	struct sockaddr_in server;

	/* Create endpoint */
	if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
		perror("socket()");
		return -1;
	}

	/* Set socket option */
	if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) {
		perror("setsockopt()");
		return 1;
	}

	/* Bind socket */
	if (bind_socket(&udp_event, sock_fd, LOCAL_PORT) != 0) {  
		printf("bind_socket() failed\n");
		return -1;
	}

	/* Set IP, port */
	memset(&server, 0, sizeof(server));
	server.sin_family = AF_INET;
	server.sin_addr.s_addr = inet_addr(SVR_IP);
	server.sin_port = htons(SVR_PORT);


	ServerInfo *serverInfo = (ServerInfo *)malloc(sizeof(ServerInfo));
	serverInfo->server = &server;
	serverInfo->fd = sock_fd;

    pthread_t tidp;
	if ((pthread_create(&tidp, NULL, recv_input_thread, (void*)serverInfo)) == -1){
		printf("create error!\n");
		return 1;
	}
 
	/* Enter the event loop; does not return. */
	event_dispatch();
	close(sock_fd);


	printf("done\n");
	return 0;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/06/16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档