Google Pub / Sub订阅者不会收到消息?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (220)

我的代码如下所示,基本上与示例中的代码完全相同:

package com.example.google.pubsub

import java.io.FileInputStream
import java.security.spec.PKCS8EncodedKeySpec
import java.security.{KeyFactory, PrivateKey}
import java.util.Base64

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.{Done, NotUsed}
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential

import scala.concurrent.duration._
import scala.concurrent.Future

object SubscriberMain extends App {
  println("#### SUBSCRIBER ####")

  val privateKey: PrivateKey = {
    import java.io.FileInputStream
    val credential = GoogleCredential.fromStream(new FileInputStream("mycredentials.json"))
    val privateKey = credential.getServiceAccountPrivateKey
    privateKey
  }
  val clientEmail = "main-19@weirdproject.iam.gserviceaccount.com"
  val projectId = "weirdproject"
  val apiKey = "xxxx"
  val subscription = "somesubscription"

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val subscriptionSource: Source[ReceivedMessage, NotUsed] =
    GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)

  val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
    GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)

  subscriptionSource
    .map { message =>
      val data = message.message.data
      println(s"received a message: $data")
      message.ackId
    }
    .groupedWithin(1000, 1.minute)
    .map(AcknowledgeRequest.apply)
    .to(ackSink)

}

我发现akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic从未被执行,这似乎是没有提取消息的原因。

提问于
用户回答回答于

你有什么是一个流的定义,但你没有运行它。调用run()

subscriptionSource
  .map { message =>
    val data = message.message.data
    println(s"received a message: $data")
    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)
  .run() // <---

或使用runWith(),这是一种方便的方法,可返回以下物化值Sink

val result: Future[Done] =
  subscriptionSource
    .map { message =>
      val data = message.message.data
      println(s"received a message: $data")
      message.ackId
    }
    .groupedWithin(1000, 1.minute)
    .map(AcknowledgeRequest.apply)
    .runWith(ackSink)

热门问答

【有奖互动】你是哪个星球的人?

薛定喵君独立博客(xuedingmiao.com)博主。GitChat 作者。mp-unpack 工具作者。公号『极客之路』号主。

主库的binlog被删掉了,从库是否可以用对应的Relay_Log_File同步?

朱明豪从事Oracle、MySQL等数据库工作10年,擅长性能诊断优化、故障处理、SQL优化、业务架构设计、技术培训等。
推荐
1.Waiting for Slave Workers to free pending events, 可能是出现大事务,可能参数slave_pending_jobs_size_max过小 2.“主库设置了expire_logs_days,所以从库的Relay_Mas...... 展开详请

CDN加速时,当带宽超出所设置阈值后关闭CDN服务,是否可以自动重启CDN服务?

开元

腾讯云 · 高级工程师 (已认证)

专注给云上客户提供优质的服务
推荐

触发封顶带宽导致域名关闭后,若您希望继续使用 CDN 服务,可以在重新启动域名加速。

详见https://cloud.tencent.com/document/product/228/7541

是否提供海外CDN加速服务(微信小程序云)?

开元

腾讯云 · 高级工程师 (已认证)

专注给云上客户提供优质的服务
推荐

目前腾讯云是支持海外加速的,CDN加速只和域名有关系,只需要把需要海外加速域名配置海外CDN就就可以。详细见:https://cloud.tencent.com/document/product/673

Dr.Elephant支持hadoop3吗?还有编译一直有包找不到怎么解决?

目前TBDS的hadoop版本是2.7.2,建议配置文件中使用该版本号进行匹配

iOS实时音视频的SDK和Demo有没有Objective-C版本?

腾讯视频云-ZacharyTXLiteAVSDK技术支持
推荐
下载专业版和企业版的压缩包里面带的官方demo是Objective-C的,下载地址:https://cloud.tencent.com/document/product/647/32689 image.png ... 展开详请

所属标签

扫码关注云+社区

领取腾讯云代金券