本文主要研究一下artemis的ExpiryScanner
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {
//......
private ExpiryReaper expiryReaperRunnable;
//......
public synchronized void startExpiryScanner() {
if (expiryReaperPeriod > 0) {
if (expiryReaperRunnable != null)
expiryReaperRunnable.stop();
expiryReaperRunnable = new ExpiryReaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), expiryReaperPeriod, TimeUnit.MILLISECONDS, false);
expiryReaperRunnable.start();
}
}
public synchronized void stop() throws Exception {
started = false;
managementService.removeNotificationListener(this);
if (expiryReaperRunnable != null)
expiryReaperRunnable.stop();
if (addressQueueReaperRunnable != null)
addressQueueReaperRunnable.stop();
addressManager.clear();
queueInfos.clear();
}
//......
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
private final class ExpiryReaper extends ActiveMQScheduledComponent {
ExpiryReaper(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
}
@Override
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : getLocalQueues()) {
try {
queue.expireReferences();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
}
}
}
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
public class QueueImpl extends CriticalComponentImpl implements Queue {
//......
private final ExpiryScanner expiryScanner = new ExpiryScanner();
//......
public void expireReferences() {
if (isExpirationRedundant()) {
return;
}
if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
expiryScanner.scannerRunning.incrementAndGet();
getExecutor().execute(expiryScanner);
}
}
//......
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
class ExpiryScanner implements Runnable {
public AtomicInteger scannerRunning = new AtomicInteger(0);
@Override
public void run() {
boolean expired = false;
boolean hasElements = false;
int elementsExpired = 0;
LinkedList<MessageReference> expiredMessages = new LinkedList<>();
synchronized (QueueImpl.this) {
if (queueDestroyed) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
}
LinkedListIterator<MessageReference> iter = iterator();
try {
while (postOffice.isStarted() && iter.hasNext()) {
hasElements = true;
MessageReference ref = iter.next();
if (ref.getMessage().isExpired()) {
incDelivering(ref);
expired = true;
expiredMessages.add(ref);
iter.remove();
if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
logger.debug("Breaking loop of expiring");
scannerRunning.incrementAndGet();
getExecutor().execute(this);
break;
}
}
}
} finally {
try {
iter.close();
} catch (Throwable ignored) {
}
scannerRunning.decrementAndGet();
logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
}
}
if (!expiredMessages.isEmpty()) {
Transaction tx = new TransactionImpl(storageManager);
for (MessageReference ref : expiredMessages) {
if (tx == null) {
tx = new TransactionImpl(storageManager);
}
try {
expire(tx, ref);
refRemoved(ref);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
}
}
try {
tx.commit();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);
}
logger.debug("Expired " + elementsExpired + " references");
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well
if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) {
scheduleDepage(true);
}
}
}
1000
),若为true则提交到executor执行并跳出循环;之后遍历expiredMessages,挨个执行expire(tx, ref)以及refRemoved(ref),最后执行tx.commit()activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
public interface Message {
//......
default boolean isExpired() {
if (getExpiration() == 0) {
return false;
}
return System.currentTimeMillis() - getExpiration() >= 0;
}
//......
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
public class QueueImpl extends CriticalComponentImpl implements Queue {
//......
private void expire(final Transaction tx, final MessageReference ref) throws Exception {
SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
if (expiryAddress != null) {
Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);
if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
move(expiryAddress, tx, ref, true, true);
}
} else {
if (!printErrorExpiring) {
printErrorExpiring = true;
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
}
acknowledge(tx, ref, AckReason.EXPIRED, null);
}
if (server != null && server.hasBrokerMessagePlugins()) {
ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}
expiryLogger.addExpiry(address, ref);
}
}
//......
}
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
public class QueueImpl extends CriticalComponentImpl implements Queue {
//......
private void move(final SimpleString toAddress,
final Transaction tx,
final MessageReference ref,
final boolean expiry,
final boolean rejectDuplicate,
final long... queueIDs) throws Exception {
Message copyMessage = makeCopy(ref, expiry);
copyMessage.setAddress(toAddress);
if (queueIDs != null && queueIDs.length > 0) {
ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
for (long id : queueIDs) {
buffer.putLong(id);
}
copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
}
postOffice.route(copyMessage, tx, false, rejectDuplicate);
if (expiry) {
acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
acknowledge(tx, ref);
}
}
//......
}