I just need to run a dataflow pipeline on a daily basis, but it seems to me that suggested solutions like App Engine Cron Service, which requires building a whole web app, seems a bit too much. I was thinking about just running the pipeline from a cron job in a Compute Engine Linux VM, but maybe that's far too simple :). What's the problem with doing it that way, why isn't anybody (besides me I guess) suggesting it?
Go to the Dataflow Pipelines page in the Google Cloud console, then select +Create data pipeline. On the Create pipeline from template page, provide a pipeline name, and fill in the other template selection and parameter fields. For a batch job, in the Schedule your pipeline section, provide a recurrence schedule.
This is how I did it using Cloud Functions, PubSub, and Cloud Scheduler (this assumes you've already created a Dataflow template and it exists in your GCS bucket somewhere)
Create a new topic in PubSub. this will be used to trigger the Cloud Function
Create a Cloud Function that launches a Dataflow job from a template. I find it easiest to just create this from the CF Console. Make sure the service account you choose has permission to create a dataflow job. the function's index.js looks something like:
const google = require('googleapis');
exports.triggerTemplate = (event, context) => {
  // in this case the PubSub message payload and attributes are not used
  // but can be used to pass parameters needed by the Dataflow template
  const pubsubMessage = event.data;
  console.log(Buffer.from(pubsubMessage, 'base64').toString());
  console.log(event.attributes);
  google.google.auth.getApplicationDefault(function (err, authClient, projectId) {
  if (err) {
    console.error('Error occurred: ' + err.toString());
    throw new Error(err);
  }
  const dataflow = google.google.dataflow({ version: 'v1b3', auth: authClient });
  dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {},
          jobName: 'SOME-DATAFLOW-JOB-NAME',
          gcsPath: 'gs://PATH-TO-YOUR-TEMPLATE'
        }
      }, function(err, response) {
        if (err) {
          console.error("Problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
      });
  });
};
The package.json looks like
{
  "name": "pubsub-trigger-template",
  "version": "0.0.1",
  "dependencies": {
    "googleapis": "37.1.0",
    "@google-cloud/pubsub": "^0.18.0"
  }
}
Go to PubSub and the topic you created, manually publish a message. this should trigger the Cloud Function and start a Dataflow job
Use Cloud Scheduler to publish a PubSub message on schedule https://cloud.google.com/scheduler/docs/tut-pub-sub
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With