前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的reportError

聊聊storm的reportError

原创
作者头像
code4it
发布2018-10-23 16:27:11
8240
发布2018-10-23 16:27:11
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下storm的reportError

IErrorReporter

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

代码语言:javascript
复制
public interface IErrorReporter {
    void reportError(Throwable error);
}
  • ISpoutOutputCollector、IOutputCollector、IBasicOutputCollector接口均继承了IErrorReporter接口

ISpoutOutputCollector

storm-core/1.2.2/storm-core-1.2.2-sources.jar!/org/apache/storm/spout/ISpoutOutputCollector.java

代码语言:javascript
复制
public interface ISpoutOutputCollector extends IErrorReporter{
    /**
        Returns the task ids that received the tuples.
    */
    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
    long getPendingCount();
}
  • ISpoutOutputCollector的实现类有SpoutOutputCollector、SpoutOutputCollectorImpl等

IOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java

代码语言:javascript
复制
public interface IOutputCollector extends IErrorReporter {
    /**
     * Returns the task ids that received the tuples.
     */
    List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void ack(Tuple input);

    void fail(Tuple input);

    void resetTimeout(Tuple input);

    void flush();
}
  • IOutputCollector的实现类有OutputCollector、BoltOutputCollectorImpl等

IBasicOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java

代码语言:javascript
复制
public interface IBasicOutputCollector extends IErrorReporter {
    List<Integer> emit(String streamId, List<Object> tuple);

    void emitDirect(int taskId, String streamId, List<Object> tuple);

    void resetTimeout(Tuple tuple);
}
  • IBasicOutputCollector的实现类有BasicOutputCollector

reportError

SpoutOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

代码语言:javascript
复制
    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }

BoltOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

代码语言:javascript
复制
    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }

可以看到SpoutOutputCollectorImpl及BoltOutputCollectorImpl的reportError方法,均调用了executor.getReportError().report(error);

ReportError.report

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java

代码语言:javascript
复制
public class ReportError implements IReportError {

    private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);

    private final Map<String, Object> topoConf;
    private final IStormClusterState stormClusterState;
    private final String stormId;
    private final String componentId;
    private final WorkerTopologyContext workerTopologyContext;

    private int maxPerInterval;
    private int errorIntervalSecs;
    private AtomicInteger intervalStartTime;
    private AtomicInteger intervalErrors;

    public ReportError(Map<String, Object> topoConf, IStormClusterState stormClusterState, String stormId, String componentId,
                       WorkerTopologyContext workerTopologyContext) {
        this.topoConf = topoConf;
        this.stormClusterState = stormClusterState;
        this.stormId = stormId;
        this.componentId = componentId;
        this.workerTopologyContext = workerTopologyContext;
        this.errorIntervalSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
        this.maxPerInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
        this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
        this.intervalErrors = new AtomicInteger(0);
    }

    @Override
    public void report(Throwable error) {
        LOG.error("Error", error);
        if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
            intervalErrors.set(0);
            intervalStartTime.set(Time.currentTimeSecs());
        }
        if (intervalErrors.incrementAndGet() <= maxPerInterval) {
            try {
                stormClusterState.reportError(stormId, componentId, Utils.hostname(),
                                              workerTopologyContext.getThisWorkerPort().longValue(), error);
            } catch (UnknownHostException e) {
                throw Utils.wrapInRuntime(e);
            }

        }
    }
}
  • 可以看到这里先判断interval是否需要重置,然后再判断error是否超过interval的最大次数,没有超过的话,则调用stormClusterState.reportError写入到存储,比如zk

StormClusterStateImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

代码语言:javascript
复制
    @Override
    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
        String path = ClusterUtils.errorPath(stormId, componentId);
        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
        errorInfo.set_host(node);
        errorInfo.set_port(port.intValue());
        byte[] serData = Utils.serialize(errorInfo);
        stateStorage.mkdirs(path, defaultAcls);
        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls);
        stateStorage.set_data(lastErrorPath, serData, defaultAcls);
        List<String> childrens = stateStorage.get_children(path, false);

        Collections.sort(childrens, new Comparator<String>() {
            public int compare(String arg0, String arg1) {
                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
            }
        });

        while (childrens.size() > 10) {
            String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0);
            try {
                stateStorage.delete_node(znodePath);
            } catch (Exception e) {
                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                    // if the node is already deleted, do nothing
                    LOG.warn("Could not find the znode: {}", znodePath);
                } else {
                    throw e;
                }
            }
        }
    }
  • 这里使用ClusterUtils.errorPath(stormId, componentId)获取写入的目录,再通过ClusterUtils.lastErrorPath(stormId, componentId)获取写入的路径
  • 由于zk不适合存储大量数据,因而这里会判断如果childrens超过10的时候,会删除多余的节点,这里先按照节点名substring(1)升序排序,然后挨个删除

ClusterUtils.errorPath

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

代码语言:javascript
复制
    public static final String ZK_SEPERATOR = "/";

    public static final String ERRORS_ROOT = "errors";

    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;

    public static String errorPath(String stormId, String componentId) {
        try {
            return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static String lastErrorPath(String stormId, String componentId) {
        return errorPath(stormId, componentId) + "-last-error";
    }

    public static String errorStormRoot(String stormId) {
        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
    }
  • errorPath的路径为/errors/{stormId}/{componentId},该目录下创建了以e开头的EPHEMERAL_SEQUENTIAL节点,error信息首先追加到该目录下,然后再判断如果超过10个则删除旧的节点
  • lastErrorPath的路径为/errors/{stormId}/{componentId}-last-error,用于存储该componentId的最后一个error

zkCli查看

代码语言:javascript
复制
[zk: localhost:2181(CONNECTED) 21] ls /storm/errors
[DRPCStateQuery-1-1540185943, reportErrorDemo-1-1540260375]
[zk: localhost:2181(CONNECTED) 22] ls /storm/errors/reportErrorDemo-1-1540260375
[print, print-last-error]
[zk: localhost:2181(CONNECTED) 23] ls /storm/errors/reportErrorDemo-1-1540260375/print
[e0000000291, e0000000290, e0000000295, e0000000294, e0000000293, e0000000292, e0000000299, e0000000298, e0000000297, e0000000296]
[zk: localhost:2181(CONNECTED) 24] ls /storm/errors/reportErrorDemo-1-1540260375/print/e0000000299
[]
[zk: localhost:2181(CONNECTED) 25] ls /storm/errors/reportErrorDemo-1-1540260375/print-last-error
[]

storm-ui

代码语言:javascript
复制
curl -i http://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375?sys=false
  • storm-ui请求了如上的接口,获取了topology相关的数据,其中spout或bolt中包括了lastError,展示了最近一个的error信息

StormApiResource

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java

代码语言:javascript
复制
    /**
     * /api/v1/topology -> topo.
     */
    @GET
    @Path("/topology/{id}")
    @AuthNimbusOp(value = "getTopology", needsTopoId = true)
    @Produces("application/json")
    public Response getTopology(@PathParam("id") String id,
                                @DefaultValue(":all-time") @QueryParam("window") String window,
                                @QueryParam("sys") boolean sys,
                                @QueryParam(callbackParameterName) String callback) throws TException {
        topologyPageRequestMeter.mark();
        try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(config)) {
            return UIHelpers.makeStandardResponse(
                    UIHelpers.getTopologySummary(
                            nimbusClient.getClient().getTopologyPageInfo(id, window, sys),
                            window, config,
                            servletRequest.getRemoteUser()
                    ),
                    callback
            );
        }
    }
  • 这里调用了nimbusClient.getClient().getTopologyPageInfo(id, window, sys)方法

Nimbus.getTopologyPageInfo

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

代码语言:javascript
复制
    @Override
    public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys)
        throws NotAliveException, AuthorizationException, TException {
        try {
            getTopologyPageInfoCalls.mark();
            CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyPageInfo");
            String topoName = common.topoName;
            IStormClusterState state = stormClusterState;
            int launchTimeSecs = common.launchTimeSecs;
            Assignment assignment = common.assignment;
            Map<List<Integer>, Map<String, Object>> beats = common.beats;
            Map<Integer, String> taskToComp = common.taskToComponent;
            StormTopology topology = common.topology;
            Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
            StormBase base = common.base;
            if (base == null) {
                throw new WrappedNotAliveException(topoId);
            }
            Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
            List<WorkerSummary> workerSummaries = null;
            Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
            if (assignment != null) {
                Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
                Map<String, String> nodeToHost = assignment.get_node_host();
                for (Entry<List<Long>, NodeInfo> entry : execToNodeInfo.entrySet()) {
                    NodeInfo ni = entry.getValue();
                    List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
                    exec2NodePort.put(entry.getKey(), nodePort);
                }

                workerSummaries = StatsUtil.aggWorkerStats(topoId,
                                                           topoName,
                                                           taskToComp,
                                                           beats,
                                                           exec2NodePort,
                                                           nodeToHost,
                                                           workerToResources,
                                                           includeSys,
                                                           true); //this is the topology page, so we know the user is authorized
            }

            TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
                                                                        exec2NodePort,
                                                                        taskToComp,
                                                                        beats,
                                                                        topology,
                                                                        window,
                                                                        includeSys,
                                                                        state);

            //......
            return topoPageInfo;
        } catch (Exception e) {
            LOG.warn("Get topo page info exception. (topology id='{}')", topoId, e);
            if (e instanceof TException) {
                throw (TException) e;
            }
            throw new RuntimeException(e);
        }
    }
  • 这里调用了StatsUtil.aggTopoExecsStats来获取TopologyPageInfo

StatsUtil.aggTopoExecsStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

代码语言:javascript
复制
    /**
     * aggregate topo executors stats.
     *
     * @param topologyId     topology id
     * @param exec2nodePort  executor -> host+port
     * @param task2component task -> component
     * @param beats          executor[start, end] -> executor heartbeat
     * @param topology       storm topology
     * @param window         the window to be aggregated
     * @param includeSys     whether to include system streams
     * @param clusterState   cluster state
     * @return TopologyPageInfo thrift structure
     */
    public static TopologyPageInfo aggTopoExecsStats(
        String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats,
        StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) {
        List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology);
        Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, beatList);
        return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState);
    }
  • StatsUtil.aggTopoExecsStats方法最后调用了postAggregateTopoStats方法

StatsUtil.postAggregateTopoStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

代码语言:javascript
复制
    private static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData,
                                                          String topologyId, IStormClusterState clusterState) {
        TopologyPageInfo ret = new TopologyPageInfo(topologyId);

        ret.set_num_tasks(task2comp.size());
        ret.set_num_workers(((Set) accData.get(WORKERS_SET)).size());
        ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0);

        Map bolt2stats = ClientStatsUtil.getMapByKey(accData, BOLT_TO_STATS);
        Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>();
        for (Object o : bolt2stats.entrySet()) {
            Map.Entry e = (Map.Entry) o;
            Map m = (Map) e.getValue();
            long executed = getByKeyOr0(m, EXECUTED).longValue();
            if (executed > 0) {
                double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue();
                m.put(EXEC_LATENCY, execLatencyTotal / executed);

                double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue();
                m.put(PROC_LATENCY, procLatencyTotal / executed);
            }
            m.remove(EXEC_LAT_TOTAL);
            m.remove(PROC_LAT_TOTAL);
            String id = (String) e.getKey();
            m.put("last-error", getLastError(clusterState, topologyId, id));

            aggBolt2stats.put(id, thriftifyBoltAggStats(m));
        }

        //......

        return ret;
    }

    private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) {
        return stormClusterState.lastError(stormId, compId);
    }
  • 这里有添加last-error,通过getLastError调用,之后再通过thriftifyBoltAggStats转化到thrift对象
  • 这里调用了stormClusterState.lastError(stormId, compId)获取last-error

UIHelpers.getTopologySummary

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

代码语言:javascript
复制
    /**
     * getTopologySummary.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @param remoteUser remoteUser
     * @return getTopologySummary
     */
    public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo,
                                                         String window, Map<String, Object> config, String remoteUser) {
        Map<String, Object> result = new HashMap();
        Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf());
        long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
        Map<String, Object> unpackedTopologyPageInfo =
                unpackTopologyInfo(topologyPageInfo, window, config);
        result.putAll(unpackedTopologyPageInfo);
        result.put("user", remoteUser);
        result.put("window", window);
        result.put("windowHint", getWindowHint(window));
        result.put("msgTimeout", messageTimeout);
        result.put("configuration", topologyConf);
        result.put("visualizationTable", new ArrayList());
        result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
        return result;
    }
  • 获取到TopologyPageInfo之后,UIHelpers.getTopologySummary对其进行unpackTopologyInfo

UIHelpers.unpackTopologyInfo

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

代码语言:javascript
复制
    /**
     * unpackTopologyInfo.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @return unpackTopologyInfo
     */
    private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) {
        Map<String, Object> result = new HashMap();
        result.put("id", topologyPageInfo.get_id());
        //......

        Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats();
        List<Map> spoutStats = new ArrayList();

        for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) {
            spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey()));
        }
        result.put("spouts", spoutStats);

        Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats();
        List<Map> boltStats = new ArrayList();

        for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) {
            boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey()));
        }
        result.put("bolts", boltStats);

        //......
        result.put("samplingPct", samplingPct);
        result.put("replicationCount", topologyPageInfo.get_replication_count());
        result.put("topologyVersion", topologyPageInfo.get_topology_version());
        result.put("stormVersion", topologyPageInfo.get_storm_version());
        return result;
    }

    /**
     * getTopologySpoutAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param spoutId spoutId
     * @return getTopologySpoutAggStatsMap
     */
    private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                   String spoutId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("spoutId", spoutId);
        result.put("encodedSpoutId", URLEncoder.encode(spoutId));
        SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout();
        result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms());
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTopologyBoltAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param boltId boltId
     * @return getTopologyBoltAggStatsMap
     */
    private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                  String boltId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("boltId", boltId);
        result.put("encodedBoltId", URLEncoder.encode(boltId));
        BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt();
        result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity()));
        result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
        result.put("executed", boltAggregateStats.get_executed());
        result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTruncatedErrorString.
     * @param errorString errorString
     * @return getTruncatedErrorString
     */
    private static String getTruncatedErrorString(String errorString) {
        return errorString.substring(0, Math.min(errorString.length(), 200));
    }
  • 注意这里对spout调用了getTopologySpoutAggStatsMap,对bolt调用了getTopologyBoltAggStatsMap
  • 这两个方法对lastError都进行了getTruncatedErrorString处理,最大只substring(0,200)

crash log

代码语言:javascript
复制
2018-10-23 02:53:28.118 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
2018-10-23 02:53:28.129 o.a.s.d.executor Thread-10-print-executor[7 7] [ERROR]
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
2018-10-23 02:53:28.175 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__11404$fn__11405.invoke(worker.clj:792) [storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__10612$fn__10613.invoke(executor.clj:281) [storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-10-23 02:53:28.176 o.a.s.d.worker Thread-41 [INFO] Shutting down worker reportErrorDemo-2-1540263136 f9856902-cfe9-45c7-b675-93a29d3d3d36 6700
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Terminating messaging context
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Shutting down executors
2018-10-23 02:53:28.177 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[8 8]
2018-10-23 02:53:28.182 o.a.s.util Thread-3-disruptor-executor[8 8]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.186 o.a.s.util Thread-4-spout-executor[8 8] [INFO] Async loop interrupted!
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[8 8]
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[12 12]
2018-10-23 02:53:28.189 o.a.s.util Thread-5-disruptor-executor[12 12]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.util Thread-6-spout-executor[12 12] [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[12 12]
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shutting down executor count:[2 2]
2018-10-23 02:53:28.191 o.a.s.util Thread-7-disruptor-executor[2 2]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.193 o.a.s.util Thread-8-count-executor[2 2] [INFO] Async loop interrupted!
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shut down executor count:[2 2]
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shutting down executor print:[7 7]
2018-10-23 02:53:28.196 o.a.s.util Thread-9-disruptor-executor[7 7]-send-queue [INFO] Async loop interrupted!

小结

  • spout或bolt的方法里头如果抛出异常会导致整个worker die掉,同时也会自动记录异常到zk但是代价就是worker die掉不断被重启
  • reportError可以通过try catch结合使用,使得有异常之后,worker不会die掉,同时也把error信息记录起来;不过一个topology的同一个component也只记录最近10个异常,采用的是EPHEMERAL_SEQUENTIAL节点来保存,随着worker的die而销毁;lastError采用的是PERSISTENT节点。两者在topology被kill的时候相关信息都会被删掉。
  • storm-ui展示了每个component的lastError信息,展示的时候错误信息的长度最大为200

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • IErrorReporter
    • ISpoutOutputCollector
      • IOutputCollector
        • IBasicOutputCollector
        • reportError
          • SpoutOutputCollectorImpl.reportError
            • BoltOutputCollectorImpl.reportError
            • ReportError.report
              • StormClusterStateImpl.reportError
                • ClusterUtils.errorPath
                  • zkCli查看
                  • storm-ui
                    • StormApiResource
                      • Nimbus.getTopologyPageInfo
                        • StatsUtil.aggTopoExecsStats
                          • StatsUtil.postAggregateTopoStats
                            • UIHelpers.getTopologySummary
                              • UIHelpers.unpackTopologyInfo
                              • crash log
                              • 小结
                              • doc
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档