I am trying to find a way to pass the execution_Date to the SparkKubernetesOperator. Any way its possible to pass that since I will be using execution date for the spark run and s3 partitions.
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"warehouse_path": s3_path,
"snapshot_expire_time": execution_date,
"partition_filter": execution_date,
"k8s_namespace": kubernetes_namespace,
"docker_image_tag": docker_image_tag,
}
Unfortunately, params only exposes custom values to jinja but it does not render jinja templates inside them.
For example, let's look at this PythonOperator.
op = PythonOperator(
task_id="my_operator",
python_callable=lambda **context: print(context['params']),
params={
"date": "{{ execution_date }}"
},
dag=dag
)
The value for the date key is the literal string "{{ execution_date }}" and not the rendered value.
[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}
The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.
You can read more about Jinja templating with params in the Airflow Documentation.
It is possible to use execution_date in other ways.SparkKubernetesOperator leverages jinja templating with these settings.
template_fields = ['application_file', 'namespace']
template_ext = ('yaml', 'yml', 'json')
SparkKubernetesOperator has two templated fields, application_file and namespace, meaning you can use jinja templates as values. If you reference a file with those extension, it will render the file and the jinja templates inside.
Let's modify the operator you provided.
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"k8s_namespace": kubernetes_namespace,
"warehouse_path": s3_path,
}
)
I'm going to guess what /k8s/compaction_s3.yml looks like and add some jinja templates.
---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
namespace: "{{ params.k8s_namespace }}"
labels:
warehouse_path: "{{ params.k8s_namespace }}"
date: "{{ ds }}"
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.4"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
sparkVersion: "2.4.4"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.4
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.4
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
You can check the Render Template View for the Task Instance in your DAG.
Please also reference the example DAG and sample application_file in the Airflow documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With