I need to read the Table Data from Bigquery using dataflow but instead of using/storing data into TableRow Class. I want to Store Data in Java Pojo Classes, is there any method using that I directly map the data into Pojo.
Way2:
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields()) {
counter++;
mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
if(f.name().equalsIgnoreCase("reason_code_id")) {
BigDecimal numericValue =
new Conversions.DecimalConversion()
.fromBytes((ByteBuffer)s.get(f.name()) , Schema.create(s1.getType()), s1.getLogicalType());
System.out.println("Numeric Con"+numericValue);
}
else {
System.out.println("Else Condition "+f.name());
}
}
```
Facing Issue:
2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: RECORD
StackTrace
java.io.IOException: Failed to start reading from source: gs://trusted-bucket/mgp/temp/BigQueryExtractTemp/3a5365f1e53d4dd393f0eda15a2c6bd4/000000000000.avro range [0, 65461)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:596)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:306)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: Can't create a: RECORD
at org.apache.avro.Schema.create(Schema.java:120)
at com.globalpay.WelcomeEmail.mapRecordToObject(WelcomeEmail.java:118)
at com.globalpay.WelcomeEmail.access$0(WelcomeEmail.java:112)
at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:54)
at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:221)
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:214)
at org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:567)
at org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:209)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:593)
... 14 more
BigQueryIO#read(SerializableFunction) allows one to use any of the Avro to POJO conversion libraries/functions that exist.
For example, I am using the code from this blog post:
private static <T> T mapRecordToObject(GenericRecord record, T object) {
Assert.notNull(record, "record must not be null");
Assert.notNull(object, "object must not be null");
final Schema schema = ReflectData.get().getSchema(object.getClass());
Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn’t match");
record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString()));
return object;
}
PCollection<MyType> data = pipeline.apply(
BigQueryIO
.read(new SerializableFunction<SchemaAndRecord, MyType>() {
public MyType apply(SchemaAndRecord schemaAndRecord) {
return mapRecordToObject(schemaAndRecord.getRecord(), new MyType());
}
})
.from("mydataset:mytable"));
The code from the blog post assumes that the avro schema was used to generate the POJO.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With