I'm trying to use the Airflow macros in my Python Operator but I keep receiving "airflow: error: unrecognized arguments:"
So I am importing a function that has 3 positional arguments: (sys.argv,start_date,end_date) and I am hoping to make the start_date and end_date the execution date in Airflow.
The function arguments look something like this
def main(argv,start_date,end_date):
Here is the task I have in the DAG:
t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=main,
    op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
    dag=dag)
Apache Airflow is an open-source workflow management platform for building Data Pipelines. It enables users to schedule and run Data Pipelines using the flexible Python Operators and framework.
airflow.macros. ds_format (ds, input_format, output_format)[source] Takes an input string and outputs another string as specified in the output format Parameters. ds (str) – input string which contains a date. input_format (str) – input string format.
Since you're passing in dates that need to be rendered by Airflow, you'll want to use the templates_dict parameter in the Python Operator. This field is the only one that Airflow will recognize as containing templates.
You can create a custom Python operator that recognizes more fields as templates by copy-ing the existing operator and add the relevant fields to the template_fields tuple.
def main(**kwargs):
    argv = kwargs.get('templates_dict').get('argv')
    start_date = kwargs.get('templates_dict').get('start_date')
    end_date = kwargs.get('templates_dict').get('end_date')
t1 = PythonOperator(task_id='Pull_DCM_Report',
                    provide_context=True,
                    python_callable=main,
                    templates_dict={'argv': sys.argv,
                                    'start_date': '{{ yesterday_ds }}',
                                    'end_date': '{{ ds }}'},
                    dag=dag)
You can "wrap" the call to the main function with the following:
t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=lambda **context: main([], context["ds"], context["ds"]),
    dag=dag)
If lambdas aren't your cup of tea you could define a function, call that, and have it call out to main.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With