前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一个C++多线程TCP服务Demo

一个C++多线程TCP服务Demo

作者头像
leoay 技术
发布2023-06-12 14:48:21
4000
发布2023-06-12 14:48:21
举报
文章被收录于专栏:leoay 技术leoay 技术

代码仓库在文末,编译通过,请放心食用!

无界AI生成

本文介绍了如何在 C++ 中为 Linux 环境实现并发 TCP/IP 服务器。 多线程在我的解决方案中提供并发性。 由于并发性,客户不必等待轮到他们,可以立即得到服务。 我创建的服务器有一个线程来处理新连接(TCPServer 类)。 接受这样的连接后,将创建一个新线程,负责与给定客户端(ConnectionHandler 类)的所有通信。 ConnectionHandler 的实现可以自由更改。 它可以允许对服务器的任何使用,例如它可以很好地用作 HTTP 服务器。

TCPServer.h

代码语言:javascript
复制
#pragma once

#include <thread>

class TCPServer {
public:
    static const int PORT = 1234;

    // starts server
    void start();

    // stops server
    void stop();

    // joining server thread - waits for server to end
    void join();

    // constructor automatically starts server thread
    TCPServer();
    ~TCPServer();

private:
    // event file descriptor - it will be used to tell server to stop
    int efd;

    // server thread
    std::thread m_thread;

    // it works in server thread
    void threadFunc();
};

服务器在构造 TCPServer 对象后启动。 创建一个运行 threadFunc() 方法的新线程。 TCPServer::start() 还创建了 eventfd,用于通知服务器停止处理。 threadFunc() 创建一个套接字并将其绑定到指定的端口(在本例中为 1234)。 poll() 用于监视是否有任何打开的文件描述符准备好执行 I/O。 在侦听套接字上接收到 I/O 事件后,将构建新的 ConnectionHandler 对象,该对象在单独的线程上运行。

TCPServer.cpp

代码语言:javascript
复制
#include "TCPServer.h"
#include <unistd.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <cassert>
#include <vector>
#include <unordered_map>
#include "ConnectionHandler.h"
#include <iostream>

TCPServer::TCPServer()
        : efd(-1) { 
    this->start();
}

TCPServer::~TCPServer() {
    this->stop();
    if (this->efd != -1){
        close(this->efd);
    }
}

void TCPServer::start() {
    assert(!this->m_thread.joinable());

    if (this->efd != -1) {
        close(this->efd);
    }

    this->efd = eventfd(0, 0);
    if (this->efd == -1) {
        std::cout << "eventfd() => -1, errno=" << errno << std::endl;
        return;
    }

    // creates thread
    this->m_thread = std::thread([this]( "this") { this->threadFunc(); });

    // sets name for thread
    pthread_setname_np(this->m_thread.native_handle(), "TCPServer");
}


void TCPServer::stop() {
    // writes to efd - it will be handled as stopping server thread
    uint64_t one = 1;
    auto ret = write(this->efd, &one, 8);
    if (ret == -1) {
        std::cout << "write => -1, errno=" << errno << std::endl;
    }
}

void TCPServer::join() {
    if (this->m_thread.joinable()) {
        this->m_thread.join();
    }
}

void TCPServer::threadFunc() {
    int sockfd;

    std::cout << "Listen on: " << PORT << std::endl;
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        std::cout << "socket() => -1, errno=" << errno << std::endl;
        return;
    }

    int reuseaddr = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
        std::cout << "setsockopt() => -1, errno=" << errno << std::endl;
    }

    struct sockaddr_in servaddr = {0, 0, 0, 0};
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    // binding to socket that will listen for new connections
    if (bind(sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
        std::cout << "bind() => -1, errno=" << errno << std::endl;
        close(sockfd);
        return;
    }

    // started listening, 50 pending connections can wait in a queue
    listen(sockfd, 50);

    // monitored file descriptors - at start there is efd and just created sockfd. POLLIN means we wait for data to read
    std::vector<struct pollfd> fds{ { this->efd, POLLIN, 0 }, { sockfd, POLLIN, 0 } };
    
    std::unordered_map<int, ConnectionHandler> handlers;

    while (true) {
        const int TIMEOUT = 1000;   // 1000 ms
        int n = poll(fds.data(), fds.size(), TIMEOUT);  // checking if there was any event on monitored file descriptors
        if (n == -1 && errno != ETIMEDOUT && errno != EINTR) {
            std::cout << "poll() => -1, errno=" << errno << std::endl;
            break;
        }
        
        // n pending events
        if (n > 0) {
            if (fds[0].revents) {   // handles server stop request (which is sent by TCPServer::stop())
                std::cout << "Received stop request" << std::endl;
                break;
            } else if (fds[1].revents) {    // new client connected
                // accepting connection
                int clientfd = accept(sockfd, NULL, NULL);
                std::cout << "New connection" << std::endl;
                if (clientfd != -1) {
                    // insert new pollfd to monitor
                    fds.push_back(pollfd{clientfd, POLLIN, 0});

                    // create ConnectionHandler object that will run in separate thread
                    handlers.emplace(clientfd, clientfd);
                } else {
                    std::cout << "accept => -1, errno=" << errno << std::endl;
                }

                // clearing revents
                fds[1].revents = 0;
            }
            
            // iterating all pollfds to check if anyone disconnected
            for (auto it = fds.begin() + 2; it != fds.end(); ) {
                char c;
                if (it->revents 
                        && recv(it->fd, &c, 1, MSG_PEEK | MSG_DONTWAIT) == 0) { // checks if disconnected or just fd readable
                    std::cout << "Client disconnected" << std::endl;
                    close(it->fd);  // closing socket
                    handlers.at(it->fd).terminate();    // terminating ConnectionHandler thread
                    handlers.erase(it->fd);
                    it = fds.erase(it);
                } else {
                    ++it;
                }
            }
        }
    }

    // cleaning section after receiving stop request
    for (auto it = fds.begin() + 1; it != fds.end(); it++) {
        close(it->fd);
        if (handlers.find(it->fd) != handlers.end()) {
            handlers.at(it->fd).terminate();
        }
    }

    std::cout << "TCP server stopped" << std::endl;
}

ConnectionHandler.h

代码语言:javascript
复制
#pragma once

#include <thread>

class ConnectionHandler {
private:
    std::thread m_thread;
    int fd = -1;
    bool m_terminate = false;

    std::string readMessage();
    void sendMessage(const std::string& msg);

    void stop();

public:
    explicit ConnectionHandler(int fd);
    ~ConnectionHandler();

    void terminate();
    void threadFunc();
};

ConnectionHandler.cpp

代码语言:javascript
复制
#include "ConnectionHandler.h"
#include <cassert>
#include <sys/socket.h>
#include <errno.h>
#include <iostream>
#include <pthread.h>

ConnectionHandler::ConnectionHandler(int fd) : fd(fd) {
    assert(!this->m_thread.joinable());

    // creating thread that handles received messages
    this->m_thread = std::thread([this]( "this") { this->threadFunc(); });
    pthread_setname_np(this->m_thread.native_handle(), "ConnectionHandler");
}

ConnectionHandler::~ConnectionHandler() {
    this->stop();
}

void ConnectionHandler::stop() {
    if (this->m_thread.joinable()) {
        this->m_thread.join();
    }
}

void ConnectionHandler::threadFunc() {
    while (!this->m_terminate) {
        std::string msg = this->readMessage();
        std::cout << "Received message: " << msg << std::endl;
        this->sendMessage("Thank you for your message " + msg);
    }
}

std::string ConnectionHandler::readMessage() {
    std::string msg(1024, '\0');    // buffor with 1024 length which is filled with NULL character
    
    int readBytes = recv(this->fd, (void *)msg.data(), msg.size(), 0);
    if (readBytes < 1) {
        std::cout << "Error in readMessage, readBytes: " << readBytes << std::endl;
        return "";
    }

    return msg;
}

void ConnectionHandler::sendMessage(const std::string& msg) {
    int n = send(this->fd, msg.c_str(), msg.size(), 0);
    if (n != static_cast<int>(msg.size())) {
        std::cout << "Error while sending message, message size: " << msg.size() << " bytes sent: " << std::endl;
    }
}

void ConnectionHandler::terminate() {
    this->m_terminate = true;
}

main.cpp

代码语言:javascript
复制
#include "TCPServer.h"

int main() 
{
    // create server - it starts automatically in constructor
    TCPServer server;
    // server.stop(); // we could stop the server this way
    // wait for server thread to end
    server.join();
    return 0;
}

怎么可以做得更好?

  • 用一个线程池来处理客户端连接。 线程创建代价高昂,因此我们希望避免它
  • 用一个 SIGINT (ctrl-c) 处理程序调用 main.cpp 中的 TCPServer::stop()

附录:

1、代码: https://github.com/leoay/Notes/tree/master/%E4%B8%80%E4%B8%AAC%2B%2B%E5%A4%9A%E7%BA%BF%E7%A8%8BTCP%E6%9C%8D%E5%8A%A1Demo/code

2、原文:https://www.codeer.dev/blog/2020/07/21/cpp-multithreaded-tcp-server.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-04-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 leoay技术号 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档