文章来源:www.study-java.cn/#/homework
大家可以到网站看相关文章:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0-preview</version>
</dependency>
执行的过程中,出现了很多次的jar冲突,我这边和Hadoop-common 以及 hadoop-dfs有依赖冲突,具体的根据自己实际情况去除
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
<version>2.9.2</version>
</dependency>
其中有一个错误需要将spark移到pom文件的最前面,具体原因未知:信息如下
javax.servlet.FilterRegistration"'s signer information does not match signer information
将 spark 移到最前面就搞定了。。
public class SparkMySQL {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.master("local[4]")
.getOrCreate();
SQLContext sqlContext = new SQLContext(sparkSession);
//读取mysql数据
readMySQL(sqlContext);
//停止SparkContext
sparkSession.stop();
}
public static void readMySQL(SQLContext sqlContext) {
String url = "jdbc:mysql://IP/DB?useSSL=false";
String table = "qc_work_order";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "pwd");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
Dataset<Row> jdbcDF = sqlContext.read().jdbc(url, table, connectionProperties)
.select("*")
.where("qc_user_id = '12'").limit(100).orderBy(new Column("id").desc());
jdbcDF.show(50);
}
}
这里我们可直接执行,控制台会直接输出我们查询的数据。
使用IDEA可以直接在控制台查看查询的数据,我们也可以将Java打包成Jar,通过spark-submit执行
这里要带上驱动路径,不然会报错找不到MySQL的驱动
./spark-submit --class 'package.SparkMySQL' --jar /mysql-connection.jar /SparkMySQL.jar 2>&1
和读取数据库有很大的不同,写入数据需要创建DataFrame,也就是createDataFrame方法, 其参数有多种形式JavaRDD<Row>,List<Row> rows,RDD<?> rdd等等,我这里选择 List<Row>。该List存储的是每一行的值,structFields变量存储值对应的字段。mode方法指的是操作方式,append会在现在的数据基础上拼接,overwrite则会覆盖,并改变表的结构。
public class SparkMySQL {
static String url = "jdbc:mysql://IP/DB?useSSL=false";
static Properties connectionProperties = new Properties();
static {
connectionProperties.put("user", "root");
connectionProperties.put("password", "pwd");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
}
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.master("local[4]")
.getOrCreate();
SQLContext sqlContext = new SQLContext(sparkSession);
//读取mysql数据
// readMySQL(sqlContext);
writeMySQL(sqlContext);
//停止sparkSession
sparkSession.stop();
}
public static void writeMySQL(SQLContext sqlContext) {
String table = "qc_user";
List<Row> list = new ArrayList<>();
Row row = RowFactory.create(20, "c3p0", "张琰菲", 2);
list.add(row);
List structFields = new ArrayList();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("user_id", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("user_name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("rule", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(structFields);
//overwrite 会把字段覆盖掉
sqlContext.createDataFrame(list, structType).write().mode("append").jdbc(url, table, connectionProperties);
}
}