As mentioned in Starts Airflow task execution directly from the triggerer, the syntax of this feature will change after #39585 merged.
This is needed mainly because we don't want to run any user code in the scheduler, which might happen here.
This article will cover the following 3 PRs
- Prevent start trigger initialization in scheduler
- add next_kwargs to StartTriggerArgs
- Add start execution from trigger support for existing core sensors
PR 2 is a minor fix to PR 1. PR 3 is adding this feature to core sensors.
What does it look like now?
The following code snippet shows how to define an operator that can start task execution directly from the trigger. I have kept the code simple and removed unnecessary details.
from airflow.sensors.date_time import DateTimeSensorAsync
from airflow.triggers.base import StartTriggerArgs
class ExecuteFromTriggerDateTimeSensor(DateTimeSensor):
start_trigger_args: StartTriggerArgs = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.DateTimeTrigger",
trigger_kwargs={"moment": "", "end_from_trigger": False},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger: bool = True
def __init__(self, *, trigger_kwargs, **kwargs) -> None:
super().__init__(**kwargs)
self.start_trigger_args.trigger_kwargs = dict(
moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger,
)
def execute_complete(self, context, event) -> None:
return
To enable an operator or a sensor with this feature, we must define the attributes start_trigger_args
and start_from_trigger
. We use start_from_trigger
to determine whether we want to enable this feature and start_trigger_args
to decide which trigger we want to use and how to use it. start_trigger_args
should be an instance of airflow.triggers.base.StartTriggerArgs
and it should contain the following arguments.
trigger_cls
- the full import path of the trigger to run
- e.g.,
airflow.triggers.temporal.DateTimeTrigger
trigger_kwargs
- the keyword arguments passed into the trigger for initialization
- e.g.,
{"moment": "", "end_from_trigger": False}
next_method
- the name of the method to run after execution done in the trigger
- Most of the time, it's
execute_complete
next_kwargs
- the keyword arguments passed into the next method
- Most of the time, it's
None
. I never saw it used. 🤔
timeout
- the timeout for the trigger
- Most of the time, it'll be set to
self.execution_timeout
.
If the attributes start_trigger_args
and start_from_trigger
are defined as class attributes, we could set trigger_kwargs
as a temporary value and update the values in the __init__
method. However, implementing a DAG with dynamic task mapping on an operator supporting this feature works differently, which will be detailed in the next article.
What's changed under the hook
Let's start from airflow/models/dagrun.py. Airflow verifies if the start_from_trigger
attribute is set to True
and the start_from_trigger
attribute is set. If both conditions are met, the defer_task
method will be called with exception=None
.
elif ti.task.start_from_trigger is True and ti.task.start_trigger_args is not None:
ti.start_date = timezone.utcnow()
if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
ti.try_number += 1
ti.defer_task(exception=None, session=session)
Then, let's go to airflow/models/taskinstance.py.
if exception is not None:
trigger_row = Trigger.from_object(exception.trigger)
next_method = exception.method_name
next_kwargs = exception.kwargs
timeout = exception.timeout
elif ti.task is not None and ti.task.start_trigger_args is not None:
trigger_row = Trigger(
classpath=ti.task.start_trigger_args.trigger_cls,
kwargs=ti.task.start_trigger_args.trigger_kwargs or {},
)
next_kwargs = ti.task.start_trigger_args.next_kwargs
next_method = ti.task.start_trigger_args.next_method
timeout = ti.task.start_trigger_args.timeout
else:
raise AirflowException("exception and ti.task.start_trigger_args cannot both be None")
We handle standard task deferral in the first if
condition. This happens when a task runs the defer
method and raises a TaskDeferred
exception. For a more detailed version, you can refer to this link. Therefore, we need to set exception=None
in the previous code block, as we are not handling it in the standard way.
The elif
statement checks whether this operator has a start_trigger_args
attribute, which indicates that this operator supports the "starting execution from the trigger" feature. Airflow will then load the value from start_trigger_args
and assign it to the variables that will later be used in the rest of the deferral process, similar to the standard task deferral.
The next article in this series will cover how to utilize this feature with dynamic task mapping (well.. partially). The implementation details will change somewhat, but the core idea remains unchanged.