Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use BigQueryOperator with execution_date?

Tags:

airflow

This is my code:

EXEC_TIMESTAMP  = "{{  execution_date.strftime('%Y-%m-%d %H:%M')  }}"
query = """
        select ... where date_purchased between TIMESTAMP_TRUNC(cast ( {{ params.run_timestamp }} as TIMESTAMP), HOUR, 'UTC') ...
        """
generate_op = BigQueryOperator(
                    bql=query,
                    destination_dataset_table=table_name,
                    task_id='generate',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    query_params={'run_timestamp': EXEC_TIMESTAMP},
                    dag=dag)

This should work but it doesn't. The render tab shows me:

between TIMESTAMP_TRUNC(cast (  as TIMESTAMP), HOUR, 'UTC')

The date is missing. It's being rendered into nothing.

How can I fix this? There is no provide_context=True for this operator. I don't know what to do.

like image 921
Luis Avatar asked Oct 26 '25 09:10

Luis


2 Answers

Luis, the query_params are not the params you can refer to in the templating context. They are not added to it. And since params is empty, your {{ params.run_timestamp }} is either "" or None. If you changed that to params={'run_timestamp':…} it would still have a problem because params values are not templated. So when you use a templated field bql to include {{ params.run_timestamp }} you will get exactly what's in params: {'run_timestamp': …str… } filled in WITHOUT any recursive expansion of that value. You should get {{ execution_date.strftime('%Y-%m-%d %H:%M') }}.

Let me try re-writing this for you (but I may have got the parens around cast incorrectly, not sure):

generate_op = BigQueryOperator(
                    sql="""
select ...
where date_purchased between
  TIMESTAMP_TRUNC(cast('{{execution_date.strftime('%Y-%m-%d %H:%M')}}') as TIMESTAMP), HOUR, 'UTC')
...
                    """,
                    destination_dataset_table=table_name,
                    task_id='generate',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    dag=dag,
)

You can see the bql and sql fields are templated. However the bql field is deprecated and removed in later code.

like image 159
dlamblin Avatar answered Oct 29 '25 07:10

dlamblin


The issue is you are using query_params which is not a templated field as @dlamblin mentioned.

Use the following code that directly uses the execution_date date inside bql:

import airflow
from airflow.models import DAG, Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
import os


CONNECTION_ID = Variable.get("Your_Connection")

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 12, 27, 11, 15),
    'retries': 4,
    'retry_delay': timedelta(minutes=10)
}


dag = DAG(
    dag_id='My_Test_DAG',
    default_args=args,
    schedule_interval='15 * * * *',
    max_active_runs=1,
    catchup=False,
)

query = """select customers_email_address as email, 
   from mytable
   where 
    and date_purchased = TIMESTAMP_SUB(TIMESTAMP_TRUNC(cast ({{  execution_date.strftime('%Y-%m-%d %H:%M')  }} as TIMESTAMP), HOUR, 'UTC'), INTERVAL 1 HOUR) """

create_orders_temp_table_op = BigQueryOperator(
                    bql = query,
                    destination_dataset_table='some table',
                    task_id='create_orders_temp_table',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    dag=dag)

start_task_op = DummyOperator(task_id='start_task', dag=dag)


start_task_op  >> create_orders_temp_table_op
like image 37
kaxil Avatar answered Oct 29 '25 07:10

kaxil



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!