prefect_base

  • image

    • https://docs.prefect.io/latest/

  • PrefectλŠ” Python 기반 μ›Œν¬ν”Œλ‘œ 관리 μ‹œμŠ€ν…œμž…λ‹ˆλ‹€. Prefectλ₯Ό μ‚¬μš©ν•˜λ©΄ λ‘œκΉ…, μž¬μ‹œλ„, 동적 λ§€ν•‘, 캐싱, μ‹€νŒ¨ μ•Œλ¦Ό 등을 데이터 νŒŒμ΄ν”„λΌμΈμ— μ‰½κ²Œ μΆ”κ°€ν•  수 μžˆμŠ΅λ‹ˆλ‹€

  • image

    • https://discourse.prefect.io/t/what-are-the-components-of-prefect-2-0-architecture/909

  • PrefectλŠ” Dask μœ„μ— κ΅¬μΆ•λ˜μ—ˆμœΌλ©° Daskλ₯Ό μ‚¬μš©ν•˜μ—¬ λΆ„μ‚° ν™˜κ²½μ—μ„œ Prefect μ›Œν¬ν”Œλ‘œμ˜ 싀행을 μ˜ˆμ•½ν•˜κ³  κ΄€λ¦¬ν•©λ‹ˆλ‹€.

  • PrefectλŠ” μ›Œν¬ν”Œλ‘œμ˜ 일정을 처리 ν•˜κ³  DaskλŠ” 각 μ›Œν¬ν”Œλ‘œ λ‚΄ μž‘μ—… 의 일정 및 λ¦¬μ†ŒμŠ€ 관리λ₯Ό μ²˜λ¦¬ν•©λ‹ˆλ‹€.

    • μž‘μ—… μ˜ˆμ•½: DaskλŠ” μ›Œν¬ν”Œλ‘œμš° λ‚΄μ—μ„œ λͺ¨λ“  μž‘μ—… μ˜ˆμ•½μ„ μ²˜λ¦¬ν•˜λ―€λ‘œ PrefectλŠ” Daskκ°€ λ°€λ¦¬μ΄ˆ λŒ€κΈ° μ‹œκ°„μœΌλ‘œ μ˜ˆμ•½ν•˜λŠ” 더 μž‘μ€ μž‘μ—…μ„ μž₯λ €ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

    • Dataflow: Daskκ°€ μž‘μ—… κ°„μ˜ μ μ ˆν•œ 정보 직렬화 및 톡신을 μ²˜λ¦¬ν•˜κΈ° λ•Œλ¬Έμ— PrefectλŠ” "데이터 흐름"을 일급 νŒ¨ν„΄μœΌλ‘œ 지원할 수 μžˆμŠ΅λ‹ˆλ‹€.

    • λΆ„μ‚° 계산: DaskλŠ” ν΄λŸ¬μŠ€ν„°μ˜ μž‘μ—…μžμ—κ²Œ μž‘μ—… 할당을 μ²˜λ¦¬ν•˜μ—¬ μ‚¬μš©μžκ°€ μ΅œμ†Œν•œμ˜ μ˜€λ²„ν—€λ“œλ‘œ λΆ„μ‚° κ³„μ‚°μ˜ 이점을 μ¦‰μ‹œ μ‹€ν˜„ν•  수 μžˆλ„λ‘ ν•©λ‹ˆλ‹€.

    • 병렬성: ν΄λŸ¬μŠ€ν„°μ—μ„œ μ‹€ν–‰ν•˜λ“  λ‘œμ»¬μ—μ„œ μ‹€ν–‰ν•˜λ“  DaskλŠ” μ„ λ°˜μ—μ„œ 병렬 μž‘μ—… 싀행을 μ œκ³΅ν•©λ‹ˆλ‹€.

Task

  • 데이터 νŒŒμ΄ν”„λΌμΈμ—μ„œ μ‹€μ œλ‘œ μˆ˜ν–‰λ˜λŠ” μž‘μ—… λ‹¨μœ„μž…λ‹ˆλ‹€. TaskλŠ” 단일 μž‘μ—…μ„ μˆ˜ν–‰ν•˜λ©°, λ‹€λ₯Έ Task에 μ˜μ‘΄ν•˜λŠ” κ²½μš°κ°€ λ§ŽμŠ΅λ‹ˆλ‹€.

  • μž…λ ₯ 데이터λ₯Ό λ°›μ•„μ„œ 좜λ ₯ 데이터λ₯Ό 생성할 수 있으며, μ‹€ν–‰ κ°€λŠ₯ν•œ μ½”λ“œλ‘œ κ΅¬μ„±λ©λ‹ˆλ‹€

  • Argument
    Description

    name

    Task의 μ΄λ¦„μž…λ‹ˆλ‹€. 이름을 μ§€μ •ν•˜μ§€ μ•ŠμœΌλ©΄ ν•¨μˆ˜μ˜ 이름이 μ‚¬μš©λ©λ‹ˆλ‹€.

    description

    Task에 λŒ€ν•œ μ„€λͺ…μž…λ‹ˆλ‹€. docstringμ—μ„œ κ°€μ Έμ˜΅λ‹ˆλ‹€.

    tags

    Task에 λŒ€ν•œ νƒœκ·Έμž…λ‹ˆλ‹€. μ‹€ν–‰μ‹œ prefect.context.tags에 μΆ”κ°€λ©λ‹ˆλ‹€.

    cache_key_fn

    Task κ²°κ³Όλ₯Ό 캐싱할 λ•Œ, κ²°κ³Όλ₯Ό μΊμ‹œν•˜κΈ° μœ„ν•œ ν‚€λ₯Ό μƒμ„±ν•˜λŠ” ν•¨μˆ˜μž…λ‹ˆλ‹€. ν•¨μˆ˜λŠ” kwargsλ₯Ό 인자둜 λ°›μ•„, λ¬Έμžμ—΄μ„ λ°˜ν™˜ν•©λ‹ˆλ‹€.

    cache_expiration

    Task κ²°κ³Ό μΊμ‹œμ˜ 만료 κΈ°κ°„μž…λ‹ˆλ‹€.

    task_run_name

    Task μ‹€ν–‰μ‹œ μ‹€ν–‰ μ΄λ¦„μž…λ‹ˆλ‹€. ν‚€μ›Œλ“œ μΈμžλ“€μ„ λ³€μˆ˜λ‘œ μ‚¬μš©ν•˜μ—¬ 이름을 μƒμ„±ν•©λ‹ˆλ‹€.

    retries

    Task μ‹€ν–‰ μ‹€νŒ¨μ‹œ μž¬μ‹œλ„ νšŸμˆ˜μž…λ‹ˆλ‹€.

    retry_delay_seconds

    Task μ‹€ν–‰ μ‹€νŒ¨μ‹œ μž¬μ‹œλ„λ₯Ό λŒ€κΈ°ν•˜λŠ” μ‹œκ°„μž…λ‹ˆλ‹€.

    version

    Task의 버전 μ •λ³΄μž…λ‹ˆλ‹€.

  • from prefect import flow, task
    
    @task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
    def fetch(dataset_url: str) -> pd.DataFrame:
        """Read taxi data from web into pandas DataFrame"""
        df = pd.read_csv(dataset_url)
        return df
    
    @flow()
    def etl_web_to_gcs()->None:
        """Main ETL function"""
        color = "yellow"
        year = 2021
        month = 1 
        dataset_file = f"{color}_tripdata_{year}-{month:02}"
        dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"
    
        df = fetch(dataset_url)
    
    if __name__ == '__main__':
        etl_web_to_gcs()

Flow

  • image

  • 데이터 νŒŒμ΄ν”„λΌμΈμ„ μ •μ˜ν•˜κ³  μ‹€ν–‰ν•˜κΈ°μœ„ν•œ μΆ”μƒν™”λœ κ°œλ…μž…λ‹ˆλ‹€.

  • Prefect FlowλŠ” 단일 νƒœμŠ€ν¬ λ˜λŠ” μ—¬λŸ¬ νƒœμŠ€ν¬λ‘œ ꡬ성될 수 있으며, μ΄λŸ¬ν•œ νƒœμŠ€ν¬λŠ” 일련의 파이썬 ν•¨μˆ˜λ‘œ μž‘μ„±λ©λ‹ˆλ‹€.

  • FlowλŠ” λ‹€μ–‘ν•œ 데이터 νŒŒμ΄ν”„λΌμΈ νŒ¨ν„΄μ„ μ§€μ›ν•©λ‹ˆλ‹€.

  • Argument
    Description

    description

    Flow에 λŒ€ν•œ λ¬Έμžμ—΄ μ„€λͺ…을 μž…λ ₯ν•©λ‹ˆλ‹€. μž…λ ₯ν•˜μ§€ μ•ŠμœΌλ©΄ λ°μ½”λ ˆμ΄νŠΈλœ ν•¨μˆ˜μ˜ docstringμ—μ„œ μ„€λͺ…을 κ°€μ Έμ˜΅λ‹ˆλ‹€.

    name

    Flow의 이름을 μž…λ ₯ν•©λ‹ˆλ‹€. μž…λ ₯ν•˜μ§€ μ•ŠμœΌλ©΄ ν•¨μˆ˜μ˜ μ΄λ¦„μ—μ„œ μΆ”λ‘ λ©λ‹ˆλ‹€.

    retries

    Flow μ‹€ν–‰ μ‹€νŒ¨ μ‹œ μž¬μ‹œλ„ν•  횟수λ₯Ό μ§€μ •ν•©λ‹ˆλ‹€. 기본값은 0μž…λ‹ˆλ‹€.

    retry_delay_seconds

    Flow μ‹€ν–‰ μ‹€νŒ¨ ν›„ μž¬μ‹œλ„ μ „ λŒ€κΈ°ν•  μ‹œκ°„μ„ μ§€μ •ν•©λ‹ˆλ‹€. retriesκ°€ 0이 μ•„λ‹Œ κ²½μš°μ—λ§Œ μ μš©λ©λ‹ˆλ‹€.

    flow_run_name

    Flow μ‹€ν–‰ 이름을 μ§€μ •ν•©λ‹ˆλ‹€. 이 이름은 Flow λ§€κ°œλ³€μˆ˜λ₯Ό λ³€μˆ˜λ‘œ μ‚¬μš©ν•˜μ—¬ λ¬Έμžμ—΄ ν…œν”Œλ¦ΏμœΌλ‘œ μ œκ³΅ν•  수 μžˆμŠ΅λ‹ˆλ‹€. ν•¨μˆ˜λ₯Ό λ°˜ν™˜ν•˜λŠ” ν•¨μˆ˜λ‘œ μ œκ³΅ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

    task_runner

    Flow λ‚΄μ—μ„œ task 싀행에 μ‚¬μš©ν•  task runnerλ₯Ό μ„ νƒν•©λ‹ˆλ‹€. μ§€μ •ν•˜μ§€ μ•ŠμœΌλ©΄ ConcurrentTaskRunnerκ°€ μ‚¬μš©λ©λ‹ˆλ‹€.

    timeout_seconds

    Flow μ΅œλŒ€ μ‹€ν–‰ μ‹œκ°„μ„ 초 λ‹¨μœ„λ‘œ μ§€μ •ν•©λ‹ˆλ‹€. μ§€μ •λœ μ‹œκ°„μ΄ 초과되면 Flowκ°€ μ‹€νŒ¨λ‘œ ν‘œμ‹œλ©λ‹ˆλ‹€. Flow 싀행은 λ‹€μŒ taskκ°€ 호좜될 λ•ŒκΉŒμ§€ κ³„μ†λ©λ‹ˆλ‹€.

    validate_parameters

    Flow λ§€κ°œλ³€μˆ˜κ°€ Pydantic을 톡해 κ²€μ¦λ˜λŠ”μ§€ μ—¬λΆ€λ₯Ό μ§€μ •ν•©λ‹ˆλ‹€. 기본값은 Trueμž…λ‹ˆλ‹€.

    version

    Flow 버전을 μ§€μ •ν•©λ‹ˆλ‹€. μ§€μ •ν•˜μ§€ μ•ŠμœΌλ©΄ λž˜ν•‘λœ ν•¨μˆ˜κ°€ ν¬ν•¨λœ 파일의 ν•΄μ‹œ 값을 μ΄μš©ν•˜μ—¬ 버전을 μƒμ„±ν•˜λ € μ‹œλ„ν•©λ‹ˆλ‹€. νŒŒμΌμ„ 찾을 수 μ—†λŠ” 경우 버전은 null이 λ©λ‹ˆλ‹€.

Blocks

  • 블둝은 ꡬ성 μ €μž₯을 ν™œμ„±ν™”ν•˜κ³  μ™ΈλΆ€ μ‹œμŠ€ν…œκ³Ό μƒν˜Έ μž‘μš©ν•˜κΈ° μœ„ν•œ μΈν„°νŽ˜μ΄μŠ€λ₯Ό μ œκ³΅ν•˜λŠ” Prefect λ‚΄μ˜ κΈ°λ³Έ μš”μ†Œμž…λ‹ˆλ‹€.

  • 블둝을 μ‚¬μš©ν•˜λ©΄ AWS, GitHub, Slack 및 Prefect둜 μ˜€μΌ€μŠ€νŠΈλ ˆμ΄μ…˜ν•˜λ €λŠ” 기타 μ‹œμŠ€ν…œκ³Ό 같은 μ„œλΉ„μŠ€λ‘œ μΈμ¦ν•˜κΈ° μœ„ν•œ 자격 증λͺ…을 μ•ˆμ „ν•˜κ²Œ μ €μž₯ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • prefect λ‚΄λΆ€μ—μ„œ Blockλ₯Ό 눌러 λ‹€μ–‘ν•œ 컀λ„₯ν„°λ₯Ό ν™•μΈν•©λ‹ˆλ‹€.

  • image

  • Google Cloud Platform 용 Prefect 컀λ„₯ν„°λ₯Ό λ“±λ‘ν•©λ‹ˆλ‹€.

    • prefect block register -m prefect_gcp

    • image

  • from prefect import flow, task
    from prefect_gcp.cloud_storage import GcsBucket
    
    @task()
    def write_gcs(path: Path) -> None:
        """Upload local parquet file to GCS"""
        gcs_block = GcsBucket.load("zoom-gcs")
        gcs_block.upload_from_path(from_path=path, to_path=path)
        return
  • Prefect built-in blocks

    • Block
      Slug
      Description

      Azure

      azure

      Azure Datalake 및 Azure Blob Storageμ—μ„œ 파일둜 데이터λ₯Ό μ €μž₯ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Date Time

      date-time

      λ‚ μ§œ 및 μ‹œκ°„μ„ λ‚˜νƒ€λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Docker Container]

      docker-container

      μ»¨ν…Œμ΄λ„ˆμ—μ„œ λͺ…령을 μ‹€ν–‰ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Docker Registry

      docker-registry

      Docker λ ˆμ§€μŠ€νŠΈλ¦¬μ— μ—°κ²°ν•˜λŠ” λΈ”λ‘μœΌλ‘œ Docker Engine에 μ—°κ²°ν•  수 μžˆμ–΄μ•Ό ν•©λ‹ˆλ‹€.

      GCS

      gcs

      Google Cloud Storage에 파일둜 데이터λ₯Ό μ €μž₯ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      GitHub

      github

      곡개 GitHub 리포지토리에 μ €μž₯된 파일과 μƒν˜Έ μž‘μš©ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      JSON

      json

      JSON 데이터λ₯Ό λ‚˜νƒ€λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Kubernetes Cluster Config

      kubernetes-cluster-config

      Kubernetes ν΄λŸ¬μŠ€ν„°μ™€ μƒν˜Έ μž‘μš©ν•˜κΈ° μœ„ν•œ ꡬ성을 μ €μž₯ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Kubernetes Job

      kubernetes-job

      Kubernetes Job으둜 λͺ…령을 μ‹€ν–‰ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Local File System

      local-file-system

      둜컬 파일 μ‹œμŠ€ν…œμ— 파일둜 데이터λ₯Ό μ €μž₯ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€

      Microsoft Teams Webhook

      ms-teams-webhook

      제곡된 Microsoft Teams 웹훅을 μ‚¬μš©ν•˜μ—¬ μ•Œλ¦Όμ„ λ³΄λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Opsgenie Webhook

      opsgenie-webhook

      제곡된 Opsgenie 웹훅을 μ‚¬μš©ν•˜μ—¬ μ•Œλ¦Όμ„ λ³΄λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Pager Duty Webhook

      pager-duty-webhook

      제곡된 PagerDuty 웹훅을 μ‚¬μš©ν•˜μ—¬ μ•Œλ¦Όμ„ λ³΄λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Process

      process

      μƒˆ ν”„λ‘œμ„ΈμŠ€μ—μ„œ λͺ…령을 μ‹€ν–‰ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Remote File System

      remote-file-system

      원격 파일 μ‹œμŠ€ν…œμ— 파일둜 데이터λ₯Ό μ €μž₯ν•˜λŠ” λΈ”λ‘μœΌλ‘œ fsspecμ—μ„œ μ§€μ›ν•˜λŠ” λͺ¨λ“  원격 파일 μ‹œμŠ€ν…œμ„ μ§€μ›ν•©λ‹ˆλ‹€..

      S3

      s3

      AWS S3에 파일둜 데이터λ₯Ό μ €μž₯ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Secret

      secret

      λΉ„λ°€ κ°’μœΌλ‘œμ„œ λ‘œκ·Έλ˜κ±°λ‚˜ UI에 ν‘œμ‹œλ  λ•Œ κ°€λ €μ§€λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Slack Webhook

      slack-webhook

      제곡된 Slack 웹훅을 μ‚¬μš©ν•˜μ—¬ μ•Œλ¦Όμ„ λ³΄λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      SMB

      smb

      SMB κ³΅μœ μ— 파일둜 데이터λ₯Ό μ €μž₯ν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      String

      string

      λ¬Έμžμ—΄ 데이터λ₯Ό λ‚˜νƒ€λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Twilio SMS

      twilio-sms

      Twilio SMSλ₯Ό 톡해 μ•Œλ¦Όμ„ λ³΄λ‚΄λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

      Webhook

      webhook

      웹훅을 ν˜ΈμΆœν•˜λŠ” λΈ”λ‘μž…λ‹ˆλ‹€.

Deployment

  • image

  • deploymentλŠ” μŠ€νŠΈλ¦Όμ„ μΊ‘μŠν™”ν•˜κ³  APIλ₯Ό 톡해 일정을 μ˜ˆμ•½ν•˜κ±°λ‚˜ μ‹œμž‘ν•  수 μžˆλŠ” μ„œλ²„ μΈ‘ μ•„ν‹°νŒ©νŠΈμž…λ‹ˆλ‹€. flowλŠ” μ—¬λŸ¬ Deployment에 속할 수 있으며 ν”„λ‘œκ·Έλž˜λ°ν•˜λŠ” 데 ν•„μš”ν•œ λͺ¨λ“  것이 ν¬ν•¨λœ 메타데이터가 μžˆλŠ” μ»¨ν…Œμ΄λ„ˆλΌκ³  말할 수 μžˆμŠ΅λ‹ˆλ‹€. λͺ…λ Ήμ€„μ΄λ‚˜ Python으둜 λ§Œλ“€ 수 μžˆμŠ΅λ‹ˆλ‹€.

  • Prefect μ›Œν¬ν”Œλ‘œμ— λŒ€ν•œ 배포 생성은 Prefect APIλ₯Ό 톡해 μ›Œν¬ν”Œλ‘œλ₯Ό κ΄€λ¦¬ν•˜κ³  Prefect μ—μ΄μ „νŠΈμ—μ„œ μ›κ²©μœΌλ‘œ μ‹€ν–‰ν•  수 μžˆλ„λ‘ μ›Œν¬ν”Œλ‘œ μ½”λ“œ, μ„€μ • 및 인프라 ꡬ성을 νŒ¨ν‚€μ§•ν•˜λŠ” 것을 μ˜λ―Έν•©λ‹ˆλ‹€.

  • Prefect CLI λ˜λŠ” UIλ₯Ό μ‚¬μš©ν•˜μ—¬ 생성, μ—…λ°μ΄νŠΈ, 쀑지 및 μ‚­μ œν•  수 μžˆμŠ΅λ‹ˆλ‹€. Deployment에 λŒ€ν•œ μƒνƒœ 및 둜그 μ •λ³΄λŠ” Prefect Cloud λ˜λŠ” Prefect Server UIμ—μ„œ λ³Ό 수 있으며, μ›ν•˜λŠ” 경우 λ‹€μš΄λ‘œλ“œν•˜μ—¬ 검사할 수 μžˆμŠ΅λ‹ˆλ‹€.

  • Deploymentλ₯Ό μ‚¬μš©ν•˜λ©΄ Flowλ₯Ό μ‰½κ²Œ μ‹€ν–‰ν•˜κ³  관리할 수 있으며, μ½”λ“œ λ³€κ²½μ΄λ‚˜ μ—…λ°μ΄νŠΈλ₯Ό μ‰½κ²Œ λ°˜μ˜ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • prefect deployment build parameterized_flows.py:etl_parent_flow -n "Parameterized ETL"

    • image

  • 배포 결과둜 yaml 파일이 μƒμ„±λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

    • image

  • image

  • image

  • image

Refereence

  • https://docs.prefect.io/latest/

  • https://examples.dask.org/applications/prefect-etl.html

Last updated