Dagster / Solids and Pipelines
# Solids and Pipelines
Solids และ Pipelines คือการสร้าง blocks ของ Dagster code. ใช้สิ่งเหล่านี้เพื่อกำหนด orchestration graphs. ส่วนนี้ครอบคลุมถึงวิธีการกำหนดและใช้งานทั้ง solids และ pipelines
Solids
Solids เป็นหน่วยหน้าที่ของงานใน Dagster ใน solid แต่ละ unit จะมีความรับผิดชอบคืออ่านข้อมูล นำเข้า ดำเนินการ และส่งออกข้อมูล ในการทำงานเมื่อมี solid หลายตัวจะมีการทำงานที่ต้องเชื่อมต่อกันด้วย Pipeline

# Solid Relevant APIs
@solid
การกำหนดว่า function ตัวไหนเป็น solid จะใช้ decorator เป็นตัวกำหนด ตัว decorated function จะเรียกว่า compute_fn ตัว decorator จะ returns SolidDefinition
InputDefinition
ตัว InputDefinition
จะกำหนด inputs ของ solid compute function สิ่งเหล่านี้ถูกกำหนดไว้ในอาร์กิวเมนต์ input_defs ของ @solid decorator
วิธีปกติในการกำหนดอินพุตคือการเพิ่มอาร์กิวเมนต์ให้กับฟังก์ชัน decorator
@solid
def my_input_solid(abc, xyz):
pass
คุณสามารถใช้ Dagster Type เพื่อจัดเตรียมฟังก์ชันที่ตรวจสอบอินพุตของ Solid ทุกครั้งที่ Solid ทำงาน ในกรณีนี้ คุณใช้ InputDefinitions ที่สอดคล้องกับอาร์กิวเมนต์ของ
MyDagsterType = DagsterType(type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType")
@solid(input_defs=[InputDefinition(name="abc", dagster_type=MyDagsterType)])
def my_typed_input_solid(abc):
pass
OutputDefinition
OutputDefinition คือตัวกำหนด outputs ของ solid compute function สิ่งเหล่านี้ถูกกำหนดไว้ในอาร์กิวเมนต์ output_defs ให้กับ @solid decorator
เมื่อคุณมีเอาต์พุตเดียว คุณสามารถคืนค่าเอาต์พุตได้โดยตรง
@solid
def my_output_solid():
return 5
เมื่อคุณมีเอาต์พุตมากกว่าหนึ่งรายการ คุณต้องให้อินสแตนซ์ของคลาสเอาต์พุตเพื่อแก้ความกำกวมระหว่างเอาต์พุต
@solid(
output_defs=[
OutputDefinition(name="first_output"),
OutputDefinition(name="second_output"),
],
)
def my_multi_output_solid():
yield Output(5, output_name="first_output")
yield Output(6, output_name="second_output")
SolidExecutionContext
SolidExecutionContext เป็น optional ที่สามารถกำหนดได้โดยใส่ไว้ใน argument ตัวแรก SolidExecutionContext จะทำให้ solid function สามารถเข้าถึง Dagster system information ตัวอย่างเช่น resources, logging, ข้อมูลพื้นฐานของ Dagster instance
ตัวอย่าง ในการ access logger และ log info message ออกมา
@solid(config_schema={"name": str})
def context_solid(context):
name = context.solid_config["name"]
context.log.info(f"My name is {name}")
SolidDefinition
SolidDefinition คือ Class ของ solids ส่วนนี้จะไม่ถูเรียกใช้ตรงๆ แต่เมื่อมีการ ใช้งาน @solid
จะมีการ return SolidDefinition
Solid Configuration
คือการ parameter ของ solid โดยใช้ config_schema ทำให้สามารถกำหนดค่าและกำหนดพารามิเตอร์ได้ สามารถดูการกำหนดค่าและคำอธิบายได้ ที่ Config Schema
ดังนั้น การกำหนดค่า solid สามารถใช้เพื่อระบุลักษณะการทำงานที่เป็นของ solid ในขณะใช้งานจริง ทำให้ solid มีความยืดหยุ่นและนำกลับมาใช้ใหม่ได้
ตัวอย่างเช่น เราสามารถกำหนด solid ให้ไปรับค่า API endpoint โดยผ่านการกำหนดค่าใน config
@solid(config_schema={"api_endpoint": str})
def my_configurable_solid(context):
api_endpoint = context.solid_config["api_endpoint"]
data = requests.get(f"{api_endpoint}/data").json()
return data
Patterns
Solid Factory
เป็น Pattern ในการสร้างตัวช่วยสร้าง generate solids
def x_solid(
arg,
name="default_name",
input_defs=None,
**kwargs,
):
"""
Args:
args (any): One or more arguments used to generate the nwe solid
name (str): The name of the new solid.
input_defs (list[InputDefinition]): Any input definitions for the new solid. Default: None.
Returns:
function: The new solid.
"""
@solid(name=name, input_defs=input_defs or [InputDefinition("start", Nothing)], **kwargs)
def _x_solid(context):
# Solid logic here
pass
return _x_solid
Pipeline
pipeline คือ set ของ solid ที่มีส่วนที่ต้องพึงพาข้อมูลของ solid อื่นๆ ร่วมกันเช่น solid C ต้องมีการใช้ข้อมูลของ solid A และ solid B

# Pipeline Relevant APIs
@pipeline
การกำหนด pipeline จะใช้ decorator function @pipeline
PipelineDefinition
เป็น class สำหรับ Pipeline ไม่จำเป็นต้อง initialize เพื่อใช้งาน class นี้โดยตรง แต่เมื่อมีการใช้งาน @pipeline
จะถูก return ด้วย PipelineDefinition
ModeDefinition
โหมดช่วยให้คุณเปลี่ยนพฤติกรรมไปป์ไลน์ระหว่างสภาพแวดล้อมการใช้งานที่แตกต่างกัน สำหรับข้อมูลเพิ่มเติม โปรดดูส่วนโหมด
PresetDefinition
Presets อนุญาตให้คุณกำหนดการกำหนดค่าไปป์ไลน์ล่วงหน้า
การกำหนด pipeline
ใน decorated function body เราใช้การเรียกฟังก์ชันเพื่อระบุโครงสร้างระหว่าง solids และ pipeline
ในตัวอย่างนี้ add_one
solid จะอาศัยผลลัพของ return_one solid เนื่องจากมีการพึ่งพาข้อมูลนี้อยู่ จะเห็นว่า add_one
solid จะดำเนินการหลังจาก return_one ประมวลผลเสร็จและส่งผลลัพออกมา
@solid
def return_one(context):
return 1
@solid
def add_one(context, number: int):
return number + 1
@pipeline
def one_plus_one_pipeline():
add_one(return_one())
Aliases and Tags
Solid aliases
คุณสามารถใช้ solid definition ดียวกันได้หลายครั้งใน pipeline เดียวกัน
@pipeline
def multiple_usage_pipeline():
add_one(add_one(return_one()))
ในการแยกความแตกต่างระหว่างการเรียกใช้สองรายการของ add_one Dagster จะใช้ aliases ชื่อของ solid เป็น add_one และ add_one_2 โดยอัตโนมัติ
คุณยังสามารถกำหนด alias ได้โดยใช้ .alias method ในการเรียกใช้ solid
@pipeline
def alias_pipeline():
add_one.alias("second_addition")(add_one(return_one()))
Solid Tags
ส่วนนี้จะคล้ายกับ alias คุณยังสามารถกำหนด solid tags ในการเรียกใช้ solid ได้ด้วย
@pipeline
def tag_pipeline():
add_one.tag({"my_tag": "my_value"})(add_one(return_one()))
Pipeline Configuration
Pipeline Modes
การกำหนด mode ของ Pipeline จะกำหนดโดยระบุ ModeDefinitions
แล้วเรียกใช้ต่อไปใน pipeline สำหรับข้อมูลเพิ่มเติมเกี่ยวกับ Modes ดูเพิ่มเติมที่นี่
dev_mode = ModeDefinition("dev")
staging_mode = ModeDefinition("staging")
prod_mode = ModeDefinition("prod")
@pipeline(mode_defs=[dev_mode, staging_mode, prod_mode])
def my_modes_pipeline():
my_solid()
Pipeline Presets
คุณสามารถกำหนดการกำหนดค่าล่วงหน้าก่อน Pipeline จะดำเนินการได้ PresetDefinition
ช่วยให้คุณทำได้โดยการระบุค่าการกำหนดค่า ณ เวลาที่กำหนด Pipeline
@pipeline(
preset_defs=[
PresetDefinition(
name="one",
run_config={"solids": {"add_one": {"inputs": {"number": 1}}}},
),
PresetDefinition(
name="two",
run_config={"solids": {"add_one": {"inputs": {"number": 2}}}},
),
]
)
def my_presets_pipeline():
add_one()
ใน Dagit คุณสามารถเลือกและ load preset ใน Playground

ในการ run pipeline ใน script หรือในการ test คุณสามารถกำหนดชื่อ preset ทาง preset
argument ใน Python API
def run_pipeline():
execute_pipeline(my_presets_pipeline, preset="one")
หรือจะกำหนดใน Dagster CLI dagster pipeline execute --preset
ในการ run pipeline แบบระบุชื่อ preset name
dagster pipeline execute my_presets_pipeline --preset one
Pipeline Tags
pipeline สามารถระบุชุดของ tag ที่ถูกตั้งค่าโดยอัตโนมัติบนผลลัพธ์ในการ run pipeline
@pipeline(tags={"my_tag": "my_value"})
def my_tags_pipeline():
my_solid()
การกำหนด tag my_tag ใน pipeline การเรียกใช้ pipelineใดๆ ที่สร้างโดยใช้ pipeline นี้จะมีแท็กเดียวกันด้วย
Pipeline Examples
มี DAG structures หลายแบบที่สามารถแสดงโดยใช้ pipelines ส่วนนี้จะครอบคลุมรูปแบบพื้นฐานสองถึงสามแบบที่คุณสามารถใช้เพื่อสร้าง pipelines ที่ซับซ้อนมากขึ้น
Linear Dependencies
โครงสร้าง pipeline แบบที่ง่ายที่สุดคือ linear pipeline จะมีการ return 1 output จาก root solid และส่งผ่านข้อมูลผ่านอินพุตและเอาต์พุตเดี่ยว

from dagster import pipeline, solid
@solid
def return_one(context) -> int:
return 1
@solid
def add_one(context, number: int) -> int:
return number + 1
@pipeline
def linear_pipeline():
add_one(add_one(add_one(return_one())))
Multiple Inputs
ตัว solid ที่มี single output สามารถส่งค่าไปให้ได้หลาย solid inputs ในตัวอย่างนี้ output จาก solid ตัวแรกจะส่งออกไปที่ solid 2 ตัวที่ต่างกัน จากนั้น outputs ที่ออกมาจะถูกนำมารวมกันและส่งไปที่ solid ตัวสุดท้าย

from dagster import pipeline, solid
@solid
def return_one(context) -> int:
return 1
@solid
def add_one(context, number: int):
return number + 1
@solid
def adder(context, a: int, b: int) -> int:
return a + b
@pipeline
def inputs_and_outputs_pipeline():
value = return_one()
a = add_one(value)
b = add_one(value)
adder(a, b)
Conditional Branching
solid จะเริ่มทำงานเมื่อมี input เข้ามาแล้วเท่านั้น ซึ่งเราสามารถใช้ behavior นี้มากำหนดให้ solid ทำงานแบบมีเงื่อนไขได้

ในตัวอย่างนี้ ผลลัพธ์ของ branching_solid จะถูกส่งไปที่ตัว branch_1 หรือ branch_2 ตัวใดตัวหนึ่ง ขึ้นอยู่กับผลลัพธ์ที่ได้ของ branching_solid
import random
from dagster import Output, OutputDefinition, pipeline, solid
@solid(
output_defs=[
OutputDefinition(name="branch_1", is_required=False),
OutputDefinition(name="branch_2", is_required=False),
]
)
def branching_solid():
num = random.randint(0, 1)
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@solid
def branch_1_solid(_input):
pass
@solid
def branch_2_solid(_input):
pass
@pipeline
def branching_pipeline():
branch_1, branch_2 = branching_solid()
branch_1_solid(branch_1)
branch_2_solid(branch_2)
Fixed Fan-in Pipeline
หากคุณมี fixed set of solids โดยที่ solids จะ return type ออกมาแบบเดียวกัน คุณมาสามารถเก็บรวบรวม outputs ทั้งหมดมาเก็บไว้ใน list และส่งไปที่ solid ตัวเดียวได้

solid ปลายทางจะทำงานก็ต่อเมื่อเอาต์พุตทั้งหมดสร้างสำเร็จโดย solid ต้นทาง
from typing import List
from dagster import pipeline, solid
@solid
def return_one() -> int:
return 1
@solid
def sum_fan_in(nums: List[int]) -> int:
return sum(nums)
@pipeline
def fan_in_pipeline():
fan_outs = []
for i in range(0, 10):
fan_outs.append(return_one.alias(f"return_one_{i}")())
sum_fan_in(fan_outs)
ในตัวอย่างนี้ จะมี 10 solids ซึ่งทั้งหมดจะมี output ที่มีค่าเป็นเลข 1 ส่งออกไปเก็บไว้ใน list เมื่อทำงานครบแล้ว sum_fan_in จะรับค่าและนำไป sum ค่าทั้งหมด
# Dynamic Mapping & Collect
ในหลายกรณี มี structure ของ pipeline ที่ถูกกำหนดไว้ล่วงหน้าก่อน execution แต่ Dagster รองรับการสร้าง pipeline ที่ไม่ได้กำหนดโครงสร้างสุดท้ายจนกว่าจะรันไทม์ สิ่งนี้มีประโยชน์สำหรับโครงสร้าง pipeline ที่คุณต้องการรัน instance แยกจากกันของ solid สำหรับแต่ละรายการในเอาต์พุตที่แน่นอน
ในตัวอย่างนี้ เรามี solid files_in_directory ที่กำหนด DynamicOutputDefinition
เรา map ผ่านเอาต์พุตแบบไดนามิกซึ่งจะทำให้ downstream dependencies ถูกโคลนสำหรับ DynamicOutput
แต่ละรายการที่ให้ผลลัพธ์ ตัว copy ของ downstream สามารถระบุได้โดย mapping_key ที่ส่งค่าออกมาเป็น DynamicOutput
เมื่อเสร็จทั้งหมดแล้ว เราจะได้ผลลัพธ์ทั้งหมดของ process_file และส่งต่อไปที่ summarize_directory
import os
from typing import List
from dagster import DynamicOutput, DynamicOutputDefinition, Field, pipeline, solid
from dagster.utils import file_relative_path
@solid(
config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))},
output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
path = context.solid_config["path"]
dirname, _, filenames = next(os.walk(path))
for file in filenames:
yield DynamicOutput(
value=os.path.join(dirname, file),
# create a mapping key from the file name
mapping_key=file.replace(".", "_").replace("-", "_"),
)
@solid
def process_file(path: str) -> int:
# simple example of calculating size
return os.path.getsize(path)
@solid
def summarize_directory(sizes: List[int]) -> int:
# simple example of totalling sizes
return sum(sizes)
@pipeline
def process_directory():
file_results = files_in_directory().map(process_file)
summarize_directory(file_results.collect())
Order-based Dependencies (Nothing dependencies)
ปกติแล้วเส้นทางการทำงานใน Dagster จะพึ่งพาข้อมูลเป็นหลัก การพึ่งพาข้อมูลหมายถึงแต่ละ input ของ solid ขึ้นอยู่กับ output ที่ออกมาของ solid
ถ้าคุณมี solid ที่ชื่อ Solid A ที่ไม่ขึ้นกับ output ของ solid อื่น ที่ Solid B ในทางทฤษฎีไม่ควรมีเหตุผลให้ Solid A ทำงานหลังจาก Solid B ในกรณีส่วนใหญ่ ทั้ง 2 solid ควรจะทำงานขนานกัน อย่างไรก็ตาม ในบางกรณีจำเป็นที่จะต้องมีลำดับการทำงานที่ชัดเจน
ถ้าคุณต้องการ model ลำดับการทำงานที่ชัดเจน คุณสามารถใช้ Dagster type ที่ชื่อ Nothing
ในการกำหนด input definition ที่ตัว solid ที่ Dagster type "nothing" นี้จะบอกตัว solid ว่าไม่ต้องส่งอะไรออกไประหว่าง solid
from dagster import InputDefinition, Nothing, pipeline, solid
@solid
def create_table_1():
get_database_connection().execute("create table_1 as select * from some_source_table")
@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2():
get_database_connection().execute("create table_2 as select * from table_1")
@pipeline
def nothing_dependency_pipeline():
create_table_2(start=create_table_1())
Nothing
type inputs จะไม่มี parameter ใน function เพราะไม่มีการส่งผ่านข้อมูล เมื่อเชื่อมต่อ dependencies แนะนำให้ใช้ keyword args เพื่อป้องกันการปะปนกับตำแหน่งอื่นๆ
Dagster ยังจัดเตรียม advanced abstractions เพื่อจัดการกับ dependencies และ IO ถ้าต้องการหา model อื่นๆ เพิ่มเติมเพื่อใช้แก้ปัญหาในการใช้ external storages, ดูต่อที่ IOManagers.
Patterns
# Constructing PipelineDefinitions
ในกรณีที่ต้องการกำหนด PipelineDefinition
object โดยตรงให้กำหนด
- pipeline name เช่น
name="one_plus_one_pipeline" - list ของ solid definitions เช่น
solid_defs=[return_one, add_one] - dependency structure เช่น
dependencies={"add_one":{"number":DependencyDefinition("return_one")}}
dependency structure จะประกาศ dependency ของ input ของ Solid แต่ละตัวบน output ของ Solid ตัวอื่นที่อยู่ใน pipeline ที่ key ตัวนอกสุดของ dependency dictionary คือ string ชื่อของ solid ถ้าคุณใช้ solid aliases ดูให้แน่ใจว่ามี solid aliases อยู่จริงๆ ส่วน value ของ dependency dictionary จะเป็น dictionary ที่มีการ map ชื่อ input ด้วย DependencyDefinition
one_plus_one_pipeline_def = PipelineDefinition(
name="one_plus_one_pipeline",
solid_defs=[return_one, add_one],
dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)
# Pipeline DSL
ถ้าต้องการสร้าง pipeline ด้วย YAML file ประโยชน์ของการสร้างแบบนี้คือ สะดวกในการย้ายไปยัง Dagster workflow อื่น
ตัวอย่างเช่น
pipeline:
name: some_example
description: blah blah blah
solids:
- def: add_one
alias: A
- def: add_one
alias: B
deps:
num:
solid: A
- def: add_two
alias: C
deps:
num:
solid: A
- def: subtract
deps:
left:
solid: B
right:
solid: C
คุณสามารถสร้าง PipelineDefinition จาก YAML นี้:
@solid
def add_one(num: int) -> int:
return num + 1
@solid
def add_two(num: int) -> int:
return num + 2
@solid
def subtract(left: int, right: int) -> int:
return left + right
def construct_pipeline_with_yaml(yaml_file, solid_defs):
yaml_data = load_yaml_from_path(yaml_file)
solid_def_dict = {s.name: s for s in solid_defs}
deps = {}
for solid_yaml_data in yaml_data["pipeline"]["solids"]:
check.invariant(solid_yaml_data["def"] in solid_def_dict)
def_name = solid_yaml_data["def"]
alias = solid_yaml_data.get("alias", def_name)
solid_deps_entry = {}
for input_name, input_data in solid_yaml_data.get("deps", {}).items():
solid_deps_entry[input_name] = DependencyDefinition(
solid=input_data["solid"], output=input_data.get("output", "result")
)
deps[SolidInvocation(name=def_name, alias=alias)] = solid_deps_entry
return PipelineDefinition(
name=yaml_data["pipeline"]["name"],
description=yaml_data["pipeline"].get("description"),
solid_defs=solid_defs,
dependencies=deps,
)
def define_dep_dsl_pipeline():
return construct_pipeline_with_yaml(
file_relative_path(__file__, "example.yaml"), [add_one, add_two, subtract]
)
@repository
def define_repository():
return {"pipelines": {"some_example": define_dep_dsl_pipeline}}
# Pipeline Execution
Dagster ให้ทางเลือกหลายทางในการ execute pipelines
Relevant APIs
execute_pipeline
วิธีการในการ execute a pipeline แบบซิงโครนัส โดยทั่วไปจะใช้วิธี running scripts หรือ testing
Overview
มีหลายวิธีในการ execute pipelines ส่วนนี้จะอธิบายหลายวิธีการแต่ละแบบในการ execute pipeline : Dagit, Dagster CLI, หรือ Python APIs.
คุณยังสามารถ launch pipelines ด้วยวิธีอื่น:
- Schedules สามารถใช้เพื่อเปิดรันในช่วงเวลาที่กำหนดได้
- Sensors อนุญาตให้คุณเรียกใช้งานตามการเปลี่ยนแปลงสถานะภายนอก
Executing a Pipeline
from dagster import pipeline, solid
@solid
def return_one():
return 1
@solid
def add_two(i: int):
return i + 2
@solid
def multi_three(i: int):
return i * 3
@pipeline
def my_pipeline():
multi_three(add_two(return_one()))
Dagit
Dagster มาพร้อมกับอินเทอร์เฟซบนเว็บสำหรับการดูและโต้ตอบกับ pipeline และวัตถุประสงค์อื่นๆ
การแสดง pipeline ใน Dagit คุณสามารถใช้ dagit
command:
dagit -f my_pipeline.py
จากนั้นไปที่ http://localhost:3000 เพื่อเริ่มใช้ Dagit:

คลิกที่ "Playground" tab แล้วกดที่ "Launch Execution" button ในการ execute the pipeline แล้วคุณจะเห็น Dagit เปิดการทำงาน pipeline:

Dagit Playground ยังมีตัวแก้ไขการกำหนดค่าเพื่อให้คุณสร้างการกำหนดค่าแบบโต้ตอบได้ ดูรายละเอียดใน Dagit
Dagster CLI
dagster CLI จะรวมทั้ง dagster pipeline execute
สำหรับ direct execution และ dagster pipeline launch
สำหรับเปิดการทำงานแบบอะซิงโครนัสที่ใช้กับ run launcher ที่ instance ของคุณ
ในการ execute pipeline ของคุณโดยตรง คุณสามารถเรียกใช้:
dagster pipeline execute -f my_pipeline.py
Python APIs
Dagster มี Python API สำหรับการดำเนินการที่เป็นประโยชน์เมื่อเขียน tests หรือเขียน scripts
method execute_pipeline
จะทำหน้าที่ executes pipeline และ return PipelineExecutionResult
.
from dagster import execute_pipeline
if __name__ == "__main__":
result = execute_pipeline(my_pipeline)
คุณสามารถค้นหาเอกสาร API ฉบับเต็มได้ใน Execution API และเรียนรู้เพิ่มเติมเกี่ยวกับกรณีการใช้งานการทดสอบใน Testing.
Executing Pipeline Subset
Dagster รองรับวิธี run subset ของ pipeline เรียกว่า Solid Selection
Solid Selection Syntax
เพื่อระบุ solid selection, Dagster มี query syntax พื้นฐาน มันทำงานดังนี้:
- query จะประกอบไปด้วย list
- แต่ละส่วนใน query สามารถเป็นชื่อของ solid ซึ่งในกรณีนี้จะเป็นการ select solid ตรงๆ
- แต่ละส่วนใน query สามารถชื่อของ solid ที่นำหน้าด้วย * ในกรณีนี้ solid upstream dependencies จะถูก select ทั้งหมด
- แต่ละส่วนใน query สามารถชื่อของ solid ที่ต่อท้ายด้วย * ในกรณีนี้ solid downstream dependencies จะถูก select ทั้งหมด
- แต่ละส่วนใน query สามารถชื่อของ solid ที่ต่อท้ายด้วย + ที่มีได้หลายตัว ในกรณีนี้จะเป็นการ select solid จะกระโดดไปตามจำนวน +
- แต่ละส่วนใน query สามารถชื่อของ solid ที่นำหน้าด้วย + ในกรณีนี้จะเป็นการ select solid จะกระโดดไปที่ parents
ถามว่างงมั้ย งง ถ้างงไปอ่านที่นี่แล้วทดลองเอง
Clause examples
some_solid
: select "some_solid" itself*some_solid
: select "some_solid" and all ancestors (upstream dependencies).some_solid*
: select "some_solid" and all descendants (downstream dependencies).*some_solid*
: select "some_solid" and all of its ancestors and descendants.+some_solid
: select "some_solid" and its direct parents.some_solid+++
: select "some_solid" and its children, its children's children, and its children's children's children.
Specifying Solid Selection
คุณสามารถใช้ syntax selection นี้ในอาร์กิวเมนต์ solid_selection ใน execute_pipeline
:
execute_pipeline(my_pipeline, solid_selection=["*add_two"])
ในทำนองเดียวกัน คุณสามารถใช้ solid selection ได้เหมือนกันใน Dagit Playground:

Pipeline Execution: Examples
Multiprocessing Execution
คุณอาจต้องการ execute solids ในแบบ parallel สามารถทำได้โดย multiprocess executor, ซึ่งจะ execute แต่ละ solid ใน sub process ตัวของมันเอง
multiprocess executor มีอยู่ใน default mode คุณสามารถใช้โดยระบุใน config
execution:
multiprocess:
สังเกตว่าเพราะ solid จะ execute process แบบแยกการทำงาน ค่าที่ส่งผ่านระหว่าง solids ยังต้องจัดเก็บในลักษณะที่สามารถเข้าถึงได้ข้าม process ตัวอย่างเช่น คุณสามารถกำหนดค่า fs_io_manager
เป็น IO manager ใน pipeline ทั้งหมด เป็นค่าเดียวกัน
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def parallel_pipeline():
total(return_one(), return_one(), return_one(), return_one())
ดูเพิ่มเติมสำหรับ IO Managers เกี่ยวกับการกำหนดค่า IO Managers
นอกจากนี้ การใช้ multiprocess executor จะมีส่วนร่วม reconstructing pipeline ใน process อื่น ดังนั้น Python APIs จึงจำเป็นที่จะต้องมีความสามารถ reconstruct instance ของ pipeline ได้ งานส่วนนี้จะได้รับการจัดการสำหรับคุณเมื่อใช้ Dagit หรือ Dagster CLI
ในทำนองเดียวกัน Python APIs ใช้ DagsterInstance ชั่วคราวโดยค่าเริ่มต้นเพื่อหลีกเลี่ยงการรายงานการทดสอบรันไปยัง instance เมื่อใช้ Python API สำหรับ production run ให้ตั้งค่าอินสแตนซ์โดยใช้ instance=DagsterInstance.get() ในการ config ค่า default สำหรับ loading behavior แต่ถ้าใช้งาน Dagit หรือ Dagster CLI ระบบจะจัดการให้เอง
ดังนั้นถ้าคุณ executing pipeline โดยใช้ Python APIs การ execution method ของคุณจะเป็นลักษณะนี้
def execute_multiprocessing():
from dagster import reconstructable, DagsterInstance
execute_pipeline(
# A ReconstructablePipeline is necessary to load the pipeline in child processes.
# reconstructable() is a utility function that captures where the
# PipelineDefinition came from.
reconstructable(parallel_pipeline),
run_config={
# This section controls how the run will be executed.
# The multiprocess executor runs each solid in its own sub process.
"execution": {"multiprocess": {}},
},
# The default instance for this API is an in memory ephemeral one.
# To allow the multiple processes to coordinate we use one here
# backed by a temporary directory.
instance=DagsterInstance.local_temp(),
)
# Composite Solids
Dagster ได้เตรียม Composite Solids, ซึ่งก็คือ unit ของ abstraction สำหรับการสร้าง solid จาก solid อื่นๆ
Relevant APIs
@composite_solid
การกำหนด composite solid จะใช้ decorator function
Overview
Solid จะเชื่อมโยงเข้าด้วยกันโดยกำหนด dependencies ระหว่างอินพุตและเอาต์พุต การกำหนดความเชื่อมโยงกันมักจะทำที่ระดับ Pipeline level
Composite solids ยังช่วยให้คุณกำหนด Solid ที่เชื่อมต่อกัน เพื่อสร้าง Solid ใหม่ สิ่งนี้มีประโยชน์สำหรับ:
- การจัดระเบียบ graph ขนาดใหญ่หรือ graph ที่ซับซ้อน
- ขจัดความซับซ้อน (Abstracting away complexity)
- การ Wrapping solids แล้วนำมา re-usable โดยกำหนดข้อมูลใหม่ (domain-specific information)
การ Refactoring DAG ของ solids โดยใช้ composites เป็นไปในลักษณะที่คล้ายคลึงกันมากกับการปรับโครงสร้างโค้ดด้วยฟังก์ชัน การกำหนด composite solid คล้ายกับการกำหนด pipeline, แต่ composite solids สามารถกำหนด mapping information เพื่อควบคุมข้อมูล กำหนดค่าการเข้าและออก ภายใน graph ของ solid
Defining a Composite Solid
ในการกำหนด composite solid จะใช้ @composite_solid
decorator เราจะใช้การเรียก function ภายใน decorated function body เพื่อระบุโครงสร้าง dependency ระหว่าง solids ที่ประกอบเป็น composite solid
ในตัวอย่างนี้ add_one
solid จะอาศัย output ของ return_one solid เนื่องจากมีการพึ่งพาข้อมูลนี้อยู่ return_one solid จะดำเนินการหลังจาก add_one ทำงานสำเร็จและปล่อยเอาต์พุตที่ต้องการออกมา
from dagster import InputDefinition, composite_solid, pipeline, solid
@solid
def return_one():
return 1
@solid
def add_one(number: int):
return number + 1
@solid
def multiply_by_three(number: int):
return number * 3
@composite_solid(input_defs=[InputDefinition("number", int)])
def add_one_times_three_solid(number):
return multiply_by_three(add_one(number))
@pipeline
def my_pipeline():
add_one_times_three_solid(return_one())
Composite Solid Configuration
โดยค่าเริ่มต้น การ config schema สำหรับ solid แต่ละตัวใน composite solid ถูกยกขึ้นไปที่ composite solid เอง ภายใต้ solid
key
ในตัวอย่างนี้ คุณมี solids สองตัวที่ทั้งคู่ใช้ config และถูกหุ้มด้วย composite solid
@solid(config_schema={"n": int})
def add_n(context, number: int):
return number + context.solid_config["n"]
@solid(config_schema={"m": int})
def multiply_by_m(context, number: int):
return number * context.solid_config["m"]
@composite_solid(input_defs=[InputDefinition("number", int)])
def add_n_times_m_solid(number):
return multiply_by_m(add_n(number))
ในการ run pipeline ด้วย composite solid คุณจะต้องระบุการกำหนดค่าสำหรับทั้ง add_n
และ multiply_by_m
ผ่าน composite solid:
solids:
add_n_times_m_solid:
inputs:
number: 0
solids:
add_n:
config:
n: 3
multiply_by_m:
config:
m: 2
Configuration Mapping
Composite solid ยังสามารถกำหนด config schema เมื่อ Composite solid กำหนด config schema มันจะต้องกำหนด config_mapping_fn ที่จะถูกนำไป map ใน composite solids config และจะนำไป wrapped solids' config
def config_mapping_fn(config):
x = config["x"]
return {"add_n": {"config": {"n": x}}, "multiply_by_m": {"config": {"m": x}}}
@composite_solid(
config_fn=config_mapping_fn,
config_schema={"x": int},
input_defs=[InputDefinition("number", int)],
)
def add_x_multiply_by_x(number):
return multiply_by_m(add_n(number))
@pipeline
def my_pipeline_config_mapping():
add_x_multiply_by_x(return_one())
ในตัวอย่างนี้ composite solid จะมีเพียงหนึ่งฟิลด์ใน config schema : x
. config mapping function นำการกำหนดค่าที่จัดเตรียมไว้ให้กับ composite solid และแมปกับ wrapped solids
solids:
add_x_multiply_by_x:
config:
x: 1
Examples
Multiple Outputs
เมื่อมี Multiple Outputs จาก composite solid, คุณจะต้องกำหนดผลลัพธ์ที่ maps และ return dictionary โดยที่ keys คือชื่อเอาต์พุตและ values คือค่าเอาต์พุต
from dagster import OutputDefinition
@solid
def echo(i):
print(i)
@solid
def one() -> int:
return 1
@solid
def hello() -> str:
return "hello"
@composite_solid(output_defs=[OutputDefinition(int, "x"), OutputDefinition(str, "y")])
def composite_multi_outputs():
x = one()
y = hello()
return {"x": x, "y": y}
@pipeline
def my_pipeline_multi_outputs():
x, y = composite_multi_outputs()
echo(x)
echo(y)
# Solid Events and Exceptions
ภายใน body ของ solid มีความเป็นไปได้ที่จะสื่อสารกับ Dagster framework ไม่ว่าจะเกิด event หรือเกิด exception เนื้อหานี้จะอธิบายความเป็นไปได้ต่างๆ เหล่านี้และสถานการณ์ที่คุณอาจจะได้ใช้งาน
Relevant APIs
Output
Dagster event used to yield an output from a solidAssetMaterialization
Dagster event indicating that a solid has materialized an asset.ExpectationResult
Dagster event representing the result of a data quality checkFailure
Dagster exception indicating that a failure has occurredRetryRequested
Dagster exception requesting the step to be retried
Overview
ภายในตัว solid จะมี โครงสร้างของ events อยู่หลายแบบ ประกอบด้วย types หลายประเภท กิจกรรมเหล่านี้จะถูกประมวลผลโดย Dagster และบันทึกไว้ใน event log พร้อมกับบริบทเพิ่มเติมเกี่ยวกับ solid ที่ปล่อยออกมา
นอกจากนี้ยังเป็น exceptions ของ Dagster ได้อีกด้วย ซึ่งใน exceptions แต่ละแบบจะบ่งบอกรายละเอียดที่ทำให้ framework หยุดทำงานและ action อื่นๆ ที่เกิดขึ้น
Event Metadata
บางครั้งเมื่อเราแนบข้อมูลบางอย่างไปกับ event หรือ exception ที่ไม่ตรงกับ basic parameters ผ่านไปยัง EventMetadataEntry
object ใน dagster จะมี interface สำหรับการระบุข้อมูล metadata นี้เพื่อรองรับ events หลายๆ แบบ โดยขึ้นอยู่กับประเภทของข้อมูล
AssetMaterialization
, ExpectationResult
, และ Failure
objects แต่ละตัวจะยอมรับ metadata
parameter ซึ่ง maps string labels เข้ากับ structured values. ส่วนเอาต์พุตยังยอมรับ parameter นี้ด้วย แม้ว่าฟังก์ชันนี้กำลังอยู่ในขั้นทดลองอาจเปลี่ยนแปลงได้ในอนาคต
dagster ยังรองรับ metadata types ที่มีประโยชน์หลายแบบ รวมถึงประเภทข้อมูลพื้นฐาน (EventMetadataEntry.float
, EventMetadataEntry.int
, EventMetadataEntry.text
) รวมไปถึงข้อมูลที่ซับซ้อนเช่น markdown และ json (EventMetadataEntry.md
, EventMetadataEntry.json
)
ลองดู Docs API ของ EventMetadataEntry
สำหรับรายละเอียดเพิ่มเติม
# Events
ผลลัพธ์ของอีเวนท์ภายในตัว solid คือทางที่ใช้ในการสื่อสารกับ Dagster framework event ที่สำคัญที่สุดต่อการทำงานของ Dagster คือ Output
ของ event ซึ่งช่วยให้สามารถส่งข้อมูลเอาต์พุตจาก solid หนึ่งไปยังอีกอันหนึ่งได้ อย่างไรก็ตาม Dagster ยังจัดเตรียม interfaces สำหรับสื่อสารกับ external assets และการตรวจสอบคุณภาพของข้อมูลในระหว่างที่ solid กำลังทำงานไว้ด้วย
Outputs
เนื่องจากการ return ค่าจาก solid เป็นส่วนพื้นฐานของการสร้าง data pipeline ตัว Dagster จะมี interfaces ที่ช่วยให้การเขียน code ส่วนนี้ง่ายขึ้นเล็กน้อย
สำหรับ solid ที่มี single output ก็สามารถ return ค่าออกมาตรงๆ จาก compute_fn ซึ่งจะ convert ออกมาเป็น Dagster Output
event โดยชื่อ output default คือ result
@solid
def my_simple_return_solid(context):
return 1
ตัวอย่างข้างบนอาจเป็นวิธีที่ง่ายที่สุดในการ return ค่าจาก function แต่เมื่อคุณมี outputs หลายรายการที่กำหนดไว้ใน solid ของคุณ หรือต้องการแสดงข้อมูลเพิ่มเติมที่ไม่ใช่เอาต์พุตจากเนื้อความของ solid ของคุณ การ return ค่าที่กำหนดเองไม่ใช่ตัวเลือกอีกต่อไป ในกรณีนี้ คุณจะต้องกำหนดค่า Output
events เอง โดยสารมารถทำได้ตามตัวอย่างนี้
@solid
def my_simple_yield_solid(context):
yield Output(1)
หรือถ้าต้องการกำหนดค่า output name เองที่ไม่ใช่ result ทำได้ตามตัวอย่างนี้
@solid(
output_defs=[
OutputDefinition(name="my_output"),
]
)
def my_named_yield_solid(context):
yield Output(1, output_name="my_output")
เอกสารเพิ่มเติม Solid Outputs
# Attaching Metadata to Outputs (Experimental)
ถ้ามีข้อมูลเพิ่มเติมที่ต้องการเพิ่มเข้าไปในเอาท์พุตที่ต้องการส่งออกไป คุณอาจเลือกที่จะแสดงข้อมูลนั้นโดยส่งผ่าน parameter ที่ชื่อ metadata สามารถทำได้โดย map key/value ที่ชื่อ metadata
@solid
def my_metadata_output(context):
df = get_some_data()
yield Output(
df,
metadata={
"text_metadata": "Text-based metadata for this event",
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
"raw_count": len(df),
"size (bytes)": calculate_bytes(df),
},
)
# Asset Materializations
AssetMaterialization
events จะเป็นตัวบอก Dagster ว่าคุณจะเขียข้อมูลบางส่วนและส่งออกไปยังระบบภายนอก (external system) ตัวอย่างที่ใช้ประจำก็อย่างเช่นเขียนข้อมูลลง database
โดยทั่วไปแล้วคุณต้องการส่งอีเว้นออกไปตรงตรงที่ระบบภายนอก (external system) อีเว้น AssetMaterialization
ที่สร้างขึ้นจะต้องกำหนด asset_key ซึ้งจะเป็น key ที่ใช้ระบุตัวตนและการมีอยู่ของ asset
@solid
def my_asset_solid(context):
df = get_some_data()
store_to_s3(df)
yield AssetMaterialization(
asset_key="s3.my_asset",
description="A df I stored in s3",
)
result = do_some_transform(df)
yield Output(result)
หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับ assets และเมื่อมีการใช้ event จะเกิดอะไรต่อไป ดูเพิ่มเติมได้ที่เอกสาร Asset Catalog
# Attaching Metadata to Asset Materializations
Attaching metadata ใน Asset Materializations เป็น ส่วนสำคัญในการติดตามการทำงานเมื่อเวลาผ่านไป การทำงานของ functions นี้โดยพื้นฐานแล้วจะเหมือนกับ events อื่นๆ ที่ยอมรับ metadata
parameter ทำให้คุณสามารถแนบ labels และ values ที่มีโครงสร้างที่จะแสดงได้
@solid
def my_metadata_materialization_solid(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": EventMetadata.path(remote_storage_path),
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
"size (bytes)": calculate_bytes(df),
},
)
yield Output(remote_storage_path)
# Expectation Results
Solid สามารถปล่อย event ที่มีโครงสร้างเพื่อแสดงผลลัพธ์ของการทดสอบคุณภาพข้อมูล โดย data quality event class คือ ExpectationResult
เพื่อสร้างผลลัพธ์ที่คาดหวัง เราสามารถให้ผลลัพ ExpectationResult event ใน solid ของเรา
@solid
def my_expectation_solid(context, df):
do_some_transform(df)
yield ExpectationResult(success=len(df) > 0, description="ensure dataframe has rows")
yield Output(df)
# Attaching Metadata to Expectation Results
เหมือนกับ event types อื่นๆ ใน Dagster มีหลากหลาย type ใน metadata ที่เกี่ยวข้องกับ expectation result event ผ่าน EventMetadataEntry
class แต่ละ event ที่เกินขึ้นที่มีใน metadata จะแสดงใน event log
ตัวอย่างนี้แสดงให้เห็นรายการ metadata ที่มี type ที่ต่างกันถูกแนบเข้ามาที่ expectation result:
@solid
def my_metadata_expectation_solid(context, df):
df = do_some_transform(df)
yield ExpectationResult(
success=len(df) > 0,
description="ensure dataframe has rows",
metadata={
"text_metadata": "Text-based metadata for this event",
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
"raw_count": len(df),
"size (bytes)": calculate_bytes(df),
},
)
yield Output(df)
# Exceptions
Dagster มี exception class เอาไว้ให้ใช้ในตอน execution solid ได้ behaviorที่จะเกิดหลังจากเกิด exception ขึ้นอยู่กับ exception ที่คุณใช้ รายละเอียด exception จะมีตามนี้
# Failures
Failure
exception จะใช้ในกรณีที่เกิดปัญหาและต้องการให้หยุดการ execution สิ่งนี้อาจมีประโยชน์หาก ในตัว solid สามารถ detect issue ที่เกิดขึ้น แต่ตัว issue ไม่ทำให้เกิด error ขึ้นมา แต่ยังคงสร้างปัญหาหากยังดำเนินการต่อไป
@solid
def my_failure_solid():
path = "/path/to/files"
my_files = get_files(path)
if len(my_files) == 0:
raise Failure(
description="No files to process",
metadata={
"filepath": EventMetadata.path(path),
"dashboard_url": EventMetadata.url("http://mycoolsite.com/failures"),
},
)
return some_calculation(my_files)
# Attaching Metadata to Failures
ที่ตัว raise Failure สามารถ attach EventMetadataEntry
เข้าไปได้ด้วย
@solid
def my_failure_metadata_solid():
path = "/path/to/files"
my_files = get_files(path)
if len(my_files) == 0:
raise Failure(
description="No files to process",
metadata={
"filepath": EventMetadata.path(path),
"dashboard_url": EventMetadata.url("http://mycoolsite.com/failures"),
},
)
return some_calculation(my_files)
# Retry Requests
RetryRequested
exception จะมีประโยชน์เมื่อเจอกับ failures ที่เป็น failures ที่ไม่คงที่บางครั้ง failures บางครั้งไม่ failures
สามารถกำหนด configure จำนวนครั้งในการลองใหม่ด้วยพารามิเตอร์ max_retries
@solid
def my_retry_solid():
try:
result = flaky_operation()
except:
raise RetryRequested(max_retries=3)
return result
# Solid Hooks
Solid hook ให้เรากำหนดนโยบายการจัดการ success และ failure บน solids
Relevant APIs
@failure_hook
เป็น decorator ที่กำหนด callback เมื่อ solid failure.@success_hook
เป็น decorator ที่กำหนด callback เมื่อ solid success.HookContext
context object จะใช้ใน hook functionbuild_hook_context
เป็นตัวสร้างHookContext
จะทำนอก execution มีวัตถุประสงค์เพื่อใช้ในการทดสอบ hook
# Overview
@success_hook
หรือ @failure_hook
decorated function จะเรียกว่า solid hook ตัว Solid hook ถูกออกแบบมาเพื่อวัตถุประสงค์การใช้งานทั่วไป จะเป็นอะไรก็ได้ที่คุณอยากทำที่ solid level
การกำหนด Solid Hook
from dagster import HookContext, failure_hook, success_hook
@success_hook(required_resource_keys={"slack"})
def slack_message_on_success(context: HookContext):
message = f"Solid {context.solid.name} finished successfully"
context.resources.slack.chat.post_message(channel="#foo", text=message)
@failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: HookContext):
message = f"Solid {context.solid.name} failed"
context.resources.slack.chat.post_message(channel="#foo", text=message)
# Hook context
อย่างที่คุณอาจสังเกตเห็น hook function จะมี 1 argument คือ HookContext
คุณสมบัติที่มีอยู่ในบริบทนี้คือ:
context.log
: loggerscontext.solid
: solid ที่เกี่ยวข้องกับ hookcontext.solid_config
: config เฉพาะที่เกี่ยวข้อง solidcontext.step
: execution ขั้นตอนที่ก่อให้เกิด hook.context.resources
: resources ที่ hook สามารถใช้ได้
# การใช้งาน Hooks
Dagster ให้วิธีการต่างๆ ในการ trigger solid hooks
# การใช้ hook บนทุก solid ใน pipeline
ตัวอย่างเช่น คุณต้องการส่งข้อความ slack message ไปที่ channel เมื่อ solid fails ใน pipeline ในกรณีนี้เราจะเราจะใช้งาน hook ที่ pipeline ซึ่งจะเป็นการเปิดใช้งาน hook ในทุก solid instance ที่อยู่ใน pipeline
การใช้งาน hook ที่สร้างขึ้นมาในชื่อ "slack_message_on_failure" โดยวางไว้บน decorated function ที่ชื่อ "pipeline" ของ "notif_all" จากนั้น slack messages จะถูกส่งเมื่อทุก solid ที่อยู่ใน pipeline เกิดการ fails
@slack_message_on_failure
@pipeline(mode_defs=mode_defs)
def notif_all():
# the hook "slack_message_on_failure" is applied on every solid instance within this pipeline
a()
b()
# การใช้งาน hook ใน solid
บางครั้งการใช้งาน pipeline จะต้องทำงาน share กับส่วนงานอื่น หรือต้องการ alert ที่ตำแหน่งงานที่มีความสำคัญสูง ดังนั้น dagster ได้มีวิธีการในการ set up hooks ใน solid instances ซึ่งช่วยให้คุณกำหนด policies ในการใช้งาน solid ได้
@pipeline(mode_defs=mode_defs)
def selective_notif():
# only solid "a" triggers hooks: a slack message will be sent when it fails or succeeds
a.with_hooks({slack_message_on_failure, slack_message_on_success})()
# solid "b" won't trigger any hooks
b()
ในกรณีนี้ "b" ที่เป็นของแข็งจะไม่ trigger hook ใดๆ ในขณะที่ "a" ที่เป็น solid ไม่ว่าจะ fails หรือ succeeds จะส่ง slack message
# Testing Hooks
คุณสามารถทดสอบการทำงานของ hook โดยเรียกใช้ข้อกำหนดของ hook โดยจะเรียกใช้ decorated function คุณสามารถสร้าง context เพื่อเตรียมเรียกใช้ โดยใช้ build_hook_context
function
from dagster import build_hook_context
@success_hook(required_resource_keys={"my_conn"})
def my_success_hook(context):
context.resources.my_conn.send("foo")
def test_my_success_hook():
my_conn = mock.MagicMock()
# construct HookContext with mocked ``my_conn`` resource.
context = build_hook_context(resources={"my_conn": my_conn})
my_success_hook(context)
assert my_conn.send.call_count == 1
Examples
เข้าถึง failure information ใน hook ที่เกิด failure
ในหลายกรณี คุณอาจต้องการทราบรายละเอียดเกี่ยวกับ failure ของ solid คุณสามารถส่ง exception object ใน solid ที่ล้มเหลวผ่านคุณสมบัติ solid_exception บน HookContext
:
from dagster import HookContext, failure_hook
import traceback
@failure_hook
def my_failure_hook(context: HookContext):
solid_exception: BaseException = context.solid_exception
# print stack trace of exception
traceback.print_tb(solid_exception.__traceback__)
Patterns
Environment-specific hooks using modes
Hook ใช้ resource keys ในการเข้าถึง resources หบังจากเรียกใช้ resource key ใน set ของ required_resource_keys ในตัว hook สามารถเข้าถึง resource ผ่านทาง resources attribute ของ context object
นอกจากนี้ยังช่วยให้คุณสามารถสลับค่า resource ในโหมดต่างๆ ตัวอย่างเช่น คุณสามารถส่ง slack messages เมื่อคุณอยู่ในโหมด "prod" และ mock slack resource เมื่ออยู่ในโหมด "dev"
ในกรณีนี้ เราสามารถ mock slack_resource ใน "dev" โหมด ด้วยการใช้ helper function ResourceDefinition.hardcoded_resource()
มันจึงไม่ส่ง slack messages เมื่อคุณกำลัง พัฒนา pipeline
mode_defs = [
ModeDefinition(
"dev",
resource_defs={
"slack": ResourceDefinition.hardcoded_resource(
slack_resource_mock, "do not send messages in dev"
)
},
),
ModeDefinition("prod", resource_defs={"slack": slack_resource}),
]
แต่เมื่อสลับไปที่ "prod" โหมด เราสามารถส่ง slack token ไปใน run_config และเปิดใช้งานการส่งข้อความไปยัง slack channel เมื่อมีการเรียกใช้ hook
resources:
slack:
config:
token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
จากนั้นเราก็สามารถ execute pipeline ที่มี config นี้ผ่าน Python API, CLI, หรือ Dagit UI นี่คือตัวอย่างการใช้ Python API
if __name__ == "__main__":
with open(
file_relative_path(__file__, "prod.yaml"),
"r",
) as fd:
run_config = yaml.safe_load(fd.read())
result = execute_pipeline(notif_all, run_config=run_config, mode="prod")
Pipeline-level hooks
เมื่อคุณเพิ่ม hook decorator ใน pipeline, hook จะเพิ่มเข้าไปใน solid แต่ละตัวมี่อยู่ใน pipeline, hook จะไม่ได้ติดตามทุก event ที่เกิดขึ้นใน pipeline แต่จะติดตามเฉพาะ success หรือ failure ใน solid เท่านั้น
คุณอาจพบว่าจำเป็นต้องตั้งค่า pipeline-level policies เช่น คุณอาจต้องการเรียกใช้โค้ดสำหรับทุกๆ ความล้มเหลวของ pipeline
Dagster มีวิธีสร้าง sensor ที่ตอบสนองต่อ failure events ของ pipeline สามารถดูรายละเอียดได้ที่ Pipeline failure sensor ในเนื้อหา Sensors
** เนื้อหาทั้งหมดมาจาก https://docs.dagster.io/ เขียนไว้เพื่อทำความเข้าใจ เนื้อหาบางจุดใช้ google translate แปล ดังนั้นถ้าอ่านแล้วไม่เข้าใจให้ไปอ่านที่ docs ของ dagster จะดีที่สุด **