专栏首页函数式编程语言及工具Akka-CQRS(5)- CQRS Writer Actor 部署和测试

Akka-CQRS(5)- CQRS Writer Actor 部署和测试

上篇我们做了一个WriterActor的例子,主要目的是示范WriterActor如何作为集群分片用persistentActor特性及event-sourcing模式实现CQRS的写功能。既然是集群分片,那么我们就在这篇讲讲WriterActor的部署和测试,因为这个里面还是有些值得注意的地方。下面是一段WriteActor,即集群分片(cluster-sharding)的部署代码:

    ClusterSharding(system).start(
      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS
    )

注意带handOffStopMessage参数的start函数必须同时提供allocationStrategy。这个参数提供了passivation消息类型。

整个集群分片部署代码如下:

object POSRouter extends LogSupport {
  def main(args: Array[String]) {
    import WriterActor._
    import Commands._

    val argsPat = "(.*):(.*)".r
    val (host, port) = args(0) match {
      case argsPat(h, p) => (h, p)
      case _ => ("localhost", "2551")
    }

    val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
      .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
      //roles can be deployed on this node
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
      .withFallback(ConfigFactory.load())

    log.info(s"******* hostname = $host,  port = $port *******")

    val shardName = "POSShard"

    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString
    }

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }


    val system = ActorSystem("cloud-pos-server", config)
    val role = "poswriter"    //role of this shard
    val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)

    ClusterSharding(system).start(
      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS
    )

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")

  }
}

以上有几个参数需要特别注意:host和port是从main的参数解析出来的如192.168.11.162:2551,代表本节点的host和port。akka.cluster.roles代表本节点支持的角色,这里poswriter是其中之一。而ClusterShardingSettings(system).withRole("poswriter")代表这个分片shard只能在支持poswriter角色的节点上部署。如果搞错了运行时你会发现Sharding无法启动。上面这段程序代表本节点支持poswriter角色。在本节点(输入的IP地址)部署了一个名称为“POSShard”的cluster-sharding,它具备poswriter角色。

如果我在多部机器上运行这段代码,输入当前机器的IP+PORT就代表在这么多台机器上都部署了“POSShard”分片。上面的ClusterMonitor是个集群状态监控actor:

package sdp.cluster.monitor

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import sdp.logging.LogSupport

object ClusterMonitor {
  def props = Props(new ClusterMonitor)
}

class ClusterMonitor extends Actor with LogSupport {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
      ,classOf[MemberEvent],classOf[UnreachableMember])  //订阅集群状态转换信息
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    //取消订阅
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info(s"Member is Joining: {${member.address}}")
    case MemberUp(member) =>
      log.info(s"Member is Up: {${member.address}}")
    case MemberLeft(member) =>
      log.info(s"Member is Leaving: {${member.address}}")
    case MemberExited(member) =>
      log.info(s"Member is Exiting: {${member.address}}")
    case MemberRemoved(member, previousStatus) =>
      log.info(
        s"Member is Removed: {${member.address}} after {${previousStatus}")
    case UnreachableMember(member) =>
      log.info(s"Member detected as unreachable: {${member.address}}")
      cluster.down(member.address)      //手工驱除,不用auto-down
    case _: MemberEvent => // ignore
  }

}

有了它我们可以监视集群节点连接状态。

好了,现在假设我们在几台机器组成的集群各节点上都部署了“POSShard”分片,那么就设计个客户端来向这个“POSShard”分片发送POSMessage:

    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString
    }

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }

这个客户端必须考虑以下几点:它必须在同一个集群,也就是它也是集群其中一个节点,否则无法和其它部署了“POSShard”分片的节点进行信息交流。但它又不能同处与部署了“POSShard”的节点,因为remote的hostname和port已经被占用。所以只能把客户端放在一个没有部署“POSShard”的节点上,然后用ClusterSharding(system).startProxy来启动一个分片中介:

   //no shard deployed on this node  2558, use proxy
    val posHandler = ClusterSharding(system).startProxy(
      typeName = shardName,
      role = Some("poswriter"),
      extractEntityId = getPOSId,
      extractShardId = getShopId
    )

    //val posHandler = ClusterSharding(system).shardRegion(shardName)

    system.actorOf(POSClient.props(posHandler), "pos-client")

注意这个proxy的role必须是Some("poswriter"),只有这样才能调用其它节点上的”POSShard“,因为它们的角色都是“poswriter”。与WriterActor交互的必须是个actor,因为WriterActor会用sender()返回结果,这个sender()是个ActorRef:

object POSClient {
  def props(pos: ActorRef) = Props(new POSClient(pos))
}
class POSClient(posHandler: ActorRef)  extends Actor with LogSupport {

  override def receive: Receive = {
    case msg @ POSMessage(_,_) => posHandler ! msg
    case resp: POSResponse  =>
      log.info(s"response from server: $resp")
  }
}

我们可用下面的方式来指挥WriterActor:

    val posref = system.actorOf(POSClient.props(posHandler), "pos-client")
    
    posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
    posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
    posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
    posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
    posref ! POSMessage(4021,Subtotal)

下面是服务端分片部署源代码:

resources/application.conf

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://cloud-pos-server@192.168.11.162:2551"]
    log-info = off
    sharding {
      role = "poswriter"
      passivate-idle-entity-after = 5 m
    }
  }

  persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"
  }

}

cassandra-journal {
  contact-points = ["192.168.11.162"]
}

cassandra-snapshot-store {
  contact-points = ["192.168.11.162"]
}

POSRouter.scala

package cloud.pos.server

import akka.actor._
import akka.cluster.sharding._
import akka.cluster.sharding.ClusterSharding
import com.typesafe.config.ConfigFactory
import sdp.cluster.monitor._
import sdp.logging._

object POSRouter extends LogSupport {
  def main(args: Array[String]) {
    import WriterActor._
    import Commands._

    val argsPat = "(.*):(.*)".r
    val (host, port) = args(0) match {
      case argsPat(h, p) => (h, p)
      case _ => ("localhost", "2551")
    }


    val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
      .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
      //roles can be deployed on this node
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
      .withFallback(ConfigFactory.load())


    log.info(s"******* hostname = $host,  port = $port *******")

    val shardName = "POSShard"

    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString
    }

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }


    val system = ActorSystem("cloud-pos-server", config)
    val role = "poswriter"    //role of this shard
    val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)

    ClusterSharding(system).start(
      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS
    )

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")

  }
}

下面是这个测试项目的源代码:

build.sbt

name := "cloud-pos-client"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

resources/application.conf

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "192.168.11.162"
      port = 2558
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://cloud-pos-server@192.168.11.162:2551"]
    log-info = off
  }

}

resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
            </Pattern>
        </encoder>
    </appender>

    <root level="debug">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

ClientDemo.scala

package cloud.pos.client
import akka.actor._
import akka.cluster.sharding.ClusterSharding
import sdp.cluster.monitor._
import sdp.logging._
import Commands._
import States._
import Items._
import akka.cluster.sharding._


object POSClientDemo extends LogSupport {
  def main(args: Array[String]) {

    val system = ActorSystem("cloud-pos-server")

    val shardName = "POSShard"

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    }
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId
    }

    //no shard deployed on this node  2558, use proxy
    val posHandler = ClusterSharding(system).startProxy(
      typeName = shardName,
      role = Some("poswriter"),
      extractEntityId = getPOSId,
      extractShardId = getShopId
    )

    //val posHandler = ClusterSharding(system).shardRegion(shardName)

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")

    val posref = system.actorOf(POSClient.props(posHandler), "pos-client")

    posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
    posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
    posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
    posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
    posref ! POSMessage(4021,Subtotal)



    scala.io.StdIn.readLine()

    system.terminate()

  }
}

client/Commands.scala

package cloud.pos.client

object Commands {

  sealed trait POSCommand {}

  case class LogOn(opr: String, passwd: String) extends POSCommand
  case object LogOff extends POSCommand
  case class SuperOn(su: String, passwd: String) extends POSCommand
  case object SuperOff extends POSCommand
  case class MemberOn(cardnum: String, passwd: String) extends POSCommand
  case object MemberOff extends POSCommand   //remove member status for the voucher
  case object RefundOn extends POSCommand
  case object RefundOff extends POSCommand
  case object VoidOn extends POSCommand
  case object VoidOff extends POSCommand
  case object VoidAll extends POSCommand
  case object Suspend extends POSCommand

  case class VoucherNum(vnum: Int) extends POSCommand


  case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
  case object Subtotal extends POSCommand
  case class Discount(code: String, percent: Int) extends POSCommand

  case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand          //settlement   结算支付
  //read only command, no event process
  case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
  case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
  case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
  case class WxPay(acct: String, num: String, amount: Int) extends POSCommand


  // read only command, no update event
  case class Plu(itemCode: String) extends POSCommand  //read only

  case class POSMessage(id: Long, cmd: POSCommand) {
    def shopId = id.toString.head.toString
    def posId = id.toString
  }


}

client/States.scala

package cloud.pos.client

object States {

  object TXNTYPE {
    val sales: Int = 0
    val refund: Int = 1
    val void: Int = 2
    val voided: Int = 3
    val voidall: Int = 4
    val subtotal: Int = 5
    val logon: Int = 6
    val supon: Int = 7       // super user on/off
    val suspend: Int = 8

  }

  object SALESTYPE {
    val plu: Int = 0
    val dpt: Int = 1
    val cat: Int = 2
    val brd: Int = 3
    val ra:  Int = 4
    val sub: Int = 5
    val ttl: Int = 6
    val dsc: Int = 7
    val crd: Int = 8
  }


  case class TxnItem(
                      txndate: String = ""
                      ,txntime: String = ""
                      ,opr: String = ""//工号
                      ,num: Int = 0 //销售单号
                      ,seq: Int = 1 //交易序号
                      ,txntype: Int = TXNTYPE.sales//交易类型
                      ,salestype: Int = SALESTYPE.plu //销售类型
                      ,qty: Int =  1 //交易数量
                      ,price: Int = 0 //单价(分)
                      ,amount: Int = 0 //码洋(分)
                      ,dscamt: Int = 0 //折扣:负值  net实洋 = amount + dscamt
                      ,member: String = "" //会员卡号
                      ,code: String = "" //编号(商品、账号...)
                      ,desc: String = "" //项目名称
                      ,dpt: String = ""
                      ,department: String = ""
                      ,cat: String = ""
                      ,category: String = ""
                      ,brd: String = ""
                      ,brand: String = ""
                    )


  case class VchStatus( //操作状态锁留给前端维护
                        qty: Int = 1,
                        refund: Boolean = false,
                        void: Boolean = false)

  case class VchStates(
                        opr: String = "",      //收款员
                        jseq: BigInt = 0,      //begin journal sequence for read-side replay
                        num: Int = 0,          //当前单号
                        seq: Int = 1,          //当前序号
                        void: Boolean = false, //取消模式
                        refd: Boolean = false, //退款模式
                        due: Boolean = true,   //当前余额
                        su: String = "",
                        mbr: String = ""
                      )


}

client/POSClient.scala

package cloud.pos.client

import akka.actor._
import sdp.logging._
import Responses._
import Commands._

object POSClient {
  def props(pos: ActorRef) = Props(new POSClient(pos))
}
class POSClient(posHandler: ActorRef)  extends Actor with LogSupport {

  override def receive: Receive = {
    case msg @ POSMessage(_,_) => posHandler ! msg
    case resp: POSResponse  =>
      log.info(s"response from server: $resp")
  }
}

client/Responses.scala

package cloud.pos.client

import States._
object Responses {

  object STATUS {
    val OK: Int = 0
    val FAIL: Int = -1
  }

  case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
}

client/DataAccess.scala

package cloud.pos.client

import java.time.LocalDate
import java.time.format.DateTimeFormatter


case class Item(
                 brd: String
                 ,dpt: String
                 ,cat: String
                 ,code: String
                 ,name: String
                 ,price: Int

               )
object Items {
  val apple = Item("01","02","01","001", "green apple", 820)
  val grape = Item("01","02","01","002", "red grape", 1050)
  val orage = Item("01","02","01","003", "sunkist orage", 350)
  val banana = Item("01","02","01","004", "demon banana", 300)
  val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
  val peach = Item("01","02","01","006", "xinjiang peach", 2390)

  val tblItems = List(apple, grape, orage, banana, pineapple, peach)

  sealed trait QueryItemsResult {}

  case class QueryItemsOK(items: List[Item]) extends QueryItemsResult

  case class QueryItemsFail(msg: String) extends QueryItemsResult

}


object Codes {
  case class User(code: String, name: String, passwd: String)
  case class Department(code: String, name: String)
  case class Category(code: String, name: String)
  case class Brand(code: String, name: String)
  case class Ra(code: String, name: String)
  case class Account(code: String, name: String)
  case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)

  val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
  val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
  val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
  val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
  val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")
    ,Account("004","Alipay"),Account("005","WXPay"))

  val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))

  def getDpt(code: String) = dpts.find(d => d.code == code)
  def getCat(code: String) = cats.find(d => d.code == code)
  def getBrd(code: String) = brds.find(b => b.code == code)
  def getAcct(code: String) = accts.find(a => a.code == code)
  def getRa(code: String) = ras.find(a => a.code == code)
}

object DAO {
  import Items._
  import Codes._

  def getItem(code: String): QueryItemsResult = {
    val optItem = tblItems.find(it => it.code == code)
    optItem match {
      case Some(item) => QueryItemsOK(List(item))
      case None => QueryItemsFail("Invalid item code!")
    }
  }

  def validateDpt(code: String) = dpts.find(d => d.code == code)
  def validateCat(code: String) = cats.find(d => d.code == code)
  def validateBrd(code: String) = brds.find(b => b.code == code)
  def validateRa(code: String) = ras.find(ac => ac.code == code)
  def validateAcct(code: String) = accts.find(ac => ac.code == code)

  def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))

  def lastSecOfDateStr(ldate: LocalDate): String = {
    ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59"
  }


}

logging/Log.scala

package sdp.logging

import org.slf4j.Logger

/**
  * Logger which just wraps org.slf4j.Logger internally.
  *
  * @param logger logger
  */
class Log(logger: Logger) {

  // use var consciously to enable squeezing later
  var isDebugEnabled: Boolean = logger.isDebugEnabled
  var isInfoEnabled: Boolean = logger.isInfoEnabled
  var isWarnEnabled: Boolean = logger.isWarnEnabled
  var isErrorEnabled: Boolean = logger.isErrorEnabled

  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
    level match {
      case 'debug | 'DEBUG => debug(msg)
      case 'info | 'INFO => info(msg)
      case 'warn | 'WARN => warn(msg)
      case 'error | 'ERROR => error(msg)
      case _ => // nothing to do
    }
  }

  def debug(msg: => String): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg)
    }
  }

  def debug(msg: => String, e: Throwable): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg, e)
    }
  }

  def info(msg: => String): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg)
    }
  }

  def info(msg: => String, e: Throwable): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg, e)
    }
  }

  def warn(msg: => String): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg)
    }
  }

  def warn(msg: => String, e: Throwable): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg, e)
    }
  }

  def error(msg: => String): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg)
    }
  }

  def error(msg: => String, e: Throwable): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg, e)
    }
  }

}

logging/LogSupport.scala

package sdp.logging

import org.slf4j.LoggerFactory

trait LogSupport {

  /**
    * Logger
    */
  protected val log = new Log(LoggerFactory.getLogger(this.getClass))

}

logging/ClusterMonitor.scala

package sdp.cluster.monitor

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import sdp.logging.LogSupport

object ClusterMonitor {
  def props = Props(new ClusterMonitor())
}

class ClusterMonitor extends Actor with LogSupport {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
      ,classOf[MemberEvent],classOf[UnreachableMember])  //订阅集群状态转换信息
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    //取消订阅
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info(s"Member is Joining: {${member.address}}")
    case MemberUp(member) =>
      log.info(s"Member is Up: {${member.address}}")
    case MemberLeft(member) =>
      log.info(s"Member is Leaving: {${member.address}}")
    case MemberExited(member) =>
      log.info(s"Member is Exiting: {${member.address}}")
    case MemberRemoved(member, previousStatus) =>
      log.info(
        s"Member is Removed: {${member.address}} after {${previousStatus}")
    case UnreachableMember(member) =>
      log.info(s"Member detected as unreachable: {${member.address}}")
      cluster.down(member.address)      //手工驱除,不用auto-down
    case _: MemberEvent => // ignore
  }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Scala Macros - 元编程 Metaprogramming with Def Macros

        Scala Macros对scala函数库编程人员来说是一项不可或缺的编程工具,可以通过它来解决一些用普通编程或者类层次编程(type level pr...

    用户1150956
  • Scalaz(40)- Free :versioned up,再回顾

       在上一篇讨论里我在设计示范例子时遇到了一些麻烦。由于Free Monad可能是一种主流的FP编程规范,所以在进入实质编程之前必须把所有东西都搞清楚。前面遇...

    用户1150956
  • 浅谈Slick(4)- Slick301:我的Slick开发项目设置

      前面几篇介绍里尝试了一些Slick的功能和使用方式,看来基本可以满足用scala语言进行数据库操作编程的要求,而且有些代码可以通过函数式编程模式来实现。我想...

    用户1150956
  • scala 语法深析

    scala是一种基于JVM的编程语言,spark框架是使用scala语言编写的,要阅读源码就必须掌握scala,虽然spark可以采用java和python进行...

    黑白格
  • Scala——多范式, 可伸缩, 类似Java的编程语言

    3.将features和plugins两个文件夹拷贝到eclipse安装目录中的” dropins/scala”目录下。进入dropins,新建scala文件夹...

    时间静止不是简史
  • tf.compat.v1.MetaGraphDef

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    于小勇
  • 基于 Kotlin 特性实现的验证框架

    kvalidation 地址:https://github.com/fengzhizi715/kvalidation

    fengzhizi715
  • Groovy单元测试框架spock数据驱动Demo

    上次文章分享了spock框架的基础功能的使用,在此基础上,我根据自己写的Groovy的封装方法、数据驱动以及一些Groovy的高级语法做了一些尝试。发现还是有一...

    FunTester
  • 中介者模式浅析

    对于“中介”这个角色,大家在现实生活中也不陌生,比如房产中介。试想一下,如果没有中介这个角色,租房者和房东的关系直接的联系将呈现为网状结构,租房者和房东的关系将...

    孟君
  • Scalaz(24)- 泛函数据结构: Tree-数据游览及维护

    上节我们讨论了Zipper-串形不可变集合(immutable sequential collection)游标,在串形集合中左右游走及元素维护操作。这篇我...

    用户1150956

扫码关注云+社区

领取腾讯云代金券