When I go to use operators/hooks like the BigQueryHook I see a message that these operators are deprecated and to use the airflow.gcp... operator version. However when i try and use it in my dag it fails and says no module named airflow.gcp. I have the most up to date airflow composer version w/ beta features, python3. Is it possible to install these operators somehow?
I am trying to run a Dataflow Job in python 3 using beam 2.15. I have tried virtualenv operator, but that doesn't work because it only allows python2.7. How can I do this?
The newest Airflow version available in Composer is either 1.10.2 or 1.10.3 (depending on the region). By then, those operators were in the contrib section.
Focusing on how to run Python 3 Dataflow jobs with Composer you'd need for a new version to be released. However, if you need an immediate solution you can try to back-port the fix.
In this case I defined a DataFlow3Hook which extends the normal DataFlowHook but that it does not hard-code python2 in the start_python_dataflow method:
class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        ...
        py_interpreter: str = "python3"
    ):
        ...
        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)
Then we'll have our custom DataFlowPython3Operator calling the new hook:
class DataFlowPython3Operator(DataFlowPythonOperator):
    def execute(self, context):
        ...
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
        ...
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")
Finally, in our DAG we just use the new operator:
task = DataFlowPython3Operator(
    py_file='/home/airflow/gcs/data/main.py',
    task_id=JOB_NAME,
    dag=dag)
See full code here. Job runs with Python 3.6:

Environment details and dependencies used (Beam job was a minimal example):
softwareConfig:
  imageVersion: composer-1.8.0-airflow-1.10.3
  pypiPackages:
    apache-beam: ==2.15.0
    google-api-core: ==1.14.3
    google-apitools: ==0.5.28
    google-cloud-core: ==1.0.3
  pythonVersion: '3'
Let me know if that works for you. If so, I'd recommend moving the code to a plugin for code readability and to reuse it across DAGs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With