ลองใช้ Apache Airflow

Airflow 2 ต.ค. 2020

ช่วงที่ผ่านมาได้มีโอกาศ ทำงาน data มากขึ้น ทั้งการเอาข้อมูลจากหลายๆ แหล่งมาใช้ร่วมกัน แม้กระทั่ง การทำ sitemap ก่อนหน้านี้จะใช้วิธี cronjob แต่หลังจากทำงานมาซักพักแล้ว เจอกับปัญหา ถ้าต้องการเก็บ log หรือเข้าไปตรวจดูว่าไงระบบทำงานหรือไม่จะยากพอสมควร ต้องเขียน log เอง หรือแม่กระทั่ง ให้ระบบส่ง line มาบอกว่าเริ่มงานแล้ว หรือจบงานแล้ว ที่จริงเรื่องการให้ระบบ line มาก็ดีชอบมาก แต่ถ้าต้องเข้าไปแก้ ให้มีการทำงานต่อเนื่อง เพิ่มขึ้นตอนเข้าไป พอเวลาผ่านไปก็จะเจอปัญหา จำไม่ได้แล้วว่าไงทำอะไรไว้บ้าง :)

ตอนนี้เห็นหลายๆ ที่ใช้ airflow วันนี้เลยลองซักหน่อย

เริ่มจากเลือก Airflow Docker ว่าจะใช้ตัวไหนดี ลองหลายตัว ไปเจอตัวที่น่าสนใจคือ

https://hub.docker.com/r/bitnami/airflow

ตัวนี้เขียนรายละเอียดดี update เป็น ver. ล่าสุดอยู่เสมอด้วย

Start setup airflow with docker

เริ่มจากไป download docker-compose มาซะก่อน ตามนี้

$ curl -LO https://raw.githubusercontent.com/bitnami/bitnami-docker-airflow/master/docker-compose.yml

แล้วจากนั้นจะต้องแก้ให้ docker-compose มันรู้จัก path ที่เราจะเก็บไฟซะหน่อยเผื่อทำงานไปแล้วเกิดอะไรขึ้น จะได้ไม่ต้อง เริ่มต้นใหม่

ในไฟล์ docker-compose.yml template ใน demo ให้แก้ data volumes เป็น path ตามี่เราต้องการ

version: '2'
services:
  postgresql:
    image: 'bitnami/postgresql:latest'
    environment:
      - POSTGRESQL_DATABASE=bitnami_airflow
      - POSTGRESQL_USERNAME=bn_airflow
      - POSTGRESQL_PASSWORD=bitnami1
    volumes:
      - /path/to/airflow-persistence:/bitnami/postgresql
  redis:
    image: 'bitnami/redis:latest'
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
    volumes:
      - /path/to/airflow-persistence:/bitnami
  airflow-worker:
    image: bitnami/airflow-worker:latest
    environment:
      - AIRFLOW_FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - AIRFLOW_EXECUTOR=CeleryExecutor
      - AIRFLOW_DATABASE_NAME=bitnami_airflow
      - AIRFLOW_DATABASE_USERNAME=bn_airflow
      - AIRFLOW_DATABASE_PASSWORD=bitnami1
      - AIRFLOW_LOAD_EXAMPLES=yes
    volumes:
      - /path/to/airflow-persistence:/bitnami
  airflow-scheduler:
    image: bitnami/airflow-scheduler:latest
    environment:
      - AIRFLOW_FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - AIRFLOW_EXECUTOR=CeleryExecutor
      - AIRFLOW_DATABASE_NAME=bitnami_airflow
      - AIRFLOW_DATABASE_USERNAME=bn_airflow
      - AIRFLOW_DATABASE_PASSWORD=bitnami1
      - AIRFLOW_LOAD_EXAMPLES=yes
    volumes:
      - /path/to/airflow-persistence:/bitnami
  airflow:
    image: bitnami/airflow:latest
    environment:
      - AIRFLOW_FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - AIRFLOW_EXECUTOR=CeleryExecutor
      - AIRFLOW_DATABASE_NAME=bitnami_airflow
      - AIRFLOW_DATABASE_USERNAME=bn_airflow
      - AIRFLOW_DATABASE_PASSWORD=bitnami1
      - AIRFLOW_PASSWORD=bitnami123
      - AIRFLOW_USERNAME=user
      - [email protected]
    ports:
      - '8080:8080'
    volumes:
      - /path/to/airflow-persistence:/bitnami
      - /path/to/dags:/opt/bitnami/airflow/dags
docker-compose.yml

แก้ตรงนี้ /path/to/airflow-persistence นะ

ถ้าไม่ต้องการ load example ให้เปลี่ยนจาก

AIRFLOW_LOAD_EXAMPLES=yes

เป็น

AIRFLOW_LOAD_EXAMPLES=no

การทำงานของ worker ใน airflow จะมีอยู่ 3 แบบคือ

  • Sequential Executor – ทำตามลำดับ
  • Local Executor – ทำงานขนานกันได้
  • Celery Executor – Scale ให้ Worker ช่วยๆกันทำงานได้

เช่น ถ้าเลือกใช้งาน Celery Executor จะเป็นการทำงานแบบ Vertical Scale พูดง่ายๆ คือ มีแค่เครื่องเดียวทำงานด้วย cpu จองเครื่องล้วนๆ

หลังจากแก้ docker-compose.yml ตามที่ต้องการแล้วให้

$ docker-compose up -d

ถ้า docker ps จะได้ list container ตามนี้

docker ps แสดงรายการ containers ในเครื่อง

จากนั้นให้ลอง http://127.0.0.1:8080

ตัวอย่าง dags กับ hello airflow ที่เพิ่มเข้าไปใหม่

ที่นี้มาลองเขียน code สร้าง job เข้าไป

# airflow related
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

# other packages
from datetime import datetime
from datetime import timedelta

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 10, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}

def source1_by_csv():
    print("call source1_by_csv")

def source2_by_csv():
    print("call source2_by_csv")



def get_hdfs_config():
    #return HDFS configuration parameters required to store data into HDFS.
    return None

config = get_hdfs_config()

dag = DAG(
  dag_id='01_hello_airflow', 
  description='Simple test DAG',
  default_args=default_args)

src1_csv = PythonOperator(
  task_id='source1_by_csv', 
  python_callable=source1_by_csv, 
  dag=dag)

src2_csv = PythonOperator(
  task_id='source2_by_csv', 
  python_callable=source2_by_csv, 
  dag=dag)


src1_csv >> src2_csv
01_hello_airflow.py

ก่อนหน้านี้ใน docker-compose.yml ที่บรรทัดสุดท้ายได้เพิ่ม volumes สำหรับเก็บ ไฟล์ python ไว้ ที่ /path/to/dags:/opt/bitnami/airflow/dags

ให้เอาไฟล์ python job ที่ลองเขียนไปไว้ที่ /path/to/dags

ถ้าลองเข้าไปที่ docker container ที่ image name "airflow_airflow_1"

$ docker exec -it airflow_airflow_1 bash
cmd
มีไฟล์เข้าไปใน /opt/bitnami/airflow/dags แล้วนะ

จะมีไฟล์เข้าไปอยู่ที่ /opt/bitnami/airflow/dags

หรือสามรถ list dags ทั้งหมดได้จาก cmd

$ airflow list_dags
cmd
เมื่อ cmd list_dags

** ในกรณีที่เอาไฟล์เข้าไปถูกต้องแล้วแต่ airflow ไม่ยอมแสดงผลที่หน้าเว็บ ให้ลอง update airflow scheduler โดย

$ airflow scheduler
cmd
เมื่อ cmd airflow scheduler

ถ้าไม่มีอะไรผิดพลาดเมื่อ reload หน้าเว็บจะมีรายการแสดงขึ้นมา โดยการเดาคิดว่าหน้าเว็บถูก cache จาก redis ทำให้ไม่แสดงผลตามไฟล์ที่เพิ่มเข้าไปใหม่

Start Airflow services เราจะเพิ่ม --daemon flag สำหรับ run processes แบบ daemons, จำทำให้ระบบยังทำงงานแม้ว่าไงจะออก หน้าจอไปแล้ว

$ airflow webserver --daemon
$ airflow scheduler --daemon
$ airflow worker --daemon
cmd


Architecture ของตัว Airflow

ประกอบด้วย 3 ส่วน

  • Web Server – คือ front-end web สำหรับใช้จัดการและ monitor scheduler / task รวมไปถึง log การทำงาน
  • Scheduler – เอาไว้จัดการงาน ส่งให้ worker ต่างๆ ทำงานต่อ
  • Worker – ตัวที่ทำงานจริง โดยสามารถกำหนดให้มัน Scale ได้

เราสามารถจัดการ Scale worker ได้โดย configuration ที่ไฟล์  {AIRFLOW_HOME}/airflow.cfg

ตัวอย่าง

workers = 4
executor = CeleryExecutor

Airflow and Executor node

ตัวอย่าง Single Node Airflow

ภาพจาก http://site.clairvoyantsoft.com/SETTING-APACHE-AIRFLOW-CLUSTER/

Multi-Node (Cluster) Airflow

ตัวอย่าง Multi-Node Airflow

ภาพจาก http://site.clairvoyantsoft.com/SETTING-APACHE-AIRFLOW-CLUSTER/

จากการ setup airflow ในจะเห็นว่า airflow จะแยกชิ้นส่วนต่างๆ ออกจากกัน

  • postgresql
  • redis
  • worker
  • scheduler
  • airflow (Web Server)

ดังนั้น Multi-Node จะเป็นการ Cluster Worker node

Benefits การ ทำ Multi-Node

Higher Availability หมายถึง ระบบจะทำงานได้ตลอดไม่ว่าจะเกิดอะไนขึ้น เช่น ในกรณีที่มี Node หนึ่ง down หรือถูกทำให้ offline, Cluster จะยังคงใช้งานได้ และ tasks จะยังคงถูกดำเนินการต่อไป
Distributed Processing หมายถึง การประมวลข้อมูลแบบกระจาย เช่น หากคุณมี workflow การทำงานที่ต้องใช้หน่วยความจำมากๆ แต่ละ tasks จะกระจายกันทำงาน ไปในทุก cluster ทำให้การทำงาน ในแต่ละ task ทำงานได้เร็วขึ้น

Scaling Workers

จะมีการ scale อยู่ 2 แบบ
Horizontally - ไม่รู้ ยังไม่ได้ลอง
Vertically - สามารถ scale cluster แบบ vertically โดยเพิ่ม celeryd daemons ที่ใช้งานในแต่ละ node ได้โดยการ แก้ไข config  "celeryd_concurrency" ได้ที่ {AIRFLOW_HOME}/airflow.cfg เช่น

celeryd_concurrency = 30

ถ้ามี task มากขึ้นอาจจะต้องไปเพิ่มขนาดของ instances ที่ใช้งานเพื่อให้ support order ที่มากขึ้น celeryd processes ปริมาณงานที่ต้องทำจะขึ้นอยู่กับ memory และ cpu ที่ใช้งานในแต่ละ cluster

อ่านเพิ่มเติม:
http://site.clairvoyantsoft.com/SETTING-APACHE-AIRFLOW-CLUSTER/
https://hub.docker.com/r/bitnami/airflow

แท็ก

Onyx

Just a middle-aged programmer, Can do many things but not the most.