首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >卡夫卡流: SessionWindowedSerde Vs TimeWindowedSerde。模糊隐值

卡夫卡流: SessionWindowedSerde Vs TimeWindowedSerde。模糊隐值
EN

Stack Overflow用户
提问于 2020-11-18 02:21:46
回答 1查看 273关注 0票数 0

我一直在下面的代码中获得‘歧义隐式值’消息。我尝试了几种方法(从我注释掉的几行代码中可以看出)。有什么办法解决这个问题吗?这是在Scala中。

代码语言:javascript
运行
复制
  def createTopology(conf: Config, properties: Properties): Topology = {
//    implicit val sessionSerde = Serde[WindowedSerdes.SessionWindowedSerde[String]]
//    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
    implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]

    val builder: StreamsBuilder = new StreamsBuilder()
    builder.stream("streams-plaintext-input")
        .groupBy((_, word) => word)
        .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
        .count()
        .toStream.to("streams-pipe-output")

    builder.build()

  }

编译器错误:

代码语言:javascript
运行
复制
Error:(52, 78) ambiguous implicit values:
 both method timeWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
 and method sessionWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
 match expected type org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]

Error:(52, 78) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]

Error:(52, 78) not enough arguments for method with: (implicit keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], implicit valueSerde: org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].
Unspecified value parameters keySerde, valueSerde.
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
EN

回答 1

Stack Overflow用户

发布于 2020-11-20 21:26:53

我只是通过添加导入来添加一些implicit,它编译:

代码语言:javascript
运行
复制
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.{Consumed, Produced}

import java.time.Duration
import java.util.Properties

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes.{Long, String}

def createTopology(conf: Config, properties: Properties): Topology = {
  // here we have two implicits to choose, I pick the sessionWindowedSerde because it was in your code
  // implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
  implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
  implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
  implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]

  val builder: StreamsBuilder = new StreamsBuilder()
  builder.stream("streams-plaintext-input")
    .groupBy((_, word) => word)
    .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
    .count()
    .toStream.to("streams-pipe-output")

  builder.build()
}

如果您看到一个错误:

模糊隐式值

这意味着在您的作用域中定义了多个implicit,以满足所需的类型。例如,对象org.apache.kafka.streams.scala.Serdes有两个隐式:

代码语言:javascript
运行
复制
implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] =
    new WindowedSerdes.TimeWindowedSerde[T](tSerde)

implicit def sessionWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.SessionWindowedSerde[T] =
    new WindowedSerdes.SessionWindowedSerde[T](tSerde)

其中TimeWindowedSerde扩展了Serdes.WrapperSerde<Windowed<T>>

代码语言:javascript
运行
复制
static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>

SessionWindowedSerde扩展了Serdes.WrapperSerde<Windowed<T>>

代码语言:javascript
运行
复制
static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>

它们都扩展了相同类型的Serdes.WrapperSerde<Windowed<T>>,并在行中:

代码语言:javascript
运行
复制
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]

根据with函数签名:

代码语言:javascript
运行
复制
def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
    ProducedJ.`with`(keySerde, valueSerde)

我们期望一些implicit值用于Serde[Windowed[String]],编译器不能从中选择一个,因为它们都是Serde[Windowed[String]]

因此,如果您只是尝试将它们添加到相同的范围中:

代码语言:javascript
运行
复制
implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]

你会看到

代码语言:javascript
运行
复制
ambiguous implicit values

再来一次。

结论:在导入大量implicits时要小心,最佳实践是只导入所需的implicits

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64886097

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档