Consider this test, in which a message is sent from the test to topic 'out', and the tested code is expected to consume it and reply by sending a message to topic 'in'. In order to pass, I want to make sure a message was sent to topic 'in'.
it('...', async () => {
/* initialize kafkaConsumer and kafkaProducer here */
async function someCallback() {
// ...
}
await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
await kafkaConsumer.run({ eachMessage: someCallback })
await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })
// How do I block here until someCallback is called?
})
I read about using done but I can't have that while the test itself is defined async, which I need in order to use await. Is there a different way I'm not aware of?
You can take a look at how we test KafkaJS itself for some inspiration. For example, here's a basic consumer test.
We are really not doing anything fancy, just adding the messages to an array from within the eachMessage callback, and then await a promise that periodically checks if we have reached the expected number of messages. Something like this:
it('consumes messages', async () => {
const messages = [{ value: 'hello world' }]
const consumedMessages = []
consumer.run({
eachMessage: ({ message }) => {
consumedMessages.push(message);
}
})
await producer.send({ topic, messages })
await waitFor(() => consumedMessages.length === messages.length)
})
Where waitFor is essentially a function that returns a promise and starts a setTimeout that checks the predicate and resolves the promise when the predicate is true (or rejects if it hits a timeout).
Some gotchas to keep in mind:
groupId on each run, so that multiple runs don't interfere with each other.fromBeginning: true or wait for your consumer to have subscribed and joined the group before producing (the instrumentation events emit an event on group join that you can wait for in the same way as we wait for the messages to be consumed).After some time with Tommy Brunn's answer, I found a few bugs and I ended up with this:
export const waitForKafkaMessages = async (
kafka: Kafka,
messagesAmount: number,
topic: string,
fromBeginning: boolean,
groupId: string,
): Promise<KafkaMessage[]> => {
const consumer: Consumer = kafka.consumer({ groupId })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning })
let resolveOnConsumption: (messages: KafkaMessage[]) => void
let rejectOnError: (e: Error) => void
const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
resolveOnConsumption = resolve
rejectOnError = reject
}).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below
const messages: KafkaMessage[] = []
await consumer.run({
autoCommit: false,
eachMessage: async ({ message, partition, topic }) => {
try {
// eachMessage is called by eachBatch which can consume more than messagesAmount.
// This is why we manually commit only messagesAmount messages.
if (messages.length < messagesAmount) {
messages.push(message)
// +1 because we need to commit the next assigned offset.
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
}
if (messages.length === messagesAmount) {
// I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
// This is why we close it in the promise's finally block
resolveOnConsumption(messages)
}
} catch (e) {
rejectOnError(e)
}
},
})
return returnThisPromise
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With