基于另一篇文章here,我正在阅读关于执行异步插入以提高性能的AsyncCassandraOperations。但是我找不到很多关于google或spring数据文档的帮助。
以前,我使用Cassandra Repository进行所有的数据提取和插入/更新,我发现这非常慢。根据推荐,我现在只使用AsyncCassandraOperations进行插入操作,但它不允许我这样做。我遇到了'org.springframework.data.cassandra.core.AsyncCassandraOperations‘类型的required bean错误。
请问使用AsyncCassandraOperations的正确方法是什么?
@Autowired private MyRepository repository_name;
@Autowired private AsyncCassandraOperations acops;
public void persist(List<POJO> l_POJO)
{
System.out.println("Enter Persist: "+new java.util.Date());
List<l_POJO> l_POJO_stale = repository_name.findBycol1AndStale("sample",false);
l_POJO_stale.forEach(s -> s.setStale(true));
l_POJO_stale.forEach(s -> acops.update(s));
try
{
acops.insert(l_POJO);
}
catch (Exception e)
{
System.out.println("Error in persisting new data");
}
}
发布于 2018-10-09 09:15:46
不知道是否使用了spring boot,如果是,AsyncCassandraOperations(AsyncCassandraTemplate是实现类)应该会自动创建。如果错误显示您需要一个AsyncCassandraOperations bean,那么直接的方法是创建一个,如下所示。
@Bean
AsyncCassandraTemplate asyncCassandraTemplate(Session session) {
return new AsyncCassandraTemplate(session);
}
由于您使用的是Spring data Repository接口,因此您还可以使用ReactiveCrudRepository
接口来更新实体对象或将实体对象插入到Cassandra (显示在this spring data example project中),作为使用AsyncCassandraTemplate
类的替代方法。
在使用ReactiveCrudRepository
并考虑到您想要做什么的情况下,您的代码需要进行以下更改。
WRRepository.findByCol1AndCol2AndCol3(String, boolean, String)
的返回类型从List<WRpojo>
改为Flux<WRpojo>
,以便充分利用反应式functionality.persist(List<WRpojo>)
的返回类型从boolean改为Mono<Void>
,使结果也是非阻塞的。persist(List<WRpojo>)
更改为以下内容。 public Mono<Void> persist(List<WRpojo> l_wr) {
Flux<WRpojo> l_old_wr = objWRRepository.findByCol1AndCol2AndCol3("1", false, "2").doOnNext(s -> s.setStale(true));
return objWRRepository.saveAll(l_old_wr).thenMany(objWRRepository.saveAll(l_wr)).then();
}
在反应式编程中,我们基本上不阻塞任何代码,这意味着返回的Mono<Void>
应该在下游的某个地方订阅,如果你确实想阻塞并等待所有操作完成,可以在Mono<Void>
上调用block()
,不推荐这样做。
https://stackoverflow.com/questions/52711538
复制相似问题