Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass execution_date as parameter in SparkKubernetesOperator operator?

I am trying to find a way to pass the execution_Date to the SparkKubernetesOperator. Any way its possible to pass that since I will be using execution date for the spark run and s3 partitions.

submit_compaction_to_spark = SparkKubernetesOperator(
        task_id="submit_compaction_to_spark",
        application_file="/k8s/compaction_s3.yml",
        namespace=kubernetes_namespace,
        kubernetes_conn_id="kubernetes",
        params={
            "warehouse_path": s3_path,
            "snapshot_expire_time": execution_date,
            "partition_filter": execution_date,
            "k8s_namespace": kubernetes_namespace,
            "docker_image_tag": docker_image_tag,
        }
like image 402
Kumar Padhy Avatar asked Dec 13 '25 21:12

Kumar Padhy


1 Answers

Unfortunately, params only exposes custom values to jinja but it does not render jinja templates inside them.

For example, let's look at this PythonOperator.

op = PythonOperator(
    task_id="my_operator",
    python_callable=lambda **context: print(context['params']),
    params={
        "date": "{{ execution_date }}"
    },
    dag=dag
)

The value for the date key is the literal string "{{ execution_date }}" and not the rendered value.

[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}

The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.

You can read more about Jinja templating with params in the Airflow Documentation.


It is possible to use execution_date in other ways.SparkKubernetesOperator leverages jinja templating with these settings.

template_fields = ['application_file', 'namespace']  
template_ext = ('yaml', 'yml', 'json')

SparkKubernetesOperator has two templated fields, application_file and namespace, meaning you can use jinja templates as values. If you reference a file with those extension, it will render the file and the jinja templates inside.

Let's modify the operator you provided.

submit_compaction_to_spark = SparkKubernetesOperator(
        task_id="submit_compaction_to_spark",
        application_file="/k8s/compaction_s3.yml",
        namespace=kubernetes_namespace,
        kubernetes_conn_id="kubernetes",
        params={
            "k8s_namespace": kubernetes_namespace,
            "warehouse_path": s3_path,
        }
)

I'm going to guess what /k8s/compaction_s3.yml looks like and add some jinja templates.

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
  namespace: "{{ params.k8s_namespace }}"
  labels:
    warehouse_path: "{{ params.k8s_namespace }}"
    date: "{{ ds }}"
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.4"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
  sparkVersion: "2.4.4"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 2.4.4
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.4
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

You can check the Render Template View for the Task Instance in your DAG.

Please also reference the example DAG and sample application_file in the Airflow documentation.

like image 172
Alan Ma Avatar answered Dec 16 '25 19:12

Alan Ma