I have a DAG that fetches data from Elasticsearch and ingests into the data lake. The first task, BeginIngestion, opens in several tasks (one for each resource), and these tasks open in more tasks (one for each shard). After the shards are fetched, the data is uploaded to S3 and then closed into a task EndIngestion, followed by a task AuditIngestion.
It was executing correctly, but now all tasks are executed successfully, but the "closing task" EndIngestion remains with no status. When I refresh the webserver's page, the DAG is marked as Failed.
This image shows successful upstream tasks, with the task end_ingestion with no status and the DAG marked as Failed.
I also dug into the task instance details and found
- Dagrun Running: Task instance's dagrun was not in the 'running' state but in the state 'failed'.
- Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'failed': 0, 'upstream_failed': 0, 'skipped': 0, 'done': 49, 'successes': 49}, upstream_task_ids=['s3_finish_upload_ingestion_raichucrud_complain', 's3_finish_upload_ingestion_raichucrud_interaction', 's3_finish_upload_ingestion_raichucrud_company', 's3_finish_upload_ingestion_raichucrud_user', 's3_finish_upload_ingestion_raichucrud_privatecontactinteraction', 's3_finish_upload_ingestion_raichucrud_location', 's3_finish_upload_ingestion_raichucrud_companytoken', 's3_finish_upload_ingestion_raichucrud_indexevolution', 's3_finish_upload_ingestion_raichucrud_companyindex', 's3_finish_upload_ingestion_raichucrud_producttype', 's3_finish_upload_ingestion_raichucrud_categorycomplainsto', 's3_finish_upload_ingestion_raichucrud_companyresponsible', 's3_finish_upload_ingestion_raichucrud_category', 's3_finish_upload_ingestion_raichucrud_additionalfieldoption', 's3_finish_upload_ingestion_raichucrud_privatecontactconfiguration', 's3_finish_upload_ingestion_raichucrud_phone', 's3_finish_upload_ingestion_raichucrud_presence', 's3_finish_upload_ingestion_raichucrud_responsible', 's3_finish_upload_ingestion_raichucrud_store', 's3_finish_upload_ingestion_raichucrud_socialprofile', 's3_finish_upload_ingestion_raichucrud_product', 's3_finish_upload_ingestion_raichucrud_macrorankingpresenceto', 's3_finish_upload_ingestion_raichucrud_macroinfoto', 's3_finish_upload_ingestion_raichucrud_raphoneproblem', 's3_finish_upload_ingestion_raichucrud_macrocomplainsto', 's3_finish_upload_ingestion_raichucrud_testimony', 's3_finish_upload_ingestion_raichucrud_additionalfield', 's3_finish_upload_ingestion_raichucrud_companypageblockitem', 's3_finish_upload_ingestion_raichucrud_rachatconfiguration', 's3_finish_upload_ingestion_raichucrud_macrorankingitemto', 's3_finish_upload_ingestion_raichucrud_purchaseproduct', 's3_finish_upload_ingestion_raichucrud_rachatproblem', 's3_finish_upload_ingestion_raichucrud_role', 's3_finish_upload_ingestion_raichucrud_requestmoderation', 's3_finish_upload_ingestion_raichucrud_categoryproblemto', 's3_finish_upload_ingestion_raichucrud_companypageblock', 's3_finish_upload_ingestion_raichucrud_problemtype', 's3_finish_upload_ingestion_raichucrud_key', 's3_finish_upload_ingestion_raichucrud_macro', 's3_finish_upload_ingestion_raichucrud_url', 's3_finish_upload_ingestion_raichucrud_document', 's3_finish_upload_ingestion_raichucrud_transactionkey', 's3_finish_upload_ingestion_raichucrud_catprobitemcompany', 's3_finish_upload_ingestion_raichucrud_privatecontactinteraction', 's3_finish_upload_ingestion_raichucrud_categoryinfoto', 's3_finish_upload_ingestion_raichucrud_marketplace', 's3_finish_upload_ingestion_raichucrud_macroproblemto', 's3_finish_upload_ingestion_raichucrud_categoryrankingto', 's3_finish_upload_ingestion_raichucrud_macrorankingto', 's3_finish_upload_ingestion_raichucrud_categorypageto']
As you see, the "Trigger Rule" field says that one of the tasks is in a "non-successful state", but at the same time the stats shows that all upstreams are marked as successful.
If I reset the database, it doesn't happen, but I can't reset it for every execution (hourly). I also don't want to reset it.
Does anyone have any light?
PS: I am running in an EC2 instance (c4.xlarge) with LocalExecutor.
[EDIT]
I found in the scheduler log that the DAG is in deadlock:
[2017-08-25 19:25:25,821] {models.py:4076} DagFileProcessor157 INFO - Deadlock; marking run failed
I guess this may be due to some exception treatment.
I have had this exact issue before, for me my code was generating duplicate task ids. And it looks like in your case there is also a duplicate id:
s3_finish_upload_ingestion_raichucrud_privatecontactinteraction
This is probably a year late for you, but hopefully this will save others, lots of debugging time :)