Dagster / Modes and Resources

Dagster / Modes and Resources

Resources จัดเตรียมวิธีในการจัดการ external APIs ร่วมกับ modes สามารถใช้ในสภาพแวดล้อมของการ execution ที่แตกต่างกันหลายแบบสำหรับ pipeline

Relevant APIs

  • @resource ใช้ decorator เพื่อกำหนดว่าเป็น resources ตัว decorated function นี้คือการเรียก resource_fn โดยจะ return ResourceDefinition
  • ResourceDefinition คือ Class สำหรับ resource ตัว Class นี้แทบไม่ต้อง initialize เพื่อใช้งานตรงๆ เลย การใช้งานให้ใช้ @resource มันจะ return ResourceDefinition กลับมา
  • ModeDefinition Class นี้ใช้กำหนด pipeline mode
  • InitResourceContext object ตัวนี้จะทำหน้าที่เตรียม resource ในระหว่าง initialization ซึ่ง object ตัวนี้มีต้องการ resource, config, และข้อมูลอื่นๆ
  • build_init_resource_context เป็น function ที่ใช้สร้าง InitResourceContext นอก execution มีวัตถุประสงค์เพื่อใช้ในการทำ resource testing

Overview

คุณสามารถใช้ resources ในการเข้าถึงคุณสมบัติของ execution environments ของ solid ในระหว่างที่ pipeline กำลัง execution คุณสามารถใช้ modes ผูกเข้ากับ resources (และข้อมูลสภาพแวดล้อมอื่นๆ) ไปยัง pipeline เพื่อให้ทรัพยากรเหล่านั้นสามารถใช้ได้กับ solids ภายใน pipeline

Why Use Resources and Modes

เมื่อต้องเข้าใช้งาน resources จากภายนอกร่วมกับ modes จะมีความสะดวกมากขึ้น

  • Pluggable: คุณสามารถ map resource เข้ากับ key ใน mode นึงและ map resource อื่นๆ เข้ากับ key อื่นๆ ไว้ใน mode อื่นๆ ส่วนนี้จะมีประโยชน์มากถ้ามีต้องเชื่อมต่อข้อมูลจากภายนอกหลายๆ ตัวใน production แต่ไม่ได้ใช้ในการ testing คุณสามารถระบุโหมดต่างๆ สำหรับแต่ละกรณีการดำเนินการได้ เช่นแบบแรกใน production มีการเชื่อมต่อ resource กับภายนอกหลายๆ ตัว และอีกตัวใช้เพื่อ testing จะใช้ข้อมูลจากภายใน แต่ map key เดียวกัน สำหรับข้อมูลเพิ่มเติมเกี่ยวกับความสามารถนี้ ดูที่ Separating Business Logic from Environments
  • Pipeline Scoped: เนื่องจาก resources มีการกำหนดขอบเขตของ pipeline หากคุณจัดเตรียม resource ให้กับโหมด ทรัพยากรนั้นจะพร้อมใช้งานกับทุกๆ solid ใน pipeline
  • Configurable: Resources สามารถกำหนดค่า, โดยใช้ configuration system
  • Dependencies: Resources สารมารถเปลี่ยนแปลงตาม resources อื่นๆ ซึ่งทำให้สามารถแสดง external environment objects ที่อาศัยข้อมูล external environment อื่นๆ สำหรับการ initialization ได้อย่างสมบูรณ์

Defining a Resource

การกำหนด resource จะใช้ @resource decorator ครอบฟังก์ชันที่รับ init_context เป็นพารามิเตอร์แรก ซึ่งก็คือการ InitResourceContext จากฟังก์ชันนี้ จะ return หรือ yield วัตถุที่คุณต้องการให้เป็น resource

class ExternalCerealFetcher:
    def fetch_new_cereals(self, start_ts, end_ts):
        pass


@resource
def cereal_fetcher(init_context):
    return ExternalCerealFetcher()

Accessing Resources in Solids

Solid จะใช้ resource keys ในการเข้าถึง resources เช่น:

CREATE_TABLE_1_QUERY = "create table_1 as select * from table_0"


@solid(required_resource_keys={"database"})
def solid_requires_resources(context):
    context.resources.database.execute_query(CREATE_TABLE_1_QUERY)

Testing Resource Initialization

คุณสามารถทดสอบการ initialization ของ resource โดยเรียกใช้ resource definition การ run จะทำภายใต้ decorated function

@resource
def my_resource(_):
    return "foo"


def test_my_resource():
    assert my_resource(None) == "foo"

หาก resource ของคุณต้องการ resource หรือการ config คุณสามารถใช้ InitResourceContext object โดยใช้ build_init_resource_context function.

@resource(required_resource_keys={"foo"}, config_schema={"bar": str})
def my_resource_requires_context(init_context):
    return init_context.resources.foo, init_context.resource_config["bar"]


from dagster import build_init_resource_context


def test_my_resource_with_context():
    init_context = build_init_resource_context(
        resources={"foo": "foo_str"}, config={"bar": "bar_str"}
    )
    assert my_resource_requires_context(init_context) == ("foo_str", "bar_str")

หาก resource ของคุณคือ context manager, คุณสามารถใช้งานได้โดยใช้ with syntax ของ python

from contextlib import contextmanager


@resource
@contextmanager
def my_cm_resource(_):
    yield "foo"


def test_cm_resource():
    with my_cm_resource(None) as initialized_resource:
        assert initialized_resource == "foo"

Defining a Mode

ในการกำหนด mode จะสร้างด้วย ModeDefinition ซึ่งใน resource definition แต่ละรายการที่ให้ไว้กับ mode ควรถูก map กับ key ไม่ซ้ำ

mode_def_ab = ModeDefinition(
    "ab_mode",
    resource_defs={
        "a": resource_a,
        "b": resource_b,
    },
)

Providing Modes to a Pipeline

Mode สามารถใช้งานกับ pipeline ด้วย mode_defs argument ใน @pipeline decorator

@pipeline(mode_defs=[mode_def_ab, mode_def_c])
def pipeline_with_mode():
    basic_solid()

Selecting a Mode during Execution

Python API

เมื่อ execute pipeline ใช้ใช้ execute_pipeline, คุณสามารถสลับ mode ได้ด้วย mode name ด้วย parameter "mode"

execute_pipeline(pipeline_with_mode, mode="ab_mode")

In Dagit

เมื่อเปิดการทำงาน pipeline ด้วย Dagit Playground คุณสามารถเลือก mode จากดรอปดาวน์ตัวเลือก mode:

Dagster CLI

เมื่อเปิด pipeline ผ่าน CLI คุณสามารถใช้ตัวเลือก -d เพื่อระบุโหมดได้

$ dagster pipeline execute -d prod_mode my_pipeline

Examples

Resource Configuration

ResourceDefinitions สามารถ config schema ซึ่งช่วยให้คุณ customize behavior ขณะรันไทม์ผ่าน pipeline configuration

ตัวอย่างเช่น สมมติว่าเราต้องการส่ง connection string ไปยังทรัพยากร DatabaseConnection ของเรา

class DatabaseConnection:
    def __init__(self, connection: str):
        self.connection = connection


@resource(config_schema={"connection": str})
def db_resource(init_context):
    connection = init_context.resource_config["connection"]
    return DatabaseConnection(connection)

Resource to Resource Dependencies

Resources สามารถเข้าถึง Resources อื่นได้ ด้วยการใช required_resource_keys parameter กับ @resource decorator เพื่อว่าจะให้เข้าถึง Resource ตัวไหน การเข้าถึง Resource จำเป็นต้องผ่าน context object ที่ส่งผ่านมาจาก wrapped function

@resource
def foo_resource(_):
    return "foo"

@resource(required_resource_keys={"foo"})
def emit_foo(init_context):
    return init_context.resources.foo

โปรดทราบว่า keys ที่ระบุจำเป็นต้องอยู่ในโหมดเดียวกับ resources ที่ต้องการและ dependencies ระหว่าง resources ไม่สามารถวนซ้ำได้