Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop Dataproc Job from airflow

From Airflow we can submit dataproc jobs via DataprocSubmitJobOperator.

We can stop dataproc jobs in development environment but not in Production environnment via GCP Console.

Is there any way, we can kill dataproc jobs directly via Airflow, if dataproc job id is provided as parameter.

like image 354
Khilesh Chauhan Avatar asked Oct 26 '25 08:10

Khilesh Chauhan


1 Answers

At the moment there is no operator for this action but DataprocHook has cancel_job function so you can create a custom operator :

class MyDataprocCancelJobOperator(BaseOperator):
    """ Starts a job cancellation request."""

    template_fields: Sequence[str] = ("region", "project_id", "impersonation_chain")

    def __init__(
        self,
        *,
        job_id: str,
        project_id: str,
        region: Optional[str] = None,
        retry: Union[Retry, _MethodDefault] = DEFAULT,
        timeout: Optional[float] = None,
        metadata: Sequence[Tuple[str, str]] = (),
        gcp_conn_id: str = "google_cloud_default",
        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.job_id = job_id
        self.project_id = project_id
        self.region = region
        self.retry = retry
        self.timeout = timeout
        self.metadata = metadata
        self.gcp_conn_id = gcp_conn_id
        self.impersonation_chain = impersonation_chain

    def execute(self, context: 'Context'):
        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
        job = hook.cancel_job(
            job_id=self.job_id,
            project_id=self.project_id,
            region=self.region,
            retry=self.retry,
            timeout=self.timeout,
            metadata=self.metadata,
        )
        return job
like image 88
Elad Kalif Avatar answered Oct 29 '25 07:10

Elad Kalif