kafka0.8生产者异常处理

本文简单解析一下kafka0.8.2.2版本中的java producer的异常处理。

概况

kafka的java producer的发送是异步,主要的分几步:

  • append到RecordAccumulator
  • sender从RecordAccumulator取出RecordBatch,交给client去发送
  • NetworkClient跟broker打交道,把RecordBatch发送出去

这里就涉及到了几个步骤的异常,append的时候,会抛异常,对于ApiException则放到callback里头去,其他异常直接抛出来(callback仅仅是跟RecordAccumulator打交道这一层)

sender中run方法直接捕获log出来。

具体跟network打交道的时候,请求失败(网络链接失败或是broker返回异常),则会根据重试次数重新入队。

append

kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/producer/KafkaProducer.java

public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
        try {
            // first make sure the metadata for the topic is available
            waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }
            ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
            int partition = partitioner.partition(serializedRecord, metadata.fetch());
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            ensureValidRecordSize(serializedSize);
            TopicPartition tp = new TopicPartition(record.topic(), partition);
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // Handling exceptions and record the errors;
            // For API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            throw new KafkaException(e);
        } catch (KafkaException e) {
            this.errors.record();
            throw e;
        }
    }

Handling exceptions and record the errors; For API exceptions return them in the future, for other exceptions throw directly. 对于

sender

kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/producer/internals/Sender.java

  • 线程的run方法 /** * The main run loop for the sender thread */ public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } this.client.close(); log.debug("Shutdown of Kafka producer I/O thread has completed."); }
  • run(long)方法 /** * Run a single iteration of sending * * @param now The current POSIX time in milliseconds */ public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); List<ClientRequest> requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now); for (ClientResponse response : responses) { if (response.wasDisconnected()) handleDisconnect(response, now); else handleResponse(response, now); } }

retries

/**
     * Complete or retry the given batch of records.
     * @param batch The record batch
     * @param error The error (or null if none)
     * @param baseOffset The base offset assigned to the records if successful
     * @param correlationId The correlation id for the request
     * @param now The current POSIX time stamp in milliseconds
     */
    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {
        if (error != Errors.NONE && canRetry(batch, error)) {
            // retry
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                     correlationId,
                     batch.topicPartition,
                     this.retries - batch.attempts - 1,
                     error);
            this.accumulator.reenqueue(batch, now);
            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
        } else {
            // tell the user the result of their request
            batch.done(baseOffset, error.exception());
            this.accumulator.deallocate(batch);
            if (error != Errors.NONE)
                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
        }
        if (error.exception() instanceof InvalidMetadataException)
            metadata.requestUpdate();
    }

对于有error的情况,将整个batch重新reenqueue handleDisconnect以及handleResponse都会调用这个方法

  • handleDisconnect private void handleDisconnect(ClientResponse response, long now) { log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination()); int correlation = response.request().request().header().correlationId(); @SuppressWarnings("unchecked") Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) response.request().attachment(); for (RecordBatch batch : responseBatches.values()) completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); }
  • handleResponse /** * Handle a produce response */ private void handleResponse(ClientResponse response, long now) { int correlationId = response.request().request().header().correlationId(); log.trace("Received produce response from node {} with correlation id {}", response.request().request().destination(), correlationId); @SuppressWarnings("unchecked") Map<TopicPartition, RecordBatch> batches = (Map<TopicPartition, RecordBatch>) response.request().attachment(); // if we have a response, parse it if (response.hasResponse()) { ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); ProduceResponse.PartitionResponse partResp = entry.getValue(); Errors error = Errors.forCode(partResp.errorCode); RecordBatch batch = batches.get(tp); completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NONE, -1L, correlationId, now); } }

NetworkClient.poll

kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/NetworkClient.java

public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) {
        List<NetworkSend> sends = new ArrayList<NetworkSend>();

        for (int i = 0; i < requests.size(); i++) {
            ClientRequest request = requests.get(i);
            int nodeId = request.request().destination();
            if (!isSendable(nodeId))
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");

            this.inFlightRequests.add(request);
            sends.add(request.request());
        }

        // should we update our metadata?
        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
        long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
        // if there is no node available to connect, back off refreshing metadata
        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
        if (!this.metadataFetchInProgress && metadataTimeout == 0)
            maybeUpdateMetadata(sends, now);

        // do the I/O
        try {
            this.selector.poll(Math.min(timeout, metadataTimeout), sends);
        } catch (IOException e) {
            log.error("Unexpected error during I/O in producer network thread", e);
        }

        List<ClientResponse> responses = new ArrayList<ClientResponse>();
        handleCompletedSends(responses, now);
        handleCompletedReceives(responses, now);
        handleDisconnections(responses, now);
        handleConnections();

        return responses;
    }

doc

  • kafka消息可靠性(非常好)
  • Kafka无消息丢失配置
  • Kafka 海滩拾贝(case study)
  • kafka数据可靠性深度解读(可靠性分析透彻)
  • 某互联网大厂kafka最佳实践
  • kafka producer原理梳理

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2017-10-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏androidBlog

AsyncTask 使用及封装实践

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/details/...

19210
来自专栏算法修养

FZU 2098 刻苦的小芳(卡特兰数,动态规划)

Problem 2098 刻苦的小芳 Accept: 42 Submit: 70 Time Limit: 1000 mSec Memory ...

32760
来自专栏wOw的Android小站

[设计模式]之十四:备忘录模式

在不破坏封装性的前提下,捕获一个对象的内部状态,并在该状态之外保存这个状态。这样以后就可将该对象恢复到原先保存的状态。

7610
来自专栏Seebug漏洞平台

Pwnhub之奇妙的巨蟒 Writeup

本周的Pwnhub延迟到了周一,所以周一中午就看了下这题,是一道Python 的pyc逆向题,思路到挺简单的,但是很花精力

610100
来自专栏码匠的流水账

聊聊storm的LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer....

14830
来自专栏程序员的SOD蜜

使用泛型委托,构筑最快的通用属性访问器

最近做一个父类的属性向子类的属性赋值的小程序,用了下AutoMapper组件,感觉不错,想探究下它的原理,自己动手做一个例子试试看。实现这个功能,第一反应使用反...

25390
来自专栏ShaoYL

OC 实现一个TODO宏

414120
来自专栏码匠的流水账

聊聊storm的JoinBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

16640
来自专栏码匠的流水账

FluxInterval实例及解析

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxInterval.java

18610
来自专栏算法修养

CodeForces 157A Game Outcome

A. Game Outcome time limit per test 2 seconds memory limit per test 256 me...

35170

扫码关注云+社区

领取腾讯云代金券