Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wrapping StreamBridge send and JPA save inside a transaction

I am using Spring Boot 2.5.2 and Spring Cloud 2020.0.3. I am attempting to wrap a rest service call which saves a record to a DB using JPA (CrudRepository.save) and then uses StreamBridge to post a message to a Kafka topic using spring-cloud-stream (kafka binder). I have tried several things, but nothing seems to work quite right. I am intentionally causing a JPA issue (inserting a row that would violate a unique key constraint), but the Kafka message seems to still go out to the broker.

  1. I have configured a KafkaTransactionManager (without using ChainedKafkaTransactionManager since that is now deprecated). However, it appears to be ignored, as StreamBridge seems to create its own tx mgr internally when a transactional-id-prefix is present in the configuration.
  2. Without the transactional-id-prefix, the ProducerFactory is not transactional at all, which causes the KafkaTransactionManager instantiation to fail.
  3. I tried to completely avoid creating my own transaction manager, but this also seems to fail and go ahead and send the kafka message.

What is the proper way to configure this type of a flow such that writes to both the db and broker are atomic?

HTTP -> JPA save -> Kafka send

like image 241
Bert S. Avatar asked Sep 13 '25 05:09

Bert S.


1 Answers

You don't need a transaction manager but you do need a transactional.id on the producer factory.

If the send is performed within the scope of the JPA transaction (e.g. @Transactional method with the JPA TM), the kafka template will synchronize the Kafka transaction with the existing transaction and either commit it, or roll it back depending on the main transaction.

Are you aware that, even rolled back records, are actually written to the log? You must set the consumer property isolation.level to read_committed to not receive rolled-back records; it defaults to read_uncommitted.

EDIT

There is a bug synchronizing producer-only transactions to an existing transaction; the send is performed in a local transaction instead.

You can use a TransactionTemplate to start a Kafka transaction as a work-around:

@SpringBootApplication
public class So68460690Application {

    public static void main(String[] args) {
        SpringApplication.run(So68460690Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
        return args -> {
            new TransactionTemplate(ktm).executeWithoutResult(
                    status -> foo.doInTx(bridge)); // or execute() to return a result
        };
    }

    @Bean
    KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
        return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
                .getTransactionalProducerFactory());
    }

}

@Component
class Foo {

    @Transactional
    public void doInTx(StreamBridge bridge) {
        bridge.send("ouutput", "test");
        throw new RuntimeException("testEx");
    }

}
spring.cloud.stream.bindings.output.destination=so68460690

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
spring.cloud.stream.kafka.binder.configuration.acks=all


logging.level.org.springframework.kafka=trace
2021-07-27 17:31:37.923 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
2021-07-27 17:31:37.924 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
2021-07-27 17:31:37.927 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
2021-07-27 17:31:37.928 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()
like image 69
Gary Russell Avatar answered Sep 14 '25 23:09

Gary Russell