I am following Unified Log Processing book by Manning and it's first exercise is a simple kafka consumer in Java. The program when run just stalls where consumer.poll() is being called.
I am running this from inside the valgrind environment provided by the author of the book and it is available at git clone https://github.com/alexanderdean/Unified-Log-Processing.git
It has zookeeper-3.4.6 and kafka_2.10-0.8.2.1
I have created a topic using following commandline:
./kafka-topics.sh --create --topic raw --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "raw".
kafka-console-producer and kafka-console-consumer are working as expected.
./kafka-console-producer.sh --topic raw --broker-list localhost:9092
[2016-10-17 14:09:05,899] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
one
two
three
four
five
./kafka-console-consumer.sh --topic raw --from-beginning --zookeeper localhost:2181
one
two
three
four
five
^CConsumed 5 messages
The java code I am testing is pretty basic and just creates a consumer.
StreamApp.java
package nile;
public class StreamApp {
public static void main(String[] args){
String servers = args[0];
String groupId = args[1];
String inTopic = args[2];
String goodTopic = args[3];
Consumer consumer = new Consumer(servers, groupId, inTopic);
consumer.run();
}
}
Consumer.java
package nile;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
public class Consumer {
private final KafkaConsumer<String, String> consumer; // a
private final String topic;
public Consumer(String servers, String groupId, String topic) {
this.consumer = new KafkaConsumer<String, String>(createConfig(servers, groupId));
this.topic = topic;
System.out.println("Topic to listen for:" + this.topic + ":");
}
public void run() {
System.out.println("Starting to listen for items ");
this.consumer.subscribe(Arrays.asList(this.topic)); // b
try {
while (true) {
System.out.println("Subscribed to: " + consumer.subscription());
System.out.println("Inside the loop");
ConsumerRecords<String, String> records = consumer.poll(100); // c
System.out.println("After consuming");
for (ConsumerRecord<String, String> record : records) {
System.out.println("Got an item from kafka: " + record.value());
}
}
} finally {
consumer.close();
}
}
private static Properties createConfig(String servers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", groupId); // e
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); // a
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); // a
return props;
}
}
Libraries I am using are (from my build.gradle)
dependencies { // b
compile 'org.apache.kafka:kafka-clients:0.9.0.0'
compile 'com.maxmind.geoip:geoip-api:1.2.14'
compile 'com.fasterxml.jackson.core:jackson-databind:2.6.3'
compile 'org.slf4j:slf4j-api:1.7.5'
}
I am running the code as:
java -jar ./build/libs/nile-0.1.0.jar localhost:9092 ulp-ch03-3.3 raw enriched
Output is:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Topic to listen for:raw:
Starting to listen for items
Subscribed to: [raw]
Inside the loop
And that is where it stops. The consumer.poll() is not returning any thing and not getting timed out ether. Not sure what what is wrong here. Been splitting my hair for 2 days now, any help to get this working would be much appreciated. :)
Seems you are using 0.9.x consumer API to consume messages from 0.8.x servers, which is not allowed due to 0.9.0.0 has an inter-broker protocol change from previous versions. Use the old consumer(namely Scala consumer) or upgrade the kafka server version to 0.9.x
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With