前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊apache gossip的ActiveGossiper

聊聊apache gossip的ActiveGossiper

作者头像
code4it
发布2019-05-14 12:30:29
4850
发布2019-05-14 12:30:29
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下apache gossip的ActiveGossiper

AbstractActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java

代码语言:javascript
复制
/**
 * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
 */
public abstract class AbstractActiveGossiper {

  protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);

  protected final GossipManager gossipManager;
  protected final GossipCore gossipCore;
  private final Histogram sharedDataHistogram;
  private final Histogram sendPerNodeDataHistogram;
  private final Histogram sendMembershipHistogram;
  private final Random random;
  private final GossipSettings gossipSettings;

  public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
    this.gossipManager = gossipManager;
    this.gossipCore = gossipCore;
    sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
    sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
    sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
    random = new Random();
    gossipSettings = gossipManager.getSettings();
  }

  public void init() {

  }

  public void shutdown() {

  }

  public final void sendShutdownMessage(LocalMember me, LocalMember target){
    if (target == null){
      return;
    }
    ShutdownMessage m = new ShutdownMessage();
    m.setNodeId(me.getId());
    m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
    gossipCore.sendOneWay(m, target.getUri());
  }

  //......

  /**
   * Performs the sending of the membership list, after we have incremented our own heartbeat.
   */
  protected void sendMembershipList(LocalMember me, LocalMember member) {
    if (member == null){
      return;
    }
    long startTime = System.currentTimeMillis();
    me.setHeartbeat(System.nanoTime());
    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
    message.setUuid(UUID.randomUUID().toString());
    message.getMembers().add(convert(me));
    for (LocalMember other : gossipManager.getMembers().keySet()) {
      message.getMembers().add(convert(other));
    }
    Response r = gossipCore.send(message, member.getUri());
    if (r instanceof ActiveGossipOk){
      //maybe count metrics here
    } else {
      LOGGER.debug("Message " + message + " generated response " + r);
    }
    sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
  }

  protected final Member convert(LocalMember member){
    Member gm = new Member();
    gm.setCluster(member.getClusterName());
    gm.setHeartbeat(member.getHeartbeat());
    gm.setUri(member.getUri().toASCIIString());
    gm.setId(member.getId());
    gm.setProperties(member.getProperties());
    return gm;
  }

  /**
   *
   * @param memberList
   *          An immutable list
   * @return The chosen LocalGossipMember to gossip with.
   */
  protected LocalMember selectPartner(List<LocalMember> memberList) {
    LocalMember member = null;
    if (memberList.size() > 0) {
      int randomNeighborIndex = random.nextInt(memberList.size());
      member = memberList.get(randomNeighborIndex);
    }
    return member;
  }
}
  • AbstractActiveGossiper的构造器需要传入gossipManager及gossipCore;它定义了sendShutdownMessage、sendMembershipList、selectPartner等方法
  • selectPartner方法在memberList不为空的情况下随机生成randomNeighborIndex选择出一个LocalMember
  • sendMembershipList方法首先设置me的heartbeat,然后创建UdpActiveGossipMessage,该message的members首先是当前的localMember,然后再添加gossipManager.getMembers(),最后通过gossipCore.send发送给选中的member

ActiveGossipMessageHandler

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java

代码语言:javascript
复制
public class ActiveGossipMessageHandler implements MessageHandler {
  
  /**
   * @param gossipCore context.
   * @param gossipManager context.
   * @param base message reference.
   * @return boolean indicating success.
   */
  @Override
  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
    List<Member> remoteGossipMembers = new ArrayList<>();
    RemoteMember senderMember = null;
    UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
    for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
      URI u;
      try {
        u = new URI(activeGossipMessage.getMembers().get(i).getUri());
      } catch (URISyntaxException e) {
        GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
        continue;
      }
      RemoteMember member = new RemoteMember(
              activeGossipMessage.getMembers().get(i).getCluster(),
              u,
              activeGossipMessage.getMembers().get(i).getId(),
              activeGossipMessage.getMembers().get(i).getHeartbeat(),
              activeGossipMessage.getMembers().get(i).getProperties());
      if (i == 0) {
        senderMember = member;
      }
      if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {
        UdpNotAMemberFault f = new UdpNotAMemberFault();
        f.setException("Not a member of this cluster " + i);
        f.setUriFrom(activeGossipMessage.getUriFrom());
        f.setUuid(activeGossipMessage.getUuid());
        GossipCore.LOGGER.warn(f);
        gossipCore.sendOneWay(f, member.getUri());
        continue;
      }
      remoteGossipMembers.add(member);
    }
    UdpActiveGossipOk o = new UdpActiveGossipOk();
    o.setUriFrom(activeGossipMessage.getUriFrom());
    o.setUuid(activeGossipMessage.getUuid());
    gossipCore.sendOneWay(o, senderMember.getUri());
    gossipCore.mergeLists(senderMember, remoteGossipMembers);
    return true;
  }
}
  • 当目标member接收到UdpActiveGossipMessage的时候,由ActiveGossipMessageHandler来处理该消息;它首先从activeGossipMessage.getMembers(),转换为RemoteMember,添加到remoteGossipMembers,之后通过gossipCore.sendOneWay给发送方回复UdpActiveGossipOk,最后执行gossipCore.mergeLists(senderMember, remoteGossipMembers)

GossipCore

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java

代码语言:javascript
复制
public class GossipCore implements GossipCoreConstants {

  class LatchAndBase {
    private final CountDownLatch latch;
    private volatile Base base;
    
    LatchAndBase(){
      latch = new CountDownLatch(1);
    }
    
  }
  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
  private final GossipManager gossipManager;
  private ConcurrentHashMap<String, LatchAndBase> requests;
  private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
  private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
  private final Meter messageSerdeException;
  private final Meter transmissionException;
  private final Meter transmissionSuccess;
  private final DataEventManager eventManager;
  
  public GossipCore(GossipManager manager, MetricRegistry metrics){
    this.gossipManager = manager;
    requests = new ConcurrentHashMap<>();
    perNodeData = new ConcurrentHashMap<>();
    sharedData = new ConcurrentHashMap<>();
    eventManager = new DataEventManager(metrics);
    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());
    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
    transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
    transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
  }

  public void receive(Base base) {
    if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
      LOGGER.warn("received message can not be handled");
    }
  }

  /**
   * Sends a blocking message.
   * todo: move functionality to TransportManager layer.
   * @param message
   * @param uri
   * @throws RuntimeException if data can not be serialized or in transmission error
   */
  private void sendInternal(Base message, URI uri) {
    byte[] json_bytes;
    try {
      json_bytes = gossipManager.getProtocolManager().write(message);
    } catch (IOException e) {
      messageSerdeException.mark();
      throw new RuntimeException(e);
    }
    try {
      gossipManager.getTransportManager().send(uri, json_bytes);
      transmissionSuccess.mark();
    } catch (IOException e) {
      transmissionException.mark();
      throw new RuntimeException(e);
    }
  }

  public Response send(Base message, URI uri){
    if (LOGGER.isDebugEnabled()){
      LOGGER.debug("Sending " + message);
      LOGGER.debug("Current request queue " + requests);
    }

    final Trackable t;
    LatchAndBase latchAndBase = null;
    if (message instanceof Trackable){
      t = (Trackable) message;
      latchAndBase = new LatchAndBase();
      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
    } else {
      t = null;
    }
    sendInternal(message, uri);
    if (latchAndBase == null){
      return null;
    }
    
    try {
      boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
      if (complete){
        return (Response) latchAndBase.base;
      } else{
        return null;
      }
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } finally {
      if (latchAndBase != null){
        requests.remove(t.getUuid() + "/" + t.getUriFrom());
      }
    }
  }

  /**
   * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
   * when the protocol for the message is not to wait for a response
   * @param message the message to send
   * @param u the uri to send it to
   */
  public void sendOneWay(Base message, URI u) {
    try {
      sendInternal(message, u);
    } catch (RuntimeException ex) {
      LOGGER.debug("Send one way failed", ex);
    }
  }

  public void handleResponse(String k, Base v) {
    LatchAndBase latch = requests.get(k);
    latch.base = v;
    latch.latch.countDown();
  }

  /**
   * Merge lists from remote members and update heartbeats
   *
   * @param senderMember
   * @param remoteList
   *
   */
  public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {
    if (LOGGER.isDebugEnabled()){
      debugState(senderMember, remoteList);
    }
    for (LocalMember i : gossipManager.getDeadMembers()) {
      if (i.getId().equals(senderMember.getId())) {
        LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
        i.recordHeartbeat(senderMember.getHeartbeat());
        i.setHeartbeat(senderMember.getHeartbeat());
        //TODO consider forcing an UP here
      }
    }
    for (Member remoteMember : remoteList) {
      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
        continue;
      }
      LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
      remoteMember.getUri(),
      remoteMember.getId(),
      remoteMember.getHeartbeat(),
      remoteMember.getProperties(),
      gossipManager.getSettings().getWindowSize(),
      gossipManager.getSettings().getMinimumSamples(),
      gossipManager.getSettings().getDistribution());
      aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
      Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
      if (result != null){
        for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
          if (localMember.getKey().getId().equals(remoteMember.getId())){
            localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
            localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
            localMember.getKey().setProperties(remoteMember.getProperties());
          }
        }
      }
    }
    if (LOGGER.isDebugEnabled()){
      debugState(senderMember, remoteList);
    }
  }

  //......

}
  • GossipCore的构造器需要GossipManager参数,它定义了receive、send、sendOneWay、handleResponse、mergeLists等方法
  • mergeLists方法主要是将接收到的remoteList转换为LocalMember,然后通过的putIfAbsent方法与gossipManager.getMembers()进行合并
  • 合并的同时会更新已有localMember的heartbeat,recordHeartbeat方法会忽略小于等于latestHeartbeatMs的值

GossipManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java

代码语言:javascript
复制
public abstract class GossipManager {

  public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
  
  // this mapper is used for ring and user-data persistence only. NOT messages.
  public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
    private static final long serialVersionUID = 1L;
  {
    enableDefaultTyping();
    configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
  }};

  private final ConcurrentSkipListMap<LocalMember, GossipState> members;
  private final LocalMember me;
  private final GossipSettings settings;
  private final AtomicBoolean gossipServiceRunning;
  
  private TransportManager transportManager;
  private ProtocolManager protocolManager;
  
  private final GossipCore gossipCore;
  private final DataReaper dataReaper;
  private final Clock clock;
  private final ScheduledExecutorService scheduledServiced;
  private final MetricRegistry registry;
  private final RingStatePersister ringState;
  private final UserDataPersister userDataState;
  private final GossipMemberStateRefresher memberStateRefresher;
  
  private final MessageHandler messageHandler;
  private final LockManager lockManager;

  public GossipManager(String cluster,
                       URI uri, String id, Map<String, String> properties, GossipSettings settings,
                       List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
                       MessageHandler messageHandler) {
    this.settings = settings;
    this.messageHandler = messageHandler;

    clock = new SystemClock();
    me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
            settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
    gossipCore = new GossipCore(this, registry);
    this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
    dataReaper = new DataReaper(gossipCore, clock);
    members = new ConcurrentSkipListMap<>();
    for (Member startupMember : gossipMembers) {
      if (!startupMember.equals(me)) {
        LocalMember member = new LocalMember(startupMember.getClusterName(),
                startupMember.getUri(), startupMember.getId(),
                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
                settings.getMinimumSamples(), settings.getDistribution());
        //TODO should members start in down state?
        members.put(member, GossipState.DOWN);
      }
    }
    gossipServiceRunning = new AtomicBoolean(true);
    this.scheduledServiced = Executors.newScheduledThreadPool(1);
    this.registry = registry;
    this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
    this.userDataState = new UserDataPersister(
        gossipCore,
        GossipManager.buildPerNodeDataPath(this),
        GossipManager.buildSharedDataPath(this));
    this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
    readSavedRingState();
    readSavedDataState();
  }

  /**
   * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
   * thread and start the receiver thread.
   */
  public void init() {
    
    // protocol manager and transport managers are specified in settings.
    // construct them here via reflection.
    
    protocolManager = ReflectionUtils.constructWithReflection(
        settings.getProtocolManagerClass(),
        new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class },
        new Object[] { settings, me.getId(), this.getRegistry() }
    );
    
    transportManager = ReflectionUtils.constructWithReflection(
        settings.getTransportManagerClass(),
        new Class<?>[] { GossipManager.class, GossipCore.class},
        new Object[] { this, gossipCore }
    );
    
    // start processing gossip messages.
    transportManager.startEndpoint();
    transportManager.startActiveGossiper();
    
    dataReaper.init();
    if (settings.isPersistRingState()) {
      scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
    }
    if (settings.isPersistDataState()) {
      scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
    }
    memberStateRefresher.init();
    LOGGER.debug("The GossipManager is started.");
  }

  /**
   * Shutdown the gossip service.
   */
  public void shutdown() {
    gossipServiceRunning.set(false);
    lockManager.shutdown();
    gossipCore.shutdown();
    transportManager.shutdown();
    dataReaper.close();
    memberStateRefresher.shutdown();
    scheduledServiced.shutdown();
    try {
      scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.error(e);
    }
    scheduledServiced.shutdownNow();
  }

  //......

}
  • GossipManager使用ConcurrentSkipListMap维护了LocalMember与GossipState的映射的members,同时该构造器创建了RingStatePersister、UserDataPersister、GossipMemberStateRefresher
  • init方法调用了transportManager.startEndpoint()及startActiveGossiper方法,同时通过scheduleAtFixedRate注册了RingStatePersister、UserDataPersister这两个定时任务,另外还执行了memberStateRefresher.init()
  • shutdown方法执行了gossipCore.shutdown()、transportManager.shutdown()、memberStateRefresher.shutdown()等

GossipMemberStateRefresher

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java

代码语言:javascript
复制
public class GossipMemberStateRefresher {
  public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);

  private final Map<LocalMember, GossipState> members;
  private final GossipSettings settings;
  private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
  private final Clock clock;
  private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
  private final ExecutorService listenerExecutor;
  private final ScheduledExecutorService scheduledExecutor;
  private final BlockingQueue<Runnable> workQueue;

  public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
                                    GossipListener listener,
                                    BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
    this.members = members;
    this.settings = settings;
    listeners.add(listener);
    this.findPerNodeGossipData = findPerNodeGossipData;
    clock = new SystemClock();
    workQueue = new ArrayBlockingQueue<>(1024);
    listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
    scheduledExecutor = Executors.newScheduledThreadPool(1);
  }

  public void init() {
    scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
  }

  public void run() {
    try {
      runOnce();
    } catch (RuntimeException ex) {
      LOGGER.warn("scheduled state had exception", ex);
    }
  }

  public void runOnce() {
    for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
      boolean userDown = processOptimisticShutdown(entry);
      if (userDown)
        continue;

      Double phiMeasure = entry.getKey().detect(clock.nanoTime());
      GossipState requiredState;

      if (phiMeasure != null) {
        requiredState = calcRequiredState(phiMeasure);
      } else {
        requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
      }

      if (entry.getValue() != requiredState) {
        members.put(entry.getKey(), requiredState);
        /* Call listeners asynchronously */
        for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
      }
    }
  }

  public GossipState calcRequiredState(Double phiMeasure) {
    if (phiMeasure > settings.getConvictThreshold())
      return GossipState.DOWN;
    else
      return GossipState.UP;
  }

  public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {
    long now = clock.nanoTime();
    long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
    if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
      return GossipState.DOWN;
    } else {
      return state;
    }
  }

  /**
   * If we have a special key the per-node data that means that the node has sent us
   * a pre-emptive shutdown message. We process this so node is seen down sooner
   *
   * @param l member to consider
   * @return true if node forced down
   */
  public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {
    PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
    if (m == null) {
      return false;
    }
    ShutdownMessage s = (ShutdownMessage) m.getPayload();
    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
      members.put(l.getKey(), GossipState.DOWN);
      if (l.getValue() == GossipState.UP) {
        for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
      }
      return true;
    }
    return false;
  }

  public void register(GossipListener listener) {
    listeners.add(listener);
  }

  public void shutdown() {
    scheduledExecutor.shutdown();
    try {
      scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdown();
    try {
      listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdownNow();
  }
}
  • GossipMemberStateRefresher的init方法通过scheduledExecutor.scheduleAtFixedRate注册了GossipMemberStateRefresher的定时任务(每隔100ms执行)
  • runOnce方法遍历GossipManager传入的members,然后挨个调用LocalMember的detect方法计算phiMeasure,如果该值不为null则执行calcRequiredState,否则执行calcRequiredStateCleanupInterval来计算requiredState;如果state发生变更则更新然后异步回调GossipListener的gossipEvent方法
  • calcRequiredState方法判断phiMeasure是否大于convictThreshold(默认为10),大于则返回GossipState.DOWN,否则返回GossipState.UP;calcRequiredStateCleanupInterval方法则判断当前时间是否大于cleanupInterval+member.getHeartbeat(),大于则返回GossipState.DOWN,否则返回原有的state

AbstractTransportManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java

代码语言:javascript
复制
/**
 * Manage the protcol threads (active and passive gossipers).
 */
public abstract class AbstractTransportManager implements TransportManager {
  
  public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
  
  private final ExecutorService gossipThreadExecutor;
  private final AbstractActiveGossiper activeGossipThread;
  protected final GossipManager gossipManager;
  protected final GossipCore gossipCore;
  
  public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
    this.gossipManager = gossipManager;
    this.gossipCore = gossipCore;
    gossipThreadExecutor = Executors.newCachedThreadPool();
    activeGossipThread = ReflectionUtils.constructWithReflection(
      gossipManager.getSettings().getActiveGossipClass(),
        new Class<?>[]{
            GossipManager.class, GossipCore.class, MetricRegistry.class
        },
        new Object[]{
            gossipManager, gossipCore, gossipManager.getRegistry()
        });
  }

  // shut down threads etc.
  @Override
  public void shutdown() {
    gossipThreadExecutor.shutdown();
    if (activeGossipThread != null) {
      activeGossipThread.shutdown();
    }
    try {
      boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
      if (!result) {
        // common when blocking patterns are used to read data from a socket.
        LOGGER.warn("executor shutdown timed out");
      }
    } catch (InterruptedException e) {
      LOGGER.error(e);
    }
    gossipThreadExecutor.shutdownNow();
  }

  @Override
  public void startActiveGossiper() {
    activeGossipThread.init();
  }

  @Override
  public abstract void startEndpoint();
}
  • AbstractTransportManager的startActiveGossiper会调用activeGossipThread.init();这里activeGossipThread为AbstractActiveGossiper的子类,这里我们看下SimpleActiveGossiper

SimpleActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java

代码语言:javascript
复制
/**
 * Base implementation gossips randomly to live nodes periodically gossips to dead ones
 *
 */
public class SimpleActiveGossiper extends AbstractActiveGossiper {

  private ScheduledExecutorService scheduledExecutorService;
  private final BlockingQueue<Runnable> workQueue;
  private ThreadPoolExecutor threadService;
  
  public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
                              MetricRegistry registry) {
    super(gossipManager, gossipCore, registry);
    scheduledExecutorService = Executors.newScheduledThreadPool(2);
    workQueue = new ArrayBlockingQueue<Runnable>(1024);
    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
  }

  @Override
  public void init() {
    super.init();
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      threadService.execute(() -> {
        sendToALiveMember();
      });
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(() -> {
      sendToDeadMember();
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            () -> sendPerNodeData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(
            () -> sendSharedData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
  }
  
  @Override
  public void shutdown() {
    super.shutdown();
    scheduledExecutorService.shutdown();
    try {
      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
    sendShutdownMessage();
    threadService.shutdown();
    try {
      threadService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.debug("Issue during shutdown", e);
    }
  }

  protected void sendToALiveMember(){
    LocalMember member = selectPartner(gossipManager.getLiveMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }
  
  protected void sendToDeadMember(){
    LocalMember member = selectPartner(gossipManager.getDeadMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }
  
  /**
   * sends an optimistic shutdown message to several clusters nodes
   */
  protected void sendShutdownMessage(){
    List<LocalMember> l = gossipManager.getLiveMembers();
    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
    for (int i = 0; i < sendTo; i++) {
      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
    }
  }
}
  • SimpleActiveGossiper继承了AbstractActiveGossiper,它覆盖了init方法,这里通过scheduledExecutorService的scheduleAtFixedRate注册了sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData四个定时任务(每隔gossipInterval执行)
  • shutdown方法主要是执行scheduledExecutorService.shutdown()、sendShutdownMessage()、threadService.shutdown()
  • sendToALiveMember首先通过父类的selectPartner方法来从gossipManager.getLiveMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息;sendToDeadMember首先首先通过父类的selectPartner方法来从gossipManager.getDeadMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息

小结

  • AbstractTransportManager的startActiveGossiper会调用activeGossipThread.init();这里activeGossipThread为AbstractActiveGossiper的子类,这里假设为SimpleActiveGossiper
  • SimpleActiveGossiper的init方法,这里通过scheduledExecutorService的scheduleAtFixedRate注册了sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData四个定时任务(每隔gossipInterval执行)
  • sendToALiveMember首先通过父类的selectPartner方法来从gossipManager.getLiveMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息;sendToDeadMember首先首先通过父类的selectPartner方法来从gossipManager.getDeadMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息
  • AbstractActiveGossiper提供了selectPartner、sendMembershipList方法方法;selectPartner方法在memberList不为空的情况下随机生成randomNeighborIndex选择出一个LocalMember;sendMembershipList方法首先设置me的heartbeat,然后创建UdpActiveGossipMessage,该message的members首先是当前的localMember,然后再添加gossipManager.getMembers(),最后通过gossipCore.send发送给选中的member
  • ActiveGossipMessageHandler用于处理UdpActiveGossipMessage;它首先从activeGossipMessage.getMembers(),转换为RemoteMember,添加到remoteGossipMembers,之后通过gossipCore.sendOneWay给发送方回复UdpActiveGossipOk,最后执行gossipCore.mergeLists(senderMember, remoteGossipMembers)
  • GossipCore的mergeLists方法主要是将接收到的remoteList转换为LocalMember,然后通过的putIfAbsent方法与gossipManager.getMembers()进行合并;合并的同时会更新已有localMember的heartbeat,recordHeartbeat方法会忽略小于等于latestHeartbeatMs的值
  • GossipManager使用ConcurrentSkipListMap维护了LocalMember与GossipState的映射的members,同时该构造器创建了RingStatePersister、UserDataPersister、GossipMemberStateRefresher;init方法调用了transportManager.startEndpoint()及startActiveGossiper方法,同时通过scheduleAtFixedRate注册了RingStatePersister、UserDataPersister这两个定时任务,另外还执行了memberStateRefresher.init()
  • GossipMemberStateRefresher的init方法通过scheduledExecutor.scheduleAtFixedRate注册了GossipMemberStateRefresher的定时任务(每隔100ms执行);runOnce方法遍历GossipManager传入的members,然后挨个调用LocalMember的detect方法计算phiMeasure,如果该值不为null则执行calcRequiredState,否则执行calcRequiredStateCleanupInterval来计算requiredState;如果state发生变更则更新然后异步回调GossipListener的gossipEvent方法;calcRequiredState方法判断phiMeasure是否大于convictThreshold(默认为10),大于则返回GossipState.DOWN,否则返回GossipState.UP;calcRequiredStateCleanupInterval方法则判断当前时间是否大于cleanupInterval+member.getHeartbeat(),大于则返回GossipState.DOWN,否则返回原有的state

每次这样全量sendMembershipList在memberList非常多的情况下可能会有效率方面的问题

doc

  • GossipCore
  • GossipManager
  • AbstractActiveGossiper
  • SimpleActiveGossiper
  • GossipMemberStateRefresher
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AbstractActiveGossiper
  • ActiveGossipMessageHandler
  • GossipCore
  • GossipManager
  • GossipMemberStateRefresher
  • AbstractTransportManager
  • SimpleActiveGossiper
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档