本篇概览
为了突出重点,这里将几个关键的知识点提前给出:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | 该项目在GitHub上的主页 | |
git仓库地址(https) | 该项目源码的仓库地址,https协议 | |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
// gRPC服务,这是个在线商城的购物车服务
service CartService {
// 客户端流式:添加多个商品到购物车
rpc AddToCart (stream ProductOrder) returns (AddCartReply) {}
}
// 提交购物车时的产品信息
message ProductOrder {
// 商品ID
int32 productId = 1;
// 商品数量
int32 number = 2;
}
// 提交购物车返回结果的数据结构
message AddCartReply {
// 返回码
int32 code = 1;
// 描述信息
string message = 2;
}
// 使用springboot插件
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
// 作为gRPC服务提供方,需要用到此库
implementation 'net.devh:grpc-server-spring-boot-starter'
// 依赖自动生成源码的工程
implementation project(':grpc-lib')
// annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
annotationProcessor 'org.projectlombok:lombok'
}
spring:
application:
name: client-stream-server-side
# gRPC有关的配置,这里只需要配置服务端口号
grpc:
server:
port: 9900
package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService
@Slf4j
public class GrpcServerService extends CartServiceGrpc.CartServiceImplBase {
@Override
public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) {
// 返回匿名类,给上层框架使用
return new StreamObserver<ProductOrder>() {
// 记录处理产品的总量
private int totalCount = 0;
@Override
public void onNext(ProductOrder value) {
log.info("正在处理商品[{}],数量为[{}]",
value.getProductId(),
value.getNumber());
// 增加总量
totalCount += value.getNumber();
}
@Override
public void onError(Throwable t) {
log.error("添加购物车异常", t);
}
@Override
public void onCompleted() {
log.info("添加购物车完成,共计[{}]件商品", totalCount);
responseObserver.onNext(AddCartReply.newBuilder()
.setCode(10000)
.setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount))
.build());
responseObserver.onCompleted();
}
};
}
}
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation project(':grpc-lib')
}
server:
port: 8082
spring:
application:
name: client-stream-client-side
grpc:
client:
# gRPC配置的名字,GrpcClient注解会用到
client-stream-server-side:
# gRPC服务端地址
address: 'static://127.0.0.1:9900'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver<T> extends StreamObserver<T> {
String getExtra();
}
package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class GrpcClientService {
@GrpcClient("client-stream-server-side")
private CartServiceGrpc.CartServiceStub cartServiceStub;
public String addToCart(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted会在另一个线程中被执行,
// ExtendResponseObserver继承自StreamObserver
ExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() {
String extraStr;
@Override
public String getExtra() {
return extraStr;
}
private int code;
private String message;
@Override
public void onNext(AddCartReply value) {
log.info("on next");
code = value.getCode();
message = value.getMessage();
}
@Override
public void onError(Throwable t) {
log.error("gRPC request error", t);
extraStr = "gRPC error, " + t.getMessage();
countDownLatch.countDown();
}
@Override
public void onCompleted() {
log.info("on complete");
extraStr = String.format("返回码[%d],返回信息:%s" , code, message);
countDownLatch.countDown();
}
};
// 远程调用,此时数据还没有给到服务端
StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);
for(int i=0; i<count; i++) {
// 发送一笔数据到服务端
requestObserver.onNext(build(101 + i, 1 + i));
}
// 客户端告诉服务端:数据已经发完了
requestObserver.onCompleted();
try {
// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
// await的超时时间设置为2秒
countDownLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch await error", e);
}
log.info("service finish");
// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
return responseObserver.getExtra();
}
/**
* 创建ProductOrder对象
* @param productId
* @param num
* @return
*/
private static ProductOrder build(int productId, int num) {
return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
}
}
package com.bolingcavalry.grpctutorials;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class GrpcClientController {
@Autowired
private GrpcClientService grpcClientService;
@RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "1") int count) {
return grpcClientService.addToCart(count);
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。