This is my code:
EXEC_TIMESTAMP = "{{ execution_date.strftime('%Y-%m-%d %H:%M') }}"
query = """
select ... where date_purchased between TIMESTAMP_TRUNC(cast ( {{ params.run_timestamp }} as TIMESTAMP), HOUR, 'UTC') ...
"""
generate_op = BigQueryOperator(
bql=query,
destination_dataset_table=table_name,
task_id='generate',
bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={'run_timestamp': EXEC_TIMESTAMP},
dag=dag)
This should work but it doesn't. The render tab shows me:
between TIMESTAMP_TRUNC(cast ( as TIMESTAMP), HOUR, 'UTC')
The date is missing. It's being rendered into nothing.
How can I fix this? There is no provide_context=True for this operator. I don't know what to do.
Luis, the query_params are not the params you can refer to in the templating context. They are not added to it. And since params is empty, your {{ params.run_timestamp }} is either "" or None. If you changed that to params={'run_timestamp':…} it would still have a problem because params values are not templated. So when you use a templated field bql to include {{ params.run_timestamp }} you will get exactly what's in params: {'run_timestamp': …str… } filled in WITHOUT any recursive expansion of that value. You should get {{ execution_date.strftime('%Y-%m-%d %H:%M') }}.
Let me try re-writing this for you (but I may have got the parens around cast incorrectly, not sure):
generate_op = BigQueryOperator(
sql="""
select ...
where date_purchased between
TIMESTAMP_TRUNC(cast('{{execution_date.strftime('%Y-%m-%d %H:%M')}}') as TIMESTAMP), HOUR, 'UTC')
...
""",
destination_dataset_table=table_name,
task_id='generate',
bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
dag=dag,
)
You can see the bql and sql fields are templated. However the bql field is deprecated and removed in later code.
The issue is you are using query_params which is not a templated field as @dlamblin mentioned.
Use the following code that directly uses the execution_date date inside bql:
import airflow
from airflow.models import DAG, Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
import os
CONNECTION_ID = Variable.get("Your_Connection")
args = {
'owner': 'airflow',
'start_date': datetime(2018, 12, 27, 11, 15),
'retries': 4,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
dag_id='My_Test_DAG',
default_args=args,
schedule_interval='15 * * * *',
max_active_runs=1,
catchup=False,
)
query = """select customers_email_address as email,
from mytable
where
and date_purchased = TIMESTAMP_SUB(TIMESTAMP_TRUNC(cast ({{ execution_date.strftime('%Y-%m-%d %H:%M') }} as TIMESTAMP), HOUR, 'UTC'), INTERVAL 1 HOUR) """
create_orders_temp_table_op = BigQueryOperator(
bql = query,
destination_dataset_table='some table',
task_id='create_orders_temp_table',
bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
dag=dag)
start_task_op = DummyOperator(task_id='start_task', dag=dag)
start_task_op >> create_orders_temp_table_op
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With