我使用Spark structured streaming处理从Kafka读取的记录。以下是我想要达到的目标:
(a)每条记录都是Tuple2类型的(Timestamp, DeviceId)。
(b)我已经创建了一个静态的Dataset[DeviceId],它包含了期望在Kafka流中看到的所有有效设备in (类型为DeviceId)的集合。
(c)我需要编写一个Spark structured streaming查询
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get
所以我有一个KStream,它被反序列化为POJO,如下所示 public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
} 下面是Global Ktable记录的样子 public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}
我使用的是Alpakka,下面是我的玩具示例: val system = ActorSystem("system")
implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)
implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
implicit val ec: ExecutionContextExecutor = system.dispatcher
val log =