Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read binary data on Kafka topics in Spark

I need to read an encrypted message from a Kafka topic. My current code which reads strings from the topic looks like this :

JavaPairReceiverInputDStream<String, String> pairrdd = 
            KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);

What should I do to change this code from the kafka queue to make sure that the array of bytes read, the encrypted data is not corrupted

like image 616
kevin Avatar asked Dec 08 '25 17:12

kevin


1 Answers

For reading data from Kafka in <byte[], byte[]> form, you can use KafkaUtils. Like this-

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<byte[], byte[]>> pairrdd =
  KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<byte[], byte[]>Subscribe(topics, kafkaParams)
  );

I hope this helps!

like image 141
himanshuIIITian Avatar answered Dec 12 '25 03:12

himanshuIIITian