在ForeachBatch Function Structured Straming中,我想创建在微型批次中接收的数据帧的临时视图 func(tabdf, epoch_id):
tabaDf.createOrReplaceView("taba") 但是我得到了下面的错误: org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableExc
我正在尝试更改数据帧的模式。每次我有一个字符串类型的列时,我想把它的类型改为VarcharType( max ),其中max是该列中字符串的最大长度。我写了下面的代码。(我想稍后将数据帧导出到sql server,并且我不想让nvarchar出现在sql server中,所以我尝试将其限制在spark端) val df = spark.sql(s"SELECT * FROM $tableName")
var l : List [StructField] = List()
val schema = df.schema
schema.fields.foreach(x =>
在我们的项目中,我使用结构化流+ Kafka进行实时数据分析。我用的是火花2.2,卡夫卡0.10.2。
在应用程序启动时,在从检查点恢复流查询时,我面临一个问题。由于有来自单个kafka流点的多个流查询,而且每个流查询都有不同的校验点目录。因此,在作业失败的情况下,当我们重新启动作业时,会有一些流查询无法从检查点位置恢复,从而抛出读取增量文件的错误异常。以下是日志:
Job aborted due to stage failure: Task 2 in stage 13.0 failed 4 times, most recent failure: Lost task 2.3 in stage
我正在创建一个使用火花SQL (数据帧)和火花流的演示。我不是火花专家,所以我需要一些帮助!
我从一个数据库加载了大约100万个对象到spark Dataframe,我执行SQL查询来匹配一些字段和来自spark streaming的实时数据。
例如,
SELECT *
FROM Person
WHERE Person.name='stream.name' AND Person.age='stream.age' AND ... etc
stream.xxx是我从spark streaming RDD中提取出来的一个java字符串。
现在,问题是,对于具有100万
我试图用火花-卡桑德拉连接器连接卡桑德拉,但出现以下消息:
spark.version: 3.1.2
cassandra.connector.version: 3.1.0
Caused by: java.io.IOException: Failed to open native connection to Cassandra at {10.99.249.84:9042} :: org/apache/tinkerpop/gremlin/structure/io/BufferFactory
at com.datastax.spark.connector.cql.CassandraConnector
我使用以下代码将Spark-Streaming的输出存储到ElasticSearch。我想把火花流的输出映射到正确的名称i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)。但是,正如您目前看到的,它是映射在ES中,如_1或_2等。此外,我还想在ES中索引数据之前添加一些过滤器,即(if PlatFormName = "ubuntu" then index the data)。那我该怎么做呢?
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platf