ติดตั้งและทดลองใช้งาน dagster

ติดตั้งและทดลองใช้งาน dagster

การใช้งาน Dagster ต้องมีการ config Dagster Instance ก่อน สำหรับ Storages ในการเก็บข้อมูล Run Storage, Event Storage และ Schedule Storage ซึ่งสามารถใช้งานได้กับ Sqlite DB, Postgres DB และ MySQL DB การใช้งานร่วมกับ Sqlite จะมีข้อจำกัดเรื่อง Assets page จะไม่สามารถใช้งานได้(อ่านเรื่องนี้เพิ่มได้ที่ SqliteEventLogStorage ) ส่วนการใช้งานร่วมกับ MySQL ดูเหมือนว่า version 0.12.8 จะยังไม่สมบูรณ์ จะยังมีการแจ้งเตือนว่าเป็น Experimental ดังนั้นจึงเลือกใช้ Storages เป็น Postgres น่าจะเป็นตัวเลือกที่ดีในตอนนี้

ก่อนอื่นให้ติดตั้ง postgres ก่อน สามารถดูรายละเอียดการติดตั้ง postgres บน docker ได้ที่ link นี้

เมื่อติดตั้ง postgres เรียบร้อยแล้วให้เข้าไป config environment variables โดยในแต่ละ os จะมีการ config ต่างกัน ถ้าใน macos ปกติจะเข้าไปเพิ่ม variables ไว้ที่ ~/.bash_profile หรือ ~/.zshrc สำหรับเครื่องที่ใช้งาน terminal ร่วมกับ zsh

จากนั้นให้สร้าง folder ใหม่ขึ้นมาเพื่อใช้สำหรับเก็บ code และ config ต่างๆ ซึ้งจะ config ไว้ที่ DAGSTER_HOME

ใช้คำสั่ง vi เพื่อเข้าไปแก้ไข ไฟล์ .zshrc

vi ~/.zshrc

จากนั้นเพิ่ม variables ที่ Dagster ต้องการ ถ้าต้องการรายละเอียดส่วนนี้เพิ่มเติมให้ไปดูที่เรื่อง Dagster Instance

# Default local behavior
export DAGSTER_HOME="/{[path-to-dagster-home}"

export DAGSTER_PG_USERNAME=root
export DAGSTER_PG_PASSWORD=password
export DAGSTER_PG_HOST=localhost
export DAGSTER_PG_DB=dagster_db

export DAGSTER_COMPUTE_LOG_PATH="/Users/{your-path}/log"

จาก variables ที่เรา config ไปตัวแปล DAGSTER_HOME จะเป็นตัวแปลพื้นฐาน

ในการเริ่มการทำงานของ Dagster เมื่อเริ่มใช้งาน Dagster process เช่น ใน Dagit หรือ Dagster CLI commands ตัว Dagster จะพยายามโหลด instance ถ้ากำหนด environment variable ที่ชื่อ DAGSTER_HOME ไว้  ตัว Dagster จะมองหา config file ที่ $DAGSTER_HOME/dagster.yaml ต่อไป ไฟล์นี้จะมีการกำหนดค่าที่จำเป็นแต่ละรายการที่ประกอบขึ้นเป็น instance

สร้างไฟล์ dagster.yaml เอาไว้ใน DAGSTER_HOME

run_launcher:
  module: dagster.core.launcher
  class: DefaultRunLauncher

run_coordinator:
  module: dagster.core.run_coordinator
  class: DefaultRunCoordinator

compute_logs:
  module: dagster.core.storage.local_compute_log_manager
  class: LocalComputeLogManager
  config:
    base_dir: DAGSTER_COMPUTE_LOG_PATH

run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      username:
        env: DAGSTER_PG_USERNAME
      password:
        env: DAGSTER_PG_PASSWORD
      hostname:
        env: DAGSTER_PG_HOST
      db_name:
        env: DAGSTER_PG_DB
      port: 5432

event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    postgres_db:
      username:
        env: DAGSTER_PG_USERNAME
      password:
        env: DAGSTER_PG_PASSWORD
      hostname:
        env: DAGSTER_PG_HOST
      db_name:
        env: DAGSTER_PG_DB
      port: 5432

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      username:
        env: DAGSTER_PG_USERNAME
      password:
        env: DAGSTER_PG_PASSWORD
      hostname:
        env: DAGSTER_PG_HOST
      db_name:
        env: DAGSTER_PG_DB
      port: 5432

telemetry:
  enabled: false

เพิ่มเติมในส่วนนี้ จะมีตัวแปลที่ชื่อ DAGSTER_COMPUTE_LOG_PATH ส่วนนี้จะเกิดขึ้นหลังจากที่เรารันใช้งาน dagster ครั้งแรก

จากนั้น เพื่อให้สามารถมั่วได้ไม่กระทบกับงานอื่นให้ติดตั้ง anaconda ก่อน เพื่อใช้งาน virtual environments หรือจะใช้งานจาก virtual environments ของ python เองก็ได้ หลังจากติดตั้งเรียบร้อยแล้ว

ต่อไป สร้าง virtual environment ขึ้นมาใหม่ตามนี้

# Dagster requires Python 3.6+. It is tested on Python 3.8, 3.7, 3.6.
conda create -n dagster python=3.7

เมื่อสร้าง environment เรียบร้อยแล้ว ให้ติดตั้ง modules ตามนี้

pip install dagster dagit requests

สิ่งที่ติดตั้งไปทั้ง 3 ตัวนี้คือ

  • Dagster: core programming model และ abstraction stack; stateless, single-node, single-process และ multi-process execution engines; และ a CLI tool for driving those engines.
  • Dagit: UI สำหรับ developing and operating Dagster pipelines, including a DAG browser, a type-aware config editor, and a live execution interface.
  • Requests: not part of Dagster. Our examples will use it to download data from the internet.

จากนั้นให้สร้าง code python ที่เราจะใช้ทดลอง ตังชื่อไฟล์ว่า hello_world.py

"""isort:skip_file"""

# start_solid_marker
import requests
import csv
from dagster import pipeline, solid


@solid
def hello_cereal(context):
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    cereals = [row for row in csv.DictReader(lines)]
    context.log.info(f"Found {len(cereals)} cereals")

    return cereals


# end_solid_marker


# start_pipeline_marker
@pipeline
def hello_cereal_pipeline():
    hello_cereal()


# end_pipeline_marker

# start_execute_marker
from dagster import execute_pipeline

if __name__ == "__main__":
    result = execute_pipeline(hello_cereal_pipeline)

เราจะทดลองใช้งาน Dagster แบบผ่าน Dagit UI แต่ก่อนอื่นเราต้องไปเปิดการทำงาน Dagster Daemon ซะก่อน โดยใช้คำสั่ง

dagster-daemon run

จากนั้นจะได้ผลลัพแบบนี้

$ dagster-daemon run
2021-09-03 16:29:35 - dagster-daemon - INFO - instance is configured with the following daemons: ['BackfillDaemon', 'SchedulerDaemon', 'SensorDaemon']
2021-09-03 16:29:35 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-09-03 16:29:35 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-09-03 16:29:35 - BackfillDaemon - INFO - No backfill jobs requested.

หลังจากนั้นให้ใช้คำสั่งนี้ เพื่อแสดงการทำงาน pipeline ใน Dagit

dagit -f hello_world.py

จากนั้นจะได้ผลลัพแบบนี้

$ dagit -f hello_world.py
Loading repository...
Serving on http://127.0.0.1:3000 in process 40377

เมื่อเข้าไปที่ http://127.0.0.1:3000 ถ้าไม่มีอะไรผิดพลาด จะได้หน้าตาประมาณนี้

ถ้าเข้าไปดูที่ Instance status จะเห็นว่า demon แต่ละตัวกำลังทำงานอยู่

ถ้าต้องการดู tutorial เพิ่มเติม เข้าไปที่ A Single-Solid Pipeline

ในการ deploy เพื่อใช้งานจริง ดูรายละเอียดได้ที่ Deployment และ Workspaces