Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka. Not started EmbeddedKafkaBroker

I am coding Kafka Broker and Consumer to catch messages from the application. When trying to get messages from Consumer, an error occurs

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:303)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:280)

On the application side (Producer), there is also a connection error

2020-03-25 12:29:33.689  WARN 25786 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=tx0] Connection to node -1 (<here broker hostname>:9092) could not be established. Broker may not be available.

My project has the following dependencies:

compile "org.springframework.kafka:spring-kafka-test:2.4.4.RELEASE"
compile "org.springframework.kafka:spring-kafka:2.4.4.RELEASE"

Code of My Kafka Broker

public class KafkaServer {

    private static final String BROKERPORT = "9092";
    private static final String BROKERHOST = "localhost";
    public static final String TOPIC1 = "fss-fsstransdata";
    public static final String TOPIC2 = "fss-fsstransscores";
    public static final String TOPIC3 = "fss-fsstranstimings";
    public static final String TOPIC4 = "fss-fssdevicedata";
    @Getter
    private Consumer<String, String> consumer;

    private EmbeddedKafkaBroker embeddedKafkaBroker;

    public void run() {

        String[] topics = {TOPIC1, TOPIC2, TOPIC3, TOPIC4};

        this.embeddedKafkaBroker = new EmbeddedKafkaBroker(
                1,
                false,
                1,
                topics
        ).kafkaPorts(BROKERPORT);

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", this.embeddedKafkaBroker));
        this.consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();

        this.consumer.subscribe(Arrays.asList(topics));
    } 
}

Please help to deal with the situation. I am not good at kafka architecture and how it can be implemented on Spring.

like image 663
mr_Stat1c Avatar asked Jan 27 '26 12:01

mr_Stat1c


2 Answers

The EmbeddedKafkaBroker is designed to be used from a Spring application context or by a JUnit4 @Rule or @ClassRule or by a JUnit5 Condition.

To use it outside those environments, you must call afterPropertiesSet() to initialize it and destroy() to shut it down.

like image 172
Gary Russell Avatar answered Jan 29 '26 01:01

Gary Russell


If you are using spring then you need to annotate your bean with @EmbeddedKafka and then use @Autowire on EmbeddedKafkaBroker

Example embeded kafka annotation configuration:

@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {// place your proerties here
})

What I would do is to create a spring bean KafkaServerConfig and place all my logic for configuration and bean construction inside.

PS: it should be noted that EmbeddedKafkaBroker is intended for unit tests.

like image 38
Alexander Petrov Avatar answered Jan 29 '26 02:01

Alexander Petrov