我有3列的cassandra表。
id text,
value text,
mappings map<text,text>假设样本数据如下:
id | value | mappings
-----------------------------------------------
1ABC | xyz | {"a":"abc","b":"bcd"}在spark作业中,我为id 1ABC的b映射计算了一个新值作为HashMap Ex: "b":"xyz"(可以将map转换为JavaRDD)
如何使用cassandra java spark连接器将该值附加(覆盖)到表中?我看了一个关于如何处理CQL集合追加的this示例,但我似乎不知道如何在Java语言中做到这一点。
发布于 2020-02-06 02:08:33
如下所示解决了这个问题。
通过传递新参数或使用Spark会话中的参数来创建cassandra连接器。
import com.datastax.spark.connector.cql.CassandraConnector;
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf()); // or pass different values for spark.cassandra.connection.host, username and password
rdd.foreach(new VoidFunction<TestBean>() {
@Override
public void call(TestBean t) throws Exception {
final String id = t.getId();
final Map<String, String> mappings = t.getMappings();
boolean isUpdated = connector.withSessionDo(new AbstractFunction1<Session, Boolean>() {
@Override
public Boolean apply(Session v1) {
ResultSet updateResultSet = v1.execute(v1.prepare("update test set mappings = mappings + ? where id = ?")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.bind(mappings, id));
return updateResultSet.wasApplied();
}
});
}
});https://stackoverflow.com/questions/59976735
复制相似问题