我一直在下面的代码中获得‘歧义隐式值’消息。我尝试了几种方法(从我注释掉的几行代码中可以看出)。有什么办法解决这个问题吗?这是在Scala中。
def createTopology(conf: Config, properties: Properties): Topology = {
// implicit val sessionSerde = Serde[WindowedSerdes.SessionWindowedSerde[String]]
// implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]
val builder: StreamsBuilder = new StreamsBuilder()
builder.stream("streams-plaintext-input")
.groupBy((_, word) => word)
.windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
.count()
.toStream.to("streams-pipe-output")
builder.build()
}
编译器错误:
Error:(52, 78) ambiguous implicit values:
both method timeWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
and method sessionWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
match expected type org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
Error:(52, 78) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
Error:(52, 78) not enough arguments for method with: (implicit keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], implicit valueSerde: org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].
Unspecified value parameters keySerde, valueSerde.
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
发布于 2020-11-20 21:26:53
我只是通过添加导入来添加一些implicit
,它编译:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.{Consumed, Produced}
import java.time.Duration
import java.util.Properties
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes.{Long, String}
def createTopology(conf: Config, properties: Properties): Topology = {
// here we have two implicits to choose, I pick the sessionWindowedSerde because it was in your code
// implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]
val builder: StreamsBuilder = new StreamsBuilder()
builder.stream("streams-plaintext-input")
.groupBy((_, word) => word)
.windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
.count()
.toStream.to("streams-pipe-output")
builder.build()
}
如果您看到一个错误:
模糊隐式值
这意味着在您的作用域中定义了多个implicit
,以满足所需的类型。例如,对象org.apache.kafka.streams.scala.Serdes
有两个隐式:
implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] =
new WindowedSerdes.TimeWindowedSerde[T](tSerde)
implicit def sessionWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.SessionWindowedSerde[T] =
new WindowedSerdes.SessionWindowedSerde[T](tSerde)
其中TimeWindowedSerde
扩展了Serdes.WrapperSerde<Windowed<T>>
static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>
SessionWindowedSerde
扩展了Serdes.WrapperSerde<Windowed<T>>
static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>
它们都扩展了相同类型的Serdes.WrapperSerde<Windowed<T>>
,并在行中:
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
根据with
函数签名:
def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
ProducedJ.`with`(keySerde, valueSerde)
我们期望一些implicit
值用于Serde[Windowed[String]]
,编译器不能从中选择一个,因为它们都是Serde[Windowed[String]]
。
因此,如果您只是尝试将它们添加到相同的范围中:
implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
你会看到
ambiguous implicit values
再来一次。
结论:在导入大量implicits
时要小心,最佳实践是只导入所需的implicits
。
https://stackoverflow.com/questions/64886097
复制相似问题