embulk

Embulk & Digdag

Embulk

  • ์ผ๋ฐ˜์ ์ธ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์›Œํฌํ”Œ๋กœ์šฐ

    • ์ˆ˜์ง‘(Ingest/Collect) - ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋กœ๊ทธ, ์œ ์ € ์†์„ฑ ์ •๋ณด, ๊ด‘๊ณ ์˜ ์ธ์ƒ, ์„œ๋“œํŒŒ์น˜์ฟ ํ‚ค

    • ์ „์ฒ˜๋ฆฌ(Enrich) - ๋ด‡์˜ ์•ก์„ธ์Šค ๋กœ๊ทธ ์ œ์™ธ. IP ์ฃผ์†Œ๋กœ ์œ„์น˜ ์ •๋ณด ์ถ”๊ฐ€, user-agent์˜ ๊ตฌ์กฐํ™”, ๋งˆ์Šคํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๋กœ๊ทธ์— ์œ ์ € ์†์„ฑ ์ถ”๊ฐ€

    • ๋ถ„๋ฅ˜, ์ง‘๊ณ„, ๋ถ„์„(Model) - ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ถ”๊ฐ€, ๋ถ„์„ ์ฒ˜๋ฆฌ ์‹œ์Šคํ…œ์œผ๋กœ ์ „์†ก, ์••์ถ•ํ•ด์„œ ์Šคํ† ๋ฆฌ์ง€์— ์ €์žฅ(์•„์นด์ด๋ธŒ), ํ†ต๊ณ„ ๋ฐ์ดํ„ฐ๋กœ ๊ธฐ๋ก

    • ํ™œ์šฉ(Utilize) - ์ถ”์ฒœ ์—”์ง„ API์˜ ์ฐธ์กฐ ๋ฐ์ดํ„ฐ, ์‹ค์‹œ๊ฐ„ ๊ฑฐ๋ž˜, BI ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‚ฌ์šฉํ•œ ์‹œ๊ฐํ™”

  • Fluentd๋Š” ์ค€์‹ค์‹œ๊ฐ„์˜ ๋กœ๊ทธ ํ™œ์šฉ์„ ํ•  ๋•Œ ์ฒซ ์žฅ์• ๋ฌผ์ธ ๋กœ๊ทธ ์ˆ˜์ง‘์˜ ๊ณผ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด ๊ฐœ๋ฐœ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๊ฐ์ข… ๋ฐ์ดํ„ฐ์˜ ์ž…๋ ฅ๊ณผ ์ˆ˜์ง‘์„ Input ํ”Œ๋Ÿฌ๊ทธ์ธ์œผ๋กœ ์ง€์›ํ•˜๊ณ , ๋ฐ์ดํ„ฐ ๊ฐ€๊ณต์„ Filter ํ”Œ๋Ÿฌ๊ทธ์ธ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋ฉฐ, Output ํ”Œ๋Ÿฌ๊ทธ์ธ์œผ๋กœ ์—ฌ๋Ÿฌ ๊ฐ€์ง€ ๋ฏธ๋“ค์›จ์–ด๋‚˜ ์Šคํ† ๋ฆฌ์ง€๋กœ ์ €์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • 2๊ฐ€์ง€ ๊ณผ์ œ

      • Fluentd์˜ ์„ค์ • ํŒŒ์ผ์˜ ๊ฑฐ๋Œ€ํ™”์™€ ๊ฐ€๋” ์ผ์–ด๋‚˜๋Š” ์‚ฌ์—…์ ์ธ ์‚ฌ์–‘ ๋ณ€๊ฒฝ์— ๋”ฐ๋ผ ์„ค์ • ํŒŒ์ผ์˜ ๋ผ์ธ์ˆ˜๊ฐ€ ์ฆ๊ฐ€ํ•˜์—ฌ ์œ ์ง€๋ณด์ˆ˜๊ฐ€ ํž˜๋“ค๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

      • ์ŠคํŠธ๋ฆฌ๋ฐ ์ฒ˜๋ฆฌ์— ํŠนํ™”๋œ Fluentd๋Š” ์ •๊ธฐ์ ์œผ๋กœ ๋ฒŒํฌ๋กœ๋“œ๋ฅผ ํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ๋งŒ๋“ค๊ธฐ์—๋Š” ์ ํ•ฉํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

  • ๋ฐ์ดํ„ฐ ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ์ง€์›ํ•˜๋Š” ๋„๊ตฌ

    • ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ปฌ๋ ‰ํ„ฐ - fluentd - ์•ก์„ธ์Šค ๋กœ๊ทธ / ์•ฑ๋กœ๊ทธ / ์„œ๋ฒ„๋กœ๊ทธ

    • ๋ฒŒํฌ ๋ฐ์ดํ„ฐ ๋กœ๋” - embulk - csvํŒŒ์ผ/ S3 / MySQL / PostgreWQL ๋“ฑ

    • ์›Œํฌํ”Œ๋กœ์šฐ ๊ด€๋ฆฌ - digdag - ETL ์ฒ˜๋ฆฌ์˜ ์ž๋™ํ™”

  • Embulk

    • image

    • Embulk๋Š” Fluentd์™€ ๊ฐ™์ด Input/Filter/Output ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ์กฐํ•ฉํ•ด์„œ ์„ค์ • ํŒŒ์ผ์„ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค. ๋ณ‘๋ ฌ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ์— ๋Œ€์‘ํ•œ ์„ฑ๋Šฅ๊ณผ ์žฌ์‹œ๋„ ์ œ์–ด ๋“ฑ์— ์•ˆ์ •์„ฑ์ด ์šฐ์ˆ˜ํ•œ ๋ฐ์ดํ„ฐ ์ „์†ก ํŒŒ์ดํ”„๋ผ์ธ์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • ๋‹ค๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ํšจ์œจ์ ์œผ๋กœ ์ฝ์–ด์„œ CPU ์ฝ”์–ด๋ฅผ ์ตœ๋Œ€ํ•œ ์‚ฌ์šฉํ•ด์„œ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ ํŠนํ™”๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

    • ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ์Šคํ† ๋ฆฌ์ง€์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์„œ, ์ž„์˜์˜ ์ฒ˜๋ฆฌ๋ฅผ ํ•œ ๋’ค์— ๋‹ค๋ฅธ ๋ณด๊ด€ ์žฅ์†Œ๋กœ ๋ณด๋‚ด๋Š” ๋ฐ์ดํ„ฐ์˜ ๋Œ€์šฉ๋Ÿ‰์ฒ˜๋ฆฌ์— ํŠนํ™”๋œ ETL ์ฒ˜๋ฆฌ ๋„๊ตฌ

    • Fluentd์™€ ๋‹ค๋ฅธ ํŠน์ง•์œผ๋กœ ๊ณ ์†์„ฑ๊ณผ ํŠธ๋žœ์žญ์…˜ ์ œ์–ด, ์Šคํ‚ค๋งˆ๋ฅผ ์‚ฌ์šฉํ•œ ๋ฐ์ดํ„ฐ์˜ ๊ฒ€์‚ฌ ๊ธฐ๋Šฅ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

    • ๋Œ€์šฉ๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋งˆ์ดํฌ๋กœ ๋ฐฐ์น˜์ฒ˜๋ฆฌ๋กœ Redshift, BigQuery, Elasticsearch๋กœ ์ €์žฅํ•˜๋Š” ๊ฒฝ์šฐ๋ผ๋ฉด Fluentd ๋ณด๋‹ค๋Š” ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ, ์ฒ˜๋ฆฌ๋Ÿ‰, ์ €์žฅํƒ€์ด๋ฐ์„ ์ž์œ ๋กญ๊ฒŒ ์ปจํŠธ๋กคํ•  ์ˆ˜ ์žˆ๋Š” Embulk๋กœ ์ €์žฅํ•˜๋Š” ํŽธ์ด ํ™•์‹คํžˆ ์•ˆ์ •์ ์ž…๋‹ˆ๋‹ค.

Digdag

  • image

  • Digdag๋Š” ์›Œํฌํ”Œ๋กœ์šฐ์˜ ์ •์˜๋ฅผ ์„ค์ • ํŒŒ์ผ๋กœ ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. Embulk์™€ ์ž„์˜์˜ ์…ธ์Šคํฌ๋ฆฝํŠธ์— ์ž„์˜์˜ ๋ณ€์ˆ˜๋ฅผ ๋„ฃ์–ด๊ฐ€๋ฉฐ, ์˜์กด ๊ด€๊ณ„์ˆœ์œผ๋กœ ์ง๋ ฌ ๋ฐ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋กœ Job์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ์›Œํฌํ”Œ๋กœ์šฐ๊ด€๋ฆฌ ๋„๊ตฌ๋กœ์„œ ETL ์ฒ˜๋ฆฌ์˜ ์ž๋™ํ™”์— ๋„์›€์ด ๋ฉ๋‹ˆ๋‹ค.

  • ์—ฌ๋Ÿฌ ๋‹จ๊ณ„์—์„œ์˜ ์ฒ˜๋ฆฌ์˜ ์˜์กด๊ด€๊ณ„์™€ ์ˆœ์„œ, ๋ณ‘๋ ฌ์‹คํ–‰ ๋“ฑ์„ ํ”„๋กœ๊ทธ๋žจ ๊ฐ€๋Šฅํ•œ YAML ์„ค์ • ํŒŒ์ผ์„ ํ†ตํ•ด ์ œ์–ดํ•  ์ˆ˜ ์žˆ๋Š” ์•„ํ‚คํ…์ฒ˜

  • ์—ฌ๋Ÿฌ ๊ฐœ์˜ ๋ฐ์ดํ„ฐ์†Œ์Šค๋กœ๋ถ€ํ„ฐ ๋ณ‘๋ ฌ ๋˜๋Š” ์ง๋ ฌ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ณ , ๋‚ ์งœ๋ณ„๋กœ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค๊ณ  ์ €์žฅํ•˜๋ฉฐ ์ง€์†์ ์ธ 1์ฐจ ์ง‘๊ณ„๋ฅผ ํ•œ ๋’ค์— ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ์ €์žฅํ•˜๋Š” ์ฒ˜๋ฆฌ๋ฅผ ์ง๊ด€์ ์œผ๋กœ ์„ค์ • ํŒŒ์ผ์— ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๊ธฐ๋ณธ๊ธฐ๋Šฅ

    • ์ž‘์—…์„ ์˜์กด๊ด€๊ณ„์ˆœ์œผ๋กœ ์‹คํ–‰

    • ๊ณผ๊ฑฐ๋ถ„์˜ ์ผ๊ด„์‹คํ–‰(backfill)

    • ์ •๊ธฐ ์‹คํ–‰

    • ์‹œ๊ฐ„ ๋“ฑ์˜ ๋ณ€์ˆ˜๋ฅผ ํฌํ•จํ•ด์„œ ์‹คํ–‰

    • ํŒŒ์ผ์ด ์ƒ์„ฑ๋˜๋ฉด ์‹คํ–‰

  • ์—๋Ÿฌ์ฒ˜๋ฆฌ

    • ์‹คํŒจํ•˜๋ฉด ํ†ต์ง€

    • ์‹คํŒจํ•œ ์œ„์น˜์—์„œ ์žฌ์‹œ์ž‘

  • ์ƒํƒœ ๊ฐ์‹œ

    • ์‹คํ–‰ ์‹œ๊ฐ„์ด ์ผ์ • ์ด์ƒ์ด๋ฉด ํ†ต์ง€

    • ์ž‘์—…์˜ ์‹คํ–‰์‹œ๊ฐ„์˜ ์‹œ๊ฐํ™”

    • ์‹คํ–‰ ๋กœ๊ทธ์˜ ์ˆ˜์ง‘๊ณผ ์ €์žฅ

  • ๊ณ ์†ํ™”

    • ์ž‘์—…์„ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰

    • ๋™์‹œ ์‹คํ–‰ ์ž‘์—… ๊ฐœ์ˆ˜์˜ ์ œ์–ด

  • ๊ฐœ๋ฐœ์ง€์›

  • ์›Œํฌํ”Œ๋กœ์šฐ์˜ ๋ฒ„์ „ ๊ด€๋ฆฌ

    • GUI๋กœ ์›Œํฌํ”Œ๋กœ์šฐ ๊ฐœ๋ฐœ

    • ์ •๊ธฐ์ฒ˜๋ฆฌ๋ฅผ ๊ฐ„๋‹จํ•˜๊ฒŒ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

    • Docker ์ด๋ฏธ์ง€๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ž‘์—… ์‹คํ–‰

  • Digdag์—๋Š” ์Šค์ผ€์ค„๋Ÿฌ๋ฅผ ๋‚ด์žฅํ•˜๊ณ  ์žˆ์–ด์„œ ๋ฐ๋ชฌ์œผ๋กœ ๋™์ž‘ํ•˜๋Š” ์„œ๋ฒ„ ๋ชจ๋“œ์™€ ์ปค๋งจ๋“œ๋ผ์ธ์—์„œ ์ž„์˜๋กœ ์‹คํ–‰ํ•˜๋Š” ๋กœ์ปฌ ๋ชจ๋“œ์˜ 2๊ฐ€์ง€๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์„œ๋ฒ„ ๋ชจ๋“œ์—์„œ๋Š” ๊ธฐ๋ฐ€ ์ •๋ณด์™€ ํด๋ฆฌ์‹œ ํŒŒ์ผ์„ ์ปค๋งจ๋“œ๋ผ์ธ์œผ๋กœ ๋“ฑ๋กํ•ฉ๋‹ˆ๋‹ค.

  • Digdag์„ ์‹คํ–‰ํ•˜๋Š” ์ปค๋งจ๋“œ

    • digdag run stage1_load_assets.dig

  • ๋ณ‘๋ ฌ ์‹คํ–‰ ์ˆ˜์˜ ์ƒํ•œ์„ ์„ค์ •ํ•˜์—ฌ Digdag์˜ ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    • digdag run example.dig --max-task-threads 4

  • ํŽธ๋ฆฌํ•œ ์˜คํผ๋ ˆ์ดํ„ฐ

    • ํŒŒ์ผ์ด ๋‚˜ํƒ€๋‚  ๋•Œ๊นŒ์ง€ ๊ณ„์†ํ•˜๋Š” s3_wait>:์™€ gcs_wait>:๋ผ๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ

    • AWS S3์— ํŒŒ์ผ์ด ์ƒ์„ฑ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ธ๋‹ค๊ฐ€ ์ƒ๊ธฐ๋ฉด ๋‹ค์Œ ํƒœ์Šคํฌ๋กœ ๊ฐ€์„œ Redshift์—์„œ ๊ฐ€์ ธ์˜ค๋Š” Digdag์˜ ์„ค์ •. BigQuery์—์„œ ๊ฐ€์ ธ์˜ค๋Š” ๊ฒƒ๋„ bq_load>: ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • rb>: ์˜คํผ๋ ˆ์ดํ„ฐ์™€ py>: ์˜คํผ๋ ˆ์ดํ„ธ๋ฅด ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š” ๊ฒƒ ์™ธ์—๋„ ๋ฐ์ดํ„ฐ์˜ ๊ฐ€๊ณต๋„ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • $ cat tasks/__init__.py
      # coding:utf-8
      import digdag
      import pandas as pd
      
      class Convert(object):
        def __init(self):
          pass
      
        def transform(self, session_time = None, query_result=' 0'):
          input = pd.read_csv(digdag.env.params['input_csv']
          user_info = pd.read_csv(digdag.env.params['user_info']))
          combined = pd.merge(input, user_info, how='left', on=['id'])
          combined.to_csv(digdag.env.params['output_csv'], index=False, encoding='utf-8')
      
      # Python์œผ๋กœ ๋ฐ์ดํ„ฐ ๊ฐ€๊ณต ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋Š” ์˜ˆ์ œ
      $ cat conver_csv.dig
      _export : 
        input_csv: path/to/input.csv
        output_csv: path/to/output.csv
        user_info: path/to/users.csv
        td:
          database: www_access
      
      +extract:
        td>: queries/sample_query.sql
        download_file: ${input_csv}
      
      +transform:
        py>: Convert.transform
      
      +load:
        # ๋‚จ์€ csv ํŒŒ์ผ์„ ์ „์†กํ•จ(Embulk ์„ค์ •์€ ์ƒ๋žต)
        sh>: embulk run embulk_load_csv_bigquery.yml.liquid

Reference

  • http://www.yes24.com/Product/Goods/64464997

  • https://github.com/treasure-data/embulk

  • https://github.com/treasure-data/digdag

Last updated