Dagster / Modes and Resources
Resources จัดเตรียมวิธีในการจัดการ external APIs ร่วมกับ modes สามารถใช้ในสภาพแวดล้อมของการ execution ที่แตกต่างกันหลายแบบสำหรับ pipeline
Relevant APIs
@resource
ใช้ decorator เพื่อกำหนดว่าเป็น resources ตัว decorated function นี้คือการเรียก resource_fn โดยจะ returnResourceDefinition
ResourceDefinition
คือ Class สำหรับ resource ตัว Class นี้แทบไม่ต้อง initialize เพื่อใช้งานตรงๆ เลย การใช้งานให้ใช้@resource
มันจะ returnResourceDefinition
กลับมาModeDefinition
Class นี้ใช้กำหนด pipeline modeInitResourceContext
object ตัวนี้จะทำหน้าที่เตรียม resource ในระหว่าง initialization ซึ่ง object ตัวนี้มีต้องการ resource, config, และข้อมูลอื่นๆbuild_init_resource_context
เป็น function ที่ใช้สร้างInitResourceContext
นอก execution มีวัตถุประสงค์เพื่อใช้ในการทำ resource testing
Overview
คุณสามารถใช้ resources ในการเข้าถึงคุณสมบัติของ execution environments ของ solid ในระหว่างที่ pipeline กำลัง execution คุณสามารถใช้ modes ผูกเข้ากับ resources (และข้อมูลสภาพแวดล้อมอื่นๆ) ไปยัง pipeline เพื่อให้ทรัพยากรเหล่านั้นสามารถใช้ได้กับ solids ภายใน pipeline
Why Use Resources and Modes
เมื่อต้องเข้าใช้งาน resources จากภายนอกร่วมกับ modes จะมีความสะดวกมากขึ้น
- Pluggable: คุณสามารถ map resource เข้ากับ key ใน mode นึงและ map resource อื่นๆ เข้ากับ key อื่นๆ ไว้ใน mode อื่นๆ ส่วนนี้จะมีประโยชน์มากถ้ามีต้องเชื่อมต่อข้อมูลจากภายนอกหลายๆ ตัวใน production แต่ไม่ได้ใช้ในการ testing คุณสามารถระบุโหมดต่างๆ สำหรับแต่ละกรณีการดำเนินการได้ เช่นแบบแรกใน production มีการเชื่อมต่อ resource กับภายนอกหลายๆ ตัว และอีกตัวใช้เพื่อ testing จะใช้ข้อมูลจากภายใน แต่ map key เดียวกัน สำหรับข้อมูลเพิ่มเติมเกี่ยวกับความสามารถนี้ ดูที่ Separating Business Logic from Environments
- Pipeline Scoped: เนื่องจาก resources มีการกำหนดขอบเขตของ pipeline หากคุณจัดเตรียม resource ให้กับโหมด ทรัพยากรนั้นจะพร้อมใช้งานกับทุกๆ solid ใน pipeline
- Configurable: Resources สามารถกำหนดค่า, โดยใช้ configuration system
- Dependencies: Resources สารมารถเปลี่ยนแปลงตาม resources อื่นๆ ซึ่งทำให้สามารถแสดง external environment objects ที่อาศัยข้อมูล external environment อื่นๆ สำหรับการ initialization ได้อย่างสมบูรณ์
Defining a Resource
การกำหนด resource จะใช้ @resource
decorator ครอบฟังก์ชันที่รับ init_context เป็นพารามิเตอร์แรก ซึ่งก็คือการ InitResourceContext
จากฟังก์ชันนี้ จะ return หรือ yield วัตถุที่คุณต้องการให้เป็น resource
class ExternalCerealFetcher:
def fetch_new_cereals(self, start_ts, end_ts):
pass
@resource
def cereal_fetcher(init_context):
return ExternalCerealFetcher()
Accessing Resources in Solids
Solid จะใช้ resource keys ในการเข้าถึง resources เช่น:
CREATE_TABLE_1_QUERY = "create table_1 as select * from table_0"
@solid(required_resource_keys={"database"})
def solid_requires_resources(context):
context.resources.database.execute_query(CREATE_TABLE_1_QUERY)
Testing Resource Initialization
คุณสามารถทดสอบการ initialization ของ resource โดยเรียกใช้ resource definition การ run จะทำภายใต้ decorated function
@resource
def my_resource(_):
return "foo"
def test_my_resource():
assert my_resource(None) == "foo"
หาก resource ของคุณต้องการ resource หรือการ config คุณสามารถใช้ InitResourceContext
object โดยใช้ build_init_resource_context
function.
@resource(required_resource_keys={"foo"}, config_schema={"bar": str})
def my_resource_requires_context(init_context):
return init_context.resources.foo, init_context.resource_config["bar"]
from dagster import build_init_resource_context
def test_my_resource_with_context():
init_context = build_init_resource_context(
resources={"foo": "foo_str"}, config={"bar": "bar_str"}
)
assert my_resource_requires_context(init_context) == ("foo_str", "bar_str")
หาก resource ของคุณคือ context manager, คุณสามารถใช้งานได้โดยใช้ with
syntax ของ python
from contextlib import contextmanager
@resource
@contextmanager
def my_cm_resource(_):
yield "foo"
def test_cm_resource():
with my_cm_resource(None) as initialized_resource:
assert initialized_resource == "foo"
Defining a Mode
ในการกำหนด mode จะสร้างด้วย ModeDefinition
ซึ่งใน resource definition แต่ละรายการที่ให้ไว้กับ mode ควรถูก map กับ key ไม่ซ้ำ
mode_def_ab = ModeDefinition(
"ab_mode",
resource_defs={
"a": resource_a,
"b": resource_b,
},
)
Providing Modes to a Pipeline
Mode สามารถใช้งานกับ pipeline ด้วย mode_defs argument ใน @pipeline
decorator
@pipeline(mode_defs=[mode_def_ab, mode_def_c])
def pipeline_with_mode():
basic_solid()
Selecting a Mode during Execution
Python API
เมื่อ execute pipeline ใช้ใช้ execute_pipeline
, คุณสามารถสลับ mode ได้ด้วย mode name ด้วย parameter "mode"
execute_pipeline(pipeline_with_mode, mode="ab_mode")
In Dagit
เมื่อเปิดการทำงาน pipeline ด้วย Dagit Playground คุณสามารถเลือก mode จากดรอปดาวน์ตัวเลือก mode:

Dagster CLI
เมื่อเปิด pipeline ผ่าน CLI คุณสามารถใช้ตัวเลือก -d เพื่อระบุโหมดได้
$ dagster pipeline execute -d prod_mode my_pipeline
Examples
Resource Configuration
ResourceDefinitions
สามารถ config schema ซึ่งช่วยให้คุณ customize behavior ขณะรันไทม์ผ่าน pipeline configuration
ตัวอย่างเช่น สมมติว่าเราต้องการส่ง connection string ไปยังทรัพยากร DatabaseConnection ของเรา
class DatabaseConnection:
def __init__(self, connection: str):
self.connection = connection
@resource(config_schema={"connection": str})
def db_resource(init_context):
connection = init_context.resource_config["connection"]
return DatabaseConnection(connection)
Resource to Resource Dependencies
Resources สามารถเข้าถึง Resources อื่นได้ ด้วยการใช required_resource_keys
parameter กับ @resource
decorator เพื่อว่าจะให้เข้าถึง Resource ตัวไหน การเข้าถึง Resource จำเป็นต้องผ่าน context object ที่ส่งผ่านมาจาก wrapped function
@resource
def foo_resource(_):
return "foo"
@resource(required_resource_keys={"foo"})
def emit_foo(init_context):
return init_context.resources.foo
โปรดทราบว่า keys ที่ระบุจำเป็นต้องอยู่ในโหมดเดียวกับ resources ที่ต้องการและ dependencies ระหว่าง resources ไม่สามารถวนซ้ำได้