embulk

Embulk & Digdag

Embulk

  • 일반적인 데이터 처리 μ›Œν¬ν”Œλ‘œμš°

    • μˆ˜μ§‘(Ingest/Collect) - μ• ν”Œλ¦¬μΌ€μ΄μ…˜ 둜그, μœ μ € 속성 정보, κ΄‘κ³ μ˜ 인상, μ„œλ“œνŒŒμΉ˜μΏ ν‚€

    • μ „μ²˜λ¦¬(Enrich) - λ΄‡μ˜ μ•‘μ„ΈμŠ€ 둜그 μ œμ™Έ. IP μ£Όμ†Œλ‘œ μœ„μΉ˜ 정보 μΆ”κ°€, user-agent의 ꡬ쑰화, λ§ˆμŠ€ν„° 데이터λ₯Ό μ‚¬μš©ν•΄μ„œ λ‘œκ·Έμ— μœ μ € 속성 μΆ”κ°€

    • λΆ„λ₯˜, 집계, 뢄석(Model) - λ°μ΄ν„°λ² μ΄μŠ€μ— μΆ”κ°€, 뢄석 처리 μ‹œμŠ€ν…œμœΌλ‘œ 전솑, μ••μΆ•ν•΄μ„œ μŠ€ν† λ¦¬μ§€μ— μ €μž₯(μ•„μΉ΄μ΄λΈŒ), 톡계 λ°μ΄ν„°λ‘œ 기둝

    • ν™œμš©(Utilize) - μΆ”μ²œ μ—”μ§„ API의 μ°Έμ‘° 데이터, μ‹€μ‹œκ°„ 거래, BI μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μ‚¬μš©ν•œ μ‹œκ°ν™”

  • FluentdλŠ” μ€€μ‹€μ‹œκ°„μ˜ 둜그 ν™œμš©μ„ ν•  λ•Œ 첫 μž₯애물인 둜그 μˆ˜μ§‘μ˜ 과제λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄ κ°œλ°œλ˜μ—ˆμŠ΅λ‹ˆλ‹€. 각쒅 λ°μ΄ν„°μ˜ μž…λ ₯κ³Ό μˆ˜μ§‘μ„ Input ν”ŒλŸ¬κ·ΈμΈμœΌλ‘œ μ§€μ›ν•˜κ³ , 데이터 가곡을 Filter ν”ŒλŸ¬κ·ΈμΈμœΌλ‘œ μ²˜λ¦¬ν•˜λ©°, Output ν”ŒλŸ¬κ·ΈμΈμœΌλ‘œ μ—¬λŸ¬ κ°€μ§€ λ―Έλ“€μ›¨μ–΄λ‚˜ μŠ€ν† λ¦¬μ§€λ‘œ μ €μž₯ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

    • 2κ°€μ§€ 과제

      • Fluentd의 μ„€μ • 파일의 κ±°λŒ€ν™”μ™€ 가끔 μΌμ–΄λ‚˜λŠ” 사업적인 사양 변경에 따라 μ„€μ • 파일의 λΌμΈμˆ˜κ°€ μ¦κ°€ν•˜μ—¬ μœ μ§€λ³΄μˆ˜κ°€ νž˜λ“€κ²Œ λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

      • 슀트리밍 μ²˜λ¦¬μ— νŠΉν™”λœ FluentdλŠ” μ •κΈ°μ μœΌλ‘œ λ²Œν¬λ‘œλ“œλ₯Ό ν•˜μ—¬ 데이터 처리 μ›Œν¬ν”Œλ‘œμš°λ₯Ό λ§Œλ“€κΈ°μ—λŠ” μ ν•©ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

  • 데이터 μ›Œν¬ν”Œλ‘œμš°λ₯Ό μ§€μ›ν•˜λŠ” 도ꡬ

    • 슀트리밍 데이터 컬렉터 - fluentd - μ•‘μ„ΈμŠ€ 둜그 / μ•±λ‘œκ·Έ / μ„œλ²„λ‘œκ·Έ

    • 벌크 데이터 λ‘œλ” - embulk - csv파일/ S3 / MySQL / PostgreWQL λ“±

    • μ›Œν¬ν”Œλ‘œμš° 관리 - digdag - ETL 처리의 μžλ™ν™”

  • Embulk

    • image

    • EmbulkλŠ” Fluentd와 같이 Input/Filter/Output ν”ŒλŸ¬κ·ΈμΈμ„ μ‘°ν•©ν•΄μ„œ μ„€μ • νŒŒμΌμ„ μ •μ˜ν•©λ‹ˆλ‹€. 병렬 λΆ„μ‚° μ²˜λ¦¬μ— λŒ€μ‘ν•œ μ„±λŠ₯κ³Ό μž¬μ‹œλ„ μ œμ–΄ 등에 μ•ˆμ •μ„±μ΄ μš°μˆ˜ν•œ 데이터 전솑 νŒŒμ΄ν”„λΌμΈμ„ λ§Œλ“€ 수 μžˆμŠ΅λ‹ˆλ‹€.

    • λ‹€λŸ‰μ˜ 데이터λ₯Ό 효율적으둜 μ½μ–΄μ„œ CPU μ½”μ–΄λ₯Ό μ΅œλŒ€ν•œ μ‚¬μš©ν•΄μ„œ 배치 μ²˜λ¦¬ν•˜λŠ” 데 νŠΉν™”λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€.

    • λ°μ΄ν„°λ² μ΄μŠ€μ™€ μŠ€ν† λ¦¬μ§€μ—μ„œ 데이터λ₯Ό μ½μ–΄μ„œ, μž„μ˜μ˜ 처리λ₯Ό ν•œ 뒀에 λ‹€λ₯Έ 보관 μž₯μ†Œλ‘œ λ³΄λ‚΄λŠ” λ°μ΄ν„°μ˜ λŒ€μš©λŸ‰μ²˜λ¦¬μ— νŠΉν™”λœ ETL 처리 도ꡬ

    • Fluentd와 λ‹€λ₯Έ νŠΉμ§•μœΌλ‘œ 고속성과 νŠΈλžœμž­μ…˜ μ œμ–΄, μŠ€ν‚€λ§ˆλ₯Ό μ‚¬μš©ν•œ λ°μ΄ν„°μ˜ 검사 κΈ°λŠ₯이 μžˆμŠ΅λ‹ˆλ‹€.

    • λŒ€μš©λŸ‰μ˜ 데이터λ₯Ό 마이크둜 배치처리둜 Redshift, BigQuery, Elasticsearch둜 μ €μž₯ν•˜λŠ” 경우라면 Fluentd λ³΄λ‹€λŠ” λ³‘λ ¬μ²˜λ¦¬, μ²˜λ¦¬λŸ‰, μ €μž₯타이밍을 자유둭게 μ»¨νŠΈλ‘€ν•  수 μžˆλŠ” Embulk둜 μ €μž₯ν•˜λŠ” 편이 ν™•μ‹€νžˆ μ•ˆμ •μ μž…λ‹ˆλ‹€.

Digdag

  • image

  • DigdagλŠ” μ›Œν¬ν”Œλ‘œμš°μ˜ μ •μ˜λ₯Ό μ„€μ • 파일둜 ν•˜κ³  μžˆμŠ΅λ‹ˆλ‹€. Embulk와 μž„μ˜μ˜ μ…ΈμŠ€ν¬λ¦½νŠΈμ— μž„μ˜μ˜ λ³€μˆ˜λ₯Ό λ„£μ–΄κ°€λ©°, 의쑴 κ΄€κ³„μˆœμœΌλ‘œ 직렬 및 병렬 처리둜 Job을 μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • μ›Œν¬ν”Œλ‘œμš°κ΄€λ¦¬ λ„κ΅¬λ‘œμ„œ ETL 처리의 μžλ™ν™”μ— 도움이 λ©λ‹ˆλ‹€.

  • μ—¬λŸ¬ λ‹¨κ³„μ—μ„œμ˜ 처리의 μ˜μ‘΄κ΄€κ³„μ™€ μˆœμ„œ, 병렬싀행 등을 ν”„λ‘œκ·Έλž¨ κ°€λŠ₯ν•œ YAML μ„€μ • νŒŒμΌμ„ 톡해 μ œμ–΄ν•  수 μžˆλŠ” μ•„ν‚€ν…μ²˜

  • μ—¬λŸ¬ 개의 λ°μ΄ν„°μ†ŒμŠ€λ‘œλΆ€ν„° 병렬 λ˜λŠ” 직렬둜 데이터λ₯Ό 읽고, λ‚ μ§œλ³„λ‘œ ν…Œμ΄λΈ”μ„ λ§Œλ“€κ³  μ €μž₯ν•˜λ©° 지속적인 1μ°¨ 집계λ₯Ό ν•œ 뒀에 κ·Έ κ²°κ³Όλ₯Ό μ €μž₯ν•˜λŠ” 처리λ₯Ό μ§κ΄€μ μœΌλ‘œ μ„€μ • νŒŒμΌμ— μ„€μ •ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • κΈ°λ³ΈκΈ°λŠ₯

    • μž‘μ—…μ„ μ˜μ‘΄κ΄€κ³„μˆœμœΌλ‘œ μ‹€ν–‰

    • κ³Όκ±°λΆ„μ˜ 일괄싀행(backfill)

    • μ •κΈ° μ‹€ν–‰

    • μ‹œκ°„ λ“±μ˜ λ³€μˆ˜λ₯Ό ν¬ν•¨ν•΄μ„œ μ‹€ν–‰

    • 파일이 μƒμ„±λ˜λ©΄ μ‹€ν–‰

  • μ—λŸ¬μ²˜λ¦¬

    • μ‹€νŒ¨ν•˜λ©΄ 톡지

    • μ‹€νŒ¨ν•œ μœ„μΉ˜μ—μ„œ μž¬μ‹œμž‘

  • μƒνƒœ κ°μ‹œ

    • μ‹€ν–‰ μ‹œκ°„μ΄ 일정 이상이면 톡지

    • μž‘μ—…μ˜ μ‹€ν–‰μ‹œκ°„μ˜ μ‹œκ°ν™”

    • μ‹€ν–‰ 둜그의 μˆ˜μ§‘κ³Ό μ €μž₯

  • 고속화

    • μž‘μ—…μ„ λ³‘λ ¬λ‘œ μ‹€ν–‰

    • λ™μ‹œ μ‹€ν–‰ μž‘μ—… 개수의 μ œμ–΄

  • κ°œλ°œμ§€μ›

  • μ›Œν¬ν”Œλ‘œμš°μ˜ 버전 관리

    • GUI둜 μ›Œν¬ν”Œλ‘œμš° 개발

    • μ •κΈ°μ²˜λ¦¬λ₯Ό κ°„λ‹¨ν•˜κ²Œ μ‹€ν–‰ν•  수 μžˆλŠ” 라이브러리

    • Docker 이미지λ₯Ό μ‚¬μš©ν•΄μ„œ μž‘μ—… μ‹€ν–‰

  • Digdagμ—λŠ” μŠ€μΌ€μ€„λŸ¬λ₯Ό λ‚΄μž₯ν•˜κ³  μžˆμ–΄μ„œ 데λͺ¬μœΌλ‘œ λ™μž‘ν•˜λŠ” μ„œλ²„ λͺ¨λ“œμ™€ μ»€λ§¨λ“œλΌμΈμ—μ„œ μž„μ˜λ‘œ μ‹€ν–‰ν•˜λŠ” 둜컬 λͺ¨λ“œμ˜ 2κ°€μ§€κ°€ μžˆμŠ΅λ‹ˆλ‹€. μ„œλ²„ λͺ¨λ“œμ—μ„œλŠ” κΈ°λ°€ 정보와 ν΄λ¦¬μ‹œ νŒŒμΌμ„ μ»€λ§¨λ“œλΌμΈμœΌλ‘œ λ“±λ‘ν•©λ‹ˆλ‹€.

  • Digdag을 μ‹€ν–‰ν•˜λŠ” μ»€λ§¨λ“œ

    • digdag run stage1_load_assets.dig

  • 병렬 μ‹€ν–‰ 수의 μƒν•œμ„ μ„€μ •ν•˜μ—¬ Digdag의 μ›Œν¬ν”Œλ‘œμš°λ₯Ό μ‹€ν–‰ν•©λ‹ˆλ‹€.

    • digdag run example.dig --max-task-threads 4

  • νŽΈλ¦¬ν•œ μ˜€νΌλ ˆμ΄ν„°

    • 파일이 λ‚˜νƒ€λ‚  λ•ŒκΉŒμ§€ κ³„μ†ν•˜λŠ” s3_wait>:와 gcs_wait>:λΌλŠ” μ˜€νΌλ ˆμ΄ν„°

    • AWS S3에 파일이 생성될 λ•ŒκΉŒμ§€ κΈ°λ‹€λ Έλ‹€κ°€ 생기면 λ‹€μŒ νƒœμŠ€ν¬λ‘œ κ°€μ„œ Redshiftμ—μ„œ κ°€μ Έμ˜€λŠ” Digdag의 μ„€μ •. BigQueryμ—μ„œ κ°€μ Έμ˜€λŠ” 것도 bq_load>: μ˜€νΌλ ˆμ΄ν„°λ₯Ό μ‚¬μš©ν•˜μ—¬ λ§Œλ“€ 수 μžˆμŠ΅λ‹ˆλ‹€.

    • rb>: μ˜€νΌλ ˆμ΄ν„°μ™€ py>: μ˜€νΌλ ˆμ΄ν„Έλ₯΄ μ‚¬μš©ν•˜λ©΄ 데이터λ₯Ό μ½λŠ” 것 외에도 λ°μ΄ν„°μ˜ 가곡도 ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

    • $ cat tasks/__init__.py
      # coding:utf-8
      import digdag
      import pandas as pd
      
      class Convert(object):
        def __init(self):
          pass
      
        def transform(self, session_time = None, query_result=' 0'):
          input = pd.read_csv(digdag.env.params['input_csv']
          user_info = pd.read_csv(digdag.env.params['user_info']))
          combined = pd.merge(input, user_info, how='left', on=['id'])
          combined.to_csv(digdag.env.params['output_csv'], index=False, encoding='utf-8')
      
      # Python으둜 데이터 가곡 처리λ₯Ό ν•˜λŠ” 예제
      $ cat conver_csv.dig
      _export : 
        input_csv: path/to/input.csv
        output_csv: path/to/output.csv
        user_info: path/to/users.csv
        td:
          database: www_access
      
      +extract:
        td>: queries/sample_query.sql
        download_file: ${input_csv}
      
      +transform:
        py>: Convert.transform
      
      +load:
        # 남은 csv νŒŒμΌμ„ 전솑함(Embulk 섀정은 μƒλž΅)
        sh>: embulk run embulk_load_csv_bigquery.yml.liquid

Reference

  • http://www.yes24.com/Product/Goods/64464997

  • https://github.com/treasure-data/embulk

  • https://github.com/treasure-data/digdag

Last updated