首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >NoHostAvailableException -火花-卡桑德拉-连接器

NoHostAvailableException -火花-卡桑德拉-连接器
EN

Stack Overflow用户
提问于 2018-06-07 22:05:31
回答 2查看 86关注 0票数 0

我在2.3.0版本中使用spark-cassandra-connector_2.11。运行最新的Spark 2.3.0,尝试从Cassandra (3.0.11.1485) DSE (5.0.5)读取数据。

没有问题的示例阅读:

代码语言:javascript
运行
复制
 JavaRDD<Customer> result = javaFunctions(sc).cassandraTable(MyKeyspaceName, "customers", mapRowTo(Customer.class));

另一个正常工作的读取:如果我从单元测试执行-单线程-单读取,如下所示。

代码语言:javascript
运行
复制
cassandraConnector.withSessionDo(new AbstractFunction1<Session, Void>() {
                @Override
                public Void apply(Session session) {
                   //Read something from Cassandra via Session - Works Fine Here as well.
                }
            });

示例读取(mapPartitions+withSessionDo)有问题的代码:

代码语言:javascript
运行
复制
CassandraConnector cassandraConnector = CassandraConnector.apply(sc.getConf());

SomeSparkRDD.mapPartitions((FlatMapFunction<Iterator<Customer>, CustomerEx>) customerIterator ->
            cassandraConnector.withSessionDo(new AbstractFunction1<Session, Iterator<CustomerEx>>() {
                @Override
                public Iterator<CustomerEx> apply(Session session) {
                    return asStream(customerIterator, false)
                            .map(customer -> fetchDataViaSession(customer, session))
                            .filter(x -> x != null)
                            .iterator();
                }
            }));


public static <T> Stream<T> asStream(Iterator<T> sourceIterator, boolean parallel) {
    Iterable<T> iterable = () -> sourceIterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}

map(customer -> fetchDataViaSession(customer,session))的一些迭代可以工作,但NoHostAvailableException的大多数迭代都失败了。

尝试了各种设置,但没有成功:

代码语言:javascript
运行
复制
spark.cassandra.connection.connections_per_executor_max
spark.cassandra.connection.keep_alive_ms
spark.cassandra.input.fetch.size_in_rows
spark.cassandra.input.split.size_in_mb

Also Tried to reduce the number of Partitions of the RDD which I do mapPartitions+withSessionDo on.
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50743308

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档