Dagster Schedules

Dagster Schedules

Schedules #

Dagster จะมี scheduler ที่สามารถใช้สั่งงานตามช่วงเวลา

Relevant APIs

  • @daily_schedule Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายวัน
  • @hourly_schedule Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายวชั่วโมง
  • @weekly_schedule Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายสัปดาห์
  • @monthly_schedule Decorator ตัวนี้จะใช้กำหนดให้ทำงานแบบรายเดือน
  • @schedule Decorator เป็นตัวที่กำหนด schedule ให้ executes ตามที่กำหนดตารางเวลาใน cron สิ่งสำคัญที่ควรทราบคือ schedule ที่สร้างขึ้นโดยใช้ schedule นี้คือ non-partition based schedules
  • ScheduleEvaluationContext คือ context ที่ส่งค่า schedule definition ไป execution function
  • build_schedule_context เป็น function ที่สร้าง ScheduleEvaluationContext
  • ScheduleDefinition คือ Class ของ schedules คุณแทบไม่ต้อง initialize Class นี้โดยตรง การใช้งานคือการใช้หนึ่งใน decorators ด้านบนนี้

Overview

Schedule คือ definition ใน Dagster ที่ใช้ execute แต่ละ pipeline ตามที่มีการกำหนดช่วงเวลา ทุกครั้งที่มีการประเมินกำหนดการจะเรียกว่า tick ตัว schedule definition มีหน้าที่สร้าง run configuration สำหรับ pipeline ในแต่ละ tick

ในแต่ละ schedule

  • จะมี Target ที่ single pipeline
  • function ที่กำหนดจะ returns run configuration สำหรับ pipeline ที่กำหนด
  • Optionally, กำหนด tags, mode และการเลือก solid สำหรับ pipeline เป้าหมาย
  • Optionally, กำหนด should_execute function, ที่สามารถใช้ข้ามบาง tick

scheduler ใน Dagster ในการทำงานส่วนนี้จะเป็นส่วนหนึ่งของ dagster-daemon process เมื่อคุณกำหนด schedule ดู dagster-daemon สำหรับคำแนะนำเกี่ยวกับวิธีการรัน daemon เพื่อดำเนินการตาม schedule ของคุณ

Defining a schedule

มี schedules สองประเภทใน Dagster:

  • Partition-based
  • Non-partition-based

Partition-based schedules เป็นตัวเลือกที่ดีเมื่อ pipeline ของคุณทำงานกับข้อมูลที่แบ่งพาร์ติชั่นตามเวลา ( เช่น pipeline ที่เพิ่มข้อมูลเข้าไปที่ละส่วนแบบรายวันในฐานข้อมูล ) Non-partition-based schedules เป็นตัวเลือกที่ดีเมื่อคุณต้องการ run แต่ละ operation แบบ fixed ช่วงเวลาที่แน่นอน (ตัวอย่างเช่น pipeline ที่สร้างตารางเดิมขึ้นใหม่ในแต่ละการดำเนินการ )

Partition-based schedules

Partition-based schedules จะสร้างอยู่ภายใต้ PartitionSetDefinition ต่อการดำเนิดการ ตัว scheduler จะทำให้แน่ใจว่ามีการ run executed ในทุกๆ partition ของแต่ละ partition set นับตั้งแต่เวลาที่สั่งให้ schedule เปิดการทำงาน

ตัวอย่าง, ถ้าเราต้องการสร้าง schedule ที่ทำงานเวลา 11.00 น. ทุกวัน เราสามารถใช้ @daily_schedule decorator เพื่อสร้าง partition-based daily schedule

@daily_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2021, 1, 1),
    execution_time=datetime.time(11, 0),
    execution_timezone="US/Central",
)
def my_daily_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

decorated schedule function จะรับค่า datetime และ return ค่า run config สำหรับ pipeline run ที่เกี่ยวข้องกับ partition นั้น จากนั้น scheduler จะมีการตรวจดูให้แน่ใจว่า schedule function ได้รับการประเมินเพียงครั้งเดียวสำหรับทุก partition เพื่อสร้าง run config ซึ่งก็คือการสร้างและเรียกใช้งาน pipeline

Partition-based schedules จะ require start_date เพื่อที่จะบ่งบอกว่า เมื่อชุดของ partitions เริ่มทำงาน ตัว scheduler จะสั่งให้เริ่มทำงานหลังจาก start_date เมื่อ schedule ถูกเปิดการทำงาน คุณสามารถเริ่มการทำงานระหว่าง start_date และเมื่อสั่งให้ schedule ทำงานด้วย backfill

โดยปกติแล้ว พาร์ติชั่นที่ใช้สำหรับ run จะเป็น partition ที่เร็วกว่า partition ที่มีเวลาปัจจุบัน เมื่อดูจากการใช้งาน ETL ในกรณีทั่วไป ตัวอย่างเช่น daily schedule จะ fill ข้อมูลของเมื่อวานเข้าไปใน partition และ monthly schedule  จะ fill ข้อมูลของเดือนที่แล้วเข้าไปใน partition คุณสามารถปรับแต่ง behavior นี้ได้โดยการเปลี่ยน partition_days_offset parameter สำหรับ daily schedule ค่าเริ่มต้นของพารามิเตอร์นี้คือ 1 ซึ่งหมายความว่า scheduler จะทำงานย้อนหลัง 1 วันเพื่อกำหนดพาร์ทิชั่น การตั้งค่าเป็น 0 จะทำให้ schedule มีการ fill ข้อมูลในวันปัจจุบัน และถ้าเพิ่มค่าเข้าไปมากกว่า 1 จะเป็นกำหนดให้ fill ข้อมูลเข้าไปย้อนกลับไปตามตัวเลขที่กำหนด ยังมี parameter อื่นที่มีชื่อคล้ายกันสามารถกำหนดให้ทำงานในช่วงเวลาอื่นๆ อีกด้วย

Non-partition-based schedules

เมื่อคุณต้องการเรียกใช้ schedule แบบ fixed ช่วงเวลาและไม่ต้องการทำงานแบบ partition คุณสามารถใช้ @schedule decorator ในการกำหนด schedule

ตัวอย่างเช่น schedule นี้ทำงานเวลา 01:00 AM. ใน US/Central ของทุกวัน:

@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_schedule():
    return {"solids": {"process_data": {"config": {"dataset_name": "my_dataset"}}}}

คุณสามารถส่งผ่าน context argument เข้าไปที่ decorated function เพื่อรับ run information เช่น การกำหนดค่า scheduled แบบกำหนดเวลา

@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_execution_time_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {
        "solids": {
            "process_data": {"config": {"dataset_name": "my_dataset", "execution_date": date}}
        }
    }

Timezones

คุณสามารถปรับแต่ง timezone ที่ schedule executes ได้โดยการตั้งค่า execution_timezone parameter ใน schedule ได้ตาม tz timezone ถ้าใน schedule ไม่ได้กำหนด timezone ระบบจะทำงานใน UTC

ตัวอย่างเช่น schedule ต่อไปนี้ดำเนินการทุกวันเวลา 9AM. ตามเวลา US/Pacific:

@daily_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_time=datetime.time(9, 0),
    execution_timezone="US/Pacific",
)
def my_timezone_schedule(date):
    return {
        "solids": {
            "process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")}}
        }
    }

Daylight Savings Time

หัวข้อสำคัญของเรื่องนี้คือ

  • Daylight Savings คืออะไร
    Daylight Savings ใช้ในยุโรป อเมริกา และอีกหลายๆประเทศค่ะ มันจะปรับเวลาให้เร็วขึ้น 1 ชม ในช่วงหน้าร้อนและจะปรับกลับเหมือนเดิมในช่วงหน้าหนาว เหตุผลหลักๆก็คือประหยัดพลังงานไฟฟ้า ช่วงหน้าร้อนพระอาทิตย์จะขึ้นเร็ว คนจะได้รีบตื่นมาใช้ชีวิต ทำภาระกิจโดยใช้แสงสว่างให้คุ้มค่ามากที่สุด ที่มาที่นี่
  • อ่านรายละเอียดทั้งหมดได้ที่ Daylight Savings Time

Running the scheduler

เพื่อให้ schedule ของคุณทำงาน ส่วนนี้จะต้องถูก start ขึ้นมาก่อน วิธีที่ง่ายที่สุดในการ start และ stop schedules คือ เข้าไปที่ Dagit Schedules page คุณยังสามารถ start และ stop schedule ด้วย commands "dagster schedule start" และ "dagster schedule stop"

เมื่อ schedule start เรียบร้อยแล้ว ถ้า dagster-daemon process ของคุณกำลังทำงานอยู่ในการ deployment ของคุณ schedule จะเริ่มทำงานทันที
เข้าไปที่ Troubleshooting section ถ้าหาก start "dagster schedule" แล้วแต่ schedule ไม่ทำงาน

Testing Schedules

Testing Partition Schedules

เพื่อที่จะทำ unit test ในการทำ partition schedule คุณสามารถเรียกใช้โดยตรงโดยการกำหนด datetime ส่วนนี้จะ return run config ที่สร้างโดย decorated function สำหรับ date ที่กำหนด การกำหนดค่านี้สามารถตรวจสอบกับ pipeline ได้โดยใช้ฟังก์ชัน validate_run_config

@hourly_schedule(
    pipeline_name="test_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
)
def hourly_schedule_to_test(date):
    return {
        "solids": {
            "process_data_for_date": {
                "config": {
                    "date": date.strftime("%Y-%m-%d %H"),
                }
            }
        }
    }


from dagster import validate_run_config


def test_hourly_schedule():
    run_config = hourly_schedule_to_test(datetime.datetime(2020, 1, 1))
    assert validate_run_config(pipeline_for_test, run_config)

Testing Cron Schedules

Cron schedules สามารถทดสอบได้โดยการเรียกใช้โดยตรง เมื่อใช้เรียกใช้ schedule จะ return run config ซึ่งสามารถตรวจสอบได้โดยใช้ฟังก์ชัน validate_run_config

@schedule(cron_schedule="* * * * *", pipeline_name="my_pipeline_on_cron")
def my_cron_schedule():
    return {}


from dagster import validate_run_config


def test_my_cron_schedule():
    run_config = my_cron_schedule()
    assert validate_run_config(my_pipeline_on_cron, run_config)

ถ้าคุณใช้ context parameter ใน decorated function จากนั้น build_schedule_context จะสามารถสร้าง ScheduleEvaluationContext เพื่อใช้งานต่อไป

@schedule(cron_schedule="0 1 * * *", pipeline_name="pipeline_for_test")
def my_schedule_uses_context(context):
    date_str = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {
        "solids": {
            "process_data_for_date": {
                "config": {
                    "date": date_str,
                }
            }
        }
    }


from dagster import build_schedule_context, validate_run_config


def test_my_cron_schedule_with_context():
    context = build_schedule_context(scheduled_execution_time=datetime.datetime(2020, 1, 1))
    run_config = my_schedule_uses_context(context)
    assert validate_run_config(pipeline_for_test, run_config)

Examples

Hourly partition-based schedule

@hourly_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_time=datetime.time(hour=0, minute=25),
    execution_timezone="US/Central",
)
def my_hourly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H")}}}}

Daily partition-based schedule

@daily_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_time=datetime.time(hour=9, minute=0),
    execution_timezone="US/Central",
)
def my_daily_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

Weekly partition-based schedule

@weekly_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_day_of_week=1,  # Monday
    execution_timezone="US/Central",
)
def my_weekly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

Monthly partition-based schedule

@monthly_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_timezone="US/Central",
    execution_day_of_month=15,
    execution_time=datetime.time(hour=9, minute=0),
)
def my_monthly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m")}}}}

Patterns

Using a preset in a schedule definition

ถ้าคุณได้มีการกำหนด preset สำหรับ pipeline ที่คุณต้องการกำหนดเวลาการทำงาน คุณสามารถแยก attributes ที่จำเป็นสำหรับ preset และส่วงมันไปที่ schedule decorator

ในตัวอย่างนี้ เราใช้ solid_selection, mode, tags, และ run_configโดยตรงจาก preset

@daily_schedule(
    start_date=datetime.datetime(2020, 1, 1),
    pipeline_name="my_pipeline",
    solid_selection=preset.solid_selection,
    mode=preset.mode,
    tags_fn_for_date=lambda _: preset.tags,
)
def my_preset_schedule(_date):
    return preset.run_config

ถ้าคุณอาจต้องปรับเปลี่ยน preset ของ run config ที่มีข้อมูลเกี่ยวกับ date partition:

import copy


@daily_schedule(
    start_date=datetime.datetime(2020, 1, 1),
    pipeline_name="my_pipeline",
    solid_selection=preset.solid_selection,
    mode=preset.mode,
    tags_fn_for_date=lambda _: preset.tags,
)
def my_modified_preset_schedule(date):
    modified_run_config = copy.deepcopy(preset.run_config)
    modified_run_config["solids"]["process_data_for_date"]["config"]["date"] = date.strftime(
        "%Y-%m-%d"
    )
    return modified_run_config

หากคุณพบว่าตัวเองกำลังใช้ preset ในการ generate schedule definitions บ่อยๆ คุณสามารถใช้ helper function ที่คล้ายกับฟังก์ชันนี้เพื่อตั้งค่า preset และ return schedule

def daily_schedule_definition_from_pipeline_preset(pipeline, preset_name, start_date):
    preset = pipeline.get_preset(preset_name)
    if not preset:
        raise Exception(
            "Preset {preset_name} was not found "
            "on pipeline {pipeline_name}".format(
                preset_name=preset_name, pipeline_name=pipeline.name
            )
        )

    @daily_schedule(
        start_date=start_date,
        pipeline_name=pipeline.name,
        solid_selection=preset.solid_selection,
        mode=preset.mode,
        tags_fn_for_date=lambda _: preset.tags,
    )
    def my_schedule(_date):
        return preset.run_config

    return my_schedule

Troubleshooting

ถ้าหากพบปัญหาให้ลองทำตาม step ตาม section นี้ Troubleshooting