Dagster / Solids and Pipelines

Dagster / Solids and Pipelines

# Solids and Pipelines

Solids และ Pipelines คือการสร้าง blocks ของ Dagster code. ใช้สิ่งเหล่านี้เพื่อกำหนด orchestration graphs. ส่วนนี้ครอบคลุมถึงวิธีการกำหนดและใช้งานทั้ง solids และ pipelines


Solids

Solids เป็นหน่วยหน้าที่ของงานใน Dagster ใน solid แต่ละ unit จะมีความรับผิดชอบคืออ่านข้อมูล นำเข้า ดำเนินการ และส่งออกข้อมูล ในการทำงานเมื่อมี solid หลายตัวจะมีการทำงานที่ต้องเชื่อมต่อกันด้วย Pipeline

# Solid Relevant APIs

@solid

การกำหนดว่า function ตัวไหนเป็น solid จะใช้ decorator เป็นตัวกำหนด ตัว decorated function จะเรียกว่า compute_fn ตัว decorator จะ returns SolidDefinition

InputDefinition

ตัว InputDefinition จะกำหนด inputs ของ solid compute function สิ่งเหล่านี้ถูกกำหนดไว้ในอาร์กิวเมนต์ input_defs ของ @solid decorator

วิธีปกติในการกำหนดอินพุตคือการเพิ่มอาร์กิวเมนต์ให้กับฟังก์ชัน decorator

@solid
def my_input_solid(abc, xyz):
    pass

คุณสามารถใช้ Dagster Type เพื่อจัดเตรียมฟังก์ชันที่ตรวจสอบอินพุตของ Solid ทุกครั้งที่ Solid ทำงาน ในกรณีนี้ คุณใช้ InputDefinitions ที่สอดคล้องกับอาร์กิวเมนต์ของ

MyDagsterType = DagsterType(type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType")


@solid(input_defs=[InputDefinition(name="abc", dagster_type=MyDagsterType)])
def my_typed_input_solid(abc):
    pass

OutputDefinition

OutputDefinition คือตัวกำหนด outputs ของ solid compute function สิ่งเหล่านี้ถูกกำหนดไว้ในอาร์กิวเมนต์ output_defs ให้กับ @solid decorator

เมื่อคุณมีเอาต์พุตเดียว คุณสามารถคืนค่าเอาต์พุตได้โดยตรง

@solid
def my_output_solid():
    return 5

เมื่อคุณมีเอาต์พุตมากกว่าหนึ่งรายการ คุณต้องให้อินสแตนซ์ของคลาสเอาต์พุตเพื่อแก้ความกำกวมระหว่างเอาต์พุต

@solid(
    output_defs=[
        OutputDefinition(name="first_output"),
        OutputDefinition(name="second_output"),
    ],
)
def my_multi_output_solid():
    yield Output(5, output_name="first_output")
    yield Output(6, output_name="second_output")

SolidExecutionContext

SolidExecutionContext เป็น optional ที่สามารถกำหนดได้โดยใส่ไว้ใน argument ตัวแรก SolidExecutionContext จะทำให้ solid function สามารถเข้าถึง Dagster system information ตัวอย่างเช่น resources, logging, ข้อมูลพื้นฐานของ Dagster instance

ตัวอย่าง ในการ access logger และ log info message ออกมา

@solid(config_schema={"name": str})
def context_solid(context):
    name = context.solid_config["name"]
    context.log.info(f"My name is {name}")

SolidDefinition

SolidDefinition คือ Class ของ solids ส่วนนี้จะไม่ถูเรียกใช้ตรงๆ แต่เมื่อมีการ ใช้งาน @solid จะมีการ return SolidDefinition

Solid Configuration

คือการ parameter ของ solid โดยใช้ config_schema ทำให้สามารถกำหนดค่าและกำหนดพารามิเตอร์ได้ สามารถดูการกำหนดค่าและคำอธิบายได้ ที่ Config Schema

ดังนั้น การกำหนดค่า solid สามารถใช้เพื่อระบุลักษณะการทำงานที่เป็นของ solid ในขณะใช้งานจริง ทำให้ solid มีความยืดหยุ่นและนำกลับมาใช้ใหม่ได้

ตัวอย่างเช่น เราสามารถกำหนด solid ให้ไปรับค่า API endpoint โดยผ่านการกำหนดค่าใน config

@solid(config_schema={"api_endpoint": str})
def my_configurable_solid(context):
    api_endpoint = context.solid_config["api_endpoint"]
    data = requests.get(f"{api_endpoint}/data").json()
    return data

Patterns

Solid Factory

เป็น Pattern ในการสร้างตัวช่วยสร้าง generate solids

def x_solid(
    arg,
    name="default_name",
    input_defs=None,
    **kwargs,
):
    """
    Args:
        args (any): One or more arguments used to generate the nwe solid
        name (str): The name of the new solid.
        input_defs (list[InputDefinition]): Any input definitions for the new solid. Default: None.

    Returns:
        function: The new solid.
    """

    @solid(name=name, input_defs=input_defs or [InputDefinition("start", Nothing)], **kwargs)
    def _x_solid(context):
        # Solid logic here
        pass

    return _x_solid

Pipeline

pipeline คือ set ของ solid ที่มีส่วนที่ต้องพึงพาข้อมูลของ solid อื่นๆ ร่วมกันเช่น solid C ต้องมีการใช้ข้อมูลของ solid A และ solid B

# Pipeline Relevant APIs

@pipeline

การกำหนด pipeline จะใช้ decorator function @pipeline

PipelineDefinition

เป็น class สำหรับ Pipeline ไม่จำเป็นต้อง initialize เพื่อใช้งาน class นี้โดยตรง แต่เมื่อมีการใช้งาน @pipeline จะถูก return ด้วย PipelineDefinition

ModeDefinition

โหมดช่วยให้คุณเปลี่ยนพฤติกรรมไปป์ไลน์ระหว่างสภาพแวดล้อมการใช้งานที่แตกต่างกัน สำหรับข้อมูลเพิ่มเติม โปรดดูส่วนโหมด

PresetDefinition

Presets อนุญาตให้คุณกำหนดการกำหนดค่าไปป์ไลน์ล่วงหน้า

การกำหนด pipeline

ใน decorated function body เราใช้การเรียกฟังก์ชันเพื่อระบุโครงสร้างระหว่าง solids และ pipeline

ในตัวอย่างนี้ add_one solid จะอาศัยผลลัพของ return_one solid เนื่องจากมีการพึ่งพาข้อมูลนี้อยู่ จะเห็นว่า add_one solid จะดำเนินการหลังจาก return_one ประมวลผลเสร็จและส่งผลลัพออกมา

@solid
def return_one(context):
    return 1


@solid
def add_one(context, number: int):
    return number + 1


@pipeline
def one_plus_one_pipeline():
    add_one(return_one())

Aliases and Tags

Solid aliases

คุณสามารถใช้ solid definition ดียวกันได้หลายครั้งใน pipeline เดียวกัน

@pipeline
def multiple_usage_pipeline():
    add_one(add_one(return_one()))

ในการแยกความแตกต่างระหว่างการเรียกใช้สองรายการของ add_one Dagster จะใช้ aliases ชื่อของ solid เป็น add_one และ add_one_2 โดยอัตโนมัติ

คุณยังสามารถกำหนด alias ได้โดยใช้ .alias method ในการเรียกใช้ solid

@pipeline
def alias_pipeline():
    add_one.alias("second_addition")(add_one(return_one()))

Solid Tags

ส่วนนี้จะคล้ายกับ alias คุณยังสามารถกำหนด solid tags ในการเรียกใช้ solid ได้ด้วย

@pipeline
def tag_pipeline():
    add_one.tag({"my_tag": "my_value"})(add_one(return_one()))

Pipeline Configuration

Pipeline Modes

การกำหนด mode ของ Pipeline จะกำหนดโดยระบุ ModeDefinitions แล้วเรียกใช้ต่อไปใน pipeline สำหรับข้อมูลเพิ่มเติมเกี่ยวกับ Modes ดูเพิ่มเติมที่นี่

dev_mode = ModeDefinition("dev")
staging_mode = ModeDefinition("staging")
prod_mode = ModeDefinition("prod")


@pipeline(mode_defs=[dev_mode, staging_mode, prod_mode])
def my_modes_pipeline():
    my_solid()

Pipeline Presets

คุณสามารถกำหนดการกำหนดค่าล่วงหน้าก่อน Pipeline จะดำเนินการได้ PresetDefinition ช่วยให้คุณทำได้โดยการระบุค่าการกำหนดค่า ณ เวลาที่กำหนด Pipeline

@pipeline(
    preset_defs=[
        PresetDefinition(
            name="one",
            run_config={"solids": {"add_one": {"inputs": {"number": 1}}}},
        ),
        PresetDefinition(
            name="two",
            run_config={"solids": {"add_one": {"inputs": {"number": 2}}}},
        ),
    ]
)
def my_presets_pipeline():
    add_one()

ใน Dagit คุณสามารถเลือกและ load preset ใน Playground

ในการ run pipeline ใน script หรือในการ test คุณสามารถกำหนดชื่อ preset ทาง preset argument ใน Python API

def run_pipeline():
    execute_pipeline(my_presets_pipeline, preset="one")

หรือจะกำหนดใน Dagster CLI dagster pipeline execute --preset ในการ run pipeline แบบระบุชื่อ preset name

dagster pipeline execute my_presets_pipeline --preset one

Pipeline Tags

pipeline สามารถระบุชุดของ tag ที่ถูกตั้งค่าโดยอัตโนมัติบนผลลัพธ์ในการ run pipeline

@pipeline(tags={"my_tag": "my_value"})
def my_tags_pipeline():
    my_solid()

การกำหนด tag my_tag ใน pipeline การเรียกใช้ pipelineใดๆ ที่สร้างโดยใช้ pipeline นี้จะมีแท็กเดียวกันด้วย

Pipeline Examples

มี DAG structures หลายแบบที่สามารถแสดงโดยใช้ pipelines ส่วนนี้จะครอบคลุมรูปแบบพื้นฐานสองถึงสามแบบที่คุณสามารถใช้เพื่อสร้าง pipelines ที่ซับซ้อนมากขึ้น

Linear Dependencies

โครงสร้าง pipeline แบบที่ง่ายที่สุดคือ linear pipeline จะมีการ return 1 output จาก root solid และส่งผ่านข้อมูลผ่านอินพุตและเอาต์พุตเดี่ยว

from dagster import pipeline, solid


@solid
def return_one(context) -> int:
    return 1


@solid
def add_one(context, number: int) -> int:
    return number + 1


@pipeline
def linear_pipeline():
    add_one(add_one(add_one(return_one())))

Multiple Inputs

ตัว solid ที่มี single output สามารถส่งค่าไปให้ได้หลาย solid inputs ในตัวอย่างนี้ output จาก solid ตัวแรกจะส่งออกไปที่ solid 2 ตัวที่ต่างกัน จากนั้น outputs ที่ออกมาจะถูกนำมารวมกันและส่งไปที่ solid ตัวสุดท้าย

from dagster import pipeline, solid


@solid
def return_one(context) -> int:
    return 1


@solid
def add_one(context, number: int):
    return number + 1


@solid
def adder(context, a: int, b: int) -> int:
    return a + b


@pipeline
def inputs_and_outputs_pipeline():
    value = return_one()
    a = add_one(value)
    b = add_one(value)
    adder(a, b)

Conditional Branching

solid จะเริ่มทำงานเมื่อมี input เข้ามาแล้วเท่านั้น ซึ่งเราสามารถใช้ behavior นี้มากำหนดให้ solid ทำงานแบบมีเงื่อนไขได้

ในตัวอย่างนี้ ผลลัพธ์ของ branching_solid จะถูกส่งไปที่ตัว branch_1 หรือ branch_2 ตัวใดตัวหนึ่ง ขึ้นอยู่กับผลลัพธ์ที่ได้ของ branching_solid

import random

from dagster import Output, OutputDefinition, pipeline, solid


@solid(
    output_defs=[
        OutputDefinition(name="branch_1", is_required=False),
        OutputDefinition(name="branch_2", is_required=False),
    ]
)
def branching_solid():
    num = random.randint(0, 1)
    if num == 0:
        yield Output(1, "branch_1")
    else:
        yield Output(2, "branch_2")


@solid
def branch_1_solid(_input):
    pass


@solid
def branch_2_solid(_input):
    pass


@pipeline
def branching_pipeline():
    branch_1, branch_2 = branching_solid()
    branch_1_solid(branch_1)
    branch_2_solid(branch_2)

Fixed Fan-in Pipeline

หากคุณมี fixed set of solids โดยที่ solids จะ return type ออกมาแบบเดียวกัน คุณมาสามารถเก็บรวบรวม outputs ทั้งหมดมาเก็บไว้ใน list และส่งไปที่ solid ตัวเดียวได้

solid ปลายทางจะทำงานก็ต่อเมื่อเอาต์พุตทั้งหมดสร้างสำเร็จโดย solid ต้นทาง

from typing import List

from dagster import pipeline, solid


@solid
def return_one() -> int:
    return 1


@solid
def sum_fan_in(nums: List[int]) -> int:
    return sum(nums)


@pipeline
def fan_in_pipeline():
    fan_outs = []
    for i in range(0, 10):
        fan_outs.append(return_one.alias(f"return_one_{i}")())
    sum_fan_in(fan_outs)

ในตัวอย่างนี้ จะมี 10 solids ซึ่งทั้งหมดจะมี output ที่มีค่าเป็นเลข 1 ส่งออกไปเก็บไว้ใน list เมื่อทำงานครบแล้ว sum_fan_in จะรับค่าและนำไป sum ค่าทั้งหมด

# Dynamic Mapping & Collect

ในหลายกรณี มี structure ของ pipeline ที่ถูกกำหนดไว้ล่วงหน้าก่อน execution แต่ Dagster รองรับการสร้าง pipeline ที่ไม่ได้กำหนดโครงสร้างสุดท้ายจนกว่าจะรันไทม์ สิ่งนี้มีประโยชน์สำหรับโครงสร้าง pipeline ที่คุณต้องการรัน instance แยกจากกันของ solid สำหรับแต่ละรายการในเอาต์พุตที่แน่นอน

ในตัวอย่างนี้ เรามี solid files_in_directory ที่กำหนด DynamicOutputDefinition เรา map ผ่านเอาต์พุตแบบไดนามิกซึ่งจะทำให้ downstream dependencies ถูกโคลนสำหรับ DynamicOutput แต่ละรายการที่ให้ผลลัพธ์ ตัว copy ของ downstream สามารถระบุได้โดย mapping_key  ที่ส่งค่าออกมาเป็น DynamicOutput เมื่อเสร็จทั้งหมดแล้ว เราจะได้ผลลัพธ์ทั้งหมดของ process_file และส่งต่อไปที่ summarize_directory

import os
from typing import List

from dagster import DynamicOutput, DynamicOutputDefinition, Field, pipeline, solid
from dagster.utils import file_relative_path


@solid(
    config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))},
    output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
    path = context.solid_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(
            value=os.path.join(dirname, file),
            # create a mapping key from the file name
            mapping_key=file.replace(".", "_").replace("-", "_"),
        )


@solid
def process_file(path: str) -> int:
    # simple example of calculating size
    return os.path.getsize(path)


@solid
def summarize_directory(sizes: List[int]) -> int:
    # simple example of totalling sizes
    return sum(sizes)


@pipeline
def process_directory():
    file_results = files_in_directory().map(process_file)
    summarize_directory(file_results.collect())

Order-based Dependencies (Nothing dependencies)

ปกติแล้วเส้นทางการทำงานใน Dagster จะพึ่งพาข้อมูลเป็นหลัก การพึ่งพาข้อมูลหมายถึงแต่ละ input ของ solid ขึ้นอยู่กับ output ที่ออกมาของ solid

ถ้าคุณมี solid ที่ชื่อ Solid A ที่ไม่ขึ้นกับ output ของ solid อื่น ที่ Solid B ในทางทฤษฎีไม่ควรมีเหตุผลให้ Solid A ทำงานหลังจาก Solid B ในกรณีส่วนใหญ่ ทั้ง 2 solid ควรจะทำงานขนานกัน อย่างไรก็ตาม ในบางกรณีจำเป็นที่จะต้องมีลำดับการทำงานที่ชัดเจน

ถ้าคุณต้องการ model ลำดับการทำงานที่ชัดเจน คุณสามารถใช้ Dagster type ที่ชื่อ Nothing ในการกำหนด input definition ที่ตัว solid ที่ Dagster type "nothing" นี้จะบอกตัว solid ว่าไม่ต้องส่งอะไรออกไประหว่าง solid

from dagster import InputDefinition, Nothing, pipeline, solid


@solid
def create_table_1():
    get_database_connection().execute("create table_1 as select * from some_source_table")


@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2():
    get_database_connection().execute("create table_2 as select * from table_1")


@pipeline
def nothing_dependency_pipeline():
    create_table_2(start=create_table_1())

Nothing type inputs จะไม่มี parameter ใน function เพราะไม่มีการส่งผ่านข้อมูล เมื่อเชื่อมต่อ dependencies แนะนำให้ใช้ keyword args เพื่อป้องกันการปะปนกับตำแหน่งอื่นๆ

Dagster ยังจัดเตรียม advanced abstractions เพื่อจัดการกับ dependencies และ IO ถ้าต้องการหา model อื่นๆ เพิ่มเติมเพื่อใช้แก้ปัญหาในการใช้ external storages, ดูต่อที่ IOManagers.

Patterns

# Constructing PipelineDefinitions

ในกรณีที่ต้องการกำหนด PipelineDefinition object โดยตรงให้กำหนด

  • pipeline name เช่น
    name="one_plus_one_pipeline"
  • list ของ solid definitions เช่น
    solid_defs=[return_one, add_one]
  • dependency structure เช่น
    dependencies={"add_one":{"number":DependencyDefinition("return_one")}}

dependency structure จะประกาศ dependency ของ input ของ Solid แต่ละตัวบน output ของ Solid ตัวอื่นที่อยู่ใน pipeline ที่ key ตัวนอกสุดของ dependency dictionary คือ string ชื่อของ solid ถ้าคุณใช้ solid aliases ดูให้แน่ใจว่ามี solid aliases อยู่จริงๆ ส่วน value ของ dependency dictionary จะเป็น dictionary ที่มีการ map ชื่อ input ด้วย DependencyDefinition

one_plus_one_pipeline_def = PipelineDefinition(
    name="one_plus_one_pipeline",
    solid_defs=[return_one, add_one],
    dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)

# Pipeline DSL

ถ้าต้องการสร้าง pipeline ด้วย YAML file ประโยชน์ของการสร้างแบบนี้คือ สะดวกในการย้ายไปยัง Dagster workflow อื่น

ตัวอย่างเช่น

pipeline:
  name: some_example
  description: blah blah blah
  solids:
    - def: add_one
      alias: A
    - def: add_one
      alias: B
      deps:
        num:
          solid: A
    - def: add_two
      alias: C
      deps:
        num:
          solid: A
    - def: subtract
      deps:
        left:
          solid: B
        right:
          solid: C

คุณสามารถสร้าง PipelineDefinition จาก YAML นี้:

@solid
def add_one(num: int) -> int:
    return num + 1


@solid
def add_two(num: int) -> int:
    return num + 2


@solid
def subtract(left: int, right: int) -> int:
    return left + right


def construct_pipeline_with_yaml(yaml_file, solid_defs):
    yaml_data = load_yaml_from_path(yaml_file)
    solid_def_dict = {s.name: s for s in solid_defs}

    deps = {}

    for solid_yaml_data in yaml_data["pipeline"]["solids"]:
        check.invariant(solid_yaml_data["def"] in solid_def_dict)
        def_name = solid_yaml_data["def"]
        alias = solid_yaml_data.get("alias", def_name)
        solid_deps_entry = {}
        for input_name, input_data in solid_yaml_data.get("deps", {}).items():
            solid_deps_entry[input_name] = DependencyDefinition(
                solid=input_data["solid"], output=input_data.get("output", "result")
            )
        deps[SolidInvocation(name=def_name, alias=alias)] = solid_deps_entry

    return PipelineDefinition(
        name=yaml_data["pipeline"]["name"],
        description=yaml_data["pipeline"].get("description"),
        solid_defs=solid_defs,
        dependencies=deps,
    )


def define_dep_dsl_pipeline():
    return construct_pipeline_with_yaml(
        file_relative_path(__file__, "example.yaml"), [add_one, add_two, subtract]
    )


@repository
def define_repository():
    return {"pipelines": {"some_example": define_dep_dsl_pipeline}}

# Pipeline Execution

Dagster ให้ทางเลือกหลายทางในการ execute pipelines

Relevant APIs

execute_pipeline วิธีการในการ execute a pipeline แบบซิงโครนัส โดยทั่วไปจะใช้วิธี running scripts หรือ testing

Overview

มีหลายวิธีในการ execute pipelines ส่วนนี้จะอธิบายหลายวิธีการแต่ละแบบในการ execute pipeline : Dagit, Dagster CLI, หรือ Python APIs.

คุณยังสามารถ launch pipelines ด้วยวิธีอื่น:

  • Schedules สามารถใช้เพื่อเปิดรันในช่วงเวลาที่กำหนดได้
  • Sensors อนุญาตให้คุณเรียกใช้งานตามการเปลี่ยนแปลงสถานะภายนอก

Executing a Pipeline

from dagster import pipeline, solid


@solid
def return_one():
    return 1


@solid
def add_two(i: int):
    return i + 2


@solid
def multi_three(i: int):
    return i * 3


@pipeline
def my_pipeline():
    multi_three(add_two(return_one()))

Dagit

Dagster มาพร้อมกับอินเทอร์เฟซบนเว็บสำหรับการดูและโต้ตอบกับ pipeline และวัตถุประสงค์อื่นๆ

การแสดง pipeline ใน Dagit คุณสามารถใช้ dagit command:

dagit -f my_pipeline.py

จากนั้นไปที่ http://localhost:3000 เพื่อเริ่มใช้ Dagit:

คลิกที่ "Playground" tab แล้วกดที่ "Launch Execution" button ในการ execute the pipeline แล้วคุณจะเห็น Dagit เปิดการทำงาน pipeline:

Dagit Playground ยังมีตัวแก้ไขการกำหนดค่าเพื่อให้คุณสร้างการกำหนดค่าแบบโต้ตอบได้ ดูรายละเอียดใน Dagit

Dagster CLI

dagster CLI จะรวมทั้ง dagster pipeline execute สำหรับ direct execution และ dagster pipeline launch สำหรับเปิดการทำงานแบบอะซิงโครนัสที่ใช้กับ run launcher ที่ instance ของคุณ

ในการ execute pipeline ของคุณโดยตรง คุณสามารถเรียกใช้:

dagster pipeline execute -f my_pipeline.py

Python APIs

Dagster มี Python API สำหรับการดำเนินการที่เป็นประโยชน์เมื่อเขียน tests หรือเขียน scripts

method execute_pipeline จะทำหน้าที่ executes pipeline และ return PipelineExecutionResult.

from dagster import execute_pipeline

if __name__ == "__main__":
    result = execute_pipeline(my_pipeline)

คุณสามารถค้นหาเอกสาร API ฉบับเต็มได้ใน Execution API และเรียนรู้เพิ่มเติมเกี่ยวกับกรณีการใช้งานการทดสอบใน Testing.

Executing Pipeline Subset

Dagster รองรับวิธี run subset ของ pipeline เรียกว่า Solid Selection

Solid Selection Syntax

เพื่อระบุ solid selection, Dagster มี query syntax พื้นฐาน มันทำงานดังนี้:

  • query จะประกอบไปด้วย list
  • แต่ละส่วนใน query สามารถเป็นชื่อของ solid ซึ่งในกรณีนี้จะเป็นการ select solid ตรงๆ
  • แต่ละส่วนใน query สามารถชื่อของ solid ที่นำหน้าด้วย * ในกรณีนี้ solid upstream dependencies จะถูก select ทั้งหมด
  • แต่ละส่วนใน query สามารถชื่อของ solid ที่ต่อท้ายด้วย * ในกรณีนี้ solid downstream dependencies จะถูก select ทั้งหมด
  • แต่ละส่วนใน query สามารถชื่อของ solid ที่ต่อท้ายด้วย + ที่มีได้หลายตัว ในกรณีนี้จะเป็นการ select solid จะกระโดดไปตามจำนวน +
  • แต่ละส่วนใน query สามารถชื่อของ solid ที่นำหน้าด้วย + ในกรณีนี้จะเป็นการ select solid จะกระโดดไปที่ parents

ถามว่างงมั้ย งง ถ้างงไปอ่านที่นี่แล้วทดลองเอง

Clause examples

  • some_solid: select "some_solid" itself
  • *some_solid: select "some_solid" and all ancestors (upstream dependencies).
  • some_solid*: select "some_solid" and all descendants (downstream dependencies).
  • *some_solid*: select "some_solid" and all of its ancestors and descendants.
  • +some_solid: select "some_solid" and its direct parents.
  • some_solid+++: select "some_solid" and its children, its children's children, and its children's children's children.

Specifying Solid Selection

คุณสามารถใช้ syntax selection นี้ในอาร์กิวเมนต์ solid_selection ใน execute_pipeline:

execute_pipeline(my_pipeline, solid_selection=["*add_two"])

ในทำนองเดียวกัน คุณสามารถใช้ solid selection ได้เหมือนกันใน Dagit Playground:

Pipeline Execution: Examples

Multiprocessing Execution

คุณอาจต้องการ execute solids ในแบบ parallel สามารถทำได้โดย multiprocess executor, ซึ่งจะ execute แต่ละ solid ใน sub process ตัวของมันเอง

multiprocess executor มีอยู่ใน default mode คุณสามารถใช้โดยระบุใน config

execution:
  multiprocess:

สังเกตว่าเพราะ solid จะ execute process แบบแยกการทำงาน ค่าที่ส่งผ่านระหว่าง solids ยังต้องจัดเก็บในลักษณะที่สามารถเข้าถึงได้ข้าม process ตัวอย่างเช่น คุณสามารถกำหนดค่า fs_io_manager เป็น IO manager ใน pipeline ทั้งหมด เป็นค่าเดียวกัน

@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def parallel_pipeline():
    total(return_one(), return_one(), return_one(), return_one())

ดูเพิ่มเติมสำหรับ IO Managers เกี่ยวกับการกำหนดค่า IO Managers

นอกจากนี้ การใช้ multiprocess executor จะมีส่วนร่วม reconstructing pipeline ใน process อื่น ดังนั้น Python APIs จึงจำเป็นที่จะต้องมีความสามารถ reconstruct instance ของ pipeline ได้ งานส่วนนี้จะได้รับการจัดการสำหรับคุณเมื่อใช้ Dagit หรือ Dagster CLI

ในทำนองเดียวกัน  Python APIs ใช้ DagsterInstance ชั่วคราวโดยค่าเริ่มต้นเพื่อหลีกเลี่ยงการรายงานการทดสอบรันไปยัง instance เมื่อใช้ Python API สำหรับ production run ให้ตั้งค่าอินสแตนซ์โดยใช้ instance=DagsterInstance.get() ในการ config ค่า default สำหรับ loading behavior แต่ถ้าใช้งาน Dagit หรือ Dagster CLI ระบบจะจัดการให้เอง

ดังนั้นถ้าคุณ executing pipeline โดยใช้ Python APIs การ execution method ของคุณจะเป็นลักษณะนี้

def execute_multiprocessing():
    from dagster import reconstructable, DagsterInstance

    execute_pipeline(
        # A ReconstructablePipeline is necessary to load the pipeline in child processes.
        # reconstructable() is a utility function that captures where the
        # PipelineDefinition came from.
        reconstructable(parallel_pipeline),
        run_config={
            # This section controls how the run will be executed.
            # The multiprocess executor runs each solid in its own sub process.
            "execution": {"multiprocess": {}},
        },
        # The default instance for this API is an in memory ephemeral one.
        # To allow the multiple processes to coordinate we use one here
        # backed by a temporary directory.
        instance=DagsterInstance.local_temp(),
    )

# Composite Solids

Dagster ได้เตรียม Composite Solids, ซึ่งก็คือ unit ของ abstraction สำหรับการสร้าง solid จาก solid อื่นๆ

Relevant APIs

@composite_solid การกำหนด composite solid จะใช้ decorator function

Overview

Solid จะเชื่อมโยงเข้าด้วยกันโดยกำหนด dependencies ระหว่างอินพุตและเอาต์พุต การกำหนดความเชื่อมโยงกันมักจะทำที่ระดับ Pipeline level

Composite solids ยังช่วยให้คุณกำหนด Solid ที่เชื่อมต่อกัน เพื่อสร้าง Solid ใหม่ สิ่งนี้มีประโยชน์สำหรับ:

  • การจัดระเบียบ graph ขนาดใหญ่หรือ graph ที่ซับซ้อน
  • ขจัดความซับซ้อน (Abstracting away complexity)
  • การ Wrapping solids แล้วนำมา re-usable โดยกำหนดข้อมูลใหม่ (domain-specific information)

การ Refactoring DAG ของ solids โดยใช้ composites เป็นไปในลักษณะที่คล้ายคลึงกันมากกับการปรับโครงสร้างโค้ดด้วยฟังก์ชัน การกำหนด composite solid คล้ายกับการกำหนด pipeline, แต่ composite solids สามารถกำหนด mapping information เพื่อควบคุมข้อมูล กำหนดค่าการเข้าและออก ภายใน graph ของ solid

Defining a Composite Solid

ในการกำหนด composite solid จะใช้ @composite_solid decorator เราจะใช้การเรียก function ภายใน decorated function body เพื่อระบุโครงสร้าง dependency ระหว่าง solids ที่ประกอบเป็น composite solid

ในตัวอย่างนี้ add_one solid จะอาศัย output ของ return_one solid เนื่องจากมีการพึ่งพาข้อมูลนี้อยู่ return_one solid จะดำเนินการหลังจาก add_one ทำงานสำเร็จและปล่อยเอาต์พุตที่ต้องการออกมา

from dagster import InputDefinition, composite_solid, pipeline, solid


@solid
def return_one():
    return 1


@solid
def add_one(number: int):
    return number + 1


@solid
def multiply_by_three(number: int):
    return number * 3


@composite_solid(input_defs=[InputDefinition("number", int)])
def add_one_times_three_solid(number):
    return multiply_by_three(add_one(number))


@pipeline
def my_pipeline():
    add_one_times_three_solid(return_one())

Composite Solid Configuration

โดยค่าเริ่มต้น การ config schema สำหรับ solid แต่ละตัวใน composite solid ถูกยกขึ้นไปที่ composite solid เอง ภายใต้ solid key

ในตัวอย่างนี้ คุณมี solids สองตัวที่ทั้งคู่ใช้ config และถูกหุ้มด้วย composite solid

@solid(config_schema={"n": int})
def add_n(context, number: int):
    return number + context.solid_config["n"]


@solid(config_schema={"m": int})
def multiply_by_m(context, number: int):
    return number * context.solid_config["m"]


@composite_solid(input_defs=[InputDefinition("number", int)])
def add_n_times_m_solid(number):
    return multiply_by_m(add_n(number))

ในการ run pipeline ด้วย composite solid คุณจะต้องระบุการกำหนดค่าสำหรับทั้ง add_n และ multiply_by_m ผ่าน composite solid:

solids:
  add_n_times_m_solid:
    inputs:
      number: 0
    solids:
      add_n:
        config:
          n: 3
      multiply_by_m:
        config:
          m: 2

Configuration Mapping

Composite solid ยังสามารถกำหนด config schema เมื่อ Composite solid กำหนด config schema มันจะต้องกำหนด config_mapping_fn ที่จะถูกนำไป map ใน composite solids config และจะนำไป wrapped solids' config

def config_mapping_fn(config):
    x = config["x"]
    return {"add_n": {"config": {"n": x}}, "multiply_by_m": {"config": {"m": x}}}


@composite_solid(
    config_fn=config_mapping_fn,
    config_schema={"x": int},
    input_defs=[InputDefinition("number", int)],
)
def add_x_multiply_by_x(number):
    return multiply_by_m(add_n(number))


@pipeline
def my_pipeline_config_mapping():
    add_x_multiply_by_x(return_one())

ในตัวอย่างนี้ composite solid จะมีเพียงหนึ่งฟิลด์ใน config schema : x. config mapping function นำการกำหนดค่าที่จัดเตรียมไว้ให้กับ composite solid และแมปกับ wrapped solids

solids:
  add_x_multiply_by_x:
    config:
      x: 1

Examples

Multiple Outputs

เมื่อมี Multiple Outputs จาก composite solid, คุณจะต้องกำหนดผลลัพธ์ที่ maps และ return dictionary โดยที่ keys คือชื่อเอาต์พุตและ values คือค่าเอาต์พุต

from dagster import OutputDefinition


@solid
def echo(i):
    print(i)


@solid
def one() -> int:
    return 1


@solid
def hello() -> str:
    return "hello"


@composite_solid(output_defs=[OutputDefinition(int, "x"), OutputDefinition(str, "y")])
def composite_multi_outputs():
    x = one()
    y = hello()
    return {"x": x, "y": y}


@pipeline
def my_pipeline_multi_outputs():
    x, y = composite_multi_outputs()
    echo(x)
    echo(y)

# Solid Events and Exceptions

ภายใน body ของ solid มีความเป็นไปได้ที่จะสื่อสารกับ Dagster framework ไม่ว่าจะเกิด event หรือเกิด exception เนื้อหานี้จะอธิบายความเป็นไปได้ต่างๆ เหล่านี้และสถานการณ์ที่คุณอาจจะได้ใช้งาน

Relevant APIs

  • Output Dagster event used to yield an output from a solid
  • AssetMaterialization Dagster event indicating that a solid has materialized an asset.
  • ExpectationResult Dagster event representing the result of a data quality check
  • Failure Dagster exception indicating that a failure has occurred
  • RetryRequested Dagster exception requesting the step to be retried

Overview

ภายในตัว solid จะมี โครงสร้างของ events อยู่หลายแบบ ประกอบด้วย types หลายประเภท กิจกรรมเหล่านี้จะถูกประมวลผลโดย Dagster และบันทึกไว้ใน event log พร้อมกับบริบทเพิ่มเติมเกี่ยวกับ solid ที่ปล่อยออกมา

นอกจากนี้ยังเป็น exceptions ของ Dagster ได้อีกด้วย ซึ่งใน exceptions แต่ละแบบจะบ่งบอกรายละเอียดที่ทำให้ framework หยุดทำงานและ action อื่นๆ ที่เกิดขึ้น

Event Metadata

บางครั้งเมื่อเราแนบข้อมูลบางอย่างไปกับ event หรือ exception ที่ไม่ตรงกับ basic parameters ผ่านไปยัง EventMetadataEntry object ใน dagster จะมี interface สำหรับการระบุข้อมูล metadata นี้เพื่อรองรับ events หลายๆ แบบ โดยขึ้นอยู่กับประเภทของข้อมูล

AssetMaterialization, ExpectationResult, และ Failure objects แต่ละตัวจะยอมรับ metadata parameter ซึ่ง maps string labels เข้ากับ structured values. ส่วนเอาต์พุตยังยอมรับ parameter นี้ด้วย แม้ว่าฟังก์ชันนี้กำลังอยู่ในขั้นทดลองอาจเปลี่ยนแปลงได้ในอนาคต

dagster ยังรองรับ metadata types ที่มีประโยชน์หลายแบบ รวมถึงประเภทข้อมูลพื้นฐาน (EventMetadataEntry.float, EventMetadataEntry.int, EventMetadataEntry.text) รวมไปถึงข้อมูลที่ซับซ้อนเช่น markdown และ json (EventMetadataEntry.md, EventMetadataEntry.json)

ลองดู Docs API ของ EventMetadataEntry สำหรับรายละเอียดเพิ่มเติม

# Events

ผลลัพธ์ของอีเวนท์ภายในตัว solid คือทางที่ใช้ในการสื่อสารกับ Dagster framework event ที่สำคัญที่สุดต่อการทำงานของ Dagster คือ Output ของ event ซึ่งช่วยให้สามารถส่งข้อมูลเอาต์พุตจาก solid หนึ่งไปยังอีกอันหนึ่งได้ อย่างไรก็ตาม Dagster ยังจัดเตรียม interfaces สำหรับสื่อสารกับ external assets และการตรวจสอบคุณภาพของข้อมูลในระหว่างที่ solid กำลังทำงานไว้ด้วย

Outputs

เนื่องจากการ return ค่าจาก solid เป็นส่วนพื้นฐานของการสร้าง data pipeline ตัว Dagster จะมี interfaces ที่ช่วยให้การเขียน code ส่วนนี้ง่ายขึ้นเล็กน้อย

สำหรับ solid ที่มี single output ก็สามารถ return ค่าออกมาตรงๆ จาก compute_fn ซึ่งจะ convert ออกมาเป็น Dagster Output event โดยชื่อ output default คือ result

@solid
def my_simple_return_solid(context):
    return 1

ตัวอย่างข้างบนอาจเป็นวิธีที่ง่ายที่สุดในการ return ค่าจาก function แต่เมื่อคุณมี outputs หลายรายการที่กำหนดไว้ใน solid ของคุณ หรือต้องการแสดงข้อมูลเพิ่มเติมที่ไม่ใช่เอาต์พุตจากเนื้อความของ solid ของคุณ การ return ค่าที่กำหนดเองไม่ใช่ตัวเลือกอีกต่อไป ในกรณีนี้ คุณจะต้องกำหนดค่า Output events เอง โดยสารมารถทำได้ตามตัวอย่างนี้

@solid
def my_simple_yield_solid(context):
    yield Output(1)

หรือถ้าต้องการกำหนดค่า output name เองที่ไม่ใช่ result ทำได้ตามตัวอย่างนี้

@solid(
    output_defs=[
        OutputDefinition(name="my_output"),
    ]
)
def my_named_yield_solid(context):
    yield Output(1, output_name="my_output")

เอกสารเพิ่มเติม Solid Outputs

# Attaching Metadata to Outputs (Experimental)

ถ้ามีข้อมูลเพิ่มเติมที่ต้องการเพิ่มเข้าไปในเอาท์พุตที่ต้องการส่งออกไป คุณอาจเลือกที่จะแสดงข้อมูลนั้นโดยส่งผ่าน  parameter ที่ชื่อ metadata สามารถทำได้โดย map key/value ที่ชื่อ metadata

@solid
def my_metadata_output(context):
    df = get_some_data()
    yield Output(
        df,
        metadata={
            "text_metadata": "Text-based metadata for this event",
            "dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
            "raw_count": len(df),
            "size (bytes)": calculate_bytes(df),
        },
    )

# Asset Materializations

AssetMaterialization events จะเป็นตัวบอก Dagster ว่าคุณจะเขียข้อมูลบางส่วนและส่งออกไปยังระบบภายนอก (external system) ตัวอย่างที่ใช้ประจำก็อย่างเช่นเขียนข้อมูลลง database

โดยทั่วไปแล้วคุณต้องการส่งอีเว้นออกไปตรงตรงที่ระบบภายนอก (external system) อีเว้น AssetMaterialization ที่สร้างขึ้นจะต้องกำหนด asset_key ซึ้งจะเป็น key ที่ใช้ระบุตัวตนและการมีอยู่ของ asset

@solid
def my_asset_solid(context):
    df = get_some_data()
    store_to_s3(df)
    yield AssetMaterialization(
        asset_key="s3.my_asset",
        description="A df I stored in s3",
    )

    result = do_some_transform(df)
    yield Output(result)

หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับ assets และเมื่อมีการใช้ event จะเกิดอะไรต่อไป ดูเพิ่มเติมได้ที่เอกสาร Asset Catalog

# Attaching Metadata to Asset Materializations

Attaching metadata ใน Asset Materializations เป็น ส่วนสำคัญในการติดตามการทำงานเมื่อเวลาผ่านไป การทำงานของ functions นี้โดยพื้นฐานแล้วจะเหมือนกับ events อื่นๆ ที่ยอมรับ metadata parameter ทำให้คุณสามารถแนบ labels และ values ที่มีโครงสร้างที่จะแสดงได้

@solid
def my_metadata_materialization_solid(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(
        asset_key="my_dataset",
        description="Persisted result to storage",
        metadata={
            "text_metadata": "Text-based metadata for this event",
            "path": EventMetadata.path(remote_storage_path),
            "dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
            "size (bytes)": calculate_bytes(df),
        },
    )
    yield Output(remote_storage_path)

# Expectation Results

Solid สามารถปล่อย event ที่มีโครงสร้างเพื่อแสดงผลลัพธ์ของการทดสอบคุณภาพข้อมูล โดย data quality event class คือ ExpectationResult เพื่อสร้างผลลัพธ์ที่คาดหวัง เราสามารถให้ผลลัพ ExpectationResult event ใน solid ของเรา

@solid
def my_expectation_solid(context, df):
    do_some_transform(df)
    yield ExpectationResult(success=len(df) > 0, description="ensure dataframe has rows")
    yield Output(df)

# Attaching Metadata to Expectation Results

เหมือนกับ event types อื่นๆ ใน Dagster มีหลากหลาย type ใน metadata ที่เกี่ยวข้องกับ expectation result event ผ่าน EventMetadataEntry class แต่ละ event ที่เกินขึ้นที่มีใน metadata จะแสดงใน event log
ตัวอย่างนี้แสดงให้เห็นรายการ metadata ที่มี type ที่ต่างกันถูกแนบเข้ามาที่ expectation result:

@solid
def my_metadata_expectation_solid(context, df):
    df = do_some_transform(df)
    yield ExpectationResult(
        success=len(df) > 0,
        description="ensure dataframe has rows",
        metadata={
            "text_metadata": "Text-based metadata for this event",
            "dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
            "raw_count": len(df),
            "size (bytes)": calculate_bytes(df),
        },
    )
    yield Output(df)

# Exceptions

Dagster มี exception class เอาไว้ให้ใช้ในตอน execution solid ได้ behaviorที่จะเกิดหลังจากเกิด exception ขึ้นอยู่กับ exception ที่คุณใช้ รายละเอียด exception จะมีตามนี้

# Failures

Failure exception จะใช้ในกรณีที่เกิดปัญหาและต้องการให้หยุดการ execution สิ่งนี้อาจมีประโยชน์หาก ในตัว solid สามารถ detect issue ที่เกิดขึ้น แต่ตัว issue ไม่ทำให้เกิด error ขึ้นมา แต่ยังคงสร้างปัญหาหากยังดำเนินการต่อไป

@solid
def my_failure_solid():
    path = "/path/to/files"
    my_files = get_files(path)
    if len(my_files) == 0:
        raise Failure(
            description="No files to process",
            metadata={
                "filepath": EventMetadata.path(path),
                "dashboard_url": EventMetadata.url("http://mycoolsite.com/failures"),
            },
        )
    return some_calculation(my_files)

# Attaching Metadata to Failures

ที่ตัว raise Failure สามารถ attach EventMetadataEntry เข้าไปได้ด้วย

@solid
def my_failure_metadata_solid():
    path = "/path/to/files"
    my_files = get_files(path)
    if len(my_files) == 0:
        raise Failure(
            description="No files to process",
            metadata={
                "filepath": EventMetadata.path(path),
                "dashboard_url": EventMetadata.url("http://mycoolsite.com/failures"),
            },
        )
    return some_calculation(my_files)

# Retry Requests

RetryRequested exception จะมีประโยชน์เมื่อเจอกับ failures ที่เป็น failures ที่ไม่คงที่บางครั้ง failures บางครั้งไม่ failures

สามารถกำหนด configure จำนวนครั้งในการลองใหม่ด้วยพารามิเตอร์ max_retries

@solid
def my_retry_solid():
    try:
        result = flaky_operation()
    except:
        raise RetryRequested(max_retries=3)
    return result

# Solid Hooks

Solid hook ให้เรากำหนดนโยบายการจัดการ success และ failure บน solids

Relevant APIs

  • @failure_hook เป็น decorator ที่กำหนด callback เมื่อ solid failure.
  • @success_hook เป็น decorator ที่กำหนด callback เมื่อ solid success.
  • HookContext context object จะใช้ใน hook function
  • build_hook_context เป็นตัวสร้าง HookContext จะทำนอก execution มีวัตถุประสงค์เพื่อใช้ในการทดสอบ hook

# Overview

@success_hook หรือ @failure_hook decorated function จะเรียกว่า solid hook ตัว Solid hook ถูกออกแบบมาเพื่อวัตถุประสงค์การใช้งานทั่วไป จะเป็นอะไรก็ได้ที่คุณอยากทำที่ solid level

การกำหนด Solid Hook

from dagster import HookContext, failure_hook, success_hook


@success_hook(required_resource_keys={"slack"})
def slack_message_on_success(context: HookContext):
    message = f"Solid {context.solid.name} finished successfully"
    context.resources.slack.chat.post_message(channel="#foo", text=message)


@failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: HookContext):
    message = f"Solid {context.solid.name} failed"
    context.resources.slack.chat.post_message(channel="#foo", text=message)

# Hook context

อย่างที่คุณอาจสังเกตเห็น hook function จะมี 1 argument คือ HookContext คุณสมบัติที่มีอยู่ในบริบทนี้คือ:

  • context.log: loggers
  • context.solid: solid ที่เกี่ยวข้องกับ hook
  • context.solid_config: config เฉพาะที่เกี่ยวข้อง solid
  • context.step: execution ขั้นตอนที่ก่อให้เกิด hook.
  • context.resources: resources ที่ hook สามารถใช้ได้

# การใช้งาน Hooks

Dagster ให้วิธีการต่างๆ ในการ trigger solid hooks

# การใช้ hook บนทุก solid ใน pipeline

ตัวอย่างเช่น คุณต้องการส่งข้อความ slack message ไปที่ channel เมื่อ solid fails ใน pipeline ในกรณีนี้เราจะเราจะใช้งาน hook ที่ pipeline ซึ่งจะเป็นการเปิดใช้งาน hook ในทุก solid instance ที่อยู่ใน pipeline

การใช้งาน hook ที่สร้างขึ้นมาในชื่อ "slack_message_on_failure" โดยวางไว้บน decorated function ที่ชื่อ "pipeline" ของ "notif_all" จากนั้น slack messages จะถูกส่งเมื่อทุก solid ที่อยู่ใน pipeline เกิดการ fails

@slack_message_on_failure
@pipeline(mode_defs=mode_defs)
def notif_all():
    # the hook "slack_message_on_failure" is applied on every solid instance within this pipeline
    a()
    b()

# การใช้งาน hook ใน solid

บางครั้งการใช้งาน pipeline จะต้องทำงาน share กับส่วนงานอื่น หรือต้องการ alert ที่ตำแหน่งงานที่มีความสำคัญสูง ดังนั้น dagster ได้มีวิธีการในการ set up hooks ใน solid instances ซึ่งช่วยให้คุณกำหนด policies ในการใช้งาน solid ได้

@pipeline(mode_defs=mode_defs)
def selective_notif():
    # only solid "a" triggers hooks: a slack message will be sent when it fails or succeeds
    a.with_hooks({slack_message_on_failure, slack_message_on_success})()
    # solid "b" won't trigger any hooks
    b()

ในกรณีนี้ "b" ที่เป็นของแข็งจะไม่ trigger hook ใดๆ ในขณะที่ "a" ที่เป็น solid ไม่ว่าจะ fails หรือ succeeds จะส่ง slack message

# Testing Hooks

คุณสามารถทดสอบการทำงานของ hook โดยเรียกใช้ข้อกำหนดของ hook โดยจะเรียกใช้ decorated function คุณสามารถสร้าง context เพื่อเตรียมเรียกใช้ โดยใช้ build_hook_context function

from dagster import build_hook_context


@success_hook(required_resource_keys={"my_conn"})
def my_success_hook(context):
    context.resources.my_conn.send("foo")


def test_my_success_hook():
    my_conn = mock.MagicMock()
    # construct HookContext with mocked ``my_conn`` resource.
    context = build_hook_context(resources={"my_conn": my_conn})

    my_success_hook(context)

    assert my_conn.send.call_count == 1

Examples

เข้าถึง failure information ใน hook ที่เกิด failure

ในหลายกรณี คุณอาจต้องการทราบรายละเอียดเกี่ยวกับ failure ของ solid คุณสามารถส่ง exception object ใน solid ที่ล้มเหลวผ่านคุณสมบัติ solid_exception บน HookContext:

from dagster import HookContext, failure_hook
import traceback


@failure_hook
def my_failure_hook(context: HookContext):
    solid_exception: BaseException = context.solid_exception
    # print stack trace of exception
    traceback.print_tb(solid_exception.__traceback__)

Patterns

Environment-specific hooks using modes

Hook ใช้ resource keys ในการเข้าถึง resources หบังจากเรียกใช้ resource key ใน set ของ required_resource_keys ในตัว hook สามารถเข้าถึง resource ผ่านทาง resources attribute ของ context object

นอกจากนี้ยังช่วยให้คุณสามารถสลับค่า resource ในโหมดต่างๆ ตัวอย่างเช่น คุณสามารถส่ง slack messages เมื่อคุณอยู่ในโหมด "prod" และ mock slack resource เมื่ออยู่ในโหมด "dev"

ในกรณีนี้ เราสามารถ mock slack_resource ใน "dev" โหมด ด้วยการใช้ helper function ResourceDefinition.hardcoded_resource() มันจึงไม่ส่ง slack messages เมื่อคุณกำลัง พัฒนา pipeline

mode_defs = [
    ModeDefinition(
        "dev",
        resource_defs={
            "slack": ResourceDefinition.hardcoded_resource(
                slack_resource_mock, "do not send messages in dev"
            )
        },
    ),
    ModeDefinition("prod", resource_defs={"slack": slack_resource}),
]

แต่เมื่อสลับไปที่ "prod" โหมด เราสามารถส่ง slack token ไปใน run_config และเปิดใช้งานการส่งข้อความไปยัง slack channel เมื่อมีการเรียกใช้ hook

resources:
  slack:
    config:
      token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token

จากนั้นเราก็สามารถ execute pipeline  ที่มี config นี้ผ่าน Python API, CLI, หรือ Dagit UI นี่คือตัวอย่างการใช้ Python API

if __name__ == "__main__":
    with open(
        file_relative_path(__file__, "prod.yaml"),
        "r",
    ) as fd:
        run_config = yaml.safe_load(fd.read())
    result = execute_pipeline(notif_all, run_config=run_config, mode="prod")

Pipeline-level hooks

เมื่อคุณเพิ่ม hook decorator ใน pipeline, hook จะเพิ่มเข้าไปใน solid แต่ละตัวมี่อยู่ใน pipeline,  hook จะไม่ได้ติดตามทุก event ที่เกิดขึ้นใน pipeline แต่จะติดตามเฉพาะ success หรือ failure ใน solid เท่านั้น

คุณอาจพบว่าจำเป็นต้องตั้งค่า pipeline-level policies เช่น คุณอาจต้องการเรียกใช้โค้ดสำหรับทุกๆ ความล้มเหลวของ pipeline

Dagster มีวิธีสร้าง sensor ที่ตอบสนองต่อ failure events ของ pipeline สามารถดูรายละเอียดได้ที่ Pipeline failure sensor ในเนื้อหา Sensors


** เนื้อหาทั้งหมดมาจาก https://docs.dagster.io/ เขียนไว้เพื่อทำความเข้าใจ เนื้อหาบางจุดใช้ google translate แปล ดังนั้นถ้าอ่านแล้วไม่เข้าใจให้ไปอ่านที่ docs ของ dagster จะดีที่สุด **