ติดตั้งและทดลองใช้งาน dagster
การใช้งาน Dagster ต้องมีการ config Dagster Instance ก่อน สำหรับ Storages ในการเก็บข้อมูล Run Storage, Event Storage และ Schedule Storage ซึ่งสามารถใช้งานได้กับ Sqlite DB, Postgres DB และ MySQL DB การใช้งานร่วมกับ Sqlite จะมีข้อจำกัดเรื่อง Assets page จะไม่สามารถใช้งานได้(อ่านเรื่องนี้เพิ่มได้ที่ SqliteEventLogStorage ) ส่วนการใช้งานร่วมกับ MySQL ดูเหมือนว่า version 0.12.8 จะยังไม่สมบูรณ์ จะยังมีการแจ้งเตือนว่าเป็น Experimental ดังนั้นจึงเลือกใช้ Storages เป็น Postgres น่าจะเป็นตัวเลือกที่ดีในตอนนี้
ก่อนอื่นให้ติดตั้ง postgres ก่อน สามารถดูรายละเอียดการติดตั้ง postgres บน docker ได้ที่ link นี้
เมื่อติดตั้ง postgres เรียบร้อยแล้วให้เข้าไป config environment variables โดยในแต่ละ os จะมีการ config ต่างกัน ถ้าใน macos ปกติจะเข้าไปเพิ่ม variables ไว้ที่ ~/.bash_profile หรือ ~/.zshrc สำหรับเครื่องที่ใช้งาน terminal ร่วมกับ zsh
จากนั้นให้สร้าง folder ใหม่ขึ้นมาเพื่อใช้สำหรับเก็บ code และ config ต่างๆ ซึ้งจะ config ไว้ที่ DAGSTER_HOME
ใช้คำสั่ง vi เพื่อเข้าไปแก้ไข ไฟล์ .zshrc
vi ~/.zshrc
จากนั้นเพิ่ม variables ที่ Dagster ต้องการ ถ้าต้องการรายละเอียดส่วนนี้เพิ่มเติมให้ไปดูที่เรื่อง Dagster Instance
# Default local behavior
export DAGSTER_HOME="/{[path-to-dagster-home}"
export DAGSTER_PG_USERNAME=root
export DAGSTER_PG_PASSWORD=password
export DAGSTER_PG_HOST=localhost
export DAGSTER_PG_DB=dagster_db
export DAGSTER_COMPUTE_LOG_PATH="/Users/{your-path}/log"
จาก variables ที่เรา config ไปตัวแปล DAGSTER_HOME จะเป็นตัวแปลพื้นฐาน
ในการเริ่มการทำงานของ Dagster เมื่อเริ่มใช้งาน Dagster process เช่น ใน Dagit หรือ Dagster CLI commands ตัว Dagster จะพยายามโหลด instance ถ้ากำหนด environment variable ที่ชื่อ DAGSTER_HOME ไว้ ตัว Dagster จะมองหา config file ที่ $DAGSTER_HOME/dagster.yaml ต่อไป ไฟล์นี้จะมีการกำหนดค่าที่จำเป็นแต่ละรายการที่ประกอบขึ้นเป็น instance
สร้างไฟล์ dagster.yaml เอาไว้ใน DAGSTER_HOME
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
run_coordinator:
module: dagster.core.run_coordinator
class: DefaultRunCoordinator
compute_logs:
module: dagster.core.storage.local_compute_log_manager
class: LocalComputeLogManager
config:
base_dir: DAGSTER_COMPUTE_LOG_PATH
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432
event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432
schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432
telemetry:
enabled: false
เพิ่มเติมในส่วนนี้ จะมีตัวแปลที่ชื่อ DAGSTER_COMPUTE_LOG_PATH ส่วนนี้จะเกิดขึ้นหลังจากที่เรารันใช้งาน dagster ครั้งแรก
จากนั้น เพื่อให้สามารถมั่วได้ไม่กระทบกับงานอื่นให้ติดตั้ง anaconda ก่อน เพื่อใช้งาน virtual environments หรือจะใช้งานจาก virtual environments ของ python เองก็ได้ หลังจากติดตั้งเรียบร้อยแล้ว
ต่อไป สร้าง virtual environment ขึ้นมาใหม่ตามนี้
# Dagster requires Python 3.6+. It is tested on Python 3.8, 3.7, 3.6.
conda create -n dagster python=3.7
เมื่อสร้าง environment เรียบร้อยแล้ว ให้ติดตั้ง modules ตามนี้
pip install dagster dagit requests
สิ่งที่ติดตั้งไปทั้ง 3 ตัวนี้คือ
- Dagster: core programming model และ abstraction stack; stateless, single-node, single-process และ multi-process execution engines; และ a CLI tool for driving those engines.
- Dagit: UI สำหรับ developing and operating Dagster pipelines, including a DAG browser, a type-aware config editor, and a live execution interface.
- Requests: not part of Dagster. Our examples will use it to download data from the internet.
จากนั้นให้สร้าง code python ที่เราจะใช้ทดลอง ตังชื่อไฟล์ว่า hello_world.py
"""isort:skip_file"""
# start_solid_marker
import requests
import csv
from dagster import pipeline, solid
@solid
def hello_cereal(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
cereals = [row for row in csv.DictReader(lines)]
context.log.info(f"Found {len(cereals)} cereals")
return cereals
# end_solid_marker
# start_pipeline_marker
@pipeline
def hello_cereal_pipeline():
hello_cereal()
# end_pipeline_marker
# start_execute_marker
from dagster import execute_pipeline
if __name__ == "__main__":
result = execute_pipeline(hello_cereal_pipeline)
เราจะทดลองใช้งาน Dagster แบบผ่าน Dagit UI แต่ก่อนอื่นเราต้องไปเปิดการทำงาน Dagster Daemon ซะก่อน โดยใช้คำสั่ง
dagster-daemon run
จากนั้นจะได้ผลลัพแบบนี้
$ dagster-daemon run
2021-09-03 16:29:35 - dagster-daemon - INFO - instance is configured with the following daemons: ['BackfillDaemon', 'SchedulerDaemon', 'SensorDaemon']
2021-09-03 16:29:35 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-09-03 16:29:35 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-09-03 16:29:35 - BackfillDaemon - INFO - No backfill jobs requested.
หลังจากนั้นให้ใช้คำสั่งนี้ เพื่อแสดงการทำงาน pipeline ใน Dagit
dagit -f hello_world.py
จากนั้นจะได้ผลลัพแบบนี้
$ dagit -f hello_world.py
Loading repository...
Serving on http://127.0.0.1:3000 in process 40377
เมื่อเข้าไปที่ http://127.0.0.1:3000 ถ้าไม่มีอะไรผิดพลาด จะได้หน้าตาประมาณนี้

ถ้าเข้าไปดูที่ Instance status จะเห็นว่า demon แต่ละตัวกำลังทำงานอยู่

ถ้าต้องการดู tutorial เพิ่มเติม เข้าไปที่ A Single-Solid Pipeline
ในการ deploy เพื่อใช้งานจริง ดูรายละเอียดได้ที่ Deployment และ Workspaces