SnappyData v.0-5
我的目标是运行一个snappydata驱动程序来连接到远程服务器中的SnappyData。为此,我编写了一个Junit。然而,当我运行它时,我得到了一个错误,SparkContext是实例化的:
**java.lang.NoClassDefFoundError: org/eclipse/jetty/server/handler/GzipHandler**
at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:235)
at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:234)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:499)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:499)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:499)
我的pom.xml依赖项是:
<dependency>
<groupId>io.snappydata</groupId>
<artifactId>snappy-core_2.10</artifactId>
<version>0.5</version>
</dependency>
<dependency>
<groupId>io.snappydata</groupId>
<artifactId>snappy-cluster_2.10</artifactId>
<version>0.5</version>
<dependency>
@Test
public void testInsertDataFromCsv() throws Exception {
SparkConf conf = new SparkConf();
conf.setMaster("spark://snappy-lead-host:8090");
conf.setAppName("MySparkApp");
SparkContext sc = new SparkContext(conf);
SnappyContext snappyContext = new SnappyContext(sc);
String fileResource = "data.csv";
DataFrame dataFrame = snappyContext.read()
.format("com.databricks.spark.csv").option("header", "true")
.option("inferSchema", "true").load(fileResource);
JavaRDD<Row> row = dataFrame.javaRDD();
System.out.println(row.toDebugString());
dataFrame.write().insertInto("example_table_col");
}
发布于 2016-08-06 15:51:29
SnappyData集群的一个主要特性是长期运行的Spark executors (与数据存储JVM本身相同)。该程序的目的似乎是连接到现有集群,但它将尝试启动一组新的executor JVM进行处理,这是Spark的正常工作方式。SnappyData lead不支持该模式,因为它被设计为重用现有数据节点来执行。
Spark中的这一限制是由于集群中只能有一个已经在SnappyData lead节点上运行的驱动程序,因此无法创建新的驱动程序(我们确实打算在未来的版本中消除此限制)。因此,像"spark://..."
这样指向引导节点URL将不起作用。运行Spark作业需要这些可能的部署策略之一(除了使用JDBC/ODBC客户端直接提交SQL之外)。
注意:对于嵌入式模式,snappy-cluster
和snappy-core
依赖项都是必需的,而对于其他两种模式,只需要添加snappy-core
作为依赖项。
嵌入式模式执行:类似于JDBC/ODBC客户端,这里的执行发生在数据节点本身。这需要通过运行在活动引导节点上的作业服务器提交作业。程序必须实现SnappySQLJob/JavaSnappySQLJob
并使用REST API提交它(要么是提供的snappy-job.sh
脚本,要么是用于独立测试的this之类的东西)。详情请访问:http://snappydatainc.github.io/snappydata/jobs/
public Object runJavaJob(SnappyContext snappyContext, Config config) {
String fileResource = "data.csv";
DataFrame dataFrame = snappyContext.read()
.format("com.databricks.spark.csv").option("header", "true")
.option("inferSchema", "true").load(fileResource);
dataFrame.write().insertInto("example_table_col");
// for debugging
JavaRDD<Row> row = dataFrame.javaRDD();
return row.toDebugString();
// return Boolean.TRUE;
}
public JSparkJobValidation isValidJob(SnappyContext snappyContext,
Config config) {
return new JSparkJobValid();
}
本地拆分模式:在此模式下,执行集群是spark local
主机,因此与snappydata集群分离。这不会带来良好的性能,因为对于大多数查询,它必须从数据节点获取大量数据,但对于少量数据的功能测试,它应该是最容易使用的。使用master作为local
并将snappydata.store.locators
属性设置为指向定位器(请参阅前面的http://snappydatainc.github.io/snappydata/connectingToCluster/和链接)
@Test
public void testInsertDataFromCsv() throws Exception {
SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.setAppName("MySparkApp");
// below property can also be fetched with
// io.snappydata.Property.Locators().apply()
conf.set("snappydata.store.locators", "snappy-locator-host:10334");
SparkContext sc = new SparkContext(conf);
SnappyContext snappyContext = new SnappyContext(sc);
String fileResource = "data.csv";
DataFrame dataFrame = snappyContext.read()
.format("com.databricks.spark.csv").option("header", "true")
.option("inferSchema", "true").load(fileResource);
JavaRDD<Row> row = dataFrame.javaRDD();
System.out.println(row.toDebugString());
dataFrame.write().insertInto("example_table_col");
}
拆分模式执行:最后,执行集群可以是一个普通的Spark/Yarn/Mesos集群,它将像普通数据存储一样与snappydata集群对话。这就是其他产品的spark连接器的工作方式(比如Cassandra连接器,其中Cassandra与Spark集群是分开的)。它可以在与snappydata集群相同的节点上运行,以获得最佳性能,并且snappydata将努力确保执行被路由,以便只从本地表数据获取或插入数据。使用snappydata发行版本身或Apache Spark 1.6.x中的start-all.sh
启动一个单独的Spark集群(或Apache Spark docs中的Yarn/Mesos集群)。代码将与上面的本地拆分模式相同,其中master指向Spark/Yarn/Mesos master,而不是snappydata lead。有关更多详细信息,请参阅本地拆分模式中的链接。
https://stackoverflow.com/questions/38797985
复制