今天看到一篇写UDP 丢包场景的文章,其中提到如果UDP 缓冲区填满导致丢包的问题,写了个小程序验证了下,确实之前没有细究过,描述如下:
如何解决:
以libevent测试程序为例,在接收到缓冲区有数据的事件后,首先通过如下的方法,或者libevent封装的方法,获取到系统缓冲区中可读数据的大小,然后申请到对应大小的buffer去调用recvfrom方法,否则会出现如上UDP可读缓冲区小余可读数据的情况,导致出现UDP数据读不全的问题!
int ret = ioctl(fd, FIONREAD, &totallen);
或者
static int
get_n_bytes_readable_on_socket(evutil_socket_t fd)
{
#if defined(FIONREAD) && defined(_WIN32)
unsigned long lng = EVBUFFER_MAX_READ;
if (ioctlsocket(fd, FIONREAD, &lng) < 0)
return -1;
/* Can overflow, but mostly harmlessly. XXXX */
return (int)lng;
#elif defined(FIONREAD)
int n = EVBUFFER_MAX_READ;
if (ioctl(fd, FIONREAD, &n) < 0)
return -1;
return n;
#else
return EVBUFFER_MAX_READ;
#endif
}
服务器收到包之后全部回发给客户端:
[root@localhost sample]# ./test_udp_server
bind() success – [0.0.0.0] [8888]
Read: len [2] – content [bc]
Read: len [4] – content [abcd]
Read: len [4] – content [abcd]
Read: len [17] – content [12345678901111111]
Read: len [13] – content [aaaaaaaaab123]
客户端发送字符给服务器,并输出服务器发回来的字符串:
[root@localhost sample]# ./test_udp_client
set sz:2097152, et recv_buff:0, len:0
set sz:2097152, get snd_buff:0, len:0
bind() success – [11.12.115.239] [8888]
input 0: exit!
input other: send to other msg!
abc
input message to others :
sendToServer: len [2] – content [bc]
input message to others : Read:count:2 ,but read len [2] – content [bc]
recvfrom() no pending data.: Success
abcd
sendToServer: len [4] – content [abcd]
input message to others : Read:count:4 ,but read len [4] – content [abcd]
recvfrom() no pending data.: Success
abcd
sendToServer: len [4] – content [abcd]
input message to others : Read:count:4 ,but read len [4] – content [abcd]
recvfrom() no pending data.: Success
12345678901111111
sendToServer: len [17] – content [12345678901111111]
input message to others : Read:count:17 ,but read len [10] – content [1234567890
]
recvfrom() no pending data.: Success
aaaaaaaaab123
sendToServer: len [13] – content [aaaaaaaaab123]
input message to others : Read:count:13 ,but read len [10] – content [aaaaaaaaab
]
recvfrom() no pending data.: Success
服务器的代码:
//event test udp socket server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <event.h>
#include <event2/listener.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/resource.h>
/*
[root@localhost sample]# gcc -o test_udp_server test_udp_server.c -levent -lpthread
*/
#define SVR_IP "0.0.0.0"
#define SVR_PORT 8888
#define BUF_SIZE 1024
#define UR_CLIENT_SOCK_BUF_SIZE (65536)
#define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32)
void read_cb(int fd, short event, void *arg) {
char buf[BUF_SIZE];
int len;
int size = sizeof(struct sockaddr);
struct sockaddr_in client_addr;
memset(buf, 0, sizeof(buf));
len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size);
if (len == -1) {
perror("recvfrom()");
} else if (len == 0) {
printf("Connection Closed\n");
} else {
printf("Read: len [%d] – content [%s]\n", len, buf);
/* Echo */
sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size);
}
}
int set_sock_buf_size(int fd, int sz0)
{
int sz;
sz = sz0;
while (sz > 0) {
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
sz = sz / 2;
} else {
break;
}
}
if (sz < 1) {
perror("Cannot set socket rcv size");
}
sz = sz0;
while (sz > 0) {
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
sz = sz / 2;
} else {
break;
}
}
if (sz < 1) {
perror("Cannot set socket snd size");
}
return 0;
}
int bind_socket(struct event *ev) {
int sock_fd;
int flag = 1;
struct sockaddr_in sin;
/* Create endpoint */
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket()");
return -1;
}
/* Set socket option */
if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) {
perror("setsockopt()");
return 1;
}
/* Set IP, port */
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = inet_addr(SVR_IP);
sin.sin_port = htons(SVR_PORT);
#ifdef IP_RECVERR
if (sin.sin_family != AF_INET6) {
int on = 0;
#ifdef TURN_IP_RECVERR
on = 1;
#endif
if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0)
perror("IP_RECVERR");
}
#endif
#ifdef IPV6_RECVERR
if (sin.sin_family == AF_INET6) {
int on = 0;
#ifdef TURN_IP_RECVERR
on = 1;
#endif
if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0)
perror("IPV6_RECVERR");
}
#endif
if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) {
perror("O_NONBLOCK");
return -1;
}
set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE);
/* Bind */
if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) {
perror("bind()");
return -1;
} else {
printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT);
}
/* Init one event and add to active events */
event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL);
if (event_add(ev, NULL) == -1) {
printf("event_add() failed\n");
}
return 0;
}
int
main(int argc, char **argv)
{
struct event ev;
/* Init. event */
if (event_init() == NULL) {
printf("event_init() failed\n");
return -1;
}
/* Bind socket */
if (bind_socket(&ev) != 0) {
printf("bind_socket() failed\n");
return -1;
}
/* Enter event loop */
event_dispatch();
printf("done\n");
return 0;
}
客户端的代码:
//event test udp socket server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <event.h>
#include <event2/listener.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/resource.h>
/*
[root@localhost sample]# gcc -o test_udp_client test_udp_client.c -levent -lpthread
*/
#define LOCAL_IP "0.0.0.0"
#define LOCAL_PORT 19393
#define SVR_IP "11.12.115.239"
#define SVR_PORT 8888
#define BUF_SIZE 10 //1024
#define UR_CLIENT_SOCK_BUF_SIZE (65536)
#define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32)
void read_cb(int fd, short event, void *arg) {
char buf[BUF_SIZE];
int len = 0;
int totallen = 0;
int size = sizeof(struct sockaddr);
struct sockaddr_in client_addr;
while(1){
int ret = ioctl(fd, FIONREAD, &totallen);
if(ret < 0 || !totallen) {
perror("recvfrom() no pending data.");
break;
}
memset(buf, 0, sizeof(buf));
len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size);
if (len == -1) {
perror("recvfrom() error");
break;
} else if (len == 0) {
printf("Connection Closed\n");
} else {
printf("Read:count:%d ,but read len [%d] – content [%s]\n",totallen, len, buf);
/* Echo */
//sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size);
}
}
}
int set_sock_buf_size(int fd, int sz0)
{
int sz;
sz = sz0;
while (sz > 0) {
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
sz = sz / 2;
} else {
break;
}
}
if (sz < 1) {
perror("Cannot set socket rcv size");
}
int val=0, len = 0;
int ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&val, &len);
printf("set sz:%d, et recv_buff:%d, len:%d\r\n", sz, val, len);
sz = sz0;
while (sz > 0) {
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
sz = sz / 2;
} else {
break;
}
}
val=0, len = 0;
ret = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&val, &len);
printf("set sz:%d, get snd_buff:%d, len:%d\r\n", sz, val, len);
if (sz < 1) {
perror("Cannot set socket snd size");
}
return 0;
}
void sendToServer(int fd, char* msg, int len, struct sockaddr_in *server){
printf("sendToServer: len [%d] – content [%s]\n", len, msg);
sendto(fd, msg, len, 0, (struct sockaddr *)server, sizeof(struct sockaddr_in));
}
typedef struct AAA{
struct sockaddr_in *server;
int fd;
}ServerInfo;
//recv user input value
static void * recv_input_thread(void *arg) {
if (arg == NULL){
return NULL;
}
ServerInfo *serverInfo = (ServerInfo *) arg;
char input;
char msg[1024];
printf("input 0: exit!\n");
printf("input other: send to other msg!\n");
scanf("%c", &input);
fflush(stdin);
if (input == '0'){
return NULL;
}
do{
printf("input message to others : ");
memset(msg, 0x00, 1024);
//scanf("%s", &msg);
gets(msg);
msg[1023] = '\0';
printf("\n");
sendToServer(serverInfo->fd, msg, strlen(msg), serverInfo->server);
}while(input != '0');
printf("Done!\n");
free(serverInfo);
return NULL;
}
int bind_socket(struct event *ev, int sock_fd, int local_port) {
int flag = 1;
struct sockaddr_in sin;
/* Set IP, port */
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = inet_addr(LOCAL_IP);
sin.sin_port = local_port;
#ifdef IP_RECVERR
if (sin.sin_family != AF_INET6) {
int on = 0;
#ifdef TURN_IP_RECVERR
on = 1;
#endif
if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0)
perror("IP_RECVERR");
}
#endif
#ifdef IPV6_RECVERR
if (sin.sin_family == AF_INET6) {
int on = 0;
#ifdef TURN_IP_RECVERR
on = 1;
#endif
if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0)
perror("IPV6_RECVERR");
}
#endif
if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) {
perror("O_NONBLOCK");
return -1;
}
set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE);
/* Bind */
if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) {
perror("bind()");
return -1;
} else {
printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT);
}
/* Init one event and add to active events */
event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL);
if (event_add(ev, NULL) == -1) {
printf("event_add() failed\n");
}
return 0;
}
int
main(int argc, char **argv)
{
struct event udp_event;
/* Init. event */
if (event_init() == NULL) {
printf("event_init() failed\n");
return -1;
}
int sock_fd;
int flag = 1;
struct sockaddr_in server;
/* Create endpoint */
if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket()");
return -1;
}
/* Set socket option */
if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) {
perror("setsockopt()");
return 1;
}
/* Bind socket */
if (bind_socket(&udp_event, sock_fd, LOCAL_PORT) != 0) {
printf("bind_socket() failed\n");
return -1;
}
/* Set IP, port */
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(SVR_IP);
server.sin_port = htons(SVR_PORT);
ServerInfo *serverInfo = (ServerInfo *)malloc(sizeof(ServerInfo));
serverInfo->server = &server;
serverInfo->fd = sock_fd;
pthread_t tidp;
if ((pthread_create(&tidp, NULL, recv_input_thread, (void*)serverInfo)) == -1){
printf("create error!\n");
return 1;
}
/* Enter the event loop; does not return. */
event_dispatch();
close(sock_fd);
printf("done\n");
return 0;
}