Is it possible to create a pipeline that reads data from Pub/Sub and writes to Datastore? In my code I specify the PubsubIO as the input, and apply windowing to get a bounded PCollection, but it seems that it is not possible to use the DatastoreIO.writeTo with the options.setStreaming as true, while that is required in order to use PubsubIO as input. Is there a way around this? Or is it simply not possible to read from pubsub and write to datastore?
Here's my code:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);
    options.setRunner(DataflowPipelineRunner.class);
    options.setProject(projectName);
    options.setStagingLocation("gs://my-staging-bucket/staging");
    options.setStreaming(true);
    Pipeline p = Pipeline.create(options);
    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
        private static final long serialVersionUID = 1L;
        public void processElement(ProcessContext c) {
            String msg = c.element();
            byte[] decoded = Base64.decodeBase64(msg.getBytes());
            String outmsg = new String(decoded);
            c.output(outmsg);
        }
    }));
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));
    inputEntity.apply(DatastoreIO.writeTo(datasetid));
    p.run();
And this is the exception I get:
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)
The DatastoreIO sink is not currently supported in the streaming runner. To write to Datastore from a streaming pipeline, you can make direct calls to the Datastore API from a DoFn.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With