Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

For google cloud dataflow, is it possible to start another pipeline from a pipeline.

I am trying setup a google cloud dataflow pipeline (streaming mode) that read pubsub topic message, extract information (object name in google cloud storage) from published message, then start another pipeline (batch mode) to process the object stored in google cloud storage.

Is it possible to start another pipeline within a pipeline???

like image 981
火星一號 Avatar asked Sep 11 '25 05:09

火星一號


2 Answers

There is no technical reason barring this. You would need to be sure to keep your Pipeline objects separate, have sufficient Compute Engine quota to launch all the jobs you need.

like image 156
Sam McVeety Avatar answered Sep 14 '25 00:09

Sam McVeety


We got it to work. Doing this:

private static class ExecuteUpdateTaskFroNamespace extends DoFn<String, String> {
    @Override
    public void processElement(ProcessContext c) throws Exception {
        String namespace = c.element();
        LOG.info("Processing namespace: " + namespace);

        BasicOptions options = c.getPipelineOptions().cloneAs(BasicOptions.class);

        EntityOptions entityOptions = PipelineOptionsFactory.as(EntityOptions.class); // important to NOT use .create()
        entityOptions.setNamespace(namespace);
        entityOptions.setProject(options.getProject());
        entityOptions.setRunner(DataflowPipelineRunner.class);
        entityOptions.setStagingLocation(options.getStagingLocation());
        entityOptions.setKind("DocsAsset");
        try {
            Pipeline p = Pipeline.create(entityOptions);
            p.apply("Read from Datastore", BcDatastoreReadFactory.getEntitySource(entityOptions))
                    .apply("Find Old Site Entities", ParDo.of(new FindEntities()))
                    .apply("Transform Entities", ParDo.of(new TransformEntities()))
                    .apply("Save", DatastoreIO.v1().write().withProjectId(entityOptions.getProject()));
            p.run();

            LOG.info("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
            c.output("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);

        } catch (Exception e) {
            LOG.warn("Unable to create pipeline for namespace: " + namespace, e);
        }

    }
}

Issues: You can't spawn more then 25 at a time without hitting quota, to bypass this you can change setRunner(DataflowPipelineRunner.class) to setRunner(BlockingDataflowPipelineRunner.class). BUT BlockingDataflowPipelineRunner is removed in 2.0.0

EntityOptions and BasicOptions are extensions of PipelineOptions.

like image 37
Travis Wilson Avatar answered Sep 14 '25 00:09

Travis Wilson