以後 Flyte 不能再以這點嘴 Airflow 了 😆
前言
AIP 的全名是 Airflow Improvement Proposals
顧名思義,跟 Python Enhancement Proposal (PEP) 差不多
就是讓 Airflow 變得更好的提案
最近開發 AIP-75 的功能發現,有些 AIP 的內容我看過又忘了
不小心又問了蠢問題
正如同我部落個現在的標題 "Those aren't written down are meant to be forgotten"
沒寫下來的東西,最終都會忘記的
那就來做個筆記吧
系列名「帶我讀 AIP」當然是捏他 COSCUP 的「帶您讀源碼」
你也許會問,那不是應該要是「帶您讀 AIP」嗎?
這問題問得好
因為這是寫給我自己的筆記
不過如果剛好對你有幫助的話就太好了 😇
讀起來應該就跟我的書摘系列差不多
最近要開始做 Move Asset user-facing components to task_sdk #43619
把 AIP-74 所改名的 Asset 的相關東西移動到 AIP-72 所提出的 TaskSDK
所以這系列的文章就從 AIP-72 開始了
Note
Definitions
- Airflow Core: the scheduler and API servers
- User code: DAG files, but not necessarily plugins
Motivation
- Reduces the interaction between the Task (user code) and Airflow Core → Airflow Core and Task can be upgraded separately
- Avoid workers/tasks to Airflow DB access while still being able to get the information required → task executions on local, trusted networks
- Interface for remote workers to connect to Airflow Core service for information needed for task execution → better remote execution
- Language agnostic interface → allowing Tasks in other languages
- DAGs will still be Python-based
Changes to make
- No extra components outside of Airflow and its metadata DB should be needed to run Airflow locally
Disable Direct DB access
- Remove direct DB access from
- user code
- workers
- triggerers
- triggers
- All DB traffic goes through an API server
- reduce the number of open connections to DB
Build a new API for task communication from the ground up
- Endpoints for all items in the "Airflow Task SDK"
- Strongly versioned using CalVer
- Supports WebSockets or other "instant" push for low-latency
- in practice, Async web framework (e.g., FastAPI)
- Transport encoding: TBD
API security and Strong per-task-try identity
- Strong identity for each task try ← authorization
- current choice: JWT token signed by the API server
Define an "Airflow Task SDK"
- Interface for
- Version introspection
- Airflow Connections
- Variables
- Read and write
- XCom
- Triggers
- heartbeat
- logs
- metrics
- OpenLineage events
- data about task expansion for downstream tasks
- values
- lengths
- names
- etc.
- etc.
- Access to the Connections and Variables (detail TBD)
- Read/Write task log or XCom
- consideration point
- avoid requiring every log line for every task execution to go over the API server due to performance
- solution TBD
- consideration point
- Out of scope
- handling custom callbacks
Extend Executor interface
- Replace
withqueue_task_instance()
,queue_command()
,send_callback()
queue_activity()
def queue_activity(
self,
*,
# For now just "ExecuteTask", "RunCallback", but allows for "ParseDAGFile" etc. in future
kind: ActivityType,
# Contains TI for "ExecuteTask", so executor can get ti.queue out of this.
context: dict[str, Any],
): ...
- Key difference: what's passed between the scheduler and worker
- from
commands to runto better typed parameters
- from
{
"kind": "ExecuteTask",
"token": "eyJhb....",
"id": [ "dag_id", "task_id", "run_id", "attempt", "map_index"], # Or better, a single id, but we want these details to be passed along regardless
"context": {
"connections": ["list", "of", "pre-injected", "connections"],
"...",
}
}
Dag Parsing Changes
- Strong identity for the source file being parsed
- The same Task SDK during
- dag parsing
- task execution
- Callback details TBD
- run in the parsing process and will need to be "passed down" to it
Connection/Variable security models
- a.k.a. push/inject secrets into tasks or allow tasks to request secrets.
- Details TBD
Which users are affected by the change?
- Operators that needed direct DB access need re-writing
Distro proposal based on the Task Execution Interface
這是在 Airflow dev mailing list 跟 AIP-72 高度相關的提案
所以我就一起讀了
Task Context: Incoming to Tasks
- Information obtained from the API server for task execution
- Connections
- Secrets
- XCom
- Variables
- Log output location reference
- Composite TaskRunID
Task Status and Output: Outgoing from Tasks
- information sent to the API server
- Tasks Completion status
- XCom (optional)
- Variables (write) (optional)
- Log information (optional)
- Heartbeat
- The scheduler knows that this task is still running
User scenarios
Worth reading, but I'm going to skip it in my digest
Distro proposal
- Server Distribution (based on Python)
- MetaDB handling
- Scheduler / Executor
- API server
- Web server
- DAG File processor
- Executor providers (Celery / k8s / Edge)
- Logging providers (maybe❓)
- Server providers (Secrets backend, AuthMgr backend, etc.) (as needed)
- Python Task SDK (python distribution)
- TEI serialization / deserialization
- HTTP / REST client and token authentication capabilities
- Default operators
- Logging write provider (maybe❓)
- Providers (hooks / operators / sensors) (as needed)
- Consolidated Distribution (1 + 2)
- Golang Task SDK (Go distribution)
- similar to 2 but in Go