我的Kafka制作人在交易失败的情况下一直发送给Kafka经纪人。我有一个自定义的侦听器,即我没有使用@KafkaListener注释。这是在Spring-kafka 2.2.x上运行的
你知道为什么在KafkaTransactionManager回滚的情况下,消息还是会出现在卡夫卡中吗?下面是我的设置:
// Kafka producer sender
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void sendToKafkaWithTransaction(final String topic, final Object payload){
ProducerRecord record = new ProducerRecord(topic, key, payload);
template.executeInTransaction(kt -> kt.send(record));
}
// RabbitMQ producer sender
@Transactional(transactionManager = "rabbitTransactionManager", propagation = Propagation.REQUIRED)
public void sendToRabbitmqWithTransaction(final String topic, final String header, final Object payload){
template.convertAndSend(topic, header, payload);
}
// Chained Transaction Manager
@Bean(name = "chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(
@Qualifier(value = "transactionalKafkaProducer") ProducerFactory producerFactory,
@Qualifier(value = "transactionManager") JpaTransactionManager jpaTransactionManager,
@Qualifier(value = "rabbitTransactionManager") RabbitTransactionManager rabbitTransactionManager) {
KafkaTransactionManager producerKtm = new KafkaTransactionManager(producerFactory);
producerKtm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(jpaTransactionManager, producerKtm, rabbitTransactionManager);
}
// Listener config
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
// Listener
@Transactional(transactionManager = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer){
try {
RetryState retryState = new DefaultRetryState(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());
retryTemplate.execute(context -> {
saveToDb() // This rolls back
sendToKafkaWithTransaction(topic, payload); // This message gets to Kafa, it should not.
sendToRabbitmqWithTransaction(topic, payload); // This rolls back
throw new Exception("Out of Anger");
return null;
}, recoveryCallBack, retryState);
acknowledgment.acknowledge();
}
catch (ListenerExecutionFailedException e) {
throw e;
}
}
// See logs
[ consumer-0-C-1] o.s.a.r.t.RabbitTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Setting JPA transaction on EntityManager [SessionImpl(104745239)] rollback-only
编辑:
添加spring boot配置:
spring.kafka:
admin:
bootstrap-servers: ${kakfa.host}
consumer:
group-id: test-consumers
client-id: test-consumers
auto-offset-reset: latest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
properties:
isolation-level: read_committed
producer:
client-id: test-producer
acks: all
retries: 3
transaction-id-prefix: test-producer-tx-
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
enable.idempotence: true
transactional.id: tran-id-1-
max.in.flight.requests.per.connection: 5
isolation-level: read_committed
编辑更多日志
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@18061927] for key [public abstract java.lang.Object org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(java.lang.Object)] from thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=topic-1, partition=null)
[-27cf188e6c23-1] org.apache.kafka.clients.Metadata : Cluster ID: r3baK471R6mIft7L_DIOIg
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=topic-1, partition=null)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@16bfeffa] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@30eed725] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Found thread-bound EntityManager [SessionImpl(23309560)] for JPA transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@cbfb10d] for key [HikariDataSource (HikariPool-1)] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Participating in existing transaction
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.arca.framework.messaging.services.impl.BoradcastMessageServiceImpl.sendTransactional]
[-27cf188e6c23-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=topic-1, partition=null), metadata: topic-1-0@185
[ consumer-0-C-1] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,4)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitTemplate$$Lambda$1237/634386320 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64338]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message (Body:'{ }')
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [messaging.services.impl.RabbitMessageServiceImpl.send]
[ consumer-0-C-1] c.a.f.m.k.r.KafkaSingleDispatchReceiver : Unable to process messages of type: [class messaging.kafka.events.acquiringtmstransaction.TmsTransactionEvent] and id: [92dccb48-2cd2-47b8-b778-8550dcd72d04]
[ consumer-0-C-1] .a.f.m.k.c.KafkaTransactionalRetryPolicy : Retry count [1] for message [{}]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [messaging.kafka.receivers.KafkaReceiver.onMessage] after exception: exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute : Applying rules to determine whether transaction should rollback on exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute : Winning rollback rule is: null
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute : No relevant rollback rule found: applying default rules
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Triggering beforeCompletion synchronization
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
[ consumer-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : abortTransaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b70782d, txId=tran-id-1-acquiring-tms-transaction-consumers.pos_txn_log.0]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Triggering afterCompletion synchronization
[ consumer-0-C-1] o.s.a.r.connection.RabbitResourceHolder : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64338]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Resuming suspended transaction after completion of inner transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Setting JPA transaction on EntityManager [SessionImpl(23309560)] rollback-only
[ consumer-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
发布于 2021-02-22 23:02:21
这就是Kafka交易的工作方式。发布的记录始终写入日志,后跟指示事务是已提交还是已回滚的标记记录。
为了避免看到回滚记录,您必须设置消费者
属性设置为
(它是
默认情况下)。
编辑
这是因为您正在启动一个新事务:
template.executeInTransaction(kt -> kt.send(record));
/**
* Execute some arbitrary operation(s) on the operations and return the result.
* The operations are invoked within a local transaction and do not participate
* in a global transaction (if present).
* @param callback the callback.
* @param the result type.
* @return the result.
* @since 1.1
*/
@Nullable
T executeInTransaction(OperationsCallback callback);
只要打电话就行了
模板将参与容器启动的事务。
您还可以删除
从那个方法。
EDIT2
这对我来说和预期的一样..。
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed
logging.level.org.springframework.transaction=trace
logging.level.org.springframework.kafka.core=trace
@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {
public static void main(String[] args) {
SpringApplication.run(So66306109Application.class, args);
}
@Autowired
Foo foo;
@Transactional
@KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
public void listen(String in) {
System.out.println(in);
this.foo.send(in.toUpperCase());
throw new RuntimeException("test");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so66306109-2", topics = "so66306109-2")
public void listen2(String in) {
System.out.println(in);
}
}
@Component
class Foo {
@Autowired
KafkaTemplate template;
@Transactional // Not needed - we're already in a transaction
void send(String in) {
this.template.send("so66306109-2", in);
}
}
EDIT3
如果您无法升级到受支持的版本,则需要在容器中禁用事务,并在重试执行范围内的代码中自行管理它。
下面是一个例子。
@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {
public static void main(String[] args) {
SpringApplication.run(So66306109Application.class, args);
}
@Autowired
Foo foo;
@Autowired
RetryTemplate template;
@KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
public void listen(ConsumerRecord in) {
this.template.execute(context -> {
System.out.println(in);
this.foo.send(in);
return null;
}, context -> {
System.out.println("RETRIES EXHAUSTED");
return null;
});
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so66306109-2", topics = "so66306109-2")
public void listen2(String in) {
System.out.println(in);
}
@Bean
ChainedKafkaTransactionManager chainedTm(KafkaTransactionManager ktm,
ConcurrentKafkaListenerContainerFactory factory) {
// transactions can't be started by the container
factory.getContainerProperties().setTransactionManager(null);
return new ChainedKafkaTransactionManager<>(ktm);
}
@Bean
public RetryTemplate template() {
return new RetryTemplate();
}
}
@Component
class Foo {
@Autowired
KafkaTemplate template;
@Autowired
ProducerFactory pf;
@Transactional("chainedTm")
public void send(ConsumerRecord in) {
// updateDB
this.template.send(new ProducerRecord("so66306109-2", null, null, in.value().toUpperCase()));
this.template.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(in.topic(), in.partition()),
new OffsetAndMetadata(in.offset() + 1)));
// simulate a DB rollback
KafkaResourceHolder resource = (KafkaResourceHolder) TransactionSynchronizationManager
.getResource(this.pf);
resource.setRollbackOnly();
}
}
注意:您不能手动确认此类记录;相反,请在提交事务之前将偏移量发送到事务。
https://stackoverflow.com/questions/66306109
复制相似问题