I am creating a Dataframe from a kafka topic using spark streaming. I want to write the Dataframe into a Kinesis Producer. I understand that there is no official API for this as of now. But there are multiple APIs available over the internet , but sadly, none of them worked for me. Spark version : 2.2 Scala : 2.11
I tried using https://github.com/awslabs/kinesis-kafka-connector and build the jar. But getting errors due to conflicting package names between this jar and spark API. Please help.
########### Here is the code for others:spark-shell --jars spark-sql-kinesis_2.11-2.2.0.jar,spark-sql-kafka-0-10_2.11-2.1.0.jar,spark-streaming-kafka-0-10-assembly_2.10-2.1.0.jar --files kafka_client_jaas_spark.conf --properties-file gobblin_migration.conf --conf spark.port.maxRetries=100 --driver-java-options "-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf"
import java.io.File
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import scala.sys.process._
import org.apache.log4j.{ Logger, Level, LogManager, PropertyConfigurator }
import org.apache.spark.sql.streaming.Trigger
val streamingInputDF =spark.readStream.format("kafka").option("kafka.bootstrap.servers","bootstrap server").option("subscribe", "<kafkatopic>").option("startingOffsets", "latest").option("failOnDataLoss", "false").option("kafka.security.protocol", "SASL_PLAINTEXT").load()
val xdf=streamingInputDF.select(col("partition").cast("String").alias("partitionKey"),col("value").alias("data"))
xdf.writeStream.format("kinesis").option("checkpointLocation", "<hdfspath>").outputMode("Append").option("streamName", "kinesisstreamname").option("endpointUrl","kinesisendpoint").option("awsAccessKeyId", "accesskey").option("awsSecretKey","secretkey").start().awaitTermination()
For the jar spark-sql-kinesis_2.11-2.2.0.jar, go to quoble , download the package for your spark version, build the jar.
If you are behind a corporate network, set the proxy before launching spark. export http_proxy=http://server-ip:port/ export https_proxy=https://server-ip:port/
Kafka Connect is a service to which you can POST your connector specifications (kinesis in this case), which then takes care of running the connector. It supports quite a few transformations as well while processing the records. Kafka Connect plugins are not intended to be used with Spark applications.
If your use case requires you to do some business logic while processing the records, then you could go with either Spark Streaming or Structured Streaming approach.
If you want to take Spark based approach, below are the 2 options I can think of.
Use Structured Streaming. You could use a Strucuted streaming connector for Kinesis. You can find one here. There may be others too. This is the only stable and open source connector I am aware of. You can find an example for using Kinesis as a sink here.
Use Kinesis Producer Library or aws-java-sdk-kinesis library to publish records from your Spark Streaming application. Using KPL is a preferred approach here. You could do mapPartitions
and create a Kinesis client per partition and publish the records using these libraries. There are plenty of examples in AWS docs for these 2 libraries.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With