I have a bit of confusion about the way BaseSensorOperator's parameters work: timeout  & poke_interval.
Consider this usage of the sensor :
BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout after 12 hours
)
The documentation mentions the timeout acts to set the task to 'fail' after it runs out. But I'm using a soft_fail=True, I don't think it retains the same behavior, because I've found the task failed instead of skipping after I've used both parameters soft_fail and timeout.
So what does happen here?
Here's the documentation of the BaseSensorOperator
class BaseSensorOperator(BaseOperator, SkipMixin):
    """
    Sensor operators are derived from this class and inherit these attributes.
    Sensor operators keep executing at a time interval and succeed when
    a criteria is met and fail if and when they time out.
    :param soft_fail: Set to true to mark the task as SKIPPED on failure
    :type soft_fail: bool
    :param poke_interval: Time in seconds that the job should wait in
        between each tries
    :type poke_interval: int
    :param timeout: Time, in seconds before the task times out and fails.
    :type timeout: int
    :param mode: How the sensor operates.
        Options are: ``{ poke | reschedule }``, default is ``poke``.
        When set to ``poke`` the sensor is taking up a worker slot for its
        whole execution time and sleeps between pokes. Use this mode if the
        expected runtime of the sensor is short or if a short poke interval
        is requried.
        When set to ``reschedule`` the sensor task frees the worker slot when
        the criteria is not yet met and it's rescheduled at a later time. Use
        this mode if the expected time until the criteria is met is. The poke
        inteval should be more than one minute to prevent too much load on
        the scheduler.
    :type mode: str
    """
BaseSensorOperator inherits from BaseOperator, the documentation states execution_timeout (datetime. timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
poke_interval : When using poke mode, this is the time in seconds that the sensor waits before checking the condition again. The default is 30 seconds.
Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.
The smart sensor is a service (run by a builtin DAG) which greatly reduces airflow's infrastructure cost by consolidating some of the airflow long running light weight tasks.
Defining the terms
poke_interval: the duration b/w successive 'pokes' (evaluation the necessary condition that is being 'sensed')
timeout: Just poking indefinitely is inadmissible (if for e.g. your buggy code is poking on day to become 29 whenever month is 2, it will keep poking for upto 4 years). So we define a maximum period beyond which we stop poking and terminate (the sensor is marked either FAILED or SKIPPED)
soft_fail: Normally (when soft_fail=False), sensor is marked as FAILED after timeout. When soft_fail=True, sensor will instead be marked as SKIPPED after timeout
mode: This is a slightly complex
slot in some pool (either default pool or explicitly specified pool); essentially meaning that it takes up some resources.ExternalTaskSensors is notorious for putting entire workflows (DAGs) into deadlocks
modes in sensors
mode='poke' (default) means the existing behaviour that we discussed abovemode='reschedule' means after a poke attempt, rather than going to sleep, the sensor will behave as though it failed (in current attempt) and it's status will change from RUNNING to UP_FOR_RETRY. That ways, it will release it's slot, allowing other tasks to progress while it waits for another poke attempt
if self.reschedule: reschedule_date = timezone.utcnow() + timedelta( seconds=self._get_next_poke_interval(started_at, try_number)) raise AirflowRescheduleException(reschedule_date) else: sleep(self._get_next_poke_interval(started_at, try_number)) try_number += 1
And now answering your questions directly
Q1
- The sensor pokes every 4 hours, and at every poke, will wait for the duration of the timeout (12 hours)?
- Or does it poke every 4 hours, for a total of 3 pokes, then times out?
point 2. is correct
Q2
Also, what happens with these parameters if I use the mode="reschedule"?
As explained earlier, each one of those params are independent and setting mode='reschedule' doesn't alter their behaviour in any way
BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout of 12 hours
  mode = "reschedule"
)
Let's say the criteria is not met at the first poke. So it will run again after 4 hours of interval. But the worker slot will be freed during the wait since we're using the mode="reschedule".
That is what I understood.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With