I have a Spring Boot application which has problems retrieving JMS messages of type TextMessage from an ActiveMQ broker.
If the consumer tries to retrieve messages from the broker it cannot automatically convert a message to TextMessage but treats it as ByteMessage. There is a JmsListener which should read the messages from the queue as TextMessage:
...
@JmsListener(destination = "foo")
public void jmsConsumer(TextMessage message) {
...
The JmsListener produces warnings like the following, and drops the messages:
org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method could not be invoked with incoming message
Endpoint handler details:
Method [public void net.aschemann.demo.springboot.jmsconsumer.JmsConsumer.jmsConsumer(javax.jms.TextMessage)]
Bean [net.aschemann.demo.springboot.jmsconsumer.JmsConsumer@4715f07]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [javax.jms.TextMessage] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298, failedMessage=org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:118) ~[spring-jms-5.1.4.RELEASE.jar:5.1.4.RELEASE]
I have extracted a small sample application to debug the problem: https://github.com/ascheman/springboot-camel-jms
The producer in real life is a commercial application which makes use of Apache Camel. Hence, I can hardly change/customize the producer. I have tried to build a sample producer which shows the same behavior.
Could I somehow tweak the consumer to treat the message as TextMessage?
Besides: Is there any way to retrieve the additional AMQP properties from the message programmatically directly in Spring? Of course, I could still read the message as ByteMessage and try to parse properties away. But I am looking for a cleaner way which is backed by any Spring API. The Spring @Headers annotation didn't help so far.
I had the same error, and it was caused because LazyResolutionMessage is called from MessagingMessageConverter that is the default implementation to MessageConverter, which converts your message (actually it doesn't, since it's the default):
return ((org.springframework.messaging.Message) payload).getPayload();
I have accomplished what you want, at the end my consumer was working like:
@JmsListener(destination = "${someName}")
public void consumeSomeMessages(MyCustomEvent e) {
....
}
What I had to do was:
@Bean(name = "jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory whateverNameYouWant(final ConnectionFactory genericCF) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(t -> log.error("bad consumer, bad", t));
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setConnectionFactory(genericCF);
factory.setMessageConverter(
new MessageConverter() {
@Override
public Message toMessage(Object object, Session session) {
throw new UnsupportedOperationException("since it's only for consuming!");
}
@Override
public MyCustomEvent fromMessage(Message m) {
try {
// whatever transformation you want here...
// here you could print the message, try casting,
// building new objects with message's attributes, so on...
// example:
return (new ObjectMapper()).readValue(((TextMessage) m).getText(), MyCustomEvent.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
);
return factory;
}
A few keypoints:
If your DefaultJmsListenerContainerFactory method is also called jmsListenerContainerFactory you don't need name attribute at Bean annotation
Notice you can also implement an ErrorHandler to deal with exceptions when trying to convert/cast your message's type!
ConnectionFactory was a Spring managed bean with Amazon's SQSConnectionFactory since I wanted to consume from a SQS queue. Please provide your equivalent correctly. Mine was:
@Bean("connectionFactory")
public SQSConnectionFactory someOtherNome() {
return new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.withRegion(Regions.US_EAST_1)
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
"keyAccess",
"keySecret"
)
)
)
.build()
);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With