From Airflow we can submit dataproc jobs via DataprocSubmitJobOperator.
We can stop dataproc jobs in development environment but not in Production environnment via GCP Console.
Is there any way, we can kill dataproc jobs directly via Airflow, if dataproc job id is provided as parameter.
At the moment there is no operator for this action but DataprocHook has cancel_job function so you can create a custom operator :
class MyDataprocCancelJobOperator(BaseOperator):
""" Starts a job cancellation request."""
template_fields: Sequence[str] = ("region", "project_id", "impersonation_chain")
def __init__(
self,
*,
job_id: str,
project_id: str,
region: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.job_id = job_id
self.project_id = project_id
self.region = region
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
def execute(self, context: 'Context'):
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
job = hook.cancel_job(
job_id=self.job_id,
project_id=self.project_id,
region=self.region,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
return job
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With