本文主要研究一下scalecube-cluster的MembershipProtocol
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java
/**
* Cluster Membership Protocol component responsible for managing information about existing members
* of the cluster.
*/
public interface MembershipProtocol {
/**
* Starts running cluster membership protocol. After started it begins to receive and send cluster
* membership messages
*/
Mono<Void> start();
/** Stops running cluster membership protocol and releases occupied resources. */
void stop();
/** Listen changes in cluster membership. */
Flux<MembershipEvent> listen();
/**
* Returns list of all members of the joined cluster. This will include all cluster members
* including local member.
*
* @return all members in the cluster (including local one)
*/
Collection<Member> members();
/**
* Returns list of all cluster members of the joined cluster excluding local member.
*
* @return all members in the cluster (excluding local one)
*/
Collection<Member> otherMembers();
/**
* Returns local cluster member which corresponds to this cluster instance.
*
* @return local member
*/
Member member();
/**
* Returns cluster member with given id or null if no member with such id exists at joined
* cluster.
*
* @return member by id
*/
Optional<Member> member(String id);
/**
* Returns cluster member by given address or null if no member with such address exists at joined
* cluster.
*
* @return member by address
*/
Optional<Member> member(Address address);
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);
private enum MembershipUpdateReason {
FAILURE_DETECTOR_EVENT,
MEMBERSHIP_GOSSIP,
SYNC,
INITIAL_SYNC,
SUSPICION_TIMEOUT
}
// Qualifiers
public static final String SYNC = "sc/membership/sync";
public static final String SYNC_ACK = "sc/membership/syncAck";
public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
private final Member localMember;
// Injected
private final Transport transport;
private final MembershipConfig config;
private final List<Address> seedMembers;
private final FailureDetector failureDetector;
private final GossipProtocol gossipProtocol;
private final MetadataStore metadataStore;
private final CorrelationIdGenerator cidGenerator;
// State
private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
private final Map<String, Member> members = new HashMap<>();
// Subject
private final FluxProcessor<MembershipEvent, MembershipEvent> subject =
DirectProcessor.<MembershipEvent>create().serialize();
private final FluxSink<MembershipEvent> sink = subject.sink();
// Disposables
private final Disposable.Composite actionsDisposables = Disposables.composite();
// Scheduled
private final Scheduler scheduler;
private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap<>();
/**
* Creates new instantiates of cluster membership protocol with given transport and config.
*
* @param localMember local cluster member
* @param transport cluster transport
* @param failureDetector failure detector
* @param gossipProtocol gossip protocol
* @param metadataStore metadata store
* @param config membership config parameters
* @param scheduler scheduler
* @param cidGenerator correlation id generator
*/
public MembershipProtocolImpl(
Member localMember,
Transport transport,
FailureDetector failureDetector,
GossipProtocol gossipProtocol,
MetadataStore metadataStore,
MembershipConfig config,
Scheduler scheduler,
CorrelationIdGenerator cidGenerator) {
this.transport = Objects.requireNonNull(transport);
this.config = Objects.requireNonNull(config);
this.failureDetector = Objects.requireNonNull(failureDetector);
this.gossipProtocol = Objects.requireNonNull(gossipProtocol);
this.metadataStore = Objects.requireNonNull(metadataStore);
this.localMember = Objects.requireNonNull(localMember);
this.scheduler = Objects.requireNonNull(scheduler);
this.cidGenerator = Objects.requireNonNull(cidGenerator);
// Prepare seeds
seedMembers = cleanUpSeedMembers(config.getSeedMembers());
// Init membership table with local member record
membershipTable.put(localMember.id(), new MembershipRecord(localMember, ALIVE, 0));
// fill in the table of members with local member
members.put(localMember.id(), localMember);
actionsDisposables.addAll(
Arrays.asList(
// Listen to incoming SYNC and SYNC ACK requests from other members
transport
.listen() //
.publishOn(scheduler)
.subscribe(this::onMessage, this::onError),
// Listen to events from failure detector
failureDetector
.listen()
.publishOn(scheduler)
.subscribe(this::onFailureDetectorEvent, this::onError),
// Listen to membership gossips
gossipProtocol
.listen()
.publishOn(scheduler)
.subscribe(this::onMembershipGossip, this::onError)));
}
@Override
public Flux<MembershipEvent> listen() {
return subject.onBackpressureBuffer();
}
@Override
public Mono<Void> start() {
// Make initial sync with all seed members
return Mono.create(
sink -> {
// In case no members at the moment just schedule periodic sync
if (seedMembers.isEmpty()) {
schedulePeriodicSync();
sink.success();
return;
}
// If seed addresses are specified in config - send initial sync to those nodes
LOGGER.debug("Making initial Sync to all seed members: {}", seedMembers);
//noinspection unchecked
Mono<Message>[] syncs =
seedMembers
.stream()
.map(
address -> {
String cid = cidGenerator.nextCid();
return transport
.requestResponse(prepareSyncDataMsg(SYNC, cid), address)
.filter(this::checkSyncGroup);
})
.toArray(Mono[]::new);
// Process initial SyncAck
Flux.mergeDelayError(syncs.length, syncs)
.take(1)
.timeout(Duration.ofMillis(config.getSyncTimeout()), scheduler)
.publishOn(scheduler)
.flatMap(message -> onSyncAck(message, true))
.doFinally(
s -> {
schedulePeriodicSync();
sink.success();
})
.subscribe(
null,
ex -> LOGGER.info("Exception on initial SyncAck, cause: {}", ex.toString()));
});
}
@Override
public void stop() {
// Stop accepting requests, events and sending sync
actionsDisposables.dispose();
// Cancel remove members tasks
for (String memberId : suspicionTimeoutTasks.keySet()) {
Disposable future = suspicionTimeoutTasks.get(memberId);
if (future != null && !future.isDisposed()) {
future.dispose();
}
}
suspicionTimeoutTasks.clear();
// Stop publishing events
sink.complete();
}
@Override
public Collection<Member> members() {
return new ArrayList<>(members.values());
}
@Override
public Collection<Member> otherMembers() {
return new ArrayList<>(members.values())
.stream()
.filter(member -> !member.equals(localMember))
.collect(Collectors.toList());
}
@Override
public Member member() {
return localMember;
}
@Override
public Optional<Member> member(String id) {
return Optional.ofNullable(members.get(id));
}
@Override
public Optional<Member> member(Address address) {
return new ArrayList<>(members.values())
.stream()
.filter(member -> member.address().equals(address))
.findFirst();
}
//......
}
FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT
)scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
private void onMessage(Message message) {
if (checkSyncGroup(message)) {
if (SYNC.equals(message.qualifier())) {
onSync(message).subscribe(null, this::onError);
}
if (SYNC_ACK.equals(message.qualifier())) {
if (message.correlationId() == null) { // filter out initial sync
onSyncAck(message, false).subscribe(null, this::onError);
}
}
}
}
private boolean checkSyncGroup(Message message) {
if (message.data() instanceof SyncData) {
SyncData syncData = message.data();
return config.getSyncGroup().equals(syncData.getSyncGroup());
}
return false;
}
/** Merges incoming SYNC data, merges it and sending back merged data with SYNC_ACK. */
private Mono<Void> onSync(Message syncMsg) {
return Mono.defer(
() -> {
LOGGER.debug("Received Sync: {}", syncMsg);
return syncMembership(syncMsg.data(), false)
.doOnSuccess(
avoid -> {
Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
Address address = syncMsg.sender();
transport
.send(address, message)
.subscribe(
null,
ex ->
LOGGER.debug(
"Failed to send SyncAck: {} to {}, cause: {}",
message,
address,
ex.toString()));
});
});
}
private Mono<Void> onSyncAck(Message syncAckMsg, boolean onStart) {
return Mono.defer(
() -> {
LOGGER.debug("Received SyncAck: {}", syncAckMsg);
return syncMembership(syncAckMsg.data(), onStart);
});
}
private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
return Mono.defer(
() -> {
MembershipUpdateReason reason =
onStart ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
return Mono.whenDelayError(
syncData
.getMembership()
.stream()
.filter(r1 -> !r1.equals(membershipTable.get(r1.id())))
.map(r1 -> updateMembership(r1, reason))
.toArray(Mono[]::new));
});
}
//......
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
/** Merges FD updates and processes them. */
private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
MembershipRecord r0 = membershipTable.get(fdEvent.member().id());
if (r0 == null) { // member already removed
return;
}
if (r0.status() == fdEvent.status()) { // status not changed
return;
}
LOGGER.debug("Received status change on failure detector event: {}", fdEvent);
if (fdEvent.status() == ALIVE) {
// TODO: Consider to make more elegant solution
// Alive won't override SUSPECT so issue instead extra sync with member to force it spread
// alive with inc + 1
Message syncMsg = prepareSyncDataMsg(SYNC, null);
Address address = fdEvent.member().address();
transport
.send(address, syncMsg)
.subscribe(
null,
ex ->
LOGGER.debug(
"Failed to send {} to {}, cause: {}", syncMsg, address, ex.toString()));
} else {
MembershipRecord record =
new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation());
updateMembership(record, MembershipUpdateReason.FAILURE_DETECTOR_EVENT)
.subscribe(null, this::onError);
}
}
//......
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
/** Merges received membership gossip (not spreading gossip further). */
private void onMembershipGossip(Message message) {
if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {
MembershipRecord record = message.data();
LOGGER.debug("Received membership gossip: {}", record);
updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP)
.subscribe(null, this::onError);
}
}
//......
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
/**
* Try to update membership table with the given record.
*
* @param r1 new membership record which compares with existing r0 record
* @param reason indicating the reason for updating membership table
*/
private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason reason) {
return Mono.defer(
() -> {
Objects.requireNonNull(r1, "Membership record can't be null");
// Get current record
MembershipRecord r0 = membershipTable.get(r1.id());
// Check if new record r1 overrides existing membership record r0
if (!r1.isOverrides(r0)) {
return Mono.empty();
}
// If received updated for local member then increase incarnation and spread Alive gossip
if (r1.member().id().equals(localMember.id())) {
int currentIncarnation = Math.max(r0.incarnation(), r1.incarnation());
MembershipRecord r2 =
new MembershipRecord(localMember, r0.status(), currentIncarnation + 1);
membershipTable.put(localMember.id(), r2);
LOGGER.debug(
"Local membership record r0: {}, but received r1: {}, "
+ "spread with increased incarnation r2: {}",
r0,
r1,
r2);
spreadMembershipGossip(r2)
.subscribe(
null,
ex -> {
// on-op
});
return Mono.empty();
}
// Update membership
if (r1.isDead()) {
membershipTable.remove(r1.id());
} else {
membershipTable.put(r1.id(), r1);
}
// Schedule/cancel suspicion timeout task
if (r1.isSuspect()) {
scheduleSuspicionTimeoutTask(r1);
} else {
cancelSuspicionTimeoutTask(r1.id());
}
// Emit membership and regardless of result spread gossip
return emitMembershipEvent(r0, r1)
.doFinally(
s -> {
// Spread gossip (unless already gossiped)
if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP
&& reason != MembershipUpdateReason.INITIAL_SYNC) {
spreadMembershipGossip(r1)
.subscribe(
null,
ex -> {
// no-op
});
}
});
});
}
private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
return Mono.defer(
() -> {
Message msg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build();
LOGGER.debug("Spead membreship: {} with gossip", msg);
return gossipProtocol
.spread(msg)
.doOnError(
ex ->
LOGGER.debug(
"Failed to spread membership: {} with gossip, cause: {}",
msg,
ex.toString()))
.then();
});
}
private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
long suspicionTimeout =
ClusterMath.suspicionTimeout(
config.getSuspicionMult(), membershipTable.size(), config.getPingInterval());
suspicionTimeoutTasks.computeIfAbsent(
record.id(),
id ->
scheduler.schedule(
() -> onSuspicionTimeout(id), suspicionTimeout, TimeUnit.MILLISECONDS));
}
private void onSuspicionTimeout(String memberId) {
suspicionTimeoutTasks.remove(memberId);
MembershipRecord record = membershipTable.get(memberId);
if (record != null) {
LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", record);
MembershipRecord deadRecord =
new MembershipRecord(record.member(), DEAD, record.incarnation());
updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT)
.subscribe(null, this::onError);
}
}
private void cancelSuspicionTimeoutTask(String memberId) {
Disposable future = suspicionTimeoutTasks.remove(memberId);
if (future != null && !future.isDisposed()) {
future.dispose();
}
}
private Mono<Void> emitMembershipEvent(MembershipRecord r0, MembershipRecord r1) {
return Mono.defer(
() -> {
final Member member = r1.member();
if (r1.isDead()) {
members.remove(member.id());
// removed
return Mono.fromRunnable(
() -> {
Map<String, String> metadata = metadataStore.removeMetadata(member);
sink.next(MembershipEvent.createRemoved(member, metadata));
});
}
if (r0 == null && r1.isAlive()) {
members.put(member.id(), member);
// added
return metadataStore
.fetchMetadata(member)
.doOnSuccess(
metadata -> {
metadataStore.updateMetadata(member, metadata);
sink.next(MembershipEvent.createAdded(member, metadata));
})
.onErrorResume(TimeoutException.class, e -> Mono.empty())
.then();
}
if (r0 != null && r0.incarnation() < r1.incarnation()) {
// updated
return metadataStore
.fetchMetadata(member)
.doOnSuccess(
metadata1 -> {
Map<String, String> metadata0 =
metadataStore.updateMetadata(member, metadata1);
sink.next(MembershipEvent.createUpdated(member, metadata0, metadata1));
})
.onErrorResume(TimeoutException.class, e -> Mono.empty())
.then();
}
return Mono.empty();
});
}
//......
}
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
private void schedulePeriodicSync() {
int syncInterval = config.getSyncInterval();
actionsDisposables.add(
scheduler.schedulePeriodically(
this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS));
}
private void doSync() {
Optional<Address> addressOptional = selectSyncAddress();
if (!addressOptional.isPresent()) {
return;
}
Address address = addressOptional.get();
Message message = prepareSyncDataMsg(SYNC, null);
LOGGER.debug("Send Sync: {} to {}", message, address);
transport
.send(address, message)
.subscribe(
null,
ex ->
LOGGER.debug(
"Failed to send Sync: {} to {}, cause: {}", message, address, ex.toString()));
}
private Optional<Address> selectSyncAddress() {
List<Address> addresses =
Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address))
.collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new));
Collections.shuffle(addresses);
if (addresses.isEmpty()) {
return Optional.empty();
} else {
int i = ThreadLocalRandom.current().nextInt(addresses.size());
return Optional.of(addresses.get(i));
}
}
private Message prepareSyncDataMsg(String qualifier, String cid) {
List<MembershipRecord> membershipRecords = new ArrayList<>(membershipTable.values());
SyncData syncData = new SyncData(membershipRecords, config.getSyncGroup());
return Message.withData(syncData)
.qualifier(qualifier)
.correlationId(cid)
.sender(localMember.address())
.build();
}
//......
}
FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT
)MembershipProtocolImpl的start方法会注册doSync任务(
每隔syncInterval执行
),该任务会发送SYNC消息给随机选择出来的member,来sync全量的membershipRecords;onMessage方法接收到SYNC消息时执行syncMembership并在成功时返回SYNC_ACK,接收到SYNC_ACK时也是执行syncMembership;onFailureDetectorEvent及onMembershipGossip方法都会触发updateMembership方法来更新membershipTable必要是进行spreadMembershipGossip
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。