前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >GraphQL实践10——Netflix Dgs Graphql异步订阅

GraphQL实践10——Netflix Dgs Graphql异步订阅

原创
作者头像
F嘉阳
发布2022-11-02 08:54:59
5310
发布2022-11-02 08:54:59
举报
文章被收录于专栏:graphqlgraphql

介绍

GraphQL订阅模式支持服务端主动向客户端推送数据通知,避免客户端轮训

DGS Subscriptions

新增依赖

此处使用的是SpringBoot Web组件,故引入websockets依赖

如果使用的是WebFlux则要引入对应的dgs-webflux

代码语言:html
复制
		  <dependency>
            <groupId>com.netflix.graphql.dgs</groupId>
            <artifactId>graphql-dgs-subscriptions-websockets-autoconfigure</artifactId>
        </dependency>

异步通知解析

此处定义为当新增一个Actor就给所有订阅者发异步通知

代码语言:java
复制
@Slf4j
@DgsComponent
@RequiredArgsConstructor
public class ActorDataFetcher {

	private final ActorAssembler actorAssembler;

	private FluxSink<Actor> actorStream;

	private ConnectableFlux<Actor> actorPublisher;

	@PostConstruct
	private void createActorPublisher() {
		Flux<Actor> publisher = Flux.create(emitter -> {
			actorStream = emitter;
		});

		actorPublisher = publisher.publish();
		actorPublisher.connect();
	}

	@DgsMutation
	public Actor addActor(@InputArgument SubmitActor actor) {
		Actor actorEntity = actorAssembler.convert(actor);
		actorEntity.setActorId(10);
		actorEntity.setLastUpdate(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_PATTERN));
		actorStream.next(actorEntity);
		log.info("服务端创建演员:{}", actor);
		return actorEntity;
	}

	/**
	 * 异步通知
	 *
	 * @return
	 */
	@DgsSubscription
	public Publisher<Actor> actorAdded() {
		return actorPublisher;
	}
}

单元测试

由于异步通知无法用浏览器自带的控制台测试,只能通过GraphQL客户端订阅后接收异步通知,此处参考官方最佳实践,使用单元测试模拟

代码语言:java
复制
@Slf4j
@SpringBootTest
class ActorDataFetcherTest {

	@Autowired
	private DgsQueryExecutor dgsQueryExecutor;

	@Test
	void addActor() {

		GraphQLQueryRequest graphQLQueryRequest = new GraphQLQueryRequest(
				AddActorGraphQLQuery.newRequest()
						.actor(
								SubmitActor.newBuilder()
										.firstName("fu")
										.lastName("jiayang")
										.build())
						.build(),
				new AddActorProjectionRoot()
						.actorId()
						.firstName()
						.lastUpdate());
		dgsQueryExecutor.execute(graphQLQueryRequest.serialize());
	}

	@Test
	void actorAdded() {
		ExecutionResult executionResult = dgsQueryExecutor.execute("""
				subscription {
				  actorAdded {
				    actorId
				    firstName
				    lastName
				    lastUpdate
				  }
				}
				""");
		Publisher<ExecutionResult> reviewPublisher = executionResult.getData();
		List<Actor> actors = new CopyOnWriteArrayList<>();

		reviewPublisher.subscribe(new Subscriber<>() {
			@Override
			public void onSubscribe(Subscription s) {
				s.request(2);
			}

			@Override
			public void onNext(ExecutionResult executionResult) {
				if (executionResult.getErrors().size() > 0) {
					executionResult.getErrors().forEach(error -> log.error(error.toString()));
				}
				Map<String, Object> actorResult = executionResult.getData();
				Actor actor = new ObjectMapper().convertValue(actorResult.get("actorAdded"), Actor.class);
             log.info("客户端监听到演员新增:{}",actor);
				actors.add(actor);
			}

			@Override
			public void onError(Throwable t) {
				t.printStackTrace();
			}

			@Override
			public void onComplete() {
			}
		});

		addActor();
		addActor();

		assertThat(actors.size()).isEqualTo(2);
	}
}

从日志输出可以看到客户端成功接收异步通知

image-20221028214837298
image-20221028214837298

总结

订阅模式属于GraphQL特色功能,类似消息队列实现方式,减少客户端对普通查询接口轮训性能开销

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • DGS Subscriptions
    • 新增依赖
      • 异步通知解析
      • 单元测试
      • 总结
      相关产品与服务
      消息队列 CMQ
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档