c++ 网络编程(九)TCP/IP LINUX/windows--使用IOCP模型 多线程超详细教程 以及 多线程实现服务端

原文作者:aircraft

原文链接:https://www.cnblogs.com/DOMLX/p/9661012.html

 先讲Linux下(windows下在后面可以直接跳到后面看):

一.线程基本概念

前面我们讲过多进程服务器,但我们知道它开销很大,因此我们才引入线程,我们可以把它看成是一种轻量级进程。它相比进程有如下几个优点:

  • 线程的创建和上下文切换开销更小且速度更快。
  • 线程间交换数据时无需特殊技术。

进程:在操作系统构成单独执行流的单位。 线程:在进程构成单独执行流的单位。 它们的包含关系是,操作系统 > 进程 > 线程。进程与线程具体差异其实是这样的,每个进程都有独立的完整内存空间,它包括全局数据区,堆区,栈区,而多进程服务器之所以开销大是因为只是为了区分栈区里的不同函数流执行而把数据区,堆区,栈区内存全部复制了一份。而多线程就高效多了,它只把栈区分离出来,进程中的数据区,堆区则共享。具体内存结构示例图如下:


二.创建线程

下面的程序,我们可以用它来创建一个线程:

#include <pthread.h> pthread_create (thread, attr, start_routine, arg)

在这里,pthread_create 创建一个新的线程,并让它可执行。下面是关于参数的说明:

参数

描述

thread

指向线程标识符指针。

attr

一个不透明的属性对象,可以被用来设置线程属性。您可以指定线程属性对象,也可以使用默认值 NULL。

start_routine

线程运行函数起始地址,一旦线程被创建就会执行。

arg

运行函数的参数。它必须通过把引用作为指针强制转换为 void 类型进行传递。如果没有传递参数,则使用 NULL。

创建线程成功时,函数返回 0,若返回值不为 0 则说明创建线程失败。

终止线程

使用下面的程序,我们可以用它来终止一个线程:

#include <pthread.h> pthread_exit (status)

在这里,pthread_exit 用于显式地退出一个线程。通常情况下,pthread_exit() 函数是在线程完成工作后无需继续存在时被调用。

如果 main() 是在它所创建的线程之前结束,并通过 pthread_exit() 退出,那么其他线程将继续执行。否则,它们将在 main() 结束时自动被终止。

实例

以下简单的实例代码使用 pthread_create() 函数创建了 5 个线程,每个线程输出"Hello Runoob!":

#include <iostream>
// 必须的头文件
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS 5
 
// 线程的运行函数
void* say_hello(void* args)
{
    cout << "Hello Runoob!" << endl;
    return 0;
}
 
int main()
{
    // 定义线程的 id 变量,多个变量使用数组
    pthread_t tids[NUM_THREADS];
    for(int i = 0; i < NUM_THREADS; ++i)
    {
        //参数依次是:创建的线程id,线程参数,调用的函数,传入的函数参数
        int ret = pthread_create(&tids[i], NULL, say_hello, NULL);
        if (ret != 0)
        {
           cout << "pthread_create error: error_code=" << ret << endl;
        }
    }
    //等各个线程退出后,进程才结束,否则进程强制结束了,线程可能还没反应过来;
    pthread_exit(NULL);
}

linux下编译运行后结果为:

Hello Runoob! Hello Runoob! Hello Runoob! Hello Runoob! Hello Runoob!

以下简单的实例代码使用 pthread_create() 函数创建了 5 个线程,并接收传入的参数。每个线程打印一个 "Hello Runoob!" 消息,并输出接收的参数,然后调用 pthread_exit() 终止线程。

//文件名:test.cpp
 
#include <iostream>
#include <cstdlib>
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
void *PrintHello(void *threadid)
{  
   // 对传入的参数进行强制类型转换,由无类型指针变为整形数指针,然后再读取
   int tid = *((int*)threadid);
   cout << "Hello Runoob! 线程 ID, " << tid << endl;
   pthread_exit(NULL);
}
 
int main ()
{
   pthread_t threads[NUM_THREADS];
   int indexes[NUM_THREADS];// 用数组来保存i的值
   int rc;
   int i;
   for( i=0; i < NUM_THREADS; i++ ){      
      cout << "main() : 创建线程, " << i << endl;
      indexes[i] = i; //先保存i的值
      // 传入的时候必须强制转换为void* 类型,即无类型指针        
      rc = pthread_create(&threads[i], NULL, 
                          PrintHello, (void *)&(indexes[i]));
      if (rc){
         cout << "Error:无法创建线程," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

linux下编译运行后结果为:

main() : 创建线程, 0 main() : 创建线程, 1 Hello Runoob! 线程 ID, 0 main() : 创建线程, Hello Runoob! 线程 ID, 21 main() : 创建线程, 3 Hello Runoob! 线程 ID, 2 main() : 创建线程, 4 Hello Runoob! 线程 ID, 3

向线程传递参数

这个实例演示了如何通过结构传递多个参数。您可以在线程回调中传递任意的数据类型,因为它指向 void,如下面的实例所示:

#include <iostream>
#include <cstdlib>
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
struct thread_data{
   int  thread_id;
   char *message;
};
 
void *PrintHello(void *threadarg)
{
   struct thread_data *my_data;
 
   my_data = (struct thread_data *) threadarg;
 
   cout << "Thread ID : " << my_data->thread_id ;
   cout << " Message : " << my_data->message << endl;
 
   pthread_exit(NULL);
}
 
int main ()
{
   pthread_t threads[NUM_THREADS];
   struct thread_data td[NUM_THREADS];
   int rc;
   int i;
 
   for( i=0; i < NUM_THREADS; i++ ){
      cout <<"main() : creating thread, " << i << endl;
      td[i].thread_id = i;
      td[i].message = (char*)"This is message";
      rc = pthread_create(&threads[i], NULL,
                          PrintHello, (void *)&td[i]);
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

linux下编译运行后结果为:

main() : creating thread, 0 main() : creating thread, 1 Thread ID : 0 Message : This is message main() : creating thread, Thread ID : 21 Message : This is message main() : creating thread, 3 Thread ID : 2 Message : This is message main() : creating thread, 4 Thread ID : 3 Message : This is message Thread ID : 4 Message : This is message

连接和分离线程

我们可以使用以下两个函数来连接或分离线程:

pthread_join (threadid, status) pthread_detach (threadid)

pthread_join() 子程序阻碍调用程序,直到指定的 threadid 线程终止为止。当创建一个线程时,它的某个属性会定义它是否是可连接的(joinable)或可分离的(detached)。只有创建时定义为可连接的线程才可以被连接。如果线程创建时被定义为可分离的,则它永远也不能被连接。

用途:有的人没有在main 函数最后调用 pthread_exit(NULL); 函数等待,而是选择sleep,这里就可以用pthread_join()代替sleep的不可控制,,而有时候线程结束的时候你想做某一些事情需要知道线程是否结束了,也可以调用这个函数。

这个实例演示了如何使用 pthread_join() 函数来等待线程的完成。

#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
void *wait(void *t)
{
   int i;
   long tid;
 
   tid = (long)t;
 
   sleep(1);
   cout << "Sleeping in thread " << endl;
   cout << "Thread with id : " << tid << "  ...exiting " << endl;
   pthread_exit(NULL);
}
 
int main ()
{
   int rc;
   int i;
   pthread_t threads[NUM_THREADS];
   pthread_attr_t attr;
   void *status;
 
   // 初始化并设置线程为可连接的(joinable)
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
   for( i=0; i < NUM_THREADS; i++ ){
      cout << "main() : creating thread, " << i << endl;
      rc = pthread_create(&threads[i], NULL, wait, (void *)&i );
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
 
   // 删除属性,并等待其他线程
   pthread_attr_destroy(&attr);
   for( i=0; i < NUM_THREADS; i++ ){
      rc = pthread_join(threads[i], &status);
      if (rc){
         cout << "Error:unable to join," << rc << endl;
         exit(-1);
      }
      cout << "Main: completed thread id :" << i ;
      cout << "  exiting with status :" << status << endl;
   }
 
   cout << "Main: program exiting." << endl;
   pthread_exit(NULL);
}

linux下编译运行结果:

main() : creating thread, 0
main() : creating thread, 1
main() : creating thread, 2
main() : creating thread, 3
main() : creating thread, 4
Sleeping in thread 
Thread with id : 4  ...exiting 
Sleeping in thread 
Thread with id : 3  ...exiting 
Sleeping in thread 
Thread with id : 2  ...exiting 
Sleeping in thread 
Thread with id : 1  ...exiting 
Sleeping in thread 
Thread with id : 0  ...exiting 
Main: completed thread id :0  exiting with status :0
Main: completed thread id :1  exiting with status :0
Main: completed thread id :2  exiting with status :0
Main: completed thread id :3  exiting with status :0
Main: completed thread id :4  exiting with status :0
Main: program exiting.

二.线程运行中存在的问题

线程存在的问题和临界区

前面我们知道了怎么创建线程,下面我们再来看看这样一个实例,创建100个线程,它们都访问了同一变量,其中一半对这个变量进行加1操作,一半进行减1操作,按道理其结果会等于0.

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);
long long num = 0;   //long long类型是64位整数型,多线程共同访问

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //创建100个线程,一半执行thread_inc,一半执行thread_des
    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    //等待线程返回
    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);  //+1,-1按道理结果是0
    return 0;
}

//线程入口函数1
void * thread_inc(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num += 1;//临界区(引起问题的语句就是临界区位置)
    return NULL;
}

//线程入口函数2
void * thread_des(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num -= 1;//临界区
    return NULL;
}

从运行结果看并不是0,而且每次运行的结果都不同。那这是什么原因引起的呢? 是因为每个线程访问一个变量是这样一个过程:先从内存取出这个变量值到CPU,然后CPU计算得到改变后的值,最后再将这个改变后的值写回内存。因此,我们可以很容易看出,多个线程访问同一变量,如果某个线程还只刚从内存取出数据,还没来得及写回内存,这时其它线程又访问了这个变量,所以这个值就会不正确了。

为什么会出现这种情况呢,来举个例子:

如上图所示:两个线程都要将某一个共同访问的变量加1,

就像上面说的这个运算过程是:线程1先拿到值然后经过cpu的运算在赋值回去,然后线程2在取值运算放回,上图实现的是最理想的情况,假如这时候线程一拿到了值99,同时线程二没间隔的也拿了99,这时候就要出问题了。线程一运算后赋值100回去,然后线程二运算后又赋值100回去,,,注意了哈,这里两个线程都是为了Num++服务,他们这样搞事情不就代表一个做了无用功吗?(我胖虎要是还拿的动刀还不打死你!!!)

这些看完应该就理解了为什么需要线程同步!!!!以及线程同步的重要性了吧!!

接下来我们再来讲讲怎么解决这个问题:线程同步

线程同步

线程同步用于解决线程访问顺序引发的问题,一般是如下两种情况:

  1. 同时访问同一内存空间时发生的情况
  2. 需要指定访问同一内存空间的线程执行顺序的情况

针对这两种可能引发的情况,我们分别使用的同步技术是:互斥量和信号量。

  • 互斥量 互斥量技术从字面也可以理解,就是临界区有线程访问,其它线程就得排队等待,它们的访问是互斥的,实现方式就是给临界区加锁与释放锁。

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);  //创建互斥量

int pthread_mutex_destroy(pthread_mutex_t *mutex);//销毁互斥量

int pthread_mutex_lock(pthread_mutex_t *mutex);//加锁

int pthread_mutex_unlock(pthread_mutex_t *mutex);//释放锁

简言之,就是利用lock和unlock函数围住临界区的两端。当某个线程调用pthread_mutex_lock进入临界区后,如果没有调用pthread_mutex_unlock释放锁退出,那么其它线程就会一直阻塞在临界区之外,我们把这种情况称之为死锁。所以临界区围住一定要lock和unlock一一对应。

接下来看一下代码示例:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);

long long num = 0;
pthread_mutex_t mutex;

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //互斥量的创建
    pthread_mutex_init(&mutex, NULL);

    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);
    pthread_mutex_destroy(&mutex);   //互斥量的销毁
    return 0;
}

/*扩展临界区,减少加锁,释放锁调用次数,但这样变量必须加满到50000000次后其它线程才能访问.
 这样是延长了线程的等待时间,但缩短了加锁,释放锁函数调用的时间,这里没有定论,自己酌情考虑*/
void * thread_inc(void * arg)
{
    pthread_mutex_lock(&mutex);  //互斥量锁住
    for (int i = 0; i < 1000000; i++)
        num += 1;
    pthread_mutex_unlock(&mutex); //互斥量释放锁
    return NULL;
}

/*缩短了线程等待时间,但循环创建,释放锁函数调用时间增加*/
void * thread_des(void * arg)
{
    for (int i = 0; i < 1000000; i++)
    {
        pthread_mutex_lock(&mutex);
        num -= 1;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

编译运行可以得到结果为:0

信号量 信号量与互斥量类似,只是互斥量是用锁来控制线程访问而信号量是用二进制0,1来完成控制线程顺序。sem_post信号量加1,sem_wait信号量减1,当信号量为0时,sem_wait就会阻断,因此通过这样让信号量加1减1就能控制线程的执行顺序了。 注释:mac上测试信号量函数返回-1失败,以后还是Linux上整吧,也许这些接口已经过时了…

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);//创建信号量
int sem_destroy(sem_t *sem);//销毁信号量
int sem_post(sem_t *sem);//信号量加1
int sem_wait(sem_t *sem);//信号量减1,为0时阻塞

实例代码:线程A从用户输入得到值后存入全局变量num,此时线程B将取走该值并累加。该过程共进行5次,完成后输出总和并退出程序。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

void * read(void * arg);
void * accu(void * arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;


int main(int argc, char *argv[])
{
    pthread_t id_t1, id_t2;
    sem_init(&sem_one, 0, 0);
    sem_init(&sem_two, 0, 1);

    pthread_create(&id_t1, NULL, read, NULL);
    pthread_create(&id_t2, NULL, accu, NULL);

    pthread_join(id_t1, NULL);
    pthread_join(id_t2, NULL);

    sem_destroy(&sem_one);
    sem_destroy(&sem_two);

    return 0;
}

void * read(void * arg)
{
    int i;
    for (i = 0; i < 5; i++) {
        fputs("Input num: ", stdout);
        sem_wait(&sem_two);
        scanf("%d", &num);
        sem_post(&sem_one);
    }
    return NULL;
}

void * accu(void * arg)
{
    int sum = 0 , i;
    for (i = 0; i < 5; i++) {
        sem_wait(&sem_one);
        sum+= num;
        sem_post(&sem_two);
    }
    printf("Result: %d \n", sum);
    return NULL;
}

补充:线程的销毁,线程创建后并不是其入口函数返回后就会自动销毁,需要手动销毁,不然线程创建的内存空间将一直存在。一般手动销毁有如下两种方式:1,调用pthread_join函数,其返回后同时销毁线程 ,是一个阻断函数,服务端一般不用它销毁,因为服务端主线程不宜阻断,还要实时监听客服端连接。2,调用pthread_detach函数,不会阻塞,线程返回自动销毁线程,不过要注意调用它后不能再调用pthread_join函数,它与pthread_join主要区别就是一个是阻塞函数,一个不阻塞。

四.多线程并发服务端的实现

使用多线程实现了一个简单的聊天程序,并对临界区(clnt_cnt,clnt_socks)进行加锁访问.

  • 服务端:
//
//  main.cpp
//  hello_server
//
//  Created by app05 on 15-10-22.
//  Copyright (c) 2015年 app05. All rights reserved.
//临界区是:clnt_cnt和clnt_socks访问处

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define MAX_CLNT 256

void * handle_clnt(void * arg);
void send_msg(char *msg, int len);
void error_handling(char * msg);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;

int main(int argc, char *argv[])
{
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    socklen_t clnt_adr_sz;
    pthread_t t_id;
    if (argc != 2) {
        printf("Usage : %s <port> \n", argv[0]);
        exit(1);
    }

    //创建互斥量
    pthread_mutex_init(&mutx, NULL);
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);

    memset(&serv_adr, 0, sizeof(serv_adr));
    serv_adr.sin_family = AF_INET;
    serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_adr.sin_port = htons(atoi(argv[1]));

    if(bind(serv_sock, (struct sockaddr *) &serv_adr, sizeof(serv_adr)) == -1)
        error_handling("bind() error");
    if(listen(serv_sock, 5) == -1)
        error_handling("listen() error");

    while (1)
    {
        clnt_adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz); //阻断,监听客服端连接请求

        //临界区
        pthread_mutex_lock(&mutx); //加锁
        clnt_socks[clnt_cnt++] = clnt_sock; //新连接的客服端保存到clnt_socks数组里
        pthread_mutex_unlock(&mutx); //释放锁

        //创建线程
        pthread_create(&t_id, NULL, handle_clnt, (void*) &clnt_sock);
        pthread_detach(t_id); //销毁线程,线程return后自动调用销毁,不阻断

        printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));
    }

    close(serv_sock);
    return 0;
}

//线程执行
void * handle_clnt(void * arg)
{
    int clnt_sock = *((int *)arg);
    int str_len = 0, i;
    char msg[BUF_SIZE];

    while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
        send_msg(msg, str_len);

    //从数组中移除当前客服端
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
    {
        if (clnt_sock == clnt_socks[i])
        {
            while (i++ < clnt_cnt - 1)
                clnt_socks[i] = clnt_socks[i + 1];
            break;
        }
    }
    clnt_cnt--;
    pthread_mutex_unlock(&mutx);
    close(clnt_sock);
    return NULL;
}

//向所有连接的客服端发送消息
void send_msg(char * msg, int len)
{
    int i;
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
        write(clnt_socks[i], msg, len);
    pthread_mutex_unlock(&mutx);
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

客户端:

//
//  main.cpp
//  hello_client
//
//  Created by app05 on 15-10-22.
//  Copyright (c) 2015年 app05. All rights reserved.
//
//

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define NAME_SIZE 20

void * send_msg(void * arg);
void * recv_msg(void * arg);
void error_handling(char *message);

char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, const char * argv[]) {
    int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void * thread_return;

    if(argc != 4)
    {
        printf("Usage: %s <IP> <port> \n", argv[0]);
        exit(1);
    }

    sprintf(name, "[%s]", argv[3]); //聊天人名字,配置到编译器参数里
    sock = socket(PF_INET, SOCK_STREAM, 0);
    if(sock == -1)
        error_handling("socket() error");

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
    serv_addr.sin_port = htons(atoi(argv[2]));

    if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1)
        error_handling("connect() error");

    //多线程分离输入和输出
    pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
    //阻塞,等待返回
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);

    close(sock);
    return 0;
}

//发送消息
void * send_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    while (1) {
        fgets(msg, BUF_SIZE, stdin);
        if (!strcmp(msg, "q\n") || !strcmp(msg, "Q \n")) {
            close(sock);
            exit(0);
        }
        sprintf(name_msg, "%s  %s", name, msg);
        write(sock, name_msg, strlen(name_msg));
    }
    return NULL;
}

//接收消息
void * recv_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len;
    while (1) {
        str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
        if(str_len == -1)
            return (void *)-1;
        name_msg[str_len] = 0;
        fputs(name_msg, stdout);
    }
    return NULL;
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

windows下:

一.线程概述

理解Windows内核对象

线程是系统内核对象之一。在学习线程之前,应先了解一下内核对象。内核对象是系统内核分配的一个内存块,该内存块描述的是一个数据结构,其成员负责维护对象的各种信息。内核对象的数据只能由系统内核来访问,应用程序无法在内存中找到这些数据结构并直接改变他们的内容。

常用的系统内核对象有事件对象、文件对象、作业对象、互斥对象、管道对象、进程对象和线程对象等。不同类型的内核对象,其数据结构各有不同。

理解进程和线程

进程被认为是一个正在运行的程序的实例,它也属于系统内核对象。可以将进程简单的理解为一个容器,它只是提供空间,执行程序的代码是由线程来实现的。线程存在于进程中,它负责执行进程地址空间中的代码。当一个进程创建时,系统会自动为其创建一个线程,该线程被称为主线程。在主线程中用户可以通过代码创建其他线程,当进程中的主线程结束时,进程也就结束了。

线程的创建

Windows下,创建线程有多种方式,以下将逐一介绍。注意它们的区别。

使用CreateThread函数创建线程

Windows API函数。该函数在主线程的基础上创建一个新线程。微软在Windows API中提供了建立新的线程的函数CreateThread。

HANDLECreateThread( LPSECURITY_ATTRIBUTES lpThreadAttributes,//线程安全属性 DWORD dwStackSize,//堆栈大小 LPTHREAD_START_ROUTINE lpStartAddress,//线程函数 LPVOID lpParameter,//线程参数 DWORD dwCreationFlags,//线程创建属性 LPDWORD lpThreadId//线程ID );

示例代码:
#include "stdafx.h"
#include<iostream>
#include<Windows.h>
using namespace std;

DWORD WINAPI Fun1Proc(LPVOID lpParameter)
{
    cout << "thread function Fun1Proc!\n";

    return 0;
}

int main()
{
    HANDLE hThread1 = CreateThread(NULL, 0, Fun1Proc, NULL, 0, NULL);
    CloseHandle(hThread1);

    Sleep(1000);
    cout << "main end!\n";
    system("pause");
    return 0;
}

结果图:

使用_beginthreadex函数创建线程

除了使用CreateThread API函数创建线程外,还可以用C++语言提供的_beginthreadex函数来创建线程。

uintptr_t _beginthreadex( // NATIVE CODE void *security, //线程安全属性 unsigned stack_size, //线程的栈大小 unsigned ( *start_address )( void * ),//线程函数 void *arglist, //传递到线程函数中的参数 unsigned initflag, //线程初始化标记 unsigned *thrdaddr //线程ID );

 示例代码:
#include "stdafx.h"
#include<iostream>
#include<Windows.h>
#include<process.h>
using namespace std;

unsigned int _stdcall ThreadProc(LPVOID lpParameter)
{
    cout << "thread function ThreadProc!\n";
    return 0;
}

int main()
{
    _beginthreadex(NULL, 0, ThreadProc, 0, 0, NULL);

    Sleep(1000);
    cout << "main end!\n";
    system("pause");
    return 0;
}

二.线程同步

为什么要进行线程同步?

在程序中使用多线程时,一般很少有多个线程能在其生命期内进行完全独立的操作。更多的情况是一些线程进行某些处理操作,而其他的线程必须对其处理结果进行了解。正常情况下对这种处理结果的了解应当在其处理任务完成后进行。    如果不采取适当的措施,其他线程往往会在线程处理任务结束前就去访问处理结果,这就很有可能得到有关处理结果的错误了解。例如,多个线程同时访问同一个全局变量,如果都是读取操作,则不会出现问题。如果一个线程负责改变此变量的值,而其他线程负责同时读取变量内容,则不能保证读取到的数据是经过写线程修改后的。    为了确保读线程读取到的是经过修改的变量,就必须在向变量写入数据时禁止其他线程对其的任何访问,直至赋值过程结束后再解除对其他线程的访问限制。这种保证线程能了解其他线程任务处理结束后的处理结果而采取的保护措施即为线程同步。

代码示例:  两个线程同时对一个全局变量进行加操作,演示了多线程资源访问冲突的情况。

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1;
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
    }
 
    return 0;
}
 
int main()
{
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

可以看到有时两个线程计算的值相同,这就跟上面Linux下创建一百个线程将数字加减为0没成功一样的道理,都是访问内存的时候冲突了。

为什么会出现这种情况呢,来举个例子:

如上图所示:两个线程都要将某一个共同访问的变量加1,

就像上面说的这个运算过程是:线程1先拿到值然后经过cpu的运算在赋值回去,然后线程2在取值运算放回,上图实现的是最理想的情况,假如这时候线程一拿到了值99,同时线程二没间隔的也拿了99,这时候就要出问题了。线程一运算后赋值100回去,然后线程二运算后又赋值100回去,,,注意了哈,这里两个线程都是为了Num++服务,他们这样搞事情不就代表一个做了无用功吗?(我胖虎要是还拿的动刀还不打死你!!!)

这些看完应该就理解了为什么需要线程同步!!!!以及线程同步的重要性了吧!!

关于线程同步

线程之间通信的两个基本问题是互斥和同步。

  • 线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒。
  • 线程互斥是指对于共享的操作系统资源(指的是广义的”资源”,而不是Windows的.res文件,譬如全局变量就是一种共享资源),在各线程访问时的排它性。当有若干个线程都要使用某一共享资源时,任何时刻最多只允许一个线程去使用,其它要使用该资源的线程必须等待,直到占用资源者释放该资源。

线程互斥是一种特殊的线程同步。实际上,互斥和同步对应着线程间通信发生的两种情况:

  • 当有多个线程访问共享资源而不使资源被破坏时;
  • 当一个线程需要将某个任务已经完成的情况通知另外一个或多个线程时。

从大的方面讲,线程的同步可分用户模式的线程同步和内核对象的线程同步两大类。

  • 用户模式中线程的同步方法主要有原子访问和临界区等方法。其特点是同步速度特别快,适合于对线程运行速度有严格要求的场合。
  • 内核对象的线程同步则主要由事件、等待定时器、信号量以及信号灯等内核对象构成。由于这种同步机制使用了内核对象,使用时必须将线程从用户模式切换到内核模式,而这种转换一般要耗费近千个CPU周期,因此同步速度较慢,但在适用性上却要远优于用户模式的线程同步方式。

在WIN32中,同步机制主要有以下几种:  (1)事件(Event);  (2)信号量(semaphore);  (3)互斥量(mutex);  (4)临界区(Critical section)。

Win32中的四种同步方式

临界区

临界区(Critical Section)是一段独占对某些共享资源访问的代码,在任意时刻只允许一个线程对共享资源进行访问。如果有多个线程试图同时访问临界区,那么在有一个线程进入后其他所有试图访问此临界区的线程将被挂起,并一直持续到进入临界区的线程离开。临界区在被释放后,其他线程可以继续抢占,并以此达到用原子方式操作共享资源的目的。

临界区在使用时以CRITICAL_SECTION结构对象保护共享资源,并分别用EnterCriticalSection()和LeaveCriticalSection()函数去标识和释放一个临界区。所用到的CRITICAL_SECTION结构对象必须经过InitializeCriticalSection()的初始化后才能使用,而且必须确保所有线程中的任何试图访问此共享资源的代码都处在此临界区的保护之下。否则临界区将不会起到应有的作用,共享资源依然有被破坏的可能。

代码示例:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定义全局变量
CRITICAL_SECTION Critical;      //定义临界区句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        EnterCriticalSection(&Critical);
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        LeaveCriticalSection(&Critical);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        EnterCriticalSection(&Critical);
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        LeaveCriticalSection(&Critical);
    }
 
    return 0;
}
 
int main()
{
    InitializeCriticalSection(&Critical);   //初始化临界区对象
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

问题解决!!!

事件

事件(Event)是WIN32提供的最灵活的线程间同步方式,事件可以处于激发状态(signaled or true)或未激发状态(unsignal or false)。根据状态变迁方式的不同,事件可分为两类:  (1)手动设置:这种对象只可能用程序手动设置,在需要该事件或者事件发生时,采用SetEvent及ResetEvent来进行设置。  (2)自动恢复:一旦事件发生并被处理后,自动恢复到没有事件状态,不需要再次设置。

使用”事件”机制应注意以下事项:  (1)如果跨进程访问事件,必须对事件命名,在对事件命名的时候,要注意不要与系统命名空间中的其它全局命名对象冲突;  (2)事件是否要自动恢复;  (3)事件的初始状态设置。

由于event对象属于内核对象,故进程B可以调用OpenEvent函数通过对象的名字获得进程A中event对象的句柄,然后将这个句柄用于ResetEvent、SetEvent和WaitForMultipleObjects等函数中。此法可以实现一个进程的线程控制另一进程中线程的运行,例如:

HANDLE hEvent=OpenEvent(EVENT_ALL_ACCESS,true,"MyEvent"); 
ResetEvent(hEvent);

示例代码:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定义全局变量
HANDLE hEvent;  //定义事件句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hEvent, INFINITE);  //等待对象为有信号状态
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        SetEvent(hEvent);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hEvent, INFINITE);  //等待对象为有信号状态
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        SetEvent(hEvent);
    }
 
    return 0;
}
 
int main()
{
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
    hEvent = CreateEvent(NULL, FALSE, TRUE, "event");
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

运行结果都一样就不来显示出来了。

信号量

信号量是维护0到指定最大值之间的同步对象。信号量状态在其计数大于0时是有信号的,而其计数是0时是无信号的。信号量对象在控制上可以支持有限数量共享资源的访问。

信号量的特点和用途可用下列几句话定义:  (1)如果当前资源的数量大于0,则信号量有效;  (2)如果当前资源数量是0,则信号量无效;  (3)系统决不允许当前资源的数量为负值;  (4)当前资源数量决不能大于最大资源数量。

创建信号量

函数原型为:

 HANDLE CreateSemaphore (
   PSECURITY_ATTRIBUTE psa, //信号量的安全属性
   LONG lInitialCount, //开始时可供使用的资源数
   LONG lMaximumCount, //最大资源数
   PCTSTR pszName);     //信号量的名称

释放信号量

通过调用ReleaseSemaphore函数,线程就能够对信标的当前资源数量进行递增,该函数原型为:

BOOL WINAPI ReleaseSemaphore(
   HANDLE hSemaphore,   //要增加的信号量句柄
   LONG lReleaseCount, //信号量的当前资源数增加lReleaseCount
   LPLONG lpPreviousCount  //增加前的数值返回
   );

打开信号量 

和其他核心对象一样,信号量也可以通过名字跨进程访问,打开信号量的API为:

 HANDLE OpenSemaphore (
   DWORD fdwAccess,      //access
   BOOL bInherithandle,  //如果允许子进程继承句柄,则设为TRUE
   PCTSTR pszName  //指定要打开的对象的名字
  );

代码示例:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定义全局变量
HANDLE hSemaphore;  //定义信号量句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    long count;
    while (number < 100)
    {
        WaitForSingleObject(hSemaphore, INFINITE);  //等待信号量为有信号状态
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseSemaphore(hSemaphore, 1, &count);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    long count;
    while (number < 100)
    {
        WaitForSingleObject(hSemaphore, INFINITE);  //等待信号量为有信号状态
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseSemaphore(hSemaphore, 1, &count);
    }
 
    return 0;
}
 
int main()
{
    hSemaphore = CreateSemaphore(NULL, 1, 100, "sema");
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

结果一样。

互斥量

采用互斥对象机制。 只有拥有互斥对象的线程才有访问公共资源的权限,因为互斥对象只有一个,所以能保证公共资源不会同时被多个线程访问。互斥不仅能实现同一应用程序的公共资源安全共享,还能实现不同应用程序的公共资源安全共享。

代码示例:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定义全局变量
HANDLE hMutex;  //定义互斥对象句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hMutex, INFINITE);
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseMutex(hMutex);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hMutex, INFINITE);
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseMutex(hMutex);
    }
 
    return 0;
}
 
int main()
{
    hMutex = CreateMutex(NULL, false, "mutex");     //创建互斥对象
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

结果一样的。

三.多线程+IOCP实现服务端

(1)为什么使用IOCP模型。

socket是内核对象句柄,每次对socket执行操作,需要用户对象到内核对象的转换,执行完成返回结果,需要内核对象到用户对象的转换。 IOCP的中文名称是完成端口,目前是Windows下最高效的网络模型。特点:半异步,非阻塞。(我理解的完全异步是回调式,不需要人工参与,但是IOCP的异步需要轮询)。

        其他模型的缺点:

1)select模型:最低效,每次检索对长度有限制(默认是64个链接),可以通过修改头文件的方式修改上限,需要手动循环查询是否有操作可执行,所以很低效;

2)WSAEvent,事件模型,缺点也是有上限,每次最多监听64个事件,在收到事件通知后,去手动recv数据,效率比select高许多,因为操作是系统消息通知的,可以实现异步; 3)完成例程模型,是对事件模型的改进,去掉了64个事件的上限

以上模型还有个缺点,就是每次有操作可执行时,需要手动去执行recv或者accept等操作,涉及到内核对象<->用户对象的两次切换(订制获取消息时一次,recv/accept操作一次),而且对于accept来说,每次手动调用,都会产生一个socket,当大量accept来到时,产生socket的过程会非常耗时。         知道其他模型的缺点,就知道了完成端口的优点:1)没有监听上限;2)对于accept来说,socket是提前建立准备好的,收到连接时直接返回之前传入的socket;3)只涉及到一次内核对象<->用户对象切换(订制消息时一次),因为在订制消息的时候,已经把数据缓存地址给了内核对象,内核对象在收到数据、写入缓存后,才切换回用户对象,让用户拿走数据。总的来说,完成端口是proactor模型,其他的是reactor模型。

(2)IOCP理解与应用。

扯远点。首先传统服务器的网络IO流程如下: 接到一个客户端连接->创建一个线程负责这个连接的IO操作->持续对新线程进行数据处理->全部数据处理完毕->终止线程。 但是这样的设计代价是:

  • 1:每个连接创建一个线程,将导致过多的线程。
    • 2:维护线程所消耗的堆栈内存过大。
      • 3:操作系统创建和销毁线程过大。
        • 4:线程之间切换的上下文代价过大。

此时我们可以考虑使用线程池解决其中3和4的问题。这种传统的服务器网络结构称之为会话模型。 后来我们为防止大量线程的维护,创建了I/O模型,它被希望要求可以: 1:允许一个线程在不同时刻给多个客户端进行服务。 2:允许一个客户端在不同时间被多个线程服务。

这样做的话,我们的线程则会大幅度减少,这就要求以下两点: 1:客户端状态的分离,之前会话模式我们可以通过线程状态得知客户端状态,但现在客户端状态要通过其他方式获取。 2:I/O请求的分离。一个线程不再服务于一个客户端会话,则要求客户端对这个线程提交I/O处理请求。

那么就产生了这样一个模式,分为三部分:

  • 1:会话状态管理模块。它负责接收到一个客户端连接,就创建一个会话状态。
    • 2:当会话状态发生改变,例如断掉连接,接收到网络消息,就发送一个I/O请求给 I/O工作模块进行处理。
      • 3:I/O工作模块接收到一个I/O请求后,从线程池里唤醒一个工作线程,让该工作线程处理这个I/O请求,处理完毕后,该工作线程继续挂起。

上面的做法,则将网络连接 和I/O工作线程分离为三个部分,相互通讯仅依靠 I/O请求。此时可知有以下一些建议:

  • 1:在进行I/O请求处理的工作线程是被唤醒的工作线程,一个CPU对应一个的话,可以最大化利用CPU。所以 活跃线程的个数 建议等于 硬件CPU个数。
    • 2:工作线程我们开始创建了线程池,免除创建和销毁线程的代价。因为线程是对I/O进行操作的,且一一对应,那么当I/O全部并行时,工作线程必须满足I/O并行操作需求,所以 线程池内最大工作线程个数 建议大于或者等于 I/O并行个数。
      • 3:但是我们可知CPU个数又限制了活跃的线程个数,那么线程池过大意义很低,所以按常规建议 线程池大小 等于 CPU个数*2 左右为佳。例如,8核服务器建议创建16个工作线程的线程池。 上面描述的依然是I/O模型并非IOCP,那么IOCP是什么呢,全称 IO完成端口。

它是一种WIN32的网络I/O模型,既包括了网络连接部分,也负责了部分的I/O操作功能,用于方便我们控制有并发性的网络I/O操作。它有如下特点:

  • 1:它是一个WIN32内核对象,所以无法运行于Linux.
    • 2:它自己负责维护了工作线程池,同时也负责了I/O通道的内存池。
      • 3:它自己实现了线程的管理以及I/O请求通知,最小化的做到了线程的上下文切换。
        • 4:它自己实现了线程的优化调度,提高了CPU和内存缓冲的使用率。

使用IOCP的基本步骤很简单:

  • 1:创建IOCP对象,由它负责管理多个Socket和I/O请求。CreateIoCompletionPort需要将IOCP对象和IOCP句柄绑定。
    • 2:创建一个工作线程池,以便Socket发送I/O请求给IOCP对象后,由这些工作线程进行I/O操作。注意,创建这些线程的时候,将这些线程绑定到IOCP上。
      • 3:创建一个监听的socket。
        • 4:轮询,当接收到了新的连接后,将socket和完成端口进行关联并且投递给IOCP一个I/O请求。注意:将Socket和IOCP进行关联的函数和创建IOCP的函数一样,都是CreateIoCompletionPort,不过注意传参必然是不同的。
        • 5:因为是异步的,我们可以去做其他,等待IOCP将I/O操作完成会回馈我们一个消息,我们再进行处理。
        • 其中需要知道的是:I/O请求被放在一个I/O请求队列里面,对,是队列,LIFO机制。当一个设备处理完I/O请求后,将会将这个完成后的I/O请求丢回IOCP的I/O完成队列。
        • 我们应用程序则需要在GetQueuedCompletionStatus去询问IOCP,该I/O请求是否完成。
        • 其中有一些特殊的事情要说明一下,我们有时有需要人工的去投递一些I/O请求,则需要使用PostQueuedCompletionStatus函数向IOCP投递一个I/O请求到它的请求队列中。

(3)IOCP----API详解

(1) 完成端口实现的API CreateIoCompletionPort

HANDLE WINAPI CreateIoCompletionPort( _In_ HANDLE FileHandle, _In_opt_ HANDLE ExistingCompletionPort, _In_ ULONG_PTR CompletionKey, _In_ DWORD NumberOfConcurrentThreads );

返回值:如果函数成功,则返回值是I / O完成端口的句柄:如果函数失败,则返回值为NULL。 功能:两个功能,创建完成端口句柄与将新的文件句柄(套接字)绑定到完成端口(我们也可以理解为完成队列,只是这个队列由操作系统自己维护)

FileHandle:文件句柄或INVALID_HANDLE_VALUE。创建完成端口的时候,该值设置为INVALID_HANDLE_VALUE,Ghost里面时候的是一个临时的socket句柄,不过我们不用一定要这样。 ExistingCompletionPort:现有I / O完成端口的句柄或NULL。如果此参数为现有I / O完成端口,则该函数将其与FileHandle参数指定的句柄相关联。如果成功则函数返回现有I / O完成端口的句柄。如果此参数为NULL,则该函数将创建一个新的I / O完成端口,如果FileHandle参数有效,则将其与新的I / O完成端口相关联。否则,不会发生文件句柄关联。如果成功,该函数将把句柄返回给新的I / O完成端口。 CompletionKey:该值就是类似线程里面传递的一个参数,我们在GetQueuedCompletionStatus中第三个参数获得的就是这个值。 NumberOfConcurrentThreads:如果此参数为NULL,则系统允许与系统中的处理器一样多的并发运行的线程。如果ExistingCompletionPort参数不是NULL,则忽略此参数。

GetQueuedCompletionStatus

BOOL WINAPI GetQueuedCompletionStatus( _In_ HANDLE CompletionPort, _Out_ LPDWORD lpNumberOfBytes, _Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED *lpOverlapped, _In_ DWORD dwMilliseconds );

返回值:成功返回TRUE,失败返回FALSE,如果设置了超时时间,超时返回FALSE 功能:从完成端口中获取已经完成的消息

CompletionPort:完成端口的句柄。 lpNumberOfBytes:该变量接收已完成的I / O操作期间传输的字节数。 lpCompletionKey:该变量及时我们 CreateIoCompletionPort中传递的第三个参数 lpOverlapped:接收完成的I / O操作启动时指定的OVERLAPPED结构的地址。我们可以通过CONTAINING_RECORD这个宏获取以该重叠结构为首地址的结构体信息,也就是该重叠结构为什么必须放在结构体的首地址的原因。 dwMilliseconds:超时时间(毫秒),如果为INFINITE则一直等待直到有消息到来。

备注:   CreateIoCompletionPort 提供这个功能:I/O系统可以被用来向列队的I/O完成端口发送I/O完成通知包。当 你执行一个已经关联一个完成端口的文件I/O操作,I/O系统将会在这个I/O操作完成的时候向I/O完成端口发送一个完成通知包,I/O完成端口将以先 进先出的方式放置这个I/O完成通知包,并使用GetQueuedCompletionStatus 接收I/O完成通知包。    虽然允许任何数量的 线程来调用 GetQueuedCompletionStatus 等待一个I/O完成端口,但每个线程只能同时间内关联一个I/O完成端口,且此端口是线程最后检查的那个端口。    当一个包被放入队列中,系统首先会 检查有多少个关联此端口的线程在运行,如果运行的线程的数量少于NumberOfConcurrentThreads的值,那么允许其中的一个等 待线程去处理包。当一个运行的线程完成处理,将再次调用GetQueuedCompletionStatus ,此时系统允许另一个等待线程去处理包。    系 统也允许一个等待的线程处理包如果运行的线程进入任何形式的等待状态,当这个线程从等待状态进入运行状态,可能会有一个很短的时期活动线程的数量会超过 NumberOfConcurrentThreads 的值,此时,系统会通过不允许任何新的活动线程快速的减少线程个数,直到活动线程少于NumberOfConcurrentThreads 的值。

PostQueuedCompletionStatus

BOOL WINAPI PostQueuedCompletionStatus( _In_ HANDLE CompletionPort, _In_ DWORD dwNumberOfBytesTransferred, _In_ ULONG_PTR dwCompletionKey, _In_opt_ LPOVERLAPPED lpOverlapped );

返回值:成功,返回非零,失败返回零。使用GetLasrError获取最后的错误码 功能:手动向完成端口投递一个异步消息。就类似我们Win32中的PostMessage

CompletionPort:完成端口的句柄。 dwNumberOfBytesTransferred:通过GetQueuedCompletionStatus函数的lpNumberOfBytesTransferred参数返回的值。 dwCompletionKey:通过GetQueuedCompletionStatus函数的lpCompletionKey参数返回的值。 lpOverlapped:通过GetQueuedCompletionStatus函数的lpOverlapped参数返回的值。

可以看到上面后三个参数都可以传递给GetQueuedCompletionStatus,这样—来。—个工作者线程收到传递过来的三个GetQueuedCompletionStatus函数参数后,便可根据由这三个参数的某一个设置的特殊值,决定何时应该退出。例如,可用dwCompletionPort参数传递0值,而—个工作者线程会将其解释成中止指令。一旦所有工作者线程都已关闭,便可使用CloseHandle函数,关闭完成端口。最终安全退出程序。   PostQueuedCompletionStatus函数提供了一种方式来与线程池中的所有线程进行通信。如,当用户终止服务应用程序时,我们想要所有线程都完全利索地退出。但是如果各线程还在等待完成端口而又没有已完成的I/O 请求,那么它们将无法被唤醒。   通过为线程池中的每个线程都调用一次PostQueuedCompletionStatus,我们可以将它们都唤醒。每个线程会对GetQueuedCompletionStatus的返回值进行检查,如果发现应用程序正在终止,那么它们就可以进行清理工作并正常地退出。

CONTAINING_RECORD

PCHAR CONTAINING_RECORD( [in] PCHAR Address, [in] TYPE Type, [in] PCHAR Field );

功能:返回给定结构类型的结构实例的基地址和包含结构中字段的地址。 返回值:返回包含Field的结构的基地址。 Address:我们通过GetQueuedCompletionStatus获取的重叠结构 Type:以重叠结构为首地址的结构体 Field:Type结构体的重叠结构变量

(2)相关其他函数 AcceptEx

BOOL AcceptEx(
  _In_   SOCKET sListenSocket,
  _In_   SOCKET sAcceptSocket,
  _In_   PVOID lpOutputBuffer,
  _In_   DWORD dwReceiveDataLength,
  _In_   DWORD dwLocalAddressLength,
  _In_   DWORD dwRemoteAddressLength,
  _Out_  LPDWORD lpdwBytesReceived,
  _In_   LPOVERLAPPED lpOverlapped
);

返回值:成功返回TRUE,失败返回FALSE 功能:投递异步的接收操作,类似于实现了一个网络内存池,这个池中存放的是已经创造好的套接字(由于要进行异步操作,所以该套接字也要使用WSASocket创建),当有用户连接的时候,操作系统会直接从这个网络内存池中拿出一个来给连接的客户端,这个过程我们少去了连接时才创造套接字的过程(创建一个套接字的过程内部是很复杂的),这也是这个函数优异的地方。

该函数的参数很明确,只是有些其余的话还需要提醒,AcceptEx该函数还需要通过函数指针获得,因为该函数不是windows自身的API。具体的获取过程也只是按部就班,MSDN有详细的例子,示例代码中也有详细的过程,笔者就不赘述了。

AcceptEx函数

         使用Accept(或WSAAccept)接受连接,当并发连接数超过大概30000(这取决于系统资源)的时候,容易出现WSAENOBUFS(10055)错误。这种错误主要是因为系统不能及时为新连接进来的客户端分配socket资源。因此我们应该找到一种的使用之前能够分配socket资源的方法。AcceptEx 就是我们寻找的答案,它的主要优势就是在使用socket资源之前就会分分配好资源,它的其他方面的特点就比较麻烦令人费解了。(参见MSDN库。)

(4)实现代码

服务端代码:

#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include <process.h>
#include <winsock2.h>
#include <windows.h>
 
#pragma comment(lib,"ws2_32.lib");//加载ws2_32.dll
 
#define BUF_SIZE 100
#define READ    3
#define    WRITE    5
 
typedef struct    // socket info
{
    SOCKET hClntSock;
    SOCKADDR_IN clntAdr;
} PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
 
typedef struct    // buffer info
{
    OVERLAPPED overlapped;
    WSABUF wsaBuf;
    char buffer[BUF_SIZE];
    int rwMode;    // READ or WRITE 读写模式
} PER_IO_DATA, *LPPER_IO_DATA;
 
unsigned int  WINAPI EchoThreadMain(LPVOID CompletionPortIO);
void ErrorHandling(char *message);
SOCKET ALLCLIENT[100];
int clientcount = 0;
HANDLE hMutex;//互斥量
 
int main(int argc, char* argv[])
{
 
    hMutex = CreateMutex(NULL, FALSE, NULL);//创建互斥量
 
    WSADATA    wsaData;
    HANDLE hComPort;
    SYSTEM_INFO sysInfo;
    LPPER_IO_DATA ioInfo;
    LPPER_HANDLE_DATA handleInfo;
 
    SOCKET hServSock;
    SOCKADDR_IN servAdr;
    int  i;
    DWORD recvBytes = 0,flags = 0;
 
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        ErrorHandling("WSAStartup() error!");
 
    hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//创建CP对象
    GetSystemInfo(&sysInfo);//获取当前系统的信息
 
    for (i = 0; i < sysInfo.dwNumberOfProcessors; i++)
        _beginthreadex(NULL, 0, EchoThreadMain, (LPVOID)hComPort, 0, NULL);//创建=CPU个数的线程数
 
    hServSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);//不是非阻塞套接字,但是重叠IO套接字。
    memset(&servAdr, 0, sizeof(servAdr));
    servAdr.sin_family = AF_INET;
    servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
    servAdr.sin_port = htons(1234);
 
    bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr));
    listen(hServSock, 5);
 
    while (1)
    {
        SOCKET hClntSock;
        SOCKADDR_IN clntAdr;
        int addrLen = sizeof(clntAdr);
 
        hClntSock = accept(hServSock, (SOCKADDR*)&clntAdr, &addrLen);
 
        handleInfo = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));//和重叠IO一样
        handleInfo->hClntSock = hClntSock;//存储客户端套接字
 
        WaitForSingleObject(hMutex, INFINITE);//线程同步
 
        ALLCLIENT[clientcount++] = hClntSock;//存入套接字队列
 
        ReleaseMutex(hMutex);
 
        memcpy(&(handleInfo->clntAdr), &clntAdr, addrLen);
 
        CreateIoCompletionPort((HANDLE)hClntSock, hComPort, (DWORD)handleInfo, 0);//连接套接字和CP对象
                                                                                //已完成信息将写入CP对象
        ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//存储接收到的信息
        memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
        ioInfo->wsaBuf.len = BUF_SIZE;
        ioInfo->wsaBuf.buf = ioInfo->buffer;//和重叠IO一样
        ioInfo->rwMode = READ;//读写模式
 
        WSARecv(handleInfo->hClntSock, &(ioInfo->wsaBuf),//非阻塞模式
            1, &recvBytes, &flags, &(ioInfo->overlapped), NULL);
    }
    CloseHandle(hMutex);//销毁互斥量
    return 0;
}
 
unsigned int WINAPI EchoThreadMain(LPVOID pComPort)//线程的执行
{
    HANDLE hComPort = (HANDLE)pComPort;
    SOCKET sock;
    DWORD bytesTrans;
    LPPER_HANDLE_DATA handleInfo;
    LPPER_IO_DATA ioInfo;
    DWORD flags = 0;
 
    while (1)//大循环
    {
        GetQueuedCompletionStatus(hComPort, &bytesTrans,//确认“已完成”的I/O!!
            (LPDWORD)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);//INFINITE使用时,程序将阻塞,直到已完成的I/O信息写入CP对象
        sock = handleInfo->hClntSock;//客户端套接字
 
        if (ioInfo->rwMode == READ)//读写模式(此时缓冲区有数据)
        {
            puts("message received!");
            if (bytesTrans == 0)    // 连接结束
            {
                WaitForSingleObject(hMutex, INFINITE);//线程同步
 
                closesocket(sock);
                int i = 0;
                while (ALLCLIENT[i] == sock){ i++; }
                ALLCLIENT[i] = 0;//断开置0
 
                ReleaseMutex(hMutex);
 
                free(handleInfo); free(ioInfo);
                continue;
            }
            int i = 0;
 
            for (; i < clientcount;i++)
            {
                if (ALLCLIENT[i] != 0)//判断是否为已连接的套接字
                {
                    if (ALLCLIENT[i] != sock)
                    {
                        LPPER_IO_DATA newioInfo;
                        newioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//动态分配内存
                        memset(&(newioInfo->overlapped), 0, sizeof(OVERLAPPED));
                        strcpy(newioInfo->buffer, ioInfo->buffer);//重新构建新的内存,防止多次释放free
                        newioInfo->wsaBuf.buf = newioInfo->buffer;
                        newioInfo->wsaBuf.len = bytesTrans;
                        newioInfo->rwMode = WRITE;
 
                        WSASend(ALLCLIENT[i], &(newioInfo->wsaBuf),//回声
                            1, NULL, 0, &(newioInfo->overlapped), NULL);
                    }
                    else
                    {
                        memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
                        ioInfo->wsaBuf.len = bytesTrans;
                        ioInfo->rwMode = WRITE;
                        WSASend(ALLCLIENT[i], &(ioInfo->wsaBuf),//回声
                            1, NULL, 0, &(ioInfo->overlapped), NULL);
                    }
                }
            }
            ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//动态分配内存
            memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
            ioInfo->wsaBuf.len = BUF_SIZE;
            ioInfo->wsaBuf.buf = ioInfo->buffer;
            ioInfo->rwMode = READ;
            WSARecv(sock, &(ioInfo->wsaBuf),//再非阻塞式接收
                1, NULL, &flags, &(ioInfo->overlapped), NULL);
        }
        else
        {
            puts("message sent!");
            free(ioInfo);
        }
    }
    return 0;
}
 
void ErrorHandling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

客户端:

#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <process.h> 
#define BUF_SIZE 1000
#define NAME_SIZE 20
 
#pragma comment(lib, "ws2_32.lib")  //加载 ws2_32.dll  
 
unsigned WINAPI SendMsg(void * arg);//发送信息函数
unsigned WINAPI RecvMsg(void * arg);//接受信息函数
void ErrorHandling(char * msg);//错误返回函数
 
int haveread = 0;
char NAME[50];//[名字]
char ANAME[50];
char msg[BUF_SIZE];//信息
 
int main(int argc, char *argv[])
{
 
    printf("请输入网名:");
    scanf("%s", NAME);
    WSADATA wsaData;
    SOCKET hSock;
    SOCKADDR_IN servAdr;
    HANDLE hSndThread, hRcvThread;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        ErrorHandling("WSAStartup() error!");
 
    hSock = socket(PF_INET, SOCK_STREAM, 0);
    memset(&servAdr, 0, sizeof(servAdr));
    servAdr.sin_family = AF_INET;
    servAdr.sin_addr.s_addr = inet_addr("127.0.0.1");
    servAdr.sin_port = htons(1234);
 
    if (connect(hSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
        ErrorHandling("connect() error");
 
    int resultsend;
    puts("Welcome to joining our chatting room!\n");
    sprintf(ANAME, "[%s]", NAME);
 
    hSndThread =
        (HANDLE)_beginthreadex(NULL, 0, SendMsg, (void*)&hSock, 0, NULL);//写线程
    hRcvThread =
        (HANDLE)_beginthreadex(NULL, 0, RecvMsg, (void*)&hSock, 0, NULL);//读线程
 
    WaitForSingleObject(hSndThread, INFINITE);//等待线程结束
    WaitForSingleObject(hRcvThread, INFINITE);
    closesocket(hSock);
    WSACleanup();
    system("pause");
    return 0;
}
 
unsigned WINAPI SendMsg(void * arg)   // send thread main
{
    SOCKET sock = *((SOCKET*)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    char padd[2];
    fgets(padd, 2, stdin);//多余的'\n'
    printf("\n send message:");
    while (1)
    {
        {
            fgets(msg, BUF_SIZE, stdin);
            if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
            {
                closesocket(sock);
                exit(0);
            }
            sprintf(name_msg, "[%s] %s", NAME, msg);
            char numofmsg = strlen(name_msg) + '0';
            char newmsg[100]; newmsg[0] = numofmsg; newmsg[1] = 0;//第一个字符表示消息的长度
            strcat(newmsg, name_msg);
            int result = send(sock, newmsg, strlen(newmsg), 0);
            if (result == -1)return -1;//发送错误
        }
    }
    return NULL;
}
 
unsigned WINAPI RecvMsg(void * arg)  // read thread main
{
    SOCKET sock = *((SOCKET*)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len = 0;
    while (1)
    {
        {
            char lyfstr[1000] = { 0 };
            int totalnum = 0;
            str_len = recv(sock, name_msg, 1, 0);//读取第一个字符!获取消息的长度
            if (str_len == -1)//读取错误
            {
                printf("return -1\n");
                return -1;
            }
            if (str_len == 0)//读取结束
            {
                printf("return 0\n");
                return 0;//读取结束
            }
            totalnum = name_msg[0] - '0';
            int count = 0;
 
            do
            {
                str_len = recv(sock, name_msg, 1, 0);
 
                name_msg[str_len] = 0;
 
                if (str_len == -1)//读取错误
                {
                    printf("return -1\n");
                    return -1;
                }
                if (str_len == 0)
                {
                    printf("return 0\n");
                    return 0;//读取结束
                }
                strcat(lyfstr, name_msg);
                count = str_len + count;
 
            } while (count < totalnum);
 
            lyfstr[count] = '\0';
            printf("\n");
            strcat(lyfstr, "\n");
            fputs(lyfstr, stdout);
            printf(" send message:");
            fflush(stdout);
            memset(name_msg, 0, sizeof(char));
        }
    }
    return NULL;
}
 
void ErrorHandling(char * msg)
{
    fputs(msg, stderr);
    fputc('\n', stderr);
    exit(1);
}

最后说一句啦。本网络编程入门系列博客是连载学习的,有兴趣的可以看我博客其他篇。。。。

参考博客:https://blog.csdn.net/kaida1234/article/details/79465713

参考博客:http://www.runoob.com/cplusplus/cpp-multithreading.html

参考博客:https://blog.csdn.net/u010223072/article/details/49335867

参考博客:https://blog.csdn.net/wxf2012301351/article/details/73504281

参考书籍:《TCP/IP网络编程 ---尹圣雨》

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏有趣的django

Django rest framework源码分析(1)----认证

一、基础 1.1.安装 两种方式: github pip直接安装 pip install django-rest-framework 1.2.需要先了解的一...

61111
来自专栏腾讯Bugly的专栏

《Android 创建线程源码与OOM分析》

| 导语 企鹅FM近几个版本的外网Crash出现很多OutOfMemory(以下简称OOM)问题,Crash的堆栈都在Thread::start方法上。该文详细...

1K5
来自专栏DT乱“码”

一些linux命令总结。

6720
来自专栏一个会写诗的程序员的博客

CAS指令与MESI缓存一致性协议、 “轻量级锁” 与原子操作CAS指令与MESI缓存一致性协议、 “轻量级锁” 与原子操作

CAS指令,在Intel CPU上称为CMPXCHG。最常见的原子操作有Compare and Exchange,Self Increase/Decrease等...

1725
来自专栏编程思想之路

WiFiAp探究实录--功能实现与源码分析

Android虐我千百遍,我待Android如初恋。 ——————编辑于2017-08-02——————— wifi热点说的是wifiAp相...

1.6K9
来自专栏应用案例

VBA创建Access数据库的4种方法

Excel由于本身的局限性,存储数据量过大的时候,往往会导致工作簿假死无反应,电脑卡顿等情况。那么,将数据存取到Access数据库中就是一种好的解决方法。今天,...

65310
来自专栏王清培的专栏

.NET应用程序调试—原理、工具、方法

随着应用程序的复杂度不断上升,要想将好的设计思想稳定的落实到线上,我们需要具备解决问题的能力。需要具备对运行时的错误进行定位且快速的解决它的能力。本篇文章我将分...

1340
来自专栏北京马哥教育

MongoDB多纬度监控方法详解

一、mongostat工具方法 mongostat是mongdb自带的状态检测工具,在命令行下使用。它会间隔固定时间获取mongodb的当前运行状态,并输出。如...

4655
来自专栏orientlu

UNIX IPC

管道一般为有亲缘关系进程提供单路数据流, 通过pipe(int fd[2])创建, 返回两个文件描述符, fd[0] 用于读,fd[1]用于写。 通过 read...

1502
来自专栏Google Dart

Flutter 构建完整应用手册-持久化

如果我们有一小部分我们想要保存的键值,我们可以使用shared_preferences插件。

2012

扫码关注云+社区

领取腾讯云代金券