1.什么是RPC? RPC 的全称是 Remote Procedure Call,即远程过程调用。简单来说,它是一种技术,允许你像调用本地函数一样,去调用一个位于另一台机器上的服务(函数或方法),而无需关心底层的网络细节。
一个生动的比喻: 想象一下,你想知道某个遥远城市的天气。
在技术层面,RPC框架帮你隐藏了以下复杂性:
也就是有了rpc后我们不用自己去一点点的编写这些可以模板化的操作,提高了开发效率。
2.为什么要有RPC? RPC的出现是为了解决分布式系统中的通信问题。
总结:RPC的核心目标是让分布式系统下的服务间调用变得更简单、更直观,就像在本地一样。本质是一个第三方开发工具,我们可以利用它来开发一个rpc服务器或rpc客户端。
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev克隆仓库:
git clone https://github.com/apache/brpc.gitcd brpc/mkdir build && cd buildcmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6make && sudo make install客户端类
brpc::Controller:这是一个 “每次RPC调用”的上下文对象。它用于在客户端和服务器端传递额外的控制和信息。每个 RPC 调用都应该使用一个独立的 Controller 对象,它不能在多次调用间复用。
常用接口:
Failed() const: 判断RPC是否失败。ErrorCode() const: 获取错误码。ErrorText() const: 获取错误文本。brpc::Channel:这是 客户端用于与服务器通信的通道。它代表了一个到服务端集群的连接池。创建 Channel 开销较大,应该被长期复用。
常用接口:
Init(const char* server_addr_and_port, ...): 初始化连接到单个服务器。Init(const char* naming_service_url, ...): 初始化通过命名服务(如Nacos, Consul, DNS)发现的服务集群。YourService_Stub 类:由 protoc 编译器根据 .proto 文件自动生成的客户端存根类。它继承自 YourService,内部封装了一个 brpc::Channel,并提供了与服务器端方法一一对应的、更易用的同步/异步调用接口。
服务端类
brpc::Server:RPC 服务器的主体,负责注册服务、启动端口监听、管理内部线程等。
关键接口: AddService(google::protobuf::Service* service, brpc::ServiceOwnership ownership): 将你的服务实现注册到服务器。ownership 参数指定服务器是否接管服务的生命周期。Start(int port, ...): 在指定端口启动服务。RunUntilAskedToQuit(): 阻塞当前线程,直到收到退出信号(如Ctrl+C)。这是最常用的运行方式。Stop(...) 和 Join(): 优雅地停止服务器。配置类
brpc::ServerOptions:在启动 brpc::Server 时进行各种配置。
常用配置项:
idle_timeout_sec: 连接空闲超时时间。max_concurrency: 服务器最大并发度,用于限制同时处理的请求数。internal_port: 内部端口,用于获取服务器内置状态、监控。brpc::ChannelOptions:初始化 brpc::Channel 时的配置选项。
常用配置项:
timeout_ms: 全局超时时间。connection_type: 连接类型(单连接、连接池、短连接)。protocol: 使用的协议(默认为 baidu_std,还有 h2:grpc, http 等)。load_balancing_name: 负载均衡算法名。brpc::ClosureGuard:是一个 RAII(资源获取即初始化)包装器,它确保在作用域结束时,Closure 的 Run() 方法会被自动调用一次。这是防止内存泄漏和忘记回调的最佳实践。
创建流程
.proto 文件,定义服务和消息。.proto生成的 Service 类。main 函数中,创建 brpc::Server。server.AddService 注册你的服务实现。server.Start 和 server.RunUntilAskedToQuit。Init)一个长期复用的 brpc::Channel。Stub 类,传入 Channel 来创建存根。Controller, Request, Response。Stub 的同步或异步方法进行调用。使用protobuf工具解决数据序列化与反序列化问题,和rpc接口。
编写proto文件
syntax="proto3";//声明版本
package example;//声明命名空间
option cc_generic_services = true;
message EchoRequest//定义请求对象
{
string message = 1;//变量编号
}
message EchoResponse
{
string message = 1;
}
//定义rpc接口
service EchoService
{
rpc Echo(EchoRequest) returns (EchoResponse);//远程接口调用
}编译文件:protoc --cpp_out=./ main.proto
服务器:
#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"
class EchoServiceImpl : public example::EchoService
{
public:
void Echo(google::protobuf::RpcController* controller,
const ::example::EchoRequest* request,
::example::EchoResponse* response,
::google::protobuf::Closure* done) override final
{
brpc::ClosureGuard rpc_guard(done); //智能指针管理消息发送
std::cout<<"收到消息"<<request->message()<<std::endl;
std::string str = request->message() + "---响应";
response->set_message(str);
}
private:
};
int main(int argc, char *argv[])
{
//1.构建brpc服务
brpc::Server server;
//2.向brpc服务对象中新增EchoService服务
EchoServiceImpl echo_service;
int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
if(ret == -1)
{
std::cout<<"服务添加失败"<<std::endl;
return -1;
}
//3.启动服务
brpc::ServerOptions options;//服务器配置类
options.idle_timeout_sec=-1;//连接空闲超时时间,超时后关闭
options.num_threads = 1;//io线程数
ret = server.Start(7070,&options);
if(ret == -1)
{
std::cout<<"服务启动失败"<<std::endl;
return -2;
}
server.RunUntilAskedToQuit();//等待运行结束
return 0;
}客户端:
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"
void callback(brpc::Controller* cntl,example::EchoResponse* rsp)
{
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<example::EchoResponse> rsp_guard(rsp);
if(cntl->Failed()==true)
{
std::cout<<"调用失败"<<std::endl;
return;
}
std::cout<<"收到响应:"<<rsp_guard->message()<<std::endl;
return;
}
int main(int argc, char* argv[])
{
//1.构建信道连接服务器
brpc::ChannelOptions options;//客户端配置类
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
brpc::Channel channel;
int ret = channel.Init("127.0.0.1:7070",&options);
if(ret == -1)
{
std::cout<<"初始化信道失败"<<std::endl;
return -1;
}
example::EchoService_Stub stub(&channel);
example::EchoRequest req;
req.set_message("hello brpc");
brpc::Controller* cntl = new brpc::Controller();
example::EchoResponse* rsp = new example::EchoResponse();
auto clusure = google::protobuf::NewCallback(callback,cntl,rsp);
stub.Echo(cntl,&req,rsp,clusure);
std::cout<<"异步调用结束"<<std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}在做项目时通常都会用到很多工具,如果这些工具都用各自的日志输出,各式各样就很乱,所以这里关闭brpc的日志系统,统一用一个日志输出。
同一个服务的节点可能有多个,服务的种类同样会有多个,为了方便客户端的高效访问,我们把这样服务节点都管理起来,需要利用etcd服务,让所有提供服务的节点都注册到etcd数据库中,key=服务名称,val=服务的节点。
封装:
#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include "logger.hpp"
// 封装单个服务的信道类管理
class ServiceChannel
{
public:
using ChannelPtr = std::shared_ptr<brpc::Channel>;
using ptr = std::shared_ptr<ServiceChannel>;
ServiceChannel(const std::string &name)
: _server_name(name), _index(0)
{
}
void append(const std::string &host) // 服务上线
{
std::unique_lock lock(_mutex);
ChannelPtr clptr = std::make_shared<brpc::Channel>();
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
int ret = clptr->Init(host.c_str(), &options);
if (ret == -1)
{
LOG_ERR("初始化{}-{}信道失败", host, _server_name);
return;
}
_channels.push_back(clptr);
_hosts.insert(make_pair(host, clptr));
}
void remove(const std::string &host) // 服务下线
{
std::unique_lock lock(_mutex);
auto ret = _hosts.find(host);
if (ret == _hosts.end())
{
LOG_ERR("{}-{}服务不存在", host, _server_name);
return;
}
for (auto it = _channels.begin(); it != _channels.end(); it++)
{
if (*it == _hosts[host])
{
_channels.erase(it);
break;
}
}
_hosts.erase(host);
}
ChannelPtr choose() // 服务获取
{
std::unique_lock lock(_mutex);
if (_channels.size() == 0)
{
LOG_INFO("当前没有提供 {} 服务的节点", _server_name);
return ChannelPtr();
}
int32_t index = _index;
_index = _index++ % _channels.size();
return _channels[index];
}
private:
std::mutex _mutex;
int32_t _index; // 轮询计数
std::string _server_name; // 服务名称
std::vector<ChannelPtr> _channels;
std::unordered_map<std::string, ChannelPtr> _hosts; // 主机地址与信道关系
};
class ServiceManage
{
public:
using ptr = std::shared_ptr<ServiceManage>;
ServiceChannel::ChannelPtr choose(const std::string &service_name)
{
std::unique_lock lock(_mutex);
if (_services.find(service_name) == _services.end())
{
LOG_INFO("当前没有{}服务的节点", service_name);
return ServiceChannel::ChannelPtr();
}
return _services[service_name]->choose();
}
void declared(const std::string &service_name)
{
std::unique_lock lock(_mutex);
_follow_services.insert(service_name);
}
// 服务上线处理
void onServiceOnline(const std::string &service_instance, const std::string &host)
{
std::string service_name = getServiceName(service_instance);
std::unique_lock lock(_mutex);
if (_follow_services.find(service_name) != _follow_services.end())
{
LOG_INFO("{} 此服务不关心", service_name);
return;
}
if (_services.find(service_name) == _services.end())
{
ServiceChannel::ptr sc(std::make_shared<ServiceChannel>(service_name));
sc->append(host);
_services.insert(make_pair(service_name, sc));
}
else
{
ServiceChannel::ptr sc = _services[service_name];
sc->append(host);
}
LOG_INFO("{}-{} 服务节点已上线", service_name, host);
}
// 服务下线处理
void onServiceOffline(const std::string &service_instance, const std::string &host)
{
std::string service_name = getServiceName(service_instance);
std::unique_lock lock(_mutex);
if (_services.find(service_name) == _services.end())
{
LOG_WARN("{} 该服务不存在",host);
return;
}
ServiceChannel::ptr sc = _services[service_name];
sc->remove(host);
LOG_INFO("{}-{} 服务节点已删除",service_name,host);
}
std::string getServiceName(const std::string &service_instance)
{
auto pos = service_instance.find_last_of('/');
if (pos == std::string::npos)
return service_instance;
return service_instance.substr(0, pos);
}
private:
std::mutex _mutex;
std::unordered_set<std::string> _follow_services;
std::unordered_map<std::string, ServiceChannel::ptr> _services;
};关于etcd的使用:Linux系统C++开发环境搭建工具(二)—— etcd 使用指南 spdlog的使用:Linux系统C++开发环境搭建工具(一)—— gflags/gtest/spdlog 使用指南
etcd.hpp:
#pragma once
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <etcd/SyncClient.hpp>
#include <memory>
#include <functional>
class Registry
{
public:
using ptr=std::shared_ptr<Registry>;
Registry(const std::string& host)
:_client(std::make_shared<etcd::Client>(host))
,_keep_alive(_client->leasekeepalive(3).get())
,_lease_id(_keep_alive->Lease())
{}
bool registry(const std::string& key,const std::string& val)
{
auto rsp = _client->put(key,val,_lease_id).get();
if(rsp.is_ok()==false)
{
//替换为日志输出更为适宜
std::cout<<key<<": "<<val<<"注册失败"<<std::endl;
return false;
}
else return true;
}
~Registry()
{
_keep_alive->Cancel();
}
private:
std::shared_ptr<etcd::Client> _client;
std::shared_ptr<etcd::KeepAlive> _keep_alive;
uint64_t _lease_id;
};
class Discovery
{
public:
using NotifyCallback = std::function<void(std::string, std::string)>;
using ptr = std::shared_ptr<Discovery>;
Discovery(const std::string& host
,const std::string& basedir
,const NotifyCallback& put_cb
,const NotifyCallback& del_cb)
:
_client(std::make_shared<etcd::Client>(host))
,_put_cb(put_cb)
,_del_cb(del_cb)
{
auto rsp = _client->ls(basedir).get();
if(rsp.is_ok()==false)
{
std::cout<<"获取数据失败 "<<rsp.error_message()<<std::endl;
}
int sz = rsp.keys().size();
for(int i=0;i<sz;i++)
{
if(_put_cb)
_put_cb(rsp.key(i),rsp.value(i).as_string());
}
_watcher = std::make_shared<etcd::Watcher>(*_client,basedir
,std::bind(&Discovery::callback,this, std::placeholders::_1),true);
}
void callback(const etcd::Response& rsp)
{
if(rsp.is_ok()==false)
{
std::cout<<"错误事件通知"<<rsp.error_message()<<std::endl;
return;
}
for(auto const& ev: rsp.events())
{
if (ev.event_type() == etcd::Event::EventType::PUT) {
if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());
std::cout << "上线服务:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;
}else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
std::cout << "下线服务:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;
}
}
}
~Discovery()
{
_watcher->Cancel();
}
private:
NotifyCallback _put_cb;
NotifyCallback _del_cb;
std::shared_ptr<etcd::Client> _client;
std::shared_ptr<etcd::Watcher> _watcher;
};测试: registry.cc(服务器)
#include "etcd.hpp"
#include <gflags/gflags.h>
#include <thread>
#include <logger.hpp>
#include "main.pb.h"
#include <brpc/server.h>
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试;true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(instance_name, "/echo/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:7070", "当前实例的外部访问地址");
DEFINE_int32(listen_port,7070,"Rpc服务监听端口");
class EchoServiceImpl : public example::EchoService
{
public:
void Echo(google::protobuf::RpcController* controller,
const ::example::EchoRequest* request,
::example::EchoResponse* response,
::google::protobuf::Closure* done) override final
{
brpc::ClosureGuard rpc_guard(done); //智能指针管理消息发送
std::cout<<"收到消息"<<request->message()<<std::endl;
std::string str = request->message() + "---响应";
response->set_message(str);
}
private:
};
int main(int argc, char *argv[])
{
google::ParseCommandLineFlags(&argc, &argv, true);
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
//关闭brpc的默认日志输出
logging::LoggingSettings settings;
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(settings);
//1.构建服务器
brpc::Server server;
//2.向服务器对象中新增EchoService服务
EchoServiceImpl echo_service;
int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
if(ret == -1)
{
std::cout<<"服务添加失败"<<std::endl;
return -1;
}
//3.启动服务
brpc::ServerOptions options;//服务器配置类
options.idle_timeout_sec=-1;//连接空闲超时时间,超时后关闭
options.num_threads = 1;//io线程数
ret = server.Start(7070,&options);
if(ret == -1)
{
std::cout<<"服务启动失败"<<std::endl;
return -2;
}
Registry::ptr rclient = std::make_shared<Registry>(FLAGS_etcd_host);
rclient->registry(FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);
LOG_DEBUG("服务启动: {}", FLAGS_base_service + FLAGS_instance_name);
server.RunUntilAskedToQuit();//等待运行结束
return 0;
}discovery.cc(客户端)
#include <gflags/gflags.h>
#include <thread>
#include <logger.hpp>
#include <brpc/server.h>
#include "main.pb.h"
#include "etcd.hpp"
#include "channel.hpp"
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试;true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(call_service, "/service/echo", "当前实例名称");
int main(int argc, char *argv[])
{
google::ParseCommandLineFlags(&argc, &argv, true);
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
// 构造rpc管理对象
ServiceManage::ptr sm(std::make_shared<ServiceManage>());
//sm->declared(FLAGS_call_service);
auto put_cb = std::bind(&ServiceManage::onServiceOnline, sm.get(), std::placeholders::_1, std::placeholders::_2);
auto del_cb = std::bind(&ServiceManage::onServiceOffline, sm.get(), std::placeholders::_1, std::placeholders::_2);
// 构建服务发现对象
Discovery::ptr dclient = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb);
while (1)
{
//Discovery::ptr dclient = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb);
// 通过rpc管理对象获取信道
std::shared_ptr<brpc::Channel> channel = sm->choose(FLAGS_call_service);
if (!channel)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
example::EchoService_Stub stub(channel.get());
example::EchoRequest req;
req.set_message("hello brpc");
// 发起调用
brpc::Controller *cntl = new brpc::Controller();
example::EchoResponse *rsp = new example::EchoResponse();
stub.Echo(cntl, &req, rsp, nullptr);
if (cntl->Failed() == true)
{
delete cntl;
delete rsp;
LOG_ERR("调用失败");
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
std::cout << "收到响应 " << rsp->message() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}非常感谢您能耐心读完这篇文章。倘若您从中有所收获,还望多多支持呀!