prefect_base

https://docs.prefect.io/latest/
Prefect๋ Python ๊ธฐ๋ฐ ์ํฌํ๋ก ๊ด๋ฆฌ ์์คํ ์ ๋๋ค. Prefect๋ฅผ ์ฌ์ฉํ๋ฉด ๋ก๊น , ์ฌ์๋, ๋์ ๋งคํ, ์บ์ฑ, ์คํจ ์๋ฆผ ๋ฑ์ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ฝ๊ฒ ์ถ๊ฐํ ์ ์์ต๋๋ค

https://discourse.prefect.io/t/what-are-the-components-of-prefect-2-0-architecture/909
Prefect๋ Dask ์์ ๊ตฌ์ถ๋์์ผ๋ฉฐ Dask๋ฅผ ์ฌ์ฉํ์ฌ ๋ถ์ฐ ํ๊ฒฝ์์ Prefect ์ํฌํ๋ก์ ์คํ์ ์์ฝํ๊ณ ๊ด๋ฆฌํฉ๋๋ค.
Prefect๋ ์ํฌํ๋ก์ ์ผ์ ์ ์ฒ๋ฆฌ ํ๊ณ Dask๋ ๊ฐ ์ํฌํ๋ก ๋ด ์์ ์ ์ผ์ ๋ฐ ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.
์์ ์์ฝ: Dask๋ ์ํฌํ๋ก์ฐ ๋ด์์ ๋ชจ๋ ์์ ์์ฝ์ ์ฒ๋ฆฌํ๋ฏ๋ก Prefect๋ Dask๊ฐ ๋ฐ๋ฆฌ์ด ๋๊ธฐ ์๊ฐ์ผ๋ก ์์ฝํ๋ ๋ ์์ ์์ ์ ์ฅ๋ คํ ์ ์์ต๋๋ค.
Dataflow: Dask๊ฐ ์์ ๊ฐ์ ์ ์ ํ ์ ๋ณด ์ง๋ ฌํ ๋ฐ ํต์ ์ ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์ Prefect๋ "๋ฐ์ดํฐ ํ๋ฆ"์ ์ผ๊ธ ํจํด์ผ๋ก ์ง์ํ ์ ์์ต๋๋ค.
๋ถ์ฐ ๊ณ์ฐ: Dask๋ ํด๋ฌ์คํฐ์ ์์ ์์๊ฒ ์์ ํ ๋น์ ์ฒ๋ฆฌํ์ฌ ์ฌ์ฉ์๊ฐ ์ต์ํ์ ์ค๋ฒํค๋๋ก ๋ถ์ฐ ๊ณ์ฐ์ ์ด์ ์ ์ฆ์ ์คํํ ์ ์๋๋ก ํฉ๋๋ค.
๋ณ๋ ฌ์ฑ: ํด๋ฌ์คํฐ์์ ์คํํ๋ ๋ก์ปฌ์์ ์คํํ๋ Dask๋ ์ ๋ฐ์์ ๋ณ๋ ฌ ์์ ์คํ์ ์ ๊ณตํฉ๋๋ค.
Task
๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์์ ์ค์ ๋ก ์ํ๋๋ ์์ ๋จ์์ ๋๋ค. Task๋ ๋จ์ผ ์์ ์ ์ํํ๋ฉฐ, ๋ค๋ฅธ Task์ ์์กดํ๋ ๊ฒฝ์ฐ๊ฐ ๋ง์ต๋๋ค.
์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์ ์ถ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ์์ฑํ ์ ์์ผ๋ฉฐ, ์คํ ๊ฐ๋ฅํ ์ฝ๋๋ก ๊ตฌ์ฑ๋ฉ๋๋ค
- ArgumentDescription
nameTask์ ์ด๋ฆ์ ๋๋ค. ์ด๋ฆ์ ์ง์ ํ์ง ์์ผ๋ฉด ํจ์์ ์ด๋ฆ์ด ์ฌ์ฉ๋ฉ๋๋ค.
descriptionTask์ ๋ํ ์ค๋ช ์ ๋๋ค. docstring์์ ๊ฐ์ ธ์ต๋๋ค.
tagsTask์ ๋ํ ํ๊ทธ์ ๋๋ค. ์คํ์ prefect.context.tags์ ์ถ๊ฐ๋ฉ๋๋ค.
cache_key_fnTask ๊ฒฐ๊ณผ๋ฅผ ์บ์ฑํ ๋, ๊ฒฐ๊ณผ๋ฅผ ์บ์ํ๊ธฐ ์ํ ํค๋ฅผ ์์ฑํ๋ ํจ์์ ๋๋ค. ํจ์๋ kwargs๋ฅผ ์ธ์๋ก ๋ฐ์, ๋ฌธ์์ด์ ๋ฐํํฉ๋๋ค.
cache_expirationTask ๊ฒฐ๊ณผ ์บ์์ ๋ง๋ฃ ๊ธฐ๊ฐ์ ๋๋ค.
task_run_nameTask ์คํ์ ์คํ ์ด๋ฆ์ ๋๋ค. ํค์๋ ์ธ์๋ค์ ๋ณ์๋ก ์ฌ์ฉํ์ฌ ์ด๋ฆ์ ์์ฑํฉ๋๋ค.
retriesTask ์คํ ์คํจ์ ์ฌ์๋ ํ์์ ๋๋ค.
retry_delay_secondsTask ์คํ ์คํจ์ ์ฌ์๋๋ฅผ ๋๊ธฐํ๋ ์๊ฐ์ ๋๋ค.
versionTask์ ๋ฒ์ ์ ๋ณด์ ๋๋ค.
from prefect import flow, task @task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1)) def fetch(dataset_url: str) -> pd.DataFrame: """Read taxi data from web into pandas DataFrame""" df = pd.read_csv(dataset_url) return df @flow() def etl_web_to_gcs()->None: """Main ETL function""" color = "yellow" year = 2021 month = 1 dataset_file = f"{color}_tripdata_{year}-{month:02}" dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz" df = fetch(dataset_url) if __name__ == '__main__': etl_web_to_gcs()
Flow

๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ ์ํ๊ณ ์คํํ๊ธฐ์ํ ์ถ์ํ๋ ๊ฐ๋ ์ ๋๋ค.
Prefect Flow๋ ๋จ์ผ ํ์คํฌ ๋๋ ์ฌ๋ฌ ํ์คํฌ๋ก ๊ตฌ์ฑ๋ ์ ์์ผ๋ฉฐ, ์ด๋ฌํ ํ์คํฌ๋ ์ผ๋ จ์ ํ์ด์ฌ ํจ์๋ก ์์ฑ๋ฉ๋๋ค.
Flow๋ ๋ค์ํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ํจํด์ ์ง์ํฉ๋๋ค.
- ArgumentDescription
descriptionFlow์ ๋ํ ๋ฌธ์์ด ์ค๋ช ์ ์ ๋ ฅํฉ๋๋ค. ์ ๋ ฅํ์ง ์์ผ๋ฉด ๋ฐ์ฝ๋ ์ดํธ๋ ํจ์์ docstring์์ ์ค๋ช ์ ๊ฐ์ ธ์ต๋๋ค.
nameFlow์ ์ด๋ฆ์ ์ ๋ ฅํฉ๋๋ค. ์ ๋ ฅํ์ง ์์ผ๋ฉด ํจ์์ ์ด๋ฆ์์ ์ถ๋ก ๋ฉ๋๋ค.
retriesFlow ์คํ ์คํจ ์ ์ฌ์๋ํ ํ์๋ฅผ ์ง์ ํฉ๋๋ค. ๊ธฐ๋ณธ๊ฐ์ 0์ ๋๋ค.
retry_delay_secondsFlow ์คํ ์คํจ ํ ์ฌ์๋ ์ ๋๊ธฐํ ์๊ฐ์ ์ง์ ํฉ๋๋ค. retries๊ฐ 0์ด ์๋ ๊ฒฝ์ฐ์๋ง ์ ์ฉ๋ฉ๋๋ค.
flow_run_nameFlow ์คํ ์ด๋ฆ์ ์ง์ ํฉ๋๋ค. ์ด ์ด๋ฆ์ Flow ๋งค๊ฐ๋ณ์๋ฅผ ๋ณ์๋ก ์ฌ์ฉํ์ฌ ๋ฌธ์์ด ํ ํ๋ฆฟ์ผ๋ก ์ ๊ณตํ ์ ์์ต๋๋ค. ํจ์๋ฅผ ๋ฐํํ๋ ํจ์๋ก ์ ๊ณตํ ์๋ ์์ต๋๋ค.
task_runnerFlow ๋ด์์ task ์คํ์ ์ฌ์ฉํ task runner๋ฅผ ์ ํํฉ๋๋ค. ์ง์ ํ์ง ์์ผ๋ฉด ConcurrentTaskRunner๊ฐ ์ฌ์ฉ๋ฉ๋๋ค.
timeout_secondsFlow ์ต๋ ์คํ ์๊ฐ์ ์ด ๋จ์๋ก ์ง์ ํฉ๋๋ค. ์ง์ ๋ ์๊ฐ์ด ์ด๊ณผ๋๋ฉด Flow๊ฐ ์คํจ๋ก ํ์๋ฉ๋๋ค. Flow ์คํ์ ๋ค์ task๊ฐ ํธ์ถ๋ ๋๊น์ง ๊ณ์๋ฉ๋๋ค.
validate_parametersFlow ๋งค๊ฐ๋ณ์๊ฐ Pydantic์ ํตํด ๊ฒ์ฆ๋๋์ง ์ฌ๋ถ๋ฅผ ์ง์ ํฉ๋๋ค. ๊ธฐ๋ณธ๊ฐ์ True์ ๋๋ค.
versionFlow ๋ฒ์ ์ ์ง์ ํฉ๋๋ค. ์ง์ ํ์ง ์์ผ๋ฉด ๋ํ๋ ํจ์๊ฐ ํฌํจ๋ ํ์ผ์ ํด์ ๊ฐ์ ์ด์ฉํ์ฌ ๋ฒ์ ์ ์์ฑํ๋ ค ์๋ํฉ๋๋ค. ํ์ผ์ ์ฐพ์ ์ ์๋ ๊ฒฝ์ฐ ๋ฒ์ ์ null์ด ๋ฉ๋๋ค.
Blocks
๋ธ๋ก์ ๊ตฌ์ฑ ์ ์ฅ์ ํ์ฑํํ๊ณ ์ธ๋ถ ์์คํ ๊ณผ ์ํธ ์์ฉํ๊ธฐ ์ํ ์ธํฐํ์ด์ค๋ฅผ ์ ๊ณตํ๋ Prefect ๋ด์ ๊ธฐ๋ณธ ์์์ ๋๋ค.
๋ธ๋ก์ ์ฌ์ฉํ๋ฉด AWS, GitHub, Slack ๋ฐ Prefect๋ก ์ค์ผ์คํธ๋ ์ด์ ํ๋ ค๋ ๊ธฐํ ์์คํ ๊ณผ ๊ฐ์ ์๋น์ค๋ก ์ธ์ฆํ๊ธฐ ์ํ ์๊ฒฉ ์ฆ๋ช ์ ์์ ํ๊ฒ ์ ์ฅํ ์ ์์ต๋๋ค.
prefect ๋ด๋ถ์์ Block๋ฅผ ๋๋ฌ ๋ค์ํ ์ปค๋ฅํฐ๋ฅผ ํ์ธํฉ๋๋ค.

Google Cloud Platform ์ฉ Prefect ์ปค๋ฅํฐ๋ฅผ ๋ฑ๋กํฉ๋๋ค.
prefect block register -m prefect_gcp

from prefect import flow, task from prefect_gcp.cloud_storage import GcsBucket @task() def write_gcs(path: Path) -> None: """Upload local parquet file to GCS""" gcs_block = GcsBucket.load("zoom-gcs") gcs_block.upload_from_path(from_path=path, to_path=path) returnPrefect built-in blocks
- BlockSlugDescription
Azure
azureAzure Datalake ๋ฐ Azure Blob Storage์์ ํ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๋ธ๋ก์ ๋๋ค.
Date Time
date-time๋ ์ง ๋ฐ ์๊ฐ์ ๋ํ๋ด๋ ๋ธ๋ก์ ๋๋ค.
Docker Container]
docker-container์ปจํ ์ด๋์์ ๋ช ๋ น์ ์คํํ๋ ๋ธ๋ก์ ๋๋ค.
Docker Registry
docker-registryDocker ๋ ์ง์คํธ๋ฆฌ์ ์ฐ๊ฒฐํ๋ ๋ธ๋ก์ผ๋ก Docker Engine์ ์ฐ๊ฒฐํ ์ ์์ด์ผ ํฉ๋๋ค.
GCS
gcsGoogle Cloud Storage์ ํ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๋ธ๋ก์ ๋๋ค.
GitHub
github๊ณต๊ฐ GitHub ๋ฆฌํฌ์งํ ๋ฆฌ์ ์ ์ฅ๋ ํ์ผ๊ณผ ์ํธ ์์ฉํ๋ ๋ธ๋ก์ ๋๋ค.
JSON
jsonJSON ๋ฐ์ดํฐ๋ฅผ ๋ํ๋ด๋ ๋ธ๋ก์ ๋๋ค.
Kubernetes Cluster Config
kubernetes-cluster-configKubernetes ํด๋ฌ์คํฐ์ ์ํธ ์์ฉํ๊ธฐ ์ํ ๊ตฌ์ฑ์ ์ ์ฅํ๋ ๋ธ๋ก์ ๋๋ค.
Kubernetes Job
kubernetes-jobKubernetes Job์ผ๋ก ๋ช ๋ น์ ์คํํ๋ ๋ธ๋ก์ ๋๋ค.
Local File System
local-file-system๋ก์ปฌ ํ์ผ ์์คํ ์ ํ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๋ธ๋ก์ ๋๋ค
Microsoft Teams Webhook
ms-teams-webhook์ ๊ณต๋ Microsoft Teams ์นํ ์ ์ฌ์ฉํ์ฌ ์๋ฆผ์ ๋ณด๋ด๋ ๋ธ๋ก์ ๋๋ค.
Opsgenie Webhook
opsgenie-webhook์ ๊ณต๋ Opsgenie ์นํ ์ ์ฌ์ฉํ์ฌ ์๋ฆผ์ ๋ณด๋ด๋ ๋ธ๋ก์ ๋๋ค.
Pager Duty Webhook
pager-duty-webhook์ ๊ณต๋ PagerDuty ์นํ ์ ์ฌ์ฉํ์ฌ ์๋ฆผ์ ๋ณด๋ด๋ ๋ธ๋ก์ ๋๋ค.
Process
process์ ํ๋ก์ธ์ค์์ ๋ช ๋ น์ ์คํํ๋ ๋ธ๋ก์ ๋๋ค.
Remote File System
remote-file-system์๊ฒฉ ํ์ผ ์์คํ ์ ํ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๋ธ๋ก์ผ๋ก fsspec์์ ์ง์ํ๋ ๋ชจ๋ ์๊ฒฉ ํ์ผ ์์คํ ์ ์ง์ํฉ๋๋ค..
S3
s3AWS S3์ ํ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๋ธ๋ก์ ๋๋ค.
Secret
secret๋น๋ฐ ๊ฐ์ผ๋ก์ ๋ก๊ทธ๋๊ฑฐ๋ UI์ ํ์๋ ๋ ๊ฐ๋ ค์ง๋ ๋ธ๋ก์ ๋๋ค.
Slack Webhook
slack-webhook์ ๊ณต๋ Slack ์นํ ์ ์ฌ์ฉํ์ฌ ์๋ฆผ์ ๋ณด๋ด๋ ๋ธ๋ก์ ๋๋ค.
SMB
smbSMB ๊ณต์ ์ ํ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๋ธ๋ก์ ๋๋ค.
String
string๋ฌธ์์ด ๋ฐ์ดํฐ๋ฅผ ๋ํ๋ด๋ ๋ธ๋ก์ ๋๋ค.
Twilio SMS
twilio-smsTwilio SMS๋ฅผ ํตํด ์๋ฆผ์ ๋ณด๋ด๋ ๋ธ๋ก์ ๋๋ค.
Webhook
webhook์นํ ์ ํธ์ถํ๋ ๋ธ๋ก์ ๋๋ค.
Deployment

deployment๋ ์คํธ๋ฆผ์ ์บก์ํํ๊ณ API๋ฅผ ํตํด ์ผ์ ์ ์์ฝํ๊ฑฐ๋ ์์ํ ์ ์๋ ์๋ฒ ์ธก ์ํฐํฉํธ์ ๋๋ค. flow๋ ์ฌ๋ฌ Deployment์ ์ํ ์ ์์ผ๋ฉฐ ํ๋ก๊ทธ๋๋ฐํ๋ ๋ฐ ํ์ํ ๋ชจ๋ ๊ฒ์ด ํฌํจ๋ ๋ฉํ๋ฐ์ดํฐ๊ฐ ์๋ ์ปจํ ์ด๋๋ผ๊ณ ๋งํ ์ ์์ต๋๋ค. ๋ช ๋ น์ค์ด๋ Python์ผ๋ก ๋ง๋ค ์ ์์ต๋๋ค.
Prefect ์ํฌํ๋ก์ ๋ํ ๋ฐฐํฌ ์์ฑ์ Prefect API๋ฅผ ํตํด ์ํฌํ๋ก๋ฅผ ๊ด๋ฆฌํ๊ณ Prefect ์์ด์ ํธ์์ ์๊ฒฉ์ผ๋ก ์คํํ ์ ์๋๋ก ์ํฌํ๋ก ์ฝ๋, ์ค์ ๋ฐ ์ธํ๋ผ ๊ตฌ์ฑ์ ํจํค์งํ๋ ๊ฒ์ ์๋ฏธํฉ๋๋ค.
Prefect CLI ๋๋ UI๋ฅผ ์ฌ์ฉํ์ฌ ์์ฑ, ์ ๋ฐ์ดํธ, ์ค์ง ๋ฐ ์ญ์ ํ ์ ์์ต๋๋ค. Deployment์ ๋ํ ์ํ ๋ฐ ๋ก๊ทธ ์ ๋ณด๋ Prefect Cloud ๋๋ Prefect Server UI์์ ๋ณผ ์ ์์ผ๋ฉฐ, ์ํ๋ ๊ฒฝ์ฐ ๋ค์ด๋ก๋ํ์ฌ ๊ฒ์ฌํ ์ ์์ต๋๋ค.
Deployment๋ฅผ ์ฌ์ฉํ๋ฉด Flow๋ฅผ ์ฝ๊ฒ ์คํํ๊ณ ๊ด๋ฆฌํ ์ ์์ผ๋ฉฐ, ์ฝ๋ ๋ณ๊ฒฝ์ด๋ ์ ๋ฐ์ดํธ๋ฅผ ์ฝ๊ฒ ๋ฐ์ํ ์ ์์ต๋๋ค.
prefect deployment build parameterized_flows.py:etl_parent_flow -n "Parameterized ETL"
๋ฐฐํฌ ๊ฒฐ๊ณผ๋ก yaml ํ์ผ์ด ์์ฑ๋์์ต๋๋ค.



Refereence
https://docs.prefect.io/latest/
https://examples.dask.org/applications/prefect-etl.html
Last updated

