Production and Consumption

Last updated: 2024-10-14 16:30:01

This document describes the best practices of message production and consumption in CKafka to help you reduce errors in message consumption.

Producing a message

Recommendations for topic use

Configuration requirements: It is recommended to use 3 replicas and in-sync replication. The minimum number of in-sync replicas should be 2, and the number of in-sync replicas cannot be equal to the number of topic replicas; otherwise, the failure of 1 replica will lead to the inability to produce messages.
Creation method: You have the option to enable or disable the CKafka auto-create topic feature. If enabled, it signifies that when producing or consuming an uncreated topic, a topic with 3 partitions and 2 replicas will be automatically created.

Failure retries

In a distributed environment, due to network or other issues, messages may occasionally fail to be sent. This is probably because the message actually has been sent successfully but the ACK mechanism failed, or because the message indeed has not been sent successfully.
You can set the following retry parameters based on your business needs:
Category
Note
retries
The default retry count is 3. For applications with zero tolerance for data loss, consider setting it to Integer.MAX_VALUE (the largest and valid value).
retry.backoff.ms
The recommended retry interval is set to 1000.
By doing so, you will be able to deal with the issue where the broker's leader partition can't respond to producer requests immediately.

Asynchronous sending

The sending API is async. If you want to receive the result of sending, you can call metadataFuture.get(timeout, TimeUnit.MILLISECONDS).

One producer corresponding to one app

The Producer is thread-safe and can send messages to any topic. Generally, it is recommended to have one Producer per application.

Acks

Kafka's ACK mechanism refers to the producer's mechanism for acknowledgment of message sending. It is set to Acks in Kafka 0.10.x or request.required.acks on version 0.8.x. The setting of Acks will directly affect the throughput of the Kafka cluster and the reliability of messages.
The Acks parameter is described as follows:
Category
Note
acks=0
No server response is required. This method offers higher performance but carries a greater risk of data loss.
acks=1
The server's primary node returns a response upon successful write. This approach has moderate performance, moderate risk of data loss, and the possibility of data loss if the primary node fails.
acks=all
The server-side primary node will return a response only after a successful write and successful synchronization with the nodes in the ISR. This approach provides relatively lower performance but higher data security. Data loss will only occur if both the primary and standby nodes fail.
Generally, it is recommended to select acks=1. For critical services, you can set acks=all.

Batch

In general, CKafka's topics have multiple partitions. When the Producer client sends messages to the server, it needs to confirm which partition of which topic to send to. When sending multiple messages to the same partition, the Producer client will package the related messages into a batch and send them to the server in bulk. There is an additional overhead when the Producer client processes batches. Typically, small batches lead to a large number of requests from the Producer client, causing request queues to form on both the client and server sides, increasing the CPU usage of related machines, and ultimately increasing the overall message sending and consumption latency. An appropriate batch size can reduce the number of requests from the client to the server when sending messages, thereby improving the overall throughput and latency of message sending.
The batch parameters are described as follows:
Category
Note
batch.size
The message buffer size for each partition (the sum of the byte size of the message content, not the number of messages). When the buffer size reaches the set value, a network request is triggered, and the Producer client sends the messages to the server in batches.
linger.ms
The maximum time each message can remain in the cache. If this time is exceeded, the Producer client will disregard the batch.size limit and immediately send the message to the server.
buffer.memory
When the total size of all cached messages exceeds this value, messages will be sent to the server, ignoring the restrictions of batch.size and linger.ms. The default value of buffer.memory is 32MB, which ensures sufficient performance for a single Producer.
Note
If you launch multiple Producers in the same JVM, each Producer could potentially occupy 32MB of cache space, which may trigger an Out of Memory (OOM) situation. In this case, you need to consider the size of buffer.memory to avoid triggering OOM.
You can adjust the values of the parameters based on your actual business needs.

Key and value

Each message in CKafka has two fields: key (message identifier) and value (message content).
For easier tracking, please set a unique key for each message. You can track a message by its key, print send logs and consumption logs, and understand the production and consumption status of the message.
If you want to send a large number of messages, we recommend you use the sticky partitioning strategy instead of setting keys.

Sticky partitioning

Only messages sent to the same partition will be placed in the same batch. Therefore, one factor determining how a batch is formed is the partitioning strategy set by the Kafka Producer in the message queue. Kafka Producer allows you to choose a partition suitable for your business by setting the implementation class of the Partitioner. When a key is specified for a message, the default strategy of Kafka Producer is to hash the message's key and select a partition based on the hash result, ensuring that messages with the same key are sent to the same partition.
When a message does not have a specified key, the default strategy for Kafka versions prior to 2.4 is to cycle through all partitions of the topic and send messages to each partition in a round-robin manner. However, this default strategy results in poor batching effects, potentially generating a large number of small batches and increasing the actual latency. In light of the low partitioning efficiency of this default strategy for messages without keys, Kafka introduced the Sticky Partitioning Strategy in version 2.4.
The sticky partition strategy primarily addresses the issue of keyless messages being dispersed across different partitions, resulting in small batches. The main approach is to randomly select another partition once a batch in a partition is completed, and then subsequent messages will use that partition as much as possible. In the short term, this strategy sends messages to the same partition; however, over an extended period, messages will still be evenly distributed across all partitions. This helps avoid partition skew while reducing latency and improving overall service performance.
If you are using a Kafka Producer client version 2.4 or above, the default partitioning strategy is the sticky partitioning strategy. If your Producer client version is below 2.4, you can implement your own partitioning strategy based on the principles of the sticky partitioning strategy and then set the specified partitioning strategy using the partitioner.class parameter.
For the implementation of the sticky partition strategy, you can refer to the following Java code implementation. The main logic of this code is to switch partitions at a certain time interval.
public class MyStickyPartitioner implements Partitioner {

// Record the last partition switch time.
private long lastPartitionChangeTimeMillis = 0L;
// Record the current partition.
private int currentPartition = -1;
// Partition switching interval: You can choose the time interval for switching partitions based on your actual business needs.
private long partitionChangeTimeGap = 100L;

public void configure(Map<String, ?> configs) {}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// Retrieve all partition information.
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();

// Determine the current availability zone.
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// For messages with a key, select the partition based on the hash value of the key.
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();

// If the partition switch interval is exceeded, switch to the next partition; otherwise, continue with the previous partition.
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}

public void close() {}

}

Partition order

Within a single partition, messages are stored in the order they are sent, maintaining basic sequentiality. Each topic has several partitions, and if messages are allocated to different partitions, the order cannot be guaranteed between different partitions.
If you want messages to be consumed in the sending order, you can specify keys for such messages on the producer. If such messages are sent with the same key, CKafka will select a partition for their storage based on the hash of the key. As a partition can be listened on and consumed by only one consumer, messages will be consumed in the sending order.

Message consumption

Basic message consumption process

1. Poll the data.
2. Execute the consumption logic.
3. Poll the data again.

Cloud Load Balancer

Each Consumer Group can contain multiple Consumers, and the parameter group.id should be set to the same value. Consumers belonging to the same Consumer Group are responsible for consuming the subscribed Topic.
For example, if Consumer Group A subscribes to Topic A and starts three consumption instances C1, C2, and C3, each message sent to Topic A will ultimately be delivered to only one of C1, C2, or C3. By default, CKafka evenly distributes messages to each consumption instance to achieve consumption load balancing.
The internal principle of CKafka load balancing is to evenly distribute the subscribed topic's partitions among all consumers. Therefore, the number of consumers should not exceed the number of partitions; otherwise, some consumer instances will not be allocated any partitions and will be idle. In addition to the first startup, subsequent changes in consumer instances, such as restarts, additions, or reductions, will trigger a load balancing process.

Subscription

We recommend that all consumer instances in the same consumer group subscribe to the same topic so as to facilitate troubleshooting.
Consumer Group subscribing to multiple topics.
A Consumer Group can subscribe to multiple topics, and the messages from these topics are evenly consumed by the consumers within the Consumer Group. For example, if Consumer Group A subscribes to Topic A, Topic B, and Topic C, the messages from these three topics are evenly consumed by the consumers within the Consumer Group.
Below is the sample code to make a consumer group subscribe to multiple topics:
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);
Topic subscribed by multiple Consumer Groups.
A topic can be subscribed to by multiple consumer groups, and each consumer group independently consumes all messages under the topic. For example, if Consumer Group A subscribes to Topic A and Consumer Group B also subscribes to Topic A, each message sent to Topic A will be delivered not only to the consumption instances of Consumer Group A but also to those of Consumer Group B. These two processes are independent and do not affect each other.

One consumer group corresponding to one application

It is recommended that one Consumer Group corresponds to one application, meaning different applications should have different code. If you need to write different code within the same application, please prepare multiple distinct kafka.properties files, such as kafka1.properties and kafka2.properties.

Consumer offset

Each topic has multiple partitions, and each partition counts the total number of current messages, which is called MaxOffset.
CKafka consumers consume messages in partitions in sequence and record the number of consumed messages, which is called ConsumerOffset.
Number of remaining messages (aka the "number of retained messages") = MaxOffset - ConsumerOffset.
Offset submission
There are two parameters related to CKafka consumers:
enable.auto.commit: The default value is true.
auto.commit.interval.ms: The default value is 1000, i.e., 1s.
As a result of the combination of the two parameters, before the client polls the data, it will always check the time of the last committed offset first, and if the time defined by the auto.commit.interval.ms parameter has elapsed, the client will start an offset commit.
Therefore, if enable.auto.commit is set to true, it is always necessary to ensure that the data polled last time has been consumed before data polling; otherwise, the offset may be skipped.
If you want to control offset commits by yourself, set enable.auto.commit to false and call the commit(offsets) function.
Reset offset
The ConsumerOffset will be reset under the following two circumstances:
The server has no committed offsets (for example, when the client is started for the first time).
A message is pulled from an invalid offset (for example, the MaxOffset in a partition is 10, but the client pulls a message from offset 11).
For a Java client, you can configure a resetting policy by using auto.offset.reset. There are three policies:
latest: Consumption will start from the maximum offset.
earliest: Consumption will start from the minimum offset.
none: Resetting will not be performed.
Note
We recommend you set the resetting policy to latest instead of earliest so as to avoid starting consumption from the beginning when the offset is invalid, as that may cause a lot of repetitions.
If you manage the offset by yourself, you can set the policy to none.

Pulling messages

In the consumption process, the client pulls messages from the server. When the client pulls large messages, you should control the pulling speed and pay attention to the following parameters:
max.poll.records: Set it to 1 if a single message exceeds 1 MB in size.
max.partition.fetch.bytes: Set it to a value slightly greater than the size of a single message.
fetch.max.bytes: Set it to a value slightly greater than the size of a single message.
When messages are consumed over the public network, a disconnection may often occur due to the bandwidth limit of the public network. In this case, you should control the pulling speed and pay attention to the following parameters:
fetch.max.bytes: We recommend you set it to half of the public network bandwidth (note that the unit of this parameter is bytes, while the unit of the public network bandwidth is bits).
max.partition.fetch.bytes: We recommend you set it to one third or one fourth of fetch.max.bytes.

Duplicate messages and consumption idempotency

TDMQ for CKafka's consumption semantics are "at least once," which ensures that messages are not lost but cannot guarantee that messages are not duplicated. Network issues or client restarts may cause a small number of duplicate messages. If the application consumer is sensitive to message duplication (e.g., order transactions), it should implement message idempotency.
Taking a database application as an example, the common practice is as follows:
When sending a message, pass in the key as the unique ID.
When consuming a message, determine whether the key has already been consumed; if so, ignore it; otherwise, consume it once.
Of course, if the application itself is not sensitive to a small number of duplicate messages, idempotency is not necessary.

Consumption failure

In CKafka, messages are consumed from partitions one by one in sequence. If the consumer fails to execute the consumption logic after getting a message, for example, when dirty data is stored on the application server, the message will fail to be processed, and human intervention will be required. There are two methods for dealing with this situation:
Continuously retrying the consumption logic after a failure may cause the consumption thread to be blocked at the current message, preventing it from moving forward and resulting in message accumulation.
As CKafka is not designed to process failed messages, in practice, it will typically print failed messages or store them in a service (such as a dedicated topic created for storing failed messages), so that you can regularly check failed messages, analyze the causes of failures, and process accordingly.

Consumption latency

The consumption process involves the client actively pulling messages from the server. Generally, if the client can consume messages promptly, there will be no significant delay. If a considerable delay occurs, please pay attention to whether there is a backlog and consider increasing the consumption speed.

Consumption backlog

The common causes of message heap include the following:
Consumption is slower than production. In this case, you should speed up consumption.
Consumption is jammed.
After getting a message, a consumer will execute the consumption logic and generally make some remote calls. If it waits for the results at the same time, the consumption process may be jammed.
It should be ensured as much as possible that the consumer will not jam the consumption threads. If it needs to wait for the call results, we recommend you set a wait timeout period, so that the consumer will be treated as a failure after the timeout period elapses.

Speeding up consumption

Increase the number of consumer instances. You can either increase the number of consumer instances directly in the process (you need to ensure that each instance corresponds to one thread) or deploy multiple consumer instance processes.
Note
After the number of instances exceeds the number of partitions, you can't add more instances; otherwise, there will be idle consumer instances.
Increase the number of consumer threads.
1.1 Define a thread pool.
1.2 Poll the data.
1.3 Commit the data into the thread pool for concurrent processing.
1.4 Poll the data again after a successful concurrent processing result is returned.

Socket buffers

In Kafka version 0.10.x, the default value for the parameter receive.buffer.bytes is 64KB. In Kafka version 0.8.x, the default value for the parameter socket.receive.buffer.bytes is 100KB.
Both the default values are too small for high-throughput environments, especially when the bandwidth-delay product of the network between the broker and the consumer is greater than that of the local area network (LAN).
For networks with a delay of 1 ms or more and a high bandwidth (such as 10 Gbps or higher), we recommend you set the socket buffer size to 8 or 16 MB.
If your memory is insufficient, consider setting it to at least 1 MB. You can also set it to -1, which allows the underlying operating system to adjust the buffer size based on the actual network conditions.
However, for consumers that need to start "hot" partitions, automatic adjustment may not be that fast.