首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >如何使用Apache删除Cassandra中的一行?

如何使用Apache删除Cassandra中的一行?
EN

Stack Overflow用户
提问于 2019-11-09 04:08:12
回答 2查看 325关注 0票数 0

在Apache中,很容易通过CassandraSink向Cassandra插入一行。但我找不到办法删除一排。

我也试图写自定义接收器,但我得到了NotSerializableException。如何构造删除操作的代码?

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MyCassandraSink implements SinkFunction<String> {

    private Cluster cluster = Cluster.builder()
            .addContactPoint("127.0.0.1")
            .build();

    private Session cassandra = cluster.connect("mykeyspace");

    @Override
    public void invoke(String value, Context context) throws Exception {
        cassandra.execute("SOME DELETE QUERY");
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [com.datastax.driver.core.SessionManager@3b0fe47a] is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
    at com.meshkan.streaming.entry.EventListener.main(EventListener.java:42)
Caused by: java.io.NotSerializableException: com.datastax.driver.core.SessionManager
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.concurrent.CopyOnWriteArrayList.writeObject(CopyOnWriteArrayList.java:973)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 9 more
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-11-10 07:34:36

要实现自己的insert-vs-delete逻辑,请创建扩展CassandraSinkBase的接收器,并实现send()方法。请参阅AbstractCassandraTupleSink作为这样做的一个示例。请注意,CassandraSinkBase是如何通过使其短暂,并在open()调用中创建它来避免卡桑德拉客户端的序列化问题的。

票数 0
EN

Stack Overflow用户

发布于 2020-03-22 01:35:41

我找到了解决办法,但我不喜欢。CassandraPojoInputFormat既可用于删除行,也可用于更新行。(我还将它用于SELECT,这个名称意味着它似乎是用来做什么的。)

它起作用的事实是,IMHO,它是唯一的救赎美德。在我找到一个优雅的解决方案之前,我一直在使用它。我还在找..。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CassandraPojoInputFormat<MyThingyConnector> myThingyCassandraPojoInputFormat =
new CassandraPojoInputFormat<MyThingyConnector>(
"DELETE FROM " + dbKeyspace + ".<table_name> <where clause>",
clusterBuilder,
MyThingyConnector.class);

myThingyCassandraPojoInputFormat.configure(null);
myThingyCassandraPojoInputFormat.open(cassandraInputSplit);
myThingyCassandraPojoInputFormat.close();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58779124

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文