版权声明:本文为王小雷原创文章,未经博主允许不得转载 https://blog.csdn.net/dream_an/article/details/81058073
本节主要内容:
Spark对Canssandra数据库数据的处理,通过Spark SQL对结构化数据进行数据分析。
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.1'
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.1'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.3.0'
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
// .config("spark.some.config.option", "some-value")
.config("spark.cassandra.connection.host", "192.168.56.110")
.config("spark.cassandra.auth.username", "busuanzi")
.config("spark.cassandra.auth.password", "busuanzi.org")
.config("spark.cassandra.connection.port", "9042")
.getOrCreate();
ds1.write()
.format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "busuanzi_org");
put("table", "top_n_url");
}
}).mode("append").save();
查询Canssandra数据库
Dataset<Row> ds = spark.read()
.format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>(){
{
put("keyspace", "busuanzi_org");
put("table", "top_n_url");
}
}).load();
ds.show();
将ds DataFrame注册为SQL临时视图
ds.createOrReplaceTempView("dsv");
Spark SQL去重
ds.select("username", "projects", "comment").distinct().show();