本文主要研究一下storm的tickTuple
public class TickWordCountBolt extends BaseBasicBolt {
private static final Logger LOGGER = LoggerFactory.getLogger(TickWordCountBolt.class);
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
if(TupleUtils.isTick(input)){
//execute tick logic
LOGGER.info("execute tick tuple, emit and clear counts");
counts.entrySet().stream()
.forEach(entry -> collector.emit(new Values(entry.getKey(), entry.getValue())));
counts.clear();
}else{
String word = input.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
@Test
public void testTickTuple() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
//并发度10
builder.setSpout("spout", new TestWordSpout(), 10);
builder.setBolt("count", new TickWordCountBolt(), 5)
// .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3)
.fieldsGrouping("spout", new Fields("word"));
builder.setBolt("print", new PrintBolt(), 1)
.shuffleGrouping("count");
SubmitHelper.submitRemote("tickDemo",builder);
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java
public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
/**
* Define a new bolt in this topology with the specified amount of parallelism.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt the bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if (parallelism != null) {
int dop = parallelism.intValue();
if (dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop);
}
Map<String, Object> conf = component.getComponentConfiguration();
if (conf != null) {
common.set_json_conf(JSONValue.toJSONString(conf));
}
commons.put(id, common);
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
//......
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
public abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer> implements ComponentConfigurationDeclarer<T> {
@Override
public T addConfiguration(String config, Object value) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(config, value);
return addConfigurations(configMap);
}
//......
}
protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
String id;
public ConfigGetter(String id) {
this.id = id;
}
@SuppressWarnings("unchecked")
@Override
public T addConfigurations(Map<String, Object> conf) {
if (conf != null) {
if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
}
if (!conf.isEmpty()) {
String currConf = commons.get(id).get_json_conf();
commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
}
}
return (T) this;
}
//......
}
private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) {
Map<String, Object> res = new HashMap<>(into);
res.putAll(newMap);
return JSONValue.toJSONString(res);
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
private Map<String, Object> normalizedComponentConf(
Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
List<String> keysToRemove = retrieveAllConfigKeys();
keysToRemove.remove(Config.TOPOLOGY_DEBUG);
keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
Map<String, Object> componentConf;
String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
if (specJsonConf != null) {
try {
componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf);
} catch (ParseException e) {
throw new RuntimeException(e);
}
for (Object p : keysToRemove) {
componentConf.remove(p);
}
} else {
componentConf = new HashMap<>();
}
Map<String, Object> ret = new HashMap<>();
ret.putAll(topoConf);
ret.putAll(componentConf);
return ret;
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
protected void setupTicks(boolean isSpout) {
final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
if (tickTimeSecs != null) {
boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
|| (!enableMessageTimeout && isSpout)) {
LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);
} else {
StormTimer timerTask = workerData.getUserTimer();
timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
() -> {
TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
Constants.SYSTEM_COMPONENT_ID,
(int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_TICK_STREAM_ID);
AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
try {
receiveQueue.publish(tickTuple);
receiveQueue.flush(); // avoid buffering
} catch (InterruptedException e) {
LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag.");
Thread.currentThread().interrupt();
return;
}
}
);
}
}
}
__system
),taskId设置为Constants.SYSTEM_TASK_ID(-1
),streamId设置为Constants.SYSTEM_TICK_STREAM_ID(__tick
)receiveQueue
).publish(tickTuple) private final DirectInserter directInserter = new DirectInserter(this);
/**
* Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
*/
public void publish(Object obj) throws InterruptedException {
Inserter inserter = getInserter();
inserter.publish(obj);
}
private Inserter getInserter() {
Inserter inserter;
if (producerBatchSz > 1) {
inserter = thdLocalBatcher.get();
if (inserter == null) {
BatchInserter b = new BatchInserter(this, producerBatchSz);
inserter = b;
thdLocalBatcher.set(b);
}
} else {
inserter = directInserter;
}
return inserter;
}
private static class DirectInserter implements Inserter {
private JCQueue q;
public DirectInserter(JCQueue q) {
this.q = q;
}
/**
* Blocking call, that can be interrupted via Thread.interrupt
*/
@Override
public void publish(Object obj) throws InterruptedException {
boolean inserted = q.tryPublishInternal(obj);
int idleCount = 0;
while (!inserted) {
q.metrics.notifyInsertFailure();
if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName());
}
idleCount = q.backPressureWaitStrategy.idle(idleCount);
if (Thread.interrupted()) {
throw new InterruptedException();
}
inserted = q.tryPublishInternal(obj);
}
}
//......
}
// Non Blocking. returns true/false indicating success/failure. Fails if full.
private boolean tryPublishInternal(Object obj) {
if (recvQueue.offer(obj)) {
metrics.notifyArrivals(1);
return true;
}
return false;
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
/**
* Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
* elements consumed from Q
*/
public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
try {
return consumeImpl(consumer, exitCond);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
*
* @param consumer
* @param exitCond
*/
private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
int drainCount = 0;
while (exitCond.keepRunning()) {
Object tuple = recvQueue.poll();
if (tuple == null) {
break;
}
consumer.accept(tuple);
++drainCount;
}
int overflowDrainCount = 0;
int limit = overflowQ.size();
while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
Object tuple = overflowQ.poll();
++overflowDrainCount;
consumer.accept(tuple);
}
int total = drainCount + overflowDrainCount;
if (total > 0) {
consumer.flush();
}
return total;
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。