我使用星星之火-SQL2.4.1版本和Kafka 0.10 v。
当我试图通过消费者来消费数据的时候。即使在将"auto.offset.reset“设置为”最新“之后,也会出现以下错误
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {COMPANY_INBOUND-16=168}
at org.apache.kafka.clients.consumer.interna
我收到了错误信息
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' a
我使用Spark structured streaming处理从Kafka读取的记录。以下是我想要达到的目标:
(a)每条记录都是Tuple2类型的(Timestamp, DeviceId)。
(b)我已经创建了一个静态的Dataset[DeviceId],它包含了期望在Kafka流中看到的所有有效设备in (类型为DeviceId)的集合。
(c)我需要编写一个Spark structured streaming查询
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get
对于一个项目需求,我试图用火花示例中的火花构建FlumUtils示例。我能够创建jar文件。但是,在尝试执行时,我得到了以下错误。有人能帮我解决这个问题吗?
Error: application failed with exception
java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/FlumeUtils
at SimpleApp.main(SimpleApp.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Nat
在Spark2.2中,我使用了使用Kafka的星火流,如下所示:
val conf = new SparkConf()
.setAppName("Test")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, String](
"metadata.broker.list&
在本地文件系统中,我必须使用spark将数据从SQL服务器表加载到csv。下面是我使用的代码。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcSqlConnStr = "jdbc:sqlserver://HostIP:1433;databaseName=DBName;user=UserName;password=PWD;"
val jdbcDbTable = "dbo.Table"
val jdbcDF = sqlContext.read.format("jdbc"
我正在浏览下面的博客中的spark structured。
他首先使用下面的代码创建模式变量。
val cloudTrailSchema = new StructType()
.add("Records", ArrayType(new StructType()
.add("additionalEventData", StringType)
.add("apiVersion", StringType)
.add("awsRegion", StringType)
下面是实际的spark代码
val raw
我正在遵循“火花最终指南”()一书,下面的代码是在本地使用火花壳执行的
过程:在没有任何其他选项的情况下启动火花壳。
val static = spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger
我在运行无法连接到Kinesis数据源的胶水流式作业时遇到以下错误:
错误:
WARNING:root:StreamingQueryException caught. Retry number 10 ERROR:root:Exceeded maximuim number of retries in streaming interval,
exception thrown Parse yarn logs get error message:
StreamingQueryException: 'Error while Describe Streams\n=== Streaming Q