Dagster Schedules
Schedules #
Dagster จะมี scheduler ที่สามารถใช้สั่งงานตามช่วงเวลา
Relevant APIs
@daily_schedule
Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายวัน@hourly_schedule
Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายวชั่วโมง@weekly_schedule
Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายสัปดาห์@monthly_schedule
Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายเดือน@schedule
Decorator เป็นตัวที่กำหนด schedule ให้ executes ตามที่กำหนดตารางเวลาใน cron สิ่งสำคัญที่ควรทราบคือ schedule ที่สร้างขึ้นโดยใช้ schedule นี้คือ non-partition based schedulesScheduleEvaluationContext
คือ context ที่ส่งค่า schedule definition ไป execution functionbuild_schedule_context
เป็น function ที่สร้าง ScheduleEvaluationContextScheduleDefinition
คือ Class ของ schedules คุณแทบไม่ต้อง initialize Class นี้โดยตรง การใช้งานคือการใช้หนึ่งใน decorators ด้านบนนี้
Overview
Schedule คือ definition ใน Dagster ที่ใช้ execute แต่ละ pipeline ตามที่มีการกำหนดช่วงเวลา ทุกครั้งที่มีการประเมินกำหนดการจะเรียกว่า tick ตัว schedule definition มีหน้าที่สร้าง run configuration สำหรับ pipeline ในแต่ละ tick
ในแต่ละ schedule
- จะมี Target ที่ single pipeline
- function ที่กำหนดจะ returns run configuration สำหรับ pipeline ที่กำหนด
- Optionally, กำหนด tags, mode และการเลือก solid สำหรับ pipeline เป้าหมาย
- Optionally, กำหนด
should_execute
function, ที่สามารถใช้ข้ามบาง tick
scheduler ใน Dagster ในการทำงานส่วนนี้จะเป็นส่วนหนึ่งของ dagster-daemon process เมื่อคุณกำหนด schedule ดู dagster-daemon สำหรับคำแนะนำเกี่ยวกับวิธีการรัน daemon เพื่อดำเนินการตาม schedule ของคุณ
Defining a schedule
มี schedules สองประเภทใน Dagster:
- Partition-based
- Non-partition-based
Partition-based schedules เป็นตัวเลือกที่ดีเมื่อ pipeline ของคุณทำงานกับข้อมูลที่แบ่งพาร์ติชั่นตามเวลา ( เช่น pipeline ที่เพิ่มข้อมูลเข้าไปที่ละส่วนแบบรายวันในฐานข้อมูล ) Non-partition-based schedules เป็นตัวเลือกที่ดีเมื่อคุณต้องการ run แต่ละ operation แบบ fixed ช่วงเวลาที่แน่นอน (ตัวอย่างเช่น pipeline ที่สร้างตารางเดิมขึ้นใหม่ในแต่ละการดำเนินการ )
Partition-based schedules
Partition-based schedules จะสร้างอยู่ภายใต้ PartitionSetDefinition
ต่อการดำเนิดการ ตัว scheduler จะทำให้แน่ใจว่ามีการ run executed ในทุกๆ partition ของแต่ละ partition set นับตั้งแต่เวลาที่สั่งให้ schedule เปิดการทำงาน
ตัวอย่าง, ถ้าเราต้องการสร้าง schedule ที่ทำงานเวลา 11.00 น. ทุกวัน เราสามารถใช้ @daily_schedule
decorator เพื่อสร้าง partition-based daily schedule
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2021, 1, 1),
execution_time=datetime.time(11, 0),
execution_timezone="US/Central",
)
def my_daily_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
decorated schedule function จะรับค่า datetime และ return ค่า run config สำหรับ pipeline run ที่เกี่ยวข้องกับ partition นั้น จากนั้น scheduler จะมีการตรวจดูให้แน่ใจว่า schedule function ได้รับการประเมินเพียงครั้งเดียวสำหรับทุก partition เพื่อสร้าง run config ซึ่งก็คือการสร้างและเรียกใช้งาน pipeline
Partition-based schedules จะ require start_date เพื่อที่จะบ่งบอกว่า เมื่อชุดของ partitions เริ่มทำงาน ตัว scheduler จะสั่งให้เริ่มทำงานหลังจาก start_date เมื่อ schedule ถูกเปิดการทำงาน คุณสามารถเริ่มการทำงานระหว่าง start_date และเมื่อสั่งให้ schedule ทำงานด้วย backfill
โดยปกติแล้ว พาร์ติชั่นที่ใช้สำหรับ run จะเป็น partition ที่เร็วกว่า partition ที่มีเวลาปัจจุบัน เมื่อดูจากการใช้งาน ETL ในกรณีทั่วไป ตัวอย่างเช่น daily schedule จะ fill ข้อมูลของเมื่อวานเข้าไปใน partition และ monthly schedule จะ fill ข้อมูลของเดือนที่แล้วเข้าไปใน partition คุณสามารถปรับแต่ง behavior นี้ได้โดยการเปลี่ยน partition_days_offset parameter สำหรับ daily schedule ค่าเริ่มต้นของพารามิเตอร์นี้คือ 1 ซึ่งหมายความว่า scheduler จะทำงานย้อนหลัง 1 วันเพื่อกำหนดพาร์ทิชั่น การตั้งค่าเป็น 0 จะทำให้ schedule มีการ fill ข้อมูลในวันปัจจุบัน และถ้าเพิ่มค่าเข้าไปมากกว่า 1 จะเป็นกำหนดให้ fill ข้อมูลเข้าไปย้อนกลับไปตามตัวเลขที่กำหนด ยังมี parameter อื่นที่มีชื่อคล้ายกันสามารถกำหนดให้ทำงานในช่วงเวลาอื่นๆ อีกด้วย
Non-partition-based schedules
เมื่อคุณต้องการเรียกใช้ schedule แบบ fixed ช่วงเวลาและไม่ต้องการทำงานแบบ partition คุณสามารถใช้ @schedule
decorator ในการกำหนด schedule
ตัวอย่างเช่น schedule นี้ทำงานเวลา 01:00 AM. ใน US/Central ของทุกวัน:
@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_schedule():
return {"solids": {"process_data": {"config": {"dataset_name": "my_dataset"}}}}
คุณสามารถส่งผ่าน context argument เข้าไปที่ decorated function เพื่อรับ run information เช่น การกำหนดค่า scheduled แบบกำหนดเวลา
@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_execution_time_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {
"solids": {
"process_data": {"config": {"dataset_name": "my_dataset", "execution_date": date}}
}
}
Timezones
คุณสามารถปรับแต่ง timezone ที่ schedule executes ได้โดยการตั้งค่า execution_timezone parameter ใน schedule ได้ตาม tz timezone ถ้าใน schedule ไม่ได้กำหนด timezone ระบบจะทำงานใน UTC
ตัวอย่างเช่น schedule ต่อไปนี้ดำเนินการทุกวันเวลา 9AM. ตามเวลา US/Pacific:
@daily_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_time=datetime.time(9, 0),
execution_timezone="US/Pacific",
)
def my_timezone_schedule(date):
return {
"solids": {
"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")}}
}
}
Daylight Savings Time
หัวข้อสำคัญของเรื่องนี้คือ
- Daylight Savings คืออะไร
Daylight Savings ใช้ในยุโรป อเมริกา และอีกหลายๆประเทศค่ะ มันจะปรับเวลาให้เร็วขึ้น 1 ชม ในช่วงหน้าร้อนและจะปรับกลับเหมือนเดิมในช่วงหน้าหนาว เหตุผลหลักๆก็คือประหยัดพลังงานไฟฟ้า ช่วงหน้าร้อนพระอาทิตย์จะขึ้นเร็ว คนจะได้รีบตื่นมาใช้ชีวิต ทำภาระกิจโดยใช้แสงสว่างให้คุ้มค่ามากที่สุด ที่มาที่นี่ - อ่านรายละเอียดทั้งหมดได้ที่ Daylight Savings Time
Running the scheduler
เพื่อให้ schedule ของคุณทำงาน ส่วนนี้จะต้องถูก start ขึ้นมาก่อน วิธีที่ง่ายที่สุดในการ start และ stop schedules คือ เข้าไปที่ Dagit Schedules page คุณยังสามารถ start และ stop schedule ด้วย commands "dagster schedule start" และ "dagster schedule stop"
เมื่อ schedule start เรียบร้อยแล้ว ถ้า dagster-daemon process ของคุณกำลังทำงานอยู่ในการ deployment ของคุณ schedule จะเริ่มทำงานทันที
เข้าไปที่ Troubleshooting section ถ้าหาก start "dagster schedule" แล้วแต่ schedule ไม่ทำงาน
Testing Schedules
Testing Partition Schedules
เพื่อที่จะทำ unit test ในการทำ partition schedule คุณสามารถเรียกใช้โดยตรงโดยการกำหนด datetime ส่วนนี้จะ return run config ที่สร้างโดย decorated function สำหรับ date ที่กำหนด การกำหนดค่านี้สามารถตรวจสอบกับ pipeline ได้โดยใช้ฟังก์ชัน validate_run_config
@hourly_schedule(
pipeline_name="test_pipeline",
start_date=datetime.datetime(2020, 1, 1),
)
def hourly_schedule_to_test(date):
return {
"solids": {
"process_data_for_date": {
"config": {
"date": date.strftime("%Y-%m-%d %H"),
}
}
}
}
from dagster import validate_run_config
def test_hourly_schedule():
run_config = hourly_schedule_to_test(datetime.datetime(2020, 1, 1))
assert validate_run_config(pipeline_for_test, run_config)
Testing Cron Schedules
Cron schedules สามารถทดสอบได้โดยการเรียกใช้โดยตรง เมื่อใช้เรียกใช้ schedule จะ return run config ซึ่งสามารถตรวจสอบได้โดยใช้ฟังก์ชัน validate_run_config
@schedule(cron_schedule="* * * * *", pipeline_name="my_pipeline_on_cron")
def my_cron_schedule():
return {}
from dagster import validate_run_config
def test_my_cron_schedule():
run_config = my_cron_schedule()
assert validate_run_config(my_pipeline_on_cron, run_config)
ถ้าคุณใช้ context parameter ใน decorated function จากนั้น build_schedule_context จะสามารถสร้าง ScheduleEvaluationContext เพื่อใช้งานต่อไป
@schedule(cron_schedule="0 1 * * *", pipeline_name="pipeline_for_test")
def my_schedule_uses_context(context):
date_str = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {
"solids": {
"process_data_for_date": {
"config": {
"date": date_str,
}
}
}
}
from dagster import build_schedule_context, validate_run_config
def test_my_cron_schedule_with_context():
context = build_schedule_context(scheduled_execution_time=datetime.datetime(2020, 1, 1))
run_config = my_schedule_uses_context(context)
assert validate_run_config(pipeline_for_test, run_config)
Examples
Hourly partition-based schedule
@hourly_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_time=datetime.time(hour=0, minute=25),
execution_timezone="US/Central",
)
def my_hourly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H")}}}}
Daily partition-based schedule
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_time=datetime.time(hour=9, minute=0),
execution_timezone="US/Central",
)
def my_daily_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
Weekly partition-based schedule
@weekly_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_day_of_week=1, # Monday
execution_timezone="US/Central",
)
def my_weekly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
Monthly partition-based schedule
@monthly_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_timezone="US/Central",
execution_day_of_month=15,
execution_time=datetime.time(hour=9, minute=0),
)
def my_monthly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m")}}}}
Patterns
Using a preset in a schedule definition
ถ้าคุณได้มีการกำหนด preset สำหรับ pipeline ที่คุณต้องการกำหนดเวลาการทำงาน คุณสามารถแยก attributes ที่จำเป็นสำหรับ preset และส่วงมันไปที่ schedule decorator
ในตัวอย่างนี้ เราใช้ solid_selection
, mode
, tags
, และ run_config
โดยตรงจาก preset
@daily_schedule(
start_date=datetime.datetime(2020, 1, 1),
pipeline_name="my_pipeline",
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_preset_schedule(_date):
return preset.run_config
ถ้าคุณอาจต้องปรับเปลี่ยน preset ของ run config ที่มีข้อมูลเกี่ยวกับ date partition:
import copy
@daily_schedule(
start_date=datetime.datetime(2020, 1, 1),
pipeline_name="my_pipeline",
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_modified_preset_schedule(date):
modified_run_config = copy.deepcopy(preset.run_config)
modified_run_config["solids"]["process_data_for_date"]["config"]["date"] = date.strftime(
"%Y-%m-%d"
)
return modified_run_config
หากคุณพบว่าตัวเองกำลังใช้ preset ในการ generate schedule definitions บ่อยๆ คุณสามารถใช้ helper function ที่คล้ายกับฟังก์ชันนี้เพื่อตั้งค่า preset และ return schedule
def daily_schedule_definition_from_pipeline_preset(pipeline, preset_name, start_date):
preset = pipeline.get_preset(preset_name)
if not preset:
raise Exception(
"Preset {preset_name} was not found "
"on pipeline {pipeline_name}".format(
preset_name=preset_name, pipeline_name=pipeline.name
)
)
@daily_schedule(
start_date=start_date,
pipeline_name=pipeline.name,
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_schedule(_date):
return preset.run_config
return my_schedule
Troubleshooting
ถ้าหากพบปัญหาให้ลองทำตาม step ตาม section นี้ Troubleshooting