像Databricks、Snowflake和BigQuery這樣的云數據分析平臺讓構建數據平臺變得更加容易。它們為中小型團隊提供了出色的擴展方案。
但這種便利并非僅僅體現在租用外部基礎設施上,還包括對特定技術框架的依賴性,以及基于供應商提供的功能所構建的操作和安全體系。
在本文中,你將學習如何在開源的數據湖架構中搭建批量數據導入層,并且能夠完全掌控所有組件。
我們的重點非常明確:首先會確保數據導入層能夠端到端地正常運行,然后在此基礎上進一步開發分析功能、管理機制和流處理能力,而不會讓你陷入對任何單一工具的依賴。同時,我們還會分析一些常見的集成問題,例如配置錯誤、分區值被設置為NULL,或者Python版本不兼容等問題。
最終,你將掌握以下內容:
-
一個基于Docker(使用Compose進行部署)、RustFS(對象存儲系統)、Apache Iceberg(表格格式)和Project Nessie(目錄管理系統)構建的單節點數據湖環境。
-
一個利用Apache Airflow協調運行的批量處理流程,該流程會執行PySpark作業,從而生成帶有版本標識且經過分區的Iceberg表格。
-
一種實際應用中的數據導入方案:通過Redis將外部Web爬蟲與Airflow解耦,使用輕量級的信號表將原始數據寫入對象存儲系統。
-
對這套技術架構的全面了解,包括它的優勢與局限性,以及如何進一步優化它以使其適合生產環境。
關于本文的范圍:本文主要討論ELT流程中的“數據導入”這一環節。數據轉換(使用dbt或Spark SQL)和分析處理(使用Trino或Superset)雖然是很自然的發展方向,但超出了本文的范圍。你在這里構建的基礎架構,正是后續這些功能的基石。
我們將涵蓋的內容:
數據導入問題
通過具體的使用場景,人們可以更容易地理解這種技術架構或解決方案的結構。其高層次的目標是從外部市場API中獲取金融數據,以便進行趨勢分析。你需要重點關注的是將這類數據導入數據倉庫,以便后續進行進一步分析。
這些數據是通過網絡爬蟲來采集的,而且每個接口都有一定的速率限制。在批量處理過程中,基于時間的分區機制對于下游處理流程來說非常有效,同時也有助于保持數據的整潔性。
該爬蟲作為一個獨立的外部進程運行,并通過Redis作業隊列與Airflow系統分離。這種設計使得速率限制機制以及爬取任務的生命周期管理都脫離了Airflow的編排層,從而確保各個組件能夠在出現故障時獨立地進行恢復。
在數據采集過程中,由于爬取任務不具備冪等性,因此確保數據的高可靠性是至關重要的。
技術架構組件
-
RustFS: 一種用Rust語言編寫的、兼容S3的對象存儲系統
-
Project Nessie: 用于Apache Iceberg表格的事務性目錄管理系統
-
Apache Spark: 分布式計算引擎
-
Apache Airflow: 作業調度與編排工具
-
Jupyter Notebook (可選): 可用于針對Iceberg表格執行臨時性的Spark查詢,但本文未對此進行詳細介紹
-
Scrapredis: 用于網絡爬蟲的作業隊列系統
-
Scrapworker: 負責執行網絡爬取任務的數據采集工作進程
這種架構已經在一臺配置為4核x86/AMD處理器、16GB內存以及60GB硬盤的GCP虛擬機上進行了測試,該虛擬機運行的是Debian GNU/Linux 11操作系統。同時還需要使用Docker及Compose v2工具。只要具備類似或更強的配置,這種架構在任何類似的Linux環境中都應該能夠正常運行。
系統概述

該爬蟲作為一個獨立的外部進程運行,并通過Redis作業隊列與Airflow系統分離。Airflow會將包含接口地址、查詢參數以及目標存儲路徑的作業信息推送到隊列中,爬蟲會從隊列中獲取這些信息并執行爬取任務,然后將原始數據直接寫入對象存儲系統中。
這種分離機制使得速率限制策略以及爬取任務的生命周期管理不會影響到Airflow的編排流程,同時也能有效隔離各種故障情況。
由于爬取任務不具備冪等性,因此一旦爬取任務失敗,恢復起來會相對比較困難。但在爬取階段之后的其他環節出現的故障,都可以獨立地進行重試,而無需重新觸發整個爬取過程。
快速入門
首先,需要初始化項目配置:
# 克隆倉庫
git clone https://github.com/ps-mir/data-platform
# 創建共享的Docker網絡
docker network create data-platform
# 創建主機目錄,設置權限并下載Spark JAR文件
chmod +x init.sh && ./init.sh
按照以下順序啟動服務(關閉服務時則按相反順序進行):
- RustFS
cd rustfs && docker compose up -d
- Nessie
cd nessie && docker compose up -d
- Spark — 首次運行時需要先進行構建
cd spark && docker compose build && docker compose up -d
- Scrapredis
cd scrapredis && docker compose up -d
- Airflow — 首次運行時需要先進行構建
cd airflow-docker && docker compose build && docker compose up -d
在Nessie啟動后,創建相應的命名空間:
curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
-H "Content-Type: application/json" \
-d '{"namespace": ["default"]}'
curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
-H "Content-Type: application/json" \
-d '{"namespace": ["scraper"]}'
Scrapworker直接在主機上運行(沒有使用Docker容器化)。它要求Python版本大于或等于3.14:
cd scrapworker
pip install -e .
CONFIG_PATH=./config/config.local.yaml RUSTFS_ACCESS_KEY=rustfsadmin RUSTFS_SECRET_KEY=rustfsadmin python -m scrapworker
在激活Airflow中的scraper_pipeline_v1之前,必須確保Scrapworker已經在運行中。如果沒有Scrapworker,該管道會將任務推送到隊列中,但由于沒有 worker 來處理這些任務,它們會無限期地停留在wait_for_completion狀態中。
Trino也在配置中包含,但目前尚未測試其與Nessie的集成情況。
運行管道
當所有服務都運行起來后,下一步就是在Airflow中激活這些管道。默認情況下,所有的DAG在創建時都會被暫停狀態。這四個管道在復雜程度上是相互關聯的,按照順序依次執行它們,是確保整個系統各部分能夠正確連接的最佳方法。
這四個管道都已經加載到系統中,但默認都是處于暫停狀態。在觸發執行之前,請先在Airflow的用戶界面中解除每個管道的暫停狀態。

讓我們來逐一了解一下這些管道:
spark_static_data_v1_skeleton: Hello DAG
這是一個最簡單的有向無環圖,其中沒有使用Spark,只包含一個用于打印信息的Python任務。如果該任務的狀態顯示為綠色,說明Airflow的調度器和工作者都運行正常。[2026-04-09 22:00:01] INFO - 任務操作符:
spark_static_data_v2_submit:Spark提交任務
該任務通過SparkSubmitOperator來提交一個PySpark作業,該作業會將靜態數據集寫入到Iceberg表中。由于不進行分區處理,因此每次運行都會覆蓋之前的數據。
在Nessie目錄中,這個任務的記錄如下:
類型:ICEBERG_TABLE
元數據存儲位置:s3://warehouse/default/static_data_e7e43123-95a7-44d2-b6d5-67c9c7aa4321/metadata/00000-08a5a2db-6f12-4f21-b2a9-de3d9123fbd3.metadata.json
spark_partitioned_data_v1:Spark分區處理
這個任務在步驟2的基礎上增加了基于時間的 Partitioning功能。分區的鍵值是根據預定的執行時間來確定的,因此每次運行都會將數據寫入到對應的(ds, hr, min)分區中,而不會影響之前的數據。
在RustFS中,示例文件的路徑如下:warehouse/default/static_data_partitioned_b172c66f-722b-44f3-bbee-069355753ff6/data/ds=2026-03-28/hr=23/min=15/00000-4-7a196a47-2ac0-4023-af68-ca10487fccb2-0-00001.parquet
scraper_pipeline_v1:數據抓取管道
這就是完整的數據采集流程。Airflow會向Scrapredis發送任務請求,Scrapworker會調用Binance的API并將原始數據寫入到RustFS中,最后Airflow會將處理結果發布到Nessie目錄中。
每次運行時,都會獲取以下數據:https://api.binance.com/api/v3/trades?symbol=BTCUSDT&limit=10
環境搭建
這是一個使用Docker Compose構建的單節點開發環境。這個基礎配置結構清晰,可以通過針對性的修改輕松擴展到生產環境中。
-
在生產環境中進行部署時,需要為每個組件配置高可用性、持久化存儲管理措施,并加強安全性防護。
-
所有使用的鏡像都固定為特定版本,這樣可以避免在多次拉取鏡像時出現意外問題。
-
所有的容器都共享一個名為
data-platform的外部Docker網絡,這樣各個服務就可以使用容器的名稱作為主機名進行通信了。 -
一個
init.sh腳本會在數據文件夾中創建所需的目錄結構,并負責初始化Docker網絡。
RustFS
RustFS是這一技術架構中的對象存儲層。Nessie的REST目錄模式嚴重依賴于與S3兼容的終端節點;如果將其配置在本地文件系統上運行,那么在系統啟動時Nessie的健康檢查就會失敗,從而導致目錄初始化過程中出現錯誤。對于新的部署環境而言,REST目錄模式是推薦的選擇,因為該模式能夠實現憑證的管理以及多引擎之間的協同工作。
對于那些希望自行托管且兼容S3的存儲系統而言,MinIO無疑是理想的選擇;但后來它采用了更為嚴格的許可協議。RustFS則是另一種開源替代方案,這種系統是用Rust語言編寫的,并且能夠利用本地磁盤進行數據存儲。
在寫入數據時,Spark會通過S3FileIO將Parquet格式的文件直接上傳到RustFS中。同時,Nessie也會同步提交相應的表元數據,因此數據與元數據的狀態要么會一起得到更新,要么根本不會發生任何變化。這就是Apache Iceberg的核心優勢:它能夠確保數據文件及其元數據在更新時始終保持原子性。
對于生產環境或云部署來說,像AWS S3、Google Cloud Storage或Azure Blob Storage這樣的托管型對象存儲服務才是更合適的選擇。而在大規模應用場景中,也可以考慮使用SeaweedFS、Ceph/RGW或Garage這類自行托管的存儲解決方案。
注意事項:
-
桶的創建:當RustFS通過健康檢查后,會自動運行一個使用
amazon/aws-cli工具編寫的rustfs-init腳本來創建s3://warehouse這個桶。因此,你無需手動創建這個桶。 -
權限設置:RustFS在容器內部以uid=10001的身份運行。在容器啟動之前,宿主目錄(
data/rustfs/data和data/rustfs/applogs)必須屬于這個用戶賬戶,否則系統會默默地失敗。init.sh腳本通過sudo chown -R 10001:10001命令來處理權限設置問題。 -
鏡像的固定:在構建Docker容器時,需要將
rustfs/rustfs:1.0.0-alpha.85-glibc這個鏡像固定下來。在升級之前,請先確認uid值是否發生了變化:可以通過docker run --rm --entrypoint id rustfs/rustfs:命令來進行驗證。如果uid值發生了變化,就需要重新運行init.sh腳本或手動調整權限設置。 -
Spark的數據寫入方式:Spark會通過S3FileIO將數據文件直接上傳到RustFS中。而Nessie僅負責管理表元數據,并不會代理數據傳輸的過程。這兩者之間的交互發生在數據提交時,而不是在數據寫入階段。
Nessie
Nessie用于記錄倉庫中所有表的列表、這些表的數據文件以及它們的結構信息。如果沒有Nessie,Spark就無法準確了解倉庫中實際存儲了哪些數據。
Hive Metastore提供了基于Thrift的API,多年來一直被用作元數據管理的標準工具。它通過底層的數據庫為元數據更新提供事務支持,但這些事務僅限于元數據層面,底層的數據文件并不參與這些事務操作,因此不同表之間的數據歷史記錄也無法被關聯起來。
Apache Iceberg通過原子級的表提交機制解決了數據和元數據管理之間的矛盾。而Nessie在此基礎上更進一步:它將元數據管理系統視為一個Git倉庫,每一次對表的修改都相當于一次提交操作。用戶可以對多個表進行分支操作、添加標簽,或者原子性地回滾之前的更改。
Spark通過Nessie的Iceberg REST端點來讀寫表元數據。目錄狀態會被保存到Postgres中,因此即使容器重新啟動,這些信息也不會丟失。
命名空間初始化
與Hive Metastore不同,Nessie不會自動創建命名空間。如果嘗試將表寫入一個不存在的命名空間,那么在數據已經寫入RustFS之后,這個操作將會失敗,從而導致一些文件沒有對應的目錄條目而成為“孤兒文件”。命名空間屬于結構元數據,因此它們的初始化需要在一次性的配置步驟中完成,而不能通過流程化的方式來實現。
Nessie將Iceberg目錄元數據存儲在s3://warehouse/路徑下。而Iceberg表數據則存儲在由命名空間生成的路徑中,例如,對于default命名空間,其數據存儲路徑為s3://warehouse/default/。
S3憑證配置問題
Nessie的S3憑證字段不接受純字符串作為輸入值(這可能是出于安全考慮)。即使對于本地憑證,也必須使用格式為urn:nessie-secret:quarkus:的秘密URI。
此外,對于那些包含連字符的Quarkus屬性名稱來說,SCREAMING_SNAKE_CASE這種環境變量命名規則會導致這些屬性被忽略,系統會自動使用默認值(而默認值往往會導致問題)。正確的做法是在compose環境配置塊中直接使用點表示法來指定屬性名,這樣Quarkus就能直接識別這些屬性而無需進行任何轉換:
nessie.catalog.service.s3.default-options.access-key: "urn:nessie-secret:quarkus:nessie_catalog.secrets.access-key"
nessiecatalog.secrets.access-key.name: rustfsadmin
nessie/catalog.secrets.access-key.secret: rustfsadmin
Nessie的健康檢查
一旦RustFS的配置問題得到解決,通過Nessie的健康檢查URL(http://localhost:9090/q/health)應該會返回如下響應:
{
"status": "UP",
"checks": [
{
"name": "MongoDB連接健康檢查",
"status": "UP"
},
{
"name": "倉庫對象存儲系統狀態檢查",
"status": "UP",
"data": {
"warehouse.warehouse.status": "UP"
}
},
{
"name": "數據庫連接健康檢查",
"status": "UP",
"data": {
"": "UP"
}
}
]
}
盡管這個系統并不使用MongoDB,但在響應中仍然會顯示MongoDB連接的健康檢查結果。這是因為Quarkus內置了一個探針,無論存儲類型如何,都會自動注冊并執行這個探針。如果配置了JDBC,實際上并不會真正連接到MongoDB,因此顯示“UP”狀態只是一種占位符響應。
目錄端點與管理接口
Nessie提供了兩種不同的API。用于訪問Iceberg目錄信息的REST端點位于/iceberg路徑下,Spark和Trino都是通過這個端點來與Nessie進行交互的。而Nessie的管理API則位于/api/v2路徑下,該接口用于執行分支操作、查看提交歷史記錄以及檢查表結構等信息。這兩種API是不能互相替代的。
# Iceberg REST API
http://localhost:19120/iceberg/v1/main/namespaces
http://localhost:19120/iceberg/v1/config
# Nessie管理API
http://localhost:19120/api/v2/config
注意事項:
-
對于任何非AWS S3的端點,都必須設置
path-style-access: true。而region這個參數只是AWS SDK內部使用的一個占位符。 -
Nessie的內部端口9000會被重新映射為9090,這樣就可以避免與占用9000和9001端口的RustFS發生沖突。
轉發路徑
Nessie是一個無狀態的REST服務,因此可以通過負載均衡來擴展讀取請求的處理能力,而各節點之間無需進行任何協調。數據的持久性完全依賴于后端存儲系統。
Spark
作為一款分布式計算引擎,Apache Spark非常適合用于執行那些需要長時間運行的任務。在當前的配置中,Spark會執行由Airflow提交的PySpark作業,通過Nessie REST目錄來讀寫Iceberg表格,并使用S3FileIO將數據文件直接寫入RustFS。Spark以獨立模式運行,包含一個master節點和多個worker節點,這些配置都是通過spark-defaults.conf文件來設置的。
在啟動Spark之前,需要準備兩個JAR文件,并將它們放在data/spark/jars/目錄中:
-
iceberg-spark-runtime-3.5_2.12:這個JAR文件實現了Spark與Iceberg之間的集成,包括SparkCatalog、DataFrameWriterV2、SQL擴展功能以及所有與表格格式相關的邏輯。 -
iceberg-aws-bundle:這個JAR包包含了AWS SDK v2以及Iceberg的S3FileIO組件,后者用于將數據文件寫入RustFS。不過,Spark的基礎鏡像中只預裝了Hadoop AWS(SDK v1),而這個捆綁包則提供了S3FileIO所需要的SDK v2類庫。
Spark使用自定義的Dockerfile來安裝Python 3.12,在首次使用之前請先構建相應的鏡像:
cd spark
docker compose build
docker compose up -d
關于PySpark作業的具體配置,會在Airflow的相關部分進行介紹,我們會詳細說明每個DAG結構及其對應的Spark腳本。
在提交任何會寫入Iceberg表格的Spark作業之前,必須確保目標命名空間在Nessie中已經存在。與Hive Metastore不同,Nessie不會自動創建命名空間。如果嘗試向一個不存在的命名空間寫入數據,那么在數據已經被寫入RustFS之后,就會導致一些文件無法被正確記錄到目錄中,從而形成“孤兒文件”。
在運行任何管道之前,請先創建default這個命名空間:
# 這時Nessie應該已經啟動并運行起來了
curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
-H "Content-Type: application/json" \
-d '{"namespace": ["default"]}'
{
"namespace" : [ "default" ],
"properties" : { }
}
驗證命名空間是否創建成功:
curl http://localhost:19120/iceberg/v1/main/namespaces
目錄結構不匹配:不同查詢引擎會顯示不同的表格列表
如果由Spark創建的表在Trino中無法被看到,那么很可能是因為目錄配置不匹配所致。當Spark使用NessieCatalog進行配置,而Trino則使用Iceberg REST目錄時,這兩種系統會維護不同的元數據視圖——它們不會共享相同的表狀態。因此,這兩種引擎都必須指向同一個目錄地址:http://nessie:19120/iceberg。
注意事項:
-
工作節點內存配置:工作節點被配置為
SPARK_WORKER_MEMORY: 8g。Spark的默認設置是1g內存,這種配置足以完成表的注冊操作,但不足以讓作業在無需排隊等待的情況下正常運行。因此需要根據主機可用的內存資源來調整這一參數。 -
遠程簽名功能:
remote-signing-enabled: false。Nessie的REST目錄支持通過IAM/STS機制進行身份驗證,但由于當前環境中并未啟用這一集成功能,因此明確禁用了遠程簽名功能,以避免請求失敗。 -
配置更改需要重新啟動系統:Docker文件中的綁定掛載機制會在容器啟動時緩存相應的inode信息。因此,即使修改了
spark-defaults.conf文件,也只有在重啟Spark以及Airflow工作節點之后,這些更改才會生效。在客戶端模式下,Airflow工作節點實際上就是Spark驅動程序(負責在作業提交時讀取配置文件),因此也需要重新啟動它。 -
Jupyter Notebook:該部署環境包含了基于PySpark的Jupyter實例,用戶可以使用它來針對Iceberg表進行臨時查詢操作。這個Jupyter實例會連接到同一個Spark集群和Nessie目錄,因此任何通過管道創建的表都可以立即被查詢到。
?? 警告:Spark工作節點與Airflow工作節點(即驅動程序)必須使用相同版本的Python。PySpark會在運行時強制檢查這一點,如果兩個節點使用的Python版本不同,系統會立即出現故障。本部署環境中的Spark鏡像使用了自定義的Dockerfile來安裝Python 3.12版本,這個版本與Airflow的基礎鏡像相匹配。因此,在升級其中任何一個組件時,都必須確保它們的版本保持一致。
Apache Airflow
Apache Airflow使得工作流程的創建、調度和監控變得更加便捷。在這個案例中,Airflow負責處理數據的批量導入任務,但也可以被擴展用于流式處理等場景。
Airflow的各種組件在結構上更類似于官方文檔中描述的DAG處理器架構。

關鍵組成部分包括:
-
DAG處理器會持續解析DAG文件,并將其序列化到元數據數據庫中。
-
調度器會從元數據數據庫中讀取信息,判斷何時應該執行某個DAG任務,然后創建相應的任務實例,并通過Redis隊列將它們發送給CeleryExecutor進行處理。
-
Celery工作節點會接收并執行這些任務。對于使用
SparkSubmitOperator的任務來說,該工作節點會變成Spark驅動程序,從而將作業提交到Spark集群中執行。 -
執行器會在Spark工作節點上運行,它們會將處理結果直接寫入RustFS文件系統中,并將表元數據信息更新到Nessie目錄中。Airflow還會將任務的執行結果記錄回元數據數據庫中。
Airflow使用自定義的Dockerfile來安裝Java 17以及其他相關組件。在首次使用之前,請先構建相應的鏡像:
cd airflow-docker
docker compose build
docker compose up -d
管道配置
所有管道文件都需要被保存在airflow-docker/dags文件夾中,這樣dag處理器才能從元數據數據庫中讀取這些管道信息。這里提供了四個復雜程度不同的管道示例。
-
step1_hello_dag.py:這是一個沒有依賴關系的單任務管道,僅僅是一個用于打印信息的Python函數而已。 -
step2_spark_submit.py:通過SparkSubmitOperator提交一個PySpark作業。該作業會通過Nessie目錄將數據寫入到Iceberg表中。 -
step3_spark_partitioned.py:在步驟2的基礎上增加了基于時間的數據分區功能。調度時間會被傳遞給PySpark腳本。- 為了確保作業能夠正確執行,基于時間的分區值會從
data_interval_start中計算得出。
- 為了確保作業能夠正確執行,基于時間的分區值會從
-
scraper_pipeline:這是一個用于數據采集的實用管道。它通過Redis隊列scrapredis與外部任務執行器scrapworker進行通信。- 要使這個管道正常工作,
scrapredis和scrapworker都必須處于運行狀態。
- 要使這個管道正常工作,
部署模式與驅動程序配置
最初使用的SparkSubmitOperator配置將deploy_mode設置為“cluster”,這意味著驅動程序會在Spark集群上運行,而不是在提交作業的機器上運行。但在獨立的Spark集群上使用這種配置會立即導致錯誤:
對于獨立運行的Spark集群而言,目前不支持將“deploy_mode”設置為“cluster”。
在YARN和Kubernetes環境下,Python應用程序才能使用集群模式進行部署。將deploy_mode設置為“client”可以解決這個問題,但這樣一來,驅動程序就會在Airflow工作節點的容器中運行,因此工作節點需要具備與Spark容器相同的環境配置。
總的來說,需要對Airflow工作節點進行以下三項修改:
-
需要在
/opt/spark/user-jars/路徑下添加Iceberg和Nessie相關的JAR文件。 -
需要修改
spark-defaults.conf文件,以便其中包含目錄配置、擴展名設置以及相關JAR文件的路徑信息。 -
需要將
SPARK_CONF_DIR設置為/opt/spark/conf。如果不進行這個設置,通過pip安裝的PySpark會忽略系統中的配置文件,從而導致程序無法正常運行。
為了解決這些問題,我們在airflow-docker/docker-compose.yaml文件中將這三項配置添加到了x-airflow-common配置項中,這樣所有Airflow服務都能繼承這些配置。
environment:
SPARK_CONF_DIR: /opt/spark/conf
volumes:
- ../data/spark/jars:/opt/spark/user-jars:ro
- ../spark/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf:ro
被設置為NULL的分區值
當第三個管道(使用Spark進行數據分區處理)首次運行時,數據確實被正確地存儲到了RustFS中,但當查詢Iceberg分區的元數據時,卻發現以下情況:
+------------------+----------+
| 分區 | 文件數量 |
+------------------+----------+
|{NULL, NULL, NULL}| 2|
+------------------+----------+
最初的腳本使用了Spark的DataSource V1 API:
df.write.format("iceberg").mode("overwrite").saveAsTable(table)
該腳本利用了Spark的V1 DataFrame寫入API,并指定了“iceberg”格式。這種寫法會直接加載數據到表中,而不會經過Iceberg的目錄結構進行存儲。因此,雖然Iceberg將數據文件成功存儲到了磁盤上,但元數據中的分區值卻被設置為了NULL。
正確的解決方案是使用Iceberg的原生DataFrameWriterV2 API:
df.writeTo(table).overwritePartitions()
這種寫法會通過Iceberg的官方存儲路徑進行數據傳輸,會根據實際的列值來計算分區信息,并將這些信息正確地寫入元數據中。overwritePartitions()這個方法只會覆蓋DataFrame中存在的分區數據。如果重新運行相同任務的調度程序,那么分區的值將會被更新,而其他分區的數據則不會受到影響。
?? 注意:后續使用V2 API進行的寫操作并不會自動修改那些已經被設置為NULL的分區元數據。對于那些只包含錯誤數據的表格來說,最簡單的恢復方法就是先執行`DROP TABLE`命令,然后再重新創建表格并寫入正確的數據。
Scrapredis
Scrapredis是一個專門用于處理Redis任務的實例,它位于Airflow和Scrapworker之間,充當作業隊列的角色。它與Airflow內部使用的Redis是分開的——后者僅用于CeleryExecutor任務的管理和調度。這種分離設計使得我們可以獨立地管理、擴展或替換Scrapredis,而不會影響到Airflow的整體運行。
這種架構不僅適用于數據抓取任務,任何需要擁有獨立生命周期、資源配置或速率限制機制的外部進程都可以通過這種方式進行集成:Airflow負責推送作業任務,外部處理程序會接收這些任務并執行它們,最后Airflow會定期檢查執行結果。
Scrapredis的工作流程如下:
- Airflow將作業數據推送到隊列中:
QUEUE_KEY = "scrapworker:jobs"
client.lpush(QUEUE_KEY, json.dumps(payload)
- Scrapworker會從隊列中取出下一個待處理的作業:
while True: _, payload = client.blpop(redis_cfg["queue_key"])
- 任務執行完成后,Scrapworker會將結果以及對應的
信息寫回Redis中: client.set(status_key, json.dumps({"status": "finished", "worker_id": worker_id, "s3_path": job["s3_path"]}), ex=TERMINAL_TTL)
wait_for_completion任務會持續檢測該狀態鍵的變化。當檢測成功后,publish_nessie_signal會獲取s3_path>,并將相應的信號數據寫入Nessie數據庫中。Scrapworker
Scrapworker是一個使用Scrapy爬蟲框架來抓取請求中所有頁面的Python應用程序。由于URL或客戶端特定的速率限制機制,它與Airflow是解耦設計的。簡單來說,它可以被視為一種接收并執行來自Airflow的任務的外部工作進程。
它的職責是下載數據并將其存儲到對象存儲系統RustFS中。而Nessie目錄的更新操作則是通過另一個獨立的Airflow任務來完成的。
固定信號表
Scrapworker會將原始JSON數據寫入RustFS,而不是直接將抓取到的數據以Iceberg列的形式存儲。隨后,該流程會向由Nessie管理的Iceberg表格中發布一條簡潔的信號記錄。
這種信號結構的格式是固定的且非常簡單(包含
run_id、endpoint、s3_path、ds、hr、min和published_at等字段),無論抓取到什么類型的數據,這個結構都不會發生變化。如果將抓取到的數據直接以Iceberg列的形式存儲,那么Scrapworker就需要負責處理不同端點之間的數據結構轉換問題。但這并不符合理想的設計方案。實際上,這種數據結構的管理職責應該由后續的處理流程來承擔:
Scrapworker → RustFS中的原始文件 + Iceberg表格中的信號記錄(由Pipeline處理) Airflow任務 → 通過s3_path讀取原始數據,應用相應的數據結構,然后生成結構化的Iceberg表格后續的處理流程能夠了解數據的來源和數據結構,因此它們才是處理類型轉換、空值處理以及數據分區布局等問題的合適位置。Scrapworker則應該保持其通用性和簡潔性——同樣的代碼可以用于處理任何類型的端點數據,而無需進行任何修改。
為什么信號發布需要作為一個獨立的Airflow任務來執行
Scrapworker會將數據寫入RustFS,并在Redis中設置
status: finished狀態,同時記錄下s3_path>信息。而另一個獨立的Airflow任務則會讀取這些狀態信息,并將信號記錄發布到Nessie數據庫中。這種分離的設計是有意為之的。如果Scrapworker在寫入RustFS之后直接將數據發布到Nessie,那么這兩種操作就會面臨相同的失敗風險。例如,如果在RustFS中的寫操作成功了,但Nessie數據庫出現故障,那么數據就會丟失,既沒有信號記錄可供參考,也沒有可行的恢復機制。此時,唯一的解決辦法就是重新進行爬取操作,但這顯然不具備冪等性。
通過這種分離的設計,即使其中一個環節出現故障,其他環節也能正常運行。Nessie數據庫的故障只會觸發Airflow系統中信號發布任務的重試操作,而不會導致重新爬取數據或重復執行相同的爬取流程。RustFS和Nessie數據庫的故障都可以獨立地進行恢復處理。
注意事項:
-
原始抓取到的文件會被直接寫入
s3://warehouse/raw/路徑,這個路徑完全不在Nessie的管理范圍內。Iceberg層中的任何系統都不會訪問這個路徑。 -
Scrapworker使用的信號表存儲在專門的
scraper命名空間中。在Scrapworker首次運行之前,需要先創建這個命名空間。
curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
-H "Content-Type: application/json" \
-d '{"namespace": ["scraper"]}'
后續發展方向
我們構建的這個技術棧是一個功能完備的數據采集層。它能夠可靠地接收數據,將這些數據存儲在帶有版本控制的目錄中,并為后續的開發工作提供基礎。從這里開始,有兩個發展方向值得考慮。
擴展功能
這些改進是在現有技術棧的基礎上進行的,它們能夠在不添加新組件的情況下提升系統的穩定性。
數據采集的可靠性:目前,Scrapworker在遇到故障時會將狀態設置為status: failed,這會導致Airflow需要重新觸發整個數據處理流程。如果加入客戶端端的速率限制機制以及針對每個端點的重試邏輯,并引入退避策略,那么爬取任務就會具備更好的自我恢復能力,即使某個頁面的獲取失敗,也可以獨立地嘗試再次獲取數據,而無需通知Airflow。
配置驗證:如果在config.yaml文件中配置錯誤,相關端點在運行時可能會默默地出現故障,而且這種問題往往會在爬取任務進行到較后期才被發現。如果在系統啟動時調用validate_config()函數,就可以在任何任務開始執行之前檢查是否存在缺失的必要字段,比如offset_param或response_map。隨著端點數量的增加,這一功能變得越來越重要。
可觀測性:Airflow提供的警報機制和SLA監控功能可以在數據處理流程偏離計劃或任務耗時超過預期時及時發出警告。信號表在這一方面也非常有用。一個簡單的監控工具,只要在指定的時間窗口內檢查是否存在預期的信號記錄,就能完成SLA監控任務,而且這種監控方式完全不需要依賴外部工具。
添加新的功能層
這些是新添加的功能,它們是在現有數據采集基礎之上構建的。
轉換層:由數據采集層生成的原始Iceberg表格是后續轉換步驟的輸入數據。使用dbt或Spark SQL可以讀取這些原始數據,為它們應用相應的數據結構,清理數據類型,然后將處理后的結構化表格存儲到另一個命名空間中。這就是ELT模型中的“轉換”環節,也是當數據采集過程穩定下來后自然而然要進行的下一步操作。
分析功能:Trino已經融入到了這個技術棧中,并且已經實現了部分集成。將其與Nessie完全連接起來,就可以對所有的Iceberg表格執行SQL查詢了。如果再添加Superset,就可以獲得可視化分析功能,而無需對數據采集流程進行任何修改。
支持更多類型的數據源:當前的技術棧主要支持一種數據采集方式:即通過定時的Airflow流程來觸發外部HTTP爬蟲程序。不過,這個基礎架構同樣適用于基于pull機制的數據源,比如使用CDC從數據庫中獲取數據;同時也適用于基于push機制的數據源,比如通過Kafka接收事件流數據。無論數據是如何傳入系統的,Iceberg表格和Nessie目錄都會作為數據的存儲和處理平臺。
管理功能:Iceberg和Nessie為系統提供了必要的管理基礎,包括數據快照的創建、數據結構的演變跟蹤、提交歷史的記錄以及數據追溯功能。而更高層次的管理功能,比如訪問控制、數據質量檢查、數據來源追蹤以及數據結構強制執行等,則需要通過額外的組件來實現。這些新增功能并不會取代現有的系統架構,而是建立在現有基礎之上進行的擴展。