I installed Kafka on DC/OS (Mesos) cluster on AWS. Enabled three brokers and created a topic called "topic1".
dcos kafka topic create topic1 --partitions 3 --replication 3
Then I wrote a Producer class to send messages and a Consumer class to receive them.
public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }
}
public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }
    public static void main(String[] args) {
        getMessage();
    }
}
First I ran Producer on the cluster to send messages to topic1. However when I ran Consumer, it couldn't receive anything, just hang. 
Producer is working since I was able to get all the messages by running the shell script that came with Kafka install 
./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning
But why can't I receive with Consumer? This post suggests group.id with old offset might be a possible cause. I only create group.id in the consumer not the producer. How do I config the offset for this group? 
Consuming concurrency can increase performance. If you store offsets on the zookeeper, it can be bottleneck. Reduce commits of offset and use dedicated zookeeper if possible. The best solution is storing offsets on brokers.
Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.
The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.
Kafka has a default limit of 1MB per message in the topic. This is because very large messages are considered inefficient and an anti-pattern in Apache Kafka.
As it turns out, kafkaConsumer.subscribe(Arrays.asList("topic1")); is causing poll() to hang. According to Kafka Consumer does not receive messages
, there are two ways to connect to a topic, assign and subscribe. After I replaced subscribe with the lines below, it started working.
    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
However the output shows arrays of numbers which is not expected (Producer sent Strings). But I guess this is a separate issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With