I have created a beam script to get data from kafka and push it to BigQuery using Apache Beam. For now I am using java-direct-runner and just need to push data to my bigquery.
This is my code:-
package com.knoldus.section8;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.common.collect.Lists;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
public class KafkaStreaming {
static GoogleCredentials authExplicit(String jsonPath) throws IOException {
// You can specify a credential file by providing a path to GoogleCredentials.
// Otherwise credentials are read from the GOOGLE_APPLICATION_CREDENTIALS environment variable.
GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream(jsonPath))
.createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
return credentials;
}
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
GcpOptions gcpOptions = options.as(GcpOptions.class);
gcpOptions.setProject("excellent-guard-314111");
gcpOptions.setGcpTempLocation("./");
System.out.println(gcpOptions.getGcpCredential());
gcpOptions.setGcpCredential(
authExplicit(
"excellent-guard-314111-01f257a67f01.json"));
Pipeline p = Pipeline.create(options);
ArrayList<TableFieldSchema> columns = new ArrayList<>();
columns.add(new TableFieldSchema().setName("deviceID").setType("STRING"));
columns.add(new TableFieldSchema().setName("name").setType("STRING"));
columns.add(new TableFieldSchema().setName("description").setType("STRING"));
columns.add(new TableFieldSchema().setName("eventtime").setType("LONG"));
columns.add(new TableFieldSchema().setName("temperature").setType("DOUBLE"));
columns.add(new TableFieldSchema().setName("unit").setType("STRING"));
TableSchema tblSchema = new TableSchema().setFields(columns);
PCollection<IotEvent> iotEventPCollection = p.apply(KafkaIO.<Long, IotEvent>read().withBootstrapServers("localhost:9092").withTopic("test-new").withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(IotDes.class)
.withoutMetadata()).apply(Values.<IotEvent>create()).
apply(ParDo.of(new DoFn<IotEvent, IotEvent>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element().getDeviceID());
c.output(c.element());
}
}));
PCollection<TableRow> rowData =
iotEventPCollection.apply(
ParDo.of(
new DoFn<IotEvent, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
IotEvent event = c.element();
TableRow row = new TableRow();
assert event != null;
row.set("deviceID", event.getDeviceID());
row.set("name", event.getName());
row.set("description", event.getDescription());
row.set("eventtime", event.getEventtime());
row.set("temperature", event.getTemperature());
row.set("unit", event.getUnit());
System.out.println(row.toPrettyString());
c.output(row);
}
}));
WriteResult writeResult = rowData.apply(BigQueryIO.writeTableRows().to("beam_poc.table_poc").withSchema(tblSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Beam</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.29.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.29.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
<!-- <dependency>-->
<!-- <groupId>org.apache.beam</groupId>-->
<!-- <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>-->
<!-- <version>2.29.0</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.29.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-bigquery -->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.29.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.1</version>
</dependency>
<!-- Thanks for using https://jar-download.com -->
</dependencies>
</project>
The error that I am getting:-
May 19, 2021 9:21:45 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue SEVERE: ~~~ Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~~~ Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true. java.lang.RuntimeException: ManagedChannel allocation site
A fix was released (https://issues.apache.org/jira/browse/BEAM-12356) and this error should no longer occur as long as apache beam version 2.31.0 and above is used.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With