我使用dataSet API,我有两种案例类。
case class Geo(country:Int, province:Int, city:Int, county:Int)
case class AntiFraudLog(
eventType: Int,
valid: Boolean
)
case class AntiFraudSession(fraudLogs: Seq[AntiFraudLog])
然后我生成了一个键/值对,它的值是一个case类。
val dataKeyValue: DataSet[(Long, AntiFraudLog)]
并尝试使
我在Scala (2.12.8) Apache (1.9.1)应用程序中使用case类。当我在Caused by: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V.下面运行代码时,我会得到以下异常
注意:我按照建议( )使用了默认构造函数,但在我的示例中不起作用。
这是完整的代码
package com.zignallabs
import org.apache.flink.api.scala._
/**
// Implements the program that reads from a Element
我已经从postgresql DB.then加载了一个规则表作为Flink表,读取kafka msg,并根据这些规则对msg进行分类。代码如下所示 val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.enableCheckpointing(5000)
val stenv=StreamTableEnvironment.create(senv)
val streamsource=senv.createInput(inputFormat)
stenv.registerDataS
我正在尝试制作一些scala函数,这些函数可以帮助flink、map和filter操作将错误重定向到死信队列。
然而,我正在与scala的类型擦除做斗争,这使我无法使它们成为通用的。下面mapWithDeadLetterQueue的实现不编译。
sealed trait ProcessingResult[T]
case class ProcessingSuccess[T,U](result: U) extends ProcessingResult[T]
case class ProcessingError[T: TypeInformation](errorMessage: String, ex
Flink SQL> INSERT INTO es_sink SELECT 'hello';
[INFO] Submitting SQL update statement to the cluster...
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.tabl
我正在尝试用case类阅读avro kafka的Scala主题:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaSourceTopic = "device_logs"
val kafkaBrokers = "localhost:9092"
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.set
有时会引发此错误,并向下查看任务管理器。我使用了org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode WARN org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer - Falling back to default Kryo serializer because Chill serializer couldn't be found.
java.lang.reflect.InvocationTargetExc
UDF的整个scala项目都在这里: Flink_SQL_Client_UDF/Scala_fixed/ 我注册udf的操作如下所示: ①mvn scala:compile package
②cp table_api-1.0-SNAPSHOT.jar $FLINK_HOME/lib
③add the following sentence into $FLINK_HOME/conf/flink-conf.yaml
flink.execution.jars: $FLINK_HOME/lib/table_api-1.0-SNAPSHOT.jar
④create temporary function
我才刚开始呢。我编写了以下代码,得到了“数据源的输出导致错误:无法读取用户代码包装器”错误
我做错什么了吗?
版本: Flink v 0.9.1 (hadoop 1)不使用hadoop:本地执行shell: scala shell
代码:
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("/home/ashish/Downloads/spark/synop.201501.csv"
val data_split = text.flatMap{_.split(
我正在使用flink和kafka,我得到了这个错误。
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:32)
at kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequest
当我运行flink工作时,我面临着这个问题。
Registration name clash. KvState with name 'XXXX' has already been registered by another operator (fab4c54085fa3ee85a6e1bb1062c20af).
异常:
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the
我尝试在Google上Shell中的ssh控制台中导入这个库:
import org.apache.flink.connector.kafka.source.KafkaSource
我以前安装过flink:
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
tar -xzf flink-1.14.4-bin-scala_2.11.tgz
但是,当我运行导入行时,我会得到以下错误:
<console>:81: error: object kafka is not a mem
我使用双倍创建了这个示例程序
在IDE中运行时出现以下错误
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main"
我正在尝试写一些输出到S3使用电子病历与Flink。我使用的是Scala 2.11.7、Flink 1.3.2和EMR 5.11。但是,我得到了以下错误:
java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
at org.apache.fl
我开始在Flink中训练一个多元线性回归算法。我正在跟踪可怕的和。我正在使用齐柏林飞艇来开发这个代码。
如果我从CSV文件加载数据:
//Read the file:
val data = benv.readCsvFile[(Int, Double, Double, Double)]("/.../quake.csv")
val mapped = data.map {x => new org.apache.flink.ml.common.LabeledVector (x._4, org.apache.flink.ml.math.DenseVector(x._1,x._2,x
我正在使用Linux中的Docker运行一个带有单个节点的Flink独立集群。我已经在Flink 1.10.0和JDK8的生产环境中运行了一段时间的前一个版本,我能够让S3在那里正常运行。现在,我正在尝试更新到一个较新的版本,使用本地S3实现在我的开发机器上运行Docker。不管我怎么尝试,这个错误总是弹出: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. 看起来S3方案没有被映
我最近开始“玩”Apache Flink。我已经组装了一个小应用程序来开始测试框架等等。当我尝试序列化一个普通的POJO类时,我现在遇到了一个问题: @Getter
@ToString
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public final class Species {
private String name;
private List<String> abilities;
} 不知何故,我可以从堆栈跟踪看出List类型不能序列化,但根据Flink的文档,情况不应该是这样的。这是堆栈
我要使用apache运行简单的worcount示例。使用$SPARK_HOME/jars中的本地jar文件,它正确运行,但使用maven依赖于它的错误:
java.lang.NoSuchMethodError: org.apache.hadoop.fs.FileSystem$Statistics.getThreadStatistics()Lorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsData;
at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$1$$anonfun$apply$m
所以我用它安装了一个样例Flink项目。
我正在尝试使用这个模板,它让我可以开始编写一个Flink摄取应用程序,而不必担心依赖关系,但它适得其反。当我尝试与sbt同步时,我发现找不到Flink (不是要说谎,但这部分对我来说甚至是模糊的)。我想知道是否有人知道如何让我的项目找到Flink。使用这项技术非常令人兴奋。
Error while importing sbt project:
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; support was removed in 8.0
[