Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run tasks parallely in apache Airflow

I have run following dag in Airflow, enter image description here

When executing above dag, It will run one of the following order serially.

A -> B -> C1 -> C2 -> D1 -> D2

A -> B -> C2 -> C1 -> D2 -> D1

but my requirement is run both C1 and C2 tasks parallely. Part of my airflow.cfg

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
#executor = LocalExecutor

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30

# Secret key used to run your flask app
secret_key = temporary_key

# Number of workers to run the Gunicorn web server
workers = 4

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2
like image 641
Hasitha Avatar asked Sep 03 '25 02:09

Hasitha


2 Answers

add concurrency=x (where x is int greater than 1) in your dag properties.

max_active_runs is dag concurrency. concurrency is task concurrency.

example:

    dag = DAG(
    dag_id,
    default_args=default_args,
    schedule_interval='00 03 * * *',
    max_active_runs=2,
    concurrency=2)
like image 137
Alistair McIntyre Avatar answered Sep 05 '25 00:09

Alistair McIntyre


If you are just testing it on a single machine then I suggest using LocalExecutor. SequentialExecutor runs tasks serially and CeleryExecutor would need a cluster of machines which a message broker.

Also, when you use LocalExecutor, you should use a meta DB different than sqlite as sqlite doesn't support parallel reads. So you can use Postgres or MySQL and accordingly change sql_alchemy_conn in airflow.cfg file.

Read this: https://airflow.apache.org/howto/initialize-database.html

“LocalExecutor”, an executor that can parallelize task instances locally.

like image 28
kaxil Avatar answered Sep 05 '25 00:09

kaxil