前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用OpenSocket开发一个简单的高性能高并发HttpServer

用OpenSocket开发一个简单的高性能高并发HttpServer

原创
作者头像
linyouhappy
修改2023-07-20 17:37:51
3000
修改2023-07-20 17:37:51
举报
文章被收录于专栏:OpenLinyouOpenLinyou

OpenSocket是一个跨全平台的高性能网络并发库。

它使用了高性能IO,Linux和安卓用epoll,Win32用IOCP,iOS和Mac用kqueue,其他系统使用select。

本文用这种高性能socket库,设计开发一个简单的HttpServer。

为了开发方便,我们使用OpenThread作为线程库。使用OpenThread的Actor模式设计高并发HttpServer。

设计思路如下:

创建5条线程,1条线程封装成监听者Listener,另外4条线程封装成接收者Accepter。

监听者Listener负责监听socket连接事件,监听到socket新连接事件后,就把fd发给其中一个接收者Accepter;

接收者Accepter接收到socket的fd后,打开该socket连接,与客户端进行网络通信。

此简单的HttpServer接收到Http报文后,进行response一份Http报文,然后关闭socket完成Http短连接操作。

具体源码如下:

代码语言:C++
复制
#include <assert.h>
#include <map>
#include <set>
#include <memory>
#include <string.h>
#include "opensocket.h"
#include "open/openthread.h"
using namespace open;

const std::string TestServerIp_ = "0.0.0.0";
const int TestServerPort_ = 8888;

//msgType == 1
struct SocketProto : public OpenThreadProto
{
    std::shared_ptr<OpenSocketMsg> data_;
    static inline int ProtoType() { return 1; }
    virtual inline int protoType() const { return SocketProto::ProtoType(); }
};

//msgType == 2
struct RegisterProto : public OpenThreadProto
{
    int srcPid_;
    static inline int ProtoType() { return 2; }
    virtual inline int protoType() const { return RegisterProto::ProtoType(); }
    RegisterProto() :srcPid_(-1) {}
};

//msgType == 3
struct NewClientProto : public OpenThreadProto
{
    int accept_fd_;
    std::string addr_;
    static inline int ProtoType() { return 3; }
    virtual inline int protoType() const { return NewClientProto::ProtoType(); }
    NewClientProto() : accept_fd_(-1) {}
};

////////////App//////////////////////
class App
{
    static void SocketFunc(const OpenSocketMsg* msg)
    {
        if (!msg) return;
        if (msg->uid_ >= 0)
        {
            auto proto = std::shared_ptr<SocketProto>(new SocketProto);
            proto->srcPid_ = -1;
            proto->srcName_ = "OpenSocket";
            proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
            if (!OpenThread::Send((int)msg->uid_, proto))
                printf("SocketFunc dispatch faild pid = %lld\n", msg->uid_);
        }
        else
        {
            delete msg;
        }
    }
public:
    static App Instance_;
    OpenSocket openSocket_;
    App() { openSocket_.run(App::SocketFunc); }
};
App App::Instance_;

////////////Listener//////////////////////
class Listener : public OpenThreadWorker
{
    int listen_fd_;
    unsigned int balance_;
    std::set<int> setSlaveId_;
    std::vector<int> vectSlaveId_;
public:
    Listener(const std::string& name)
        :OpenThreadWorker(name),
        listen_fd_(-1)
    {
        balance_ = 0;
        registers(SocketProto::ProtoType(), (OpenThreadHandle)&Listener::onSocketProto);
        registers(RegisterProto::ProtoType(), (OpenThreadHandle)&Listener::onRegisterProto);
    }
    virtual ~Listener() {}
    virtual void onStart()
    {
        listen_fd_ = App::Instance_.openSocket_.listen((uintptr_t)pid(), TestServerIp_, TestServerPort_, 64);
        if (listen_fd_ < 0)
        {
            printf("Listener::onStart faild listen_fd_ = %d\n", listen_fd_);
            assert(false);
        }
        App::Instance_.openSocket_.start((uintptr_t)pid(), listen_fd_);
        printf("HTTP: %s:%d\n", TestServerIp_.c_str(), TestServerPort_);
    }
    void onRegisterProto(const RegisterProto& proto)
    {
        if (proto.srcPid_ >= 0)
        {
            if (setSlaveId_.find(proto.srcPid_) == setSlaveId_.end())
            {
                setSlaveId_.insert(proto.srcPid_);
                vectSlaveId_.push_back(proto.srcPid_);
                printf("Hello OpenSocket HttpServer, srcPid = %d\n", proto.srcPid_);
            }
        }
    }
    // new client socket dispatch to Accept
    void notifyToSlave(int accept_fd, const std::string& addr)
    {
        if (!vectSlaveId_.empty())
        {
            auto proto = std::shared_ptr<NewClientProto>(new NewClientProto);
            proto->accept_fd_ = accept_fd;
            proto->addr_ = addr;
            if (balance_ >= vectSlaveId_.size())
            {
                balance_ = 0;
            }
            int slaveId = vectSlaveId_[balance_++];
            if (OpenThread::Send(slaveId, proto))
            {
                return;
            }
            printf("Listener::notifyToSlave send faild pid = %d\n", slaveId);
        }
        App::Instance_.openSocket_.close(pid_, accept_fd);
    }
    void onSocketProto(const SocketProto& proto)
    {
        const auto& msg = proto.data_;
        switch (msg->type_)
        {
        case OpenSocket::ESocketAccept:
            // linsten new client socket
            notifyToSlave(msg->ud_, msg->data());
            printf("Listener::onSocket [%s]ESocketAccept:acceptFd = %d\n", ThreadName((int)msg->uid_).c_str(), msg->ud_);
            break;
        case OpenSocket::ESocketClose:
            break;
        case OpenSocket::ESocketError:
            printf("Listener::onSocket [%s]ESocketError:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketWarning:
            printf("Listener::onSocket [%s]ESocketWarning:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketOpen:
            break;
        case OpenSocket::ESocketUdp:
        case OpenSocket::ESocketData:
            assert(false);
            break;
        default:
            break;
        }
    }
};

////////////HttpRequest//////////////////////
struct HttpRequest
{
    int fd_;
    std::string addr_;

    std::string method_;
    std::string url_;

    int code_;
    int clen_;
    std::string head_;
    std::string body_;
    std::map<std::string, std::string> headers_;
    HttpRequest() :fd_(-1), code_(-1), clen_(-1) {}

    //GET /xx/xx HTTP/x.x
    bool parseHeader();
    bool pushData(const char* data, size_t size);
};

////////////Accepter//////////////////////
class Accepter : public OpenThreadWorker
{
    int listenId_;
    std::map<int, HttpRequest> mapClient_;
public:
    Accepter(const std::string& name)
        :OpenThreadWorker(name),
        listenId_(-1)
    {
        registers(SocketProto::ProtoType(), (OpenThreadHandle)&Accepter::onSocketProto);
        registers(NewClientProto::ProtoType(), (OpenThreadHandle)&Accepter::onNewClientProto);
    }
    virtual ~Accepter() {}
    virtual void onStart() 
    { 
        while (listenId_ < 0)
        {
            listenId_ = ThreadId("listener");
            OpenThread::Sleep(1000);
        }
        auto proto = std::shared_ptr<RegisterProto>(new RegisterProto);
        proto->srcPid_ = pid();
        if (OpenThread::Send(listenId_, proto))
            return;
        printf("Accepter::onStart send faild pid = %d\n", listenId_);
    }
    void onNewClientProto(const NewClientProto& proto)
    {
        int accept_fd = proto.accept_fd_;
        if (accept_fd >= 0)
        {
            auto iter = mapClient_.find(accept_fd);
            if (iter != mapClient_.end())
            {
                assert(false);
                mapClient_.erase(iter);
                App::Instance_.openSocket_.close(pid(), accept_fd);
                return;
            }
            auto& client = mapClient_[accept_fd];
            client.fd_ = accept_fd;
            client.addr_ = proto.addr_;
            App::Instance_.openSocket_.start(pid_, accept_fd);
        }
    }
    //GET /xx/xx HTTP/x.x
    void onReadHttp(const std::shared_ptr<OpenSocketMsg> msg)
    {
        auto iter = mapClient_.find(msg->fd_);
        if (iter == mapClient_.end())
        {
            App::Instance_.openSocket_.close(pid_, msg->fd_);
            return;
        }
        auto& request = iter->second;
        if (!request.pushData(msg->data(), msg->size()))
        {
            //Header too large.close connet.
            if (request.head_.size() > 1024)
                App::Instance_.openSocket_.close(pid_, msg->fd_);
            return;
        }
        printf("new client:url = %s\n", request.url_.c_str());
        std::string content;
        content.append("<div>It's work!</div><br/>" + request.addr_ + "request:" + request.url_);
        std::string buffer = "HTTP/1.1 200 OK\r\ncontent-length:" + std::to_string(content.size()) + "\r\n\r\n" + content;
        App::Instance_.openSocket_.send(msg->fd_, buffer.data(), (int)buffer.size());
    }
    virtual void onSocketProto(const SocketProto& proto)
    {
        const auto& msg = proto.data_;
        switch (msg->type_)
        {
        case OpenSocket::ESocketData:
            onReadHttp(msg);
            break;
        case OpenSocket::ESocketClose:
            mapClient_.erase(msg->fd_);
            break;
        case OpenSocket::ESocketError:
            mapClient_.erase(msg->fd_);
            printf("Accepter::onStart [%s]ESocketError:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketWarning:
            printf("Accepter::onStart [%s]ESocketWarning:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketOpen:
        {
            auto iter = mapClient_.find(msg->fd_);
            if (iter == mapClient_.end())
            {
                App::Instance_.openSocket_.close(pid_, msg->fd_);
                return;
            }
        }
            break;
        case OpenSocket::ESocketAccept:
        case OpenSocket::ESocketUdp:
            assert(false);
            break;
        default:
            break;
        }
    }
};
int main()
{
    printf("start server==>>\n");
    std::vector<OpenThreader*> vectServer = {
        new Listener("listener"),
        new Accepter("accepter1"),
        new Accepter("accepter2"),
        new Accepter("accepter3"),
        new Accepter("accepter4")
    };
    for (size_t i = 0; i < vectServer.size(); ++i)
        vectServer[i]->start();

    printf("wait close==>>\n");
    OpenThread::ThreadJoinAll();
    for (size_t i = 0; i < vectServer.size(); ++i)
        delete vectServer[i];
    vectServer.clear();
    
    printf("Pause\n");
    return getchar();
}


bool HttpRequest::parseHeader()
{
    if (!headers_.empty() || head_.size() < 12) return true;
    std::string line;
    const char* ptr = strstr(head_.c_str(), "\r\n");
    if (!ptr) return false;
    clen_ = -1;
    line.append(head_.c_str(), ptr - head_.c_str());

    int state = 0;
    method_.clear();
    url_.clear();
    for (size_t k = 0; k < line.size(); ++k)
    {
        if (state == 0)
        {
            if (line[k] != ' ')
            {
                method_.push_back(line[k]);
                continue;
            }
            state = 1;
            while (k < line.size() && line[k] == ' ') ++k;
            if (line[k] != ' ') --k;
        }
        else
        {
            if (line[k] != ' ')
            {
                url_.push_back(line[k]);
                continue;
            }
            break;
        }
    }

    line.clear();
    int k = -1;
    int j = -1;
    std::string key;
    std::string value;
    for (size_t i = ptr - head_.c_str() + 2; i < head_.size() - 1; i++)
    {
        if (head_[i] == '\r' && head_[i + 1] == '\n')
        {
            if (j > 0)
            {
                k = 0;
                while (k < line.size() && line[k] == ' ') ++k;
                while (k >= 0 && line.back() == ' ') line.pop_back();
                value = line.data() + j + 1;
                while (j >= 0 && line[j] == ' ') j--;
                key.clear();
                key.append(line.data(), j);
                for (size_t x = 0; x < key.size(); x++)
                    key[x] = std::tolower(key[x]);
                headers_[key] = value;
            }
            ++i;
            j = -1;
            line.clear();
            continue;
        }
        line.push_back(head_[i]);
        if (j < 0 && line.back() == ':')
        {
            j = (int)line.size() - 1;
        }
    }
    clen_ = std::atoi(headers_["content-length"].c_str());
    return true;
}

bool HttpRequest::pushData(const char* data, size_t size)
{
    if (code_ == -1)
    {
        head_.append(data, size);
        const char* ptr = strstr(head_.data(), "\r\n\r\n");
        if (!ptr) return false;
        code_ = 0;
        body_.append(ptr + 4);
        head_.resize(ptr - head_.data() + 2);
        if (!parseHeader()) return false;
    }
    else
    {
        body_.append(data, size);
    }
    if (clen_ >= 0)
    {
        if (clen_ == 0 && clen_ == body_.size())
        {
            return true;
        }
        if (clen_ >= body_.size())
        {
            body_.resize(clen_);
            return true;
        }
    }
    else if (body_.size() > 2)
    {
        if (body_[body_.size() - 2] == '\r' && body_.back() == '\n')
        {
            body_.pop_back();
            body_.pop_back();
            return true;
        }
    }
    return false;
}

编译和执行

请安装cmake工具,用cmake可以构建出VS或者XCode工程,就可以在vs或者xcode上编译运行。

源代码:https://github.com/OpenMiniServer/opensockethttps://github.com/OpenMiniServer/opensocket)

代码语言:txt
复制
#克隆项目
git clone https://github.com/OpenMiniServer/opensocket
cd ./opensocket
#创建build工程目录
mkdir build
cd build
cmake ..
#如果是win32,在该目录出现opensocket.sln,点击它就可以启动vs写代码调试
make
./httpserver

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 编译和执行
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档