While running pySpark SQL pipelines via Airflow I am interested in getting out some business stats like:
One idea is to push it directly to the metrics, so it will gets automatically consumed by monitoring tools like Prometheus. Another idea is to obtain these values via some DAG result object, but I wasn't able to find anything about it in docs.
Please post some at least pseudo code if you have solution.
I would look to reuse Airflow's statistics and monitoring support in the airflow.stats.Stats class. Maybe something like this:
import logging
from airflow.stats import Stats
PYSPARK_LOG_PREFIX = "airflow_pyspark"
def your_python_operator(**context):
    [...]
    try:
        Stats.incr(f"{PYSPARK_LOG_PREFIX}_read_count", src_read_count)
        Stats.incr(f"{PYSPARK_LOG_PREFIX}_write_count", tgt_write_count)
        # So on and so forth
    except:
        logging.exception("Caught exception during statistics logging")
    [...]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With