AIP-72 - Task Execution Interface aka Task SDK
Airflow 的 task 要支援 Python 以外的語言了!

Category Tech

以後 Flyte 不能再以這點嘴 Airflow 了 😆

前言

AIP 的全名是 Airflow Improvement Proposals
顧名思義,跟 Python Enhancement Proposal (PEP) 差不多
就是讓 Airflow 變得更好的提案

最近開發 AIP-75 的功能發現,有些 AIP 的內容我看過又忘了
不小心又問了蠢問題
正如同我部落個現在的標題 "Those aren't written down are meant to be forgotten"
沒寫下來的東西,最終都會忘記的
那就來做個筆記吧

系列名「帶我讀 AIP」當然是捏他 COSCUP 的「帶您讀源碼」
你也許會問,那不是應該要是「帶您讀 AIP」嗎?
這問題問得好
因為這是寫給我自己的筆記
不過如果剛好對你有幫助的話就太好了 😇
讀起來應該就跟我的書摘系列差不多

最近要開始做 Move Asset user-facing components to task_sdk #43619
AIP-74 所改名的 Asset 的相關東西移動到 AIP-72 所提出的 TaskSDK
所以這系列的文章就從 AIP-72 開始了

Note

Definitions

  • Airflow Core: the scheduler and API servers
  • User code: DAG files, but not necessarily plugins

Motivation

  • Reduces the interaction between the Task (user code) and Airflow Core → Airflow Core and Task can be upgraded separately
  • Avoid workers/tasks to Airflow DB access while still being able to get the information required → task executions on local, trusted networks
  • Interface for remote workers to connect to Airflow Core service for information needed for task execution → better remote execution
  • Language agnostic interface → allowing Tasks in other languages
    • DAGs will still be Python-based

Changes to make

  • No extra components outside of Airflow and its metadata DB should be needed to run Airflow locally

Disable Direct DB access

  • Remove direct DB access from
    • user code
    • workers
    • triggerers
    • triggers
  • All DB traffic goes through an API server
    • reduce the number of open connections to DB

Build a new API for task communication from the ground up

  • Endpoints for all items in the "Airflow Task SDK"
  • Strongly versioned using CalVer
  • Supports WebSockets or other "instant" push for low-latency
    • in practice, Async web framework (e.g., FastAPI)
  • Transport encoding: TBD

API security and Strong per-task-try identity

  • Strong identity for each task try ← authorization
    • current choice: JWT token signed by the API server

Define an "Airflow Task SDK"

  • Interface for
    • Version introspection
    • Airflow Connections
    • Variables
    • Read and write
      • XCom
      • Triggers
    • heartbeat
    • logs
    • metrics
    • OpenLineage events
    • data about task expansion for downstream tasks
      • values
      • lengths
      • names
      • etc.
    • etc.
  • Access to the Connections and Variables (detail TBD)
  • Read/Write task log or XCom
    • consideration point
      • avoid requiring every log line for every task execution to go over the API server due to performance
      • solution TBD
  • Out of scope
    • handling custom callbacks

Extend Executor interface

  • Replace queue_task_instance(), queue_command(), send_callback() with queue_activity()
def queue_activity(
    self,
    *,
    # For now just "ExecuteTask", "RunCallback", but allows for "ParseDAGFile" etc. in future
    kind: ActivityType,
    # Contains TI for "ExecuteTask", so executor can get ti.queue out of this.
    context: dict[str, Any],
): ...
  • Key difference: what's passed between the scheduler and worker
    • from commands to run to better typed parameters
{
  "kind": "ExecuteTask",
  "token": "eyJhb....",
  "id": [ "dag_id", "task_id", "run_id", "attempt", "map_index"], # Or better, a single id, but we want these details to be passed along regardless
  "context": {  
        "connections": ["list", "of", "pre-injected", "connections"],
        "...",
    }
}

Dag Parsing Changes

  • Strong identity for the source file being parsed
  • The same Task SDK during
    • dag parsing
    • task execution
  • Callback details TBD
    • run in the parsing process and will need to be "passed down" to it

Connection/Variable security models

  • a.k.a. push/inject secrets into tasks or allow tasks to request secrets.
  • Details TBD

Which users are affected by the change?

  • Operators that needed direct DB access need re-writing

Distro proposal based on the Task Execution Interface

這是在 Airflow dev mailing listAIP-72 高度相關的提案
所以我就一起讀了

Task Context: Incoming to Tasks

  • Information obtained from the API server for task execution
    • Connections
    • Secrets
    • XCom
    • Variables
    • Log output location reference
    • Composite TaskRunID

Task Status and Output: Outgoing from Tasks

  • information sent to the API server
    • Tasks Completion status
    • XCom (optional)
    • Variables (write) (optional)
    • Log information (optional)
    • Heartbeat
      • The scheduler knows that this task is still running

User scenarios

Worth reading, but I'm going to skip it in my digest

Distro proposal

  1. Server Distribution (based on Python)
    • MetaDB handling
    • Scheduler / Executor
    • API server
    • Web server
    • DAG File processor
    • Executor providers (Celery / k8s / Edge)
    • Logging providers (maybe❓)
    • Server providers (Secrets backend, AuthMgr backend, etc.) (as needed)
  2. Python Task SDK (python distribution)
    • TEI serialization / deserialization
    • HTTP / REST client and token authentication capabilities
    • Default operators
    • Logging write provider (maybe❓)
    • Providers (hooks / operators / sensors) (as needed)
  3. Consolidated Distribution (1 + 2)
  4. Golang Task SDK (Go distribution)
    • similar to 2 but in Go