kafkaserver1.png
/**
* Initialize this scheduler so it is ready to accept scheduling of tasks
*/
def startup()
/**
* Shutdown this scheduler. When this method is complete no more executions of background tasks will occur.
* This includes tasks scheduled with a delayed execution.
*/
def shutdown()
/**
* Check if the scheduler has been started
*/
def isStarted: Boolean
/**
* Schedule a task
* @param name The name of this task
* @param delay The amount of time to wait before the first execution
* @param period The period with which to execute the task. If < 0 the task will execute only once.
* @param unit The unit for the preceding times.
*/
def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
val IsrChangeNotificationPath = "/isr_change_notification"
val EntityConfigPath = "/config"
val EntityConfigChangesPath = "/config/changes"
def getAndMaybePut(key: K) = {
if (valueFactory.isEmpty)
throw new KafkaException("Empty value factory in pool.")
val curr = pool.get(key)
if (curr == null) {
createLock synchronized {
val curr = pool.get(key)
if (curr == null)
pool.put(key, valueFactory.get(key))
pool.get(key)
}
}
else
curr
}
trait Logging
供其他类继承,方便写日志;org.apache.log4j.Logger
的封装;/usr/bin/lockf
;java.nio.channels.FileLock
实现;java.util.concurrent.LinkedBlockingQueue
实现, 加上了对队列内已有元素size大小的check;def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean
def offer(e: E): Boolean
def put(e: E): Boolean
def poll(timeout: Long, unit: TimeUnit)
def poll()
def take(): E
...