I am using spring cloud stream using the spring-cloud-starter-stream-kafka. I have bound my channels to kafka topics as follows in the application.properties:
spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12
I am unable to get my program to produce an exception message to the error channel. Surprisingly, it doesnt even seem to try to produce it, even though I am in a different thread (I have a @MessagingGateway that dumps a message into gatewayOutput, and then the rest of the flow happens asynchronously). Here is the definition of my ServiceActivator:
@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
FulfillingService {
@Override
@Audit(value = "annotatedEvent")
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
public void fulfill(TrivialRedemption redemption) throws Exception {
logger.error("FULFILLED!!!!!!");
throw new Exception("test exception");
}
}
Here is the log produced (I have truncated the full exception). There is no...
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler@2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {}
2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {}
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {}
...
...
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
EDIT: Here is the content of my channels class:
public interface Channels {
public static final String GATEWAY_OUTPUT = "gatewayOutput";
public static final String ENRICHING_INPUT = "enrichingInput";
public static final String ENRICHING_OUTPUT = "enrichingOutput";
public static final String REDEEMING_INPUT = "redeemingInput";
public static final String REDEEMING_OUTPUT = "redeemingOutput";
public static final String FULFILLING_INPUT = "fulfillingInput";
public static final String FULFILLING_OUTPUT = "fulfillingOutput";
@Output(GATEWAY_OUTPUT)
MessageChannel gatewayOutput();
@Input(ENRICHING_INPUT)
MessageChannel enrichingInput();
@Output(ENRICHING_OUTPUT)
MessageChannel enrichingOutput();
@Input(REDEEMING_INPUT)
MessageChannel redeemingInput();
@Output(REDEEMING_OUTPUT)
MessageChannel redeemingOutput();
@Input(FULFILLING_INPUT)
MessageChannel fulfillingInput();
@Output(FULFILLING_OUTPUT)
MessageChannel fulfillingOutput();
You don't show your Channels class, but the binder doesn't know that your "error" channels are "special".
The binder can be configured with retry and to route exceptions to a dead-letter topic; see this PR which is in the 1.0.0.RELEASE.
Alternatively, you can add a "mid-flow" gateway before the service activator - think of it like a "try/catch" block in Java:
@MessageEndpoint
public static class GatewayInvoker {
@Autowired
private ErrorHandlingGateway gw;
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT)
public void send(Message<?> message) {
this.gw.send(message);
}
}
@Bean
public GatewayInvoker gate() {
return new GatewayInvoker();
}
@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS)
public interface ErrorHandlingGateway {
void send(Message<?> message);
}
Change your service activator's input channel to toService.
You have to add @IntegrationComponentScan to your configuration class so the framework can detect the @MessagingGateway interface and build a proxy for it.
EDIT
Another alternative just suggested to me would be to add an ExpressionEvaluatingAdvice in your service activator's advice chain.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With