Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka-node several consumers

Have a basic example that works like a charm for 1 consumer. It receives the messages. But adding one additional consumer will be ignored.

let kafka = require('kafka-node');
let client = new kafka.Client();

let producer = new kafka.Producer(client);

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]);
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}]);


producer.on('ready', function () {

    producer.send([{topic:'topic1', messages: 'topic 1 msg' ], (err,data)=>{
        console.log(err,'1 sent');
    });
    producer.send([{topic:'topic2', messages: 'topic 1 msg'}], (err,data)=>{
        console.log(err, '2 sent');
    });

});
producer.on('error', function (err) {
    console.log('err', err);
})



consumer1.on('message',(message) =>{
    console.log(11, message);
});
consumer2.on('message',(message) =>{
    console.log(22, message);
})

The issue that event with '22' for consumer2 never fires. Data on that topic exist if I check it with command line tools

like image 310
paka Avatar asked Oct 30 '25 18:10

paka


1 Answers

You forgot to add a consumer group in consumer parameter. In your case both consumers belong to one consumer group (which is by default called kafka-node-group). Having one consumer group means that every message will be delivered once per consumer group. And since your topic has 0 partitions only the first consumer will process the message.

If you want to send the same message to both consumers you have to have two consumer groups (one consumer per group). You can do it following way:

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {groupId: 'group1'});
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}], {groupId: 'group2'});

If you want your consumers process messages together (by partitions) you need to increase number of partitions in your topic.

like image 139
streetturtle Avatar answered Nov 01 '25 10:11

streetturtle