Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Axon: How to configure amqp publishing for single events?

I have a simple spring-driven service which publishes events via amqp. The configuration is based on bootiful-axon.

Now I want the service to maintain some private state. It is a simple use case that can be realized with 3 extra events. Those events have no meaning outside the service's scope so I don't want them to "leave".

How can I specify which events should be published via amqp and which not?

like image 835
Doe Johnson Avatar asked Nov 30 '25 16:11

Doe Johnson


1 Answers

This is how I solved it:

Custom SpringAMQPPublisher that intercepts the send method:

public class SelectiveAmqpPublisher extends SpringAMQPPublisher {


    static boolean shouldSend (Class<?> pt) {
        return PublicEvent.class.isAssignableFrom(pt);
    }


    public SelectiveAmqpPublisher (
            SubscribableMessageSource<EventMessage<?>> messageSource) {

        super(messageSource);

    }


    @Override
    protected void send (List<? extends EventMessage<?>> events) {

        super.send(events.stream()
                        .filter(e -> shouldSend(e.getPayloadType()))
                        .collect(Collectors.toList()));

    }    

}

Configuration:

@Autowired
private AMQPProperties amqpProperties;

@Autowired 
private RoutingKeyResolver routingKeyResolver;

@Autowired
private AMQPMessageConverter aMQPMessageConverter;


@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
             EventBus eventBus, 
             ConnectionFactory connectionFactory,
             AMQPMessageConverter amqpMessageConverter) {

    SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);



    // The rest is from axon-spring-autoconfigure...

    publisher.setExchangeName(amqpProperties.getExchange());
    publisher.setConnectionFactory(connectionFactory);
    publisher.setMessageConverter(amqpMessageConverter);
    switch (amqpProperties.getTransactionMode()) {

        case TRANSACTIONAL:
            publisher.setTransactional(true);
            break;
        case PUBLISHER_ACK:
            publisher.setWaitForPublisherAck(true);
            break;
        case NONE:
            break;
        default:
            throw new IllegalStateException("....");
    }

    return publisher;

}
like image 136
Doe Johnson Avatar answered Dec 02 '25 06:12

Doe Johnson



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!