ลองใช้ Apache Airflow
ช่วงที่ผ่านมาได้มีโอกาศ ทำงาน data มากขึ้น ทั้งการเอาข้อมูลจากหลายๆ แหล่งมาใช้ร่วมกัน แม้กระทั่ง การทำ sitemap ก่อนหน้านี้จะใช้วิธี cronjob แต่หลังจากทำงานมาซักพักแล้ว เจอกับปัญหา ถ้าต้องการเก็บ log หรือเข้าไปตรวจดูว่าไงระบบทำงานหรือไม่จะยากพอสมควร ต้องเขียน log เอง หรือแม่กระทั่ง ให้ระบบส่ง line มาบอกว่าเริ่มงานแล้ว หรือจบงานแล้ว ที่จริงเรื่องการให้ระบบ line มาก็ดีชอบมาก แต่ถ้าต้องเข้าไปแก้ ให้มีการทำงานต่อเนื่อง เพิ่มขึ้นตอนเข้าไป พอเวลาผ่านไปก็จะเจอปัญหา จำไม่ได้แล้วว่าไงทำอะไรไว้บ้าง :)
ตอนนี้เห็นหลายๆ ที่ใช้ airflow วันนี้เลยลองซักหน่อย
เริ่มจากเลือก Airflow Docker ว่าจะใช้ตัวไหนดี ลองหลายตัว ไปเจอตัวที่น่าสนใจคือ
https://hub.docker.com/r/bitnami/airflow
ตัวนี้เขียนรายละเอียดดี update เป็น ver. ล่าสุดอยู่เสมอด้วย
Start setup airflow with docker
เริ่มจากไป download docker-compose มาซะก่อน ตามนี้
$ curl -LO https://raw.githubusercontent.com/bitnami/bitnami-docker-airflow/master/docker-compose.yml
แล้วจากนั้นจะต้องแก้ให้ docker-compose มันรู้จัก path ที่เราจะเก็บไฟซะหน่อยเผื่อทำงานไปแล้วเกิดอะไรขึ้น จะได้ไม่ต้อง เริ่มต้นใหม่
ในไฟล์ docker-compose.yml template ใน demo ให้แก้ data volumes เป็น path ตามี่เราต้องการ
version: '2'
services:
postgresql:
image: 'bitnami/postgresql:latest'
environment:
- POSTGRESQL_DATABASE=bitnami_airflow
- POSTGRESQL_USERNAME=bn_airflow
- POSTGRESQL_PASSWORD=bitnami1
volumes:
- /path/to/airflow-persistence:/bitnami/postgresql
redis:
image: 'bitnami/redis:latest'
environment:
- ALLOW_EMPTY_PASSWORD=yes
volumes:
- /path/to/airflow-persistence:/bitnami
airflow-worker:
image: bitnami/airflow-worker:latest
environment:
- AIRFLOW_FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- AIRFLOW_EXECUTOR=CeleryExecutor
- AIRFLOW_DATABASE_NAME=bitnami_airflow
- AIRFLOW_DATABASE_USERNAME=bn_airflow
- AIRFLOW_DATABASE_PASSWORD=bitnami1
- AIRFLOW_LOAD_EXAMPLES=yes
volumes:
- /path/to/airflow-persistence:/bitnami
airflow-scheduler:
image: bitnami/airflow-scheduler:latest
environment:
- AIRFLOW_FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- AIRFLOW_EXECUTOR=CeleryExecutor
- AIRFLOW_DATABASE_NAME=bitnami_airflow
- AIRFLOW_DATABASE_USERNAME=bn_airflow
- AIRFLOW_DATABASE_PASSWORD=bitnami1
- AIRFLOW_LOAD_EXAMPLES=yes
volumes:
- /path/to/airflow-persistence:/bitnami
airflow:
image: bitnami/airflow:latest
environment:
- AIRFLOW_FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- AIRFLOW_EXECUTOR=CeleryExecutor
- AIRFLOW_DATABASE_NAME=bitnami_airflow
- AIRFLOW_DATABASE_USERNAME=bn_airflow
- AIRFLOW_DATABASE_PASSWORD=bitnami1
- AIRFLOW_PASSWORD=bitnami123
- AIRFLOW_USERNAME=user
- [email protected]
ports:
- '8080:8080'
volumes:
- /path/to/airflow-persistence:/bitnami
- /path/to/dags:/opt/bitnami/airflow/dags
แก้ตรงนี้ /path/to/airflow-persistence นะ
ถ้าไม่ต้องการ load example ให้เปลี่ยนจาก
AIRFLOW_LOAD_EXAMPLES=yes
เป็น
AIRFLOW_LOAD_EXAMPLES=no
การทำงานของ worker ใน airflow จะมีอยู่ 3 แบบคือ
- Sequential Executor – ทำตามลำดับ
- Local Executor – ทำงานขนานกันได้
- Celery Executor – Scale ให้ Worker ช่วยๆกันทำงานได้
เช่น ถ้าเลือกใช้งาน Celery Executor จะเป็นการทำงานแบบ Vertical Scale พูดง่ายๆ คือ มีแค่เครื่องเดียวทำงานด้วย cpu จองเครื่องล้วนๆ
หลังจากแก้ docker-compose.yml ตามที่ต้องการแล้วให้
$ docker-compose up -d
ถ้า docker ps จะได้ list container ตามนี้

จากนั้นให้ลอง http://127.0.0.1:8080

ที่นี้มาลองเขียน code สร้าง job เข้าไป
# airflow related
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# other packages
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2020, 10, 1),
'email_on_failure': False,
'email_on_retry': False,
'schedule_interval': '@daily',
'retries': 1,
'retry_delay': timedelta(seconds=5),
}
def source1_by_csv():
print("call source1_by_csv")
def source2_by_csv():
print("call source2_by_csv")
def get_hdfs_config():
#return HDFS configuration parameters required to store data into HDFS.
return None
config = get_hdfs_config()
dag = DAG(
dag_id='01_hello_airflow',
description='Simple test DAG',
default_args=default_args)
src1_csv = PythonOperator(
task_id='source1_by_csv',
python_callable=source1_by_csv,
dag=dag)
src2_csv = PythonOperator(
task_id='source2_by_csv',
python_callable=source2_by_csv,
dag=dag)
src1_csv >> src2_csv
ก่อนหน้านี้ใน docker-compose.yml ที่บรรทัดสุดท้ายได้เพิ่ม volumes สำหรับเก็บ ไฟล์ python ไว้ ที่ /path/to/dags:/opt/bitnami/airflow/dags
ให้เอาไฟล์ python job ที่ลองเขียนไปไว้ที่ /path/to/dags
ถ้าลองเข้าไปที่ docker container ที่ image name "airflow_airflow_1"
$ docker exec -it airflow_airflow_1 bash

จะมีไฟล์เข้าไปอยู่ที่ /opt/bitnami/airflow/dags
หรือสามรถ list dags ทั้งหมดได้จาก cmd
$ airflow list_dags

** ในกรณีที่เอาไฟล์เข้าไปถูกต้องแล้วแต่ airflow ไม่ยอมแสดงผลที่หน้าเว็บ ให้ลอง update airflow scheduler โดย
$ airflow scheduler

ถ้าไม่มีอะไรผิดพลาดเมื่อ reload หน้าเว็บจะมีรายการแสดงขึ้นมา โดยการเดาคิดว่าหน้าเว็บถูก cache จาก redis ทำให้ไม่แสดงผลตามไฟล์ที่เพิ่มเข้าไปใหม่
Start Airflow services เราจะเพิ่ม --daemon
flag สำหรับ run processes แบบ daemons, จำทำให้ระบบยังทำงงานแม้ว่าไงจะออก หน้าจอไปแล้ว
$ airflow webserver --daemon
$ airflow scheduler --daemon
$ airflow worker --daemon
Architecture ของตัว Airflow
ประกอบด้วย 3 ส่วน
- Web Server – คือ front-end web สำหรับใช้จัดการและ monitor scheduler / task รวมไปถึง log การทำงาน
- Scheduler – เอาไว้จัดการงาน ส่งให้ worker ต่างๆ ทำงานต่อ
- Worker – ตัวที่ทำงานจริง โดยสามารถกำหนดให้มัน Scale ได้
เราสามารถจัดการ Scale worker ได้โดย configuration ที่ไฟล์ {AIRFLOW_HOME}/airflow.cfg
ตัวอย่าง
workers = 4
executor = CeleryExecutor
Airflow and Executor node
ตัวอย่าง Single Node Airflow

ภาพจาก http://site.clairvoyantsoft.com/SETTING-APACHE-AIRFLOW-CLUSTER/
Multi-Node (Cluster) Airflow
ตัวอย่าง Multi-Node Airflow

ภาพจาก http://site.clairvoyantsoft.com/SETTING-APACHE-AIRFLOW-CLUSTER/
จากการ setup airflow ในจะเห็นว่า airflow จะแยกชิ้นส่วนต่างๆ ออกจากกัน
- postgresql
- redis
- worker
- scheduler
- airflow (Web Server)
ดังนั้น Multi-Node จะเป็นการ Cluster Worker node
Benefits การ ทำ Multi-Node
Higher Availability หมายถึง ระบบจะทำงานได้ตลอดไม่ว่าจะเกิดอะไนขึ้น เช่น ในกรณีที่มี Node หนึ่ง down หรือถูกทำให้ offline, Cluster จะยังคงใช้งานได้ และ tasks จะยังคงถูกดำเนินการต่อไป
Distributed Processing หมายถึง การประมวลข้อมูลแบบกระจาย เช่น หากคุณมี workflow การทำงานที่ต้องใช้หน่วยความจำมากๆ แต่ละ tasks จะกระจายกันทำงาน ไปในทุก cluster ทำให้การทำงาน ในแต่ละ task ทำงานได้เร็วขึ้น
Scaling Workers
จะมีการ scale อยู่ 2 แบบ
Horizontally - ไม่รู้ ยังไม่ได้ลอง
Vertically - สามารถ scale cluster แบบ vertically โดยเพิ่ม celeryd daemons ที่ใช้งานในแต่ละ node ได้โดยการ แก้ไข config "celeryd_concurrency" ได้ที่ {AIRFLOW_HOME}/airflow.cfg เช่น
celeryd_concurrency = 30
ถ้ามี task มากขึ้นอาจจะต้องไปเพิ่มขนาดของ instances ที่ใช้งานเพื่อให้ support order ที่มากขึ้น celeryd processes ปริมาณงานที่ต้องทำจะขึ้นอยู่กับ memory และ cpu ที่ใช้งานในแต่ละ cluster
อ่านเพิ่มเติม:
http://site.clairvoyantsoft.com/SETTING-APACHE-AIRFLOW-CLUSTER/
https://hub.docker.com/r/bitnami/airflow