spark_optimization
์ต์ ํ ํฌ์ธํธ
Spark ์๋ ์ต์ ํ ๊ธฐ๋ฅ๋ค(optimizer) ์ ๊ฐ์ถ๊ณ ์์ต๋๋ค.
1.x ๋ฒ์ ์์๋ Rule-Based Optimizer๋ง ๊ฐ๊ณ ์์์ต๋๋ค.
2.x ๋ฒ์ ์์ Cost-Based Optimizer๊ฐ ์ถ๊ฐ๋์์ต๋๋ค.
3.x ๋ฒ์ ์๋ Adaptive Query Execution(AQE) ์ถ๊ฐ๋์์ต๋๋ค.
AQE(Adaptive Query Execution)
- https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
์คํ ์๊ฐ ๋์ ์ฟผ๋ฆฌ์ ์คํ ๊ณํ์ ๋์ ์ผ๋ก ์ฌ์กฐ์ ํ๊ณ ์ต์ ํํฉ๋๋ค. AQE๋ ์คํ ๊ณํ์ ์ต์ ํํ๊ธฐ ์ํด ๋ฐํ์ ํต๊ณ๋ฅผ ํ์ฉํฉ๋๋ค.
๋์ ์ ํ ํํฐ์ ํตํฉ(Dynamically coalescing shuffle partitions)
์ ํ ์ดํ์ ์์ฑ๋ ํํฐ์ ์ ํฌ๊ธฐ๊ฐ ๋๋ฌด ์๊ฑฐ๋ ๋ถ๊ท ํ์ ์ธ ๊ฒฝ์ฐ, AQE๋ ์ด๋ฌํ ํํฐ์ ๋ค์ ๋์ ์ผ๋ก ํตํฉํ์ฌ ํํฐ์ ์๋ฅผ ์ค์ ๋๋ค. ์ด๋ฅผ ํตํด ์๋์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ง ํํฐ์ ์ ๋ํ ์ค๋ฒํค๋๋ฅผ ์ค์ด๊ณ ์ ์ฒด ์ฟผ๋ฆฌ์ ์คํ ์๋๋ฅผ ํฅ์์ํต๋๋ค.
๋์ ์ ํ ์กฐ์ธ ์ ๋ต(Dynamically switching join strategies)
์กฐ์ธ ์ฐ์ฐ์ ์คํํ๊ธฐ ์ ์ ์ค์ ๋ฐ์ดํฐ ํฌ๊ธฐ๋ฅผ ํ์ธํ๊ณ , ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ฐ์ฅ ์ ํฉํ ์กฐ์ธ ์ ๋ต (์: broadcast join, sort-merge join)์ผ๋ก ๋์ ์ผ๋ก ์ ํํฉ๋๋ค.
์คํ ์กฐ์ธ์ ๋์ ์ผ๋ก ์ต์ ํ(Dynamically optimizing skew joins)
์กฐ์ธ ์ฐ์ฐ ์ ํ์ชฝ์ ๋ฐ์ดํฐ ๋ถํฌ๊ฐ ๋งค์ฐ ๋ถ๊ท ํ์ ์ธ ๊ฒฝ์ฐ (์ฆ, ์คํ๊ฐ ์๋ ๊ฒฝ์ฐ), AQE๋ ์ด ์คํ๋ฅผ ์ธ์ํ๊ณ ํด๋น ํํฐ์ ์ ์ธ๋ถํํ์ฌ ์กฐ์ธ ์ฑ๋ฅ์ ํฅ์์ํต๋๋ค.
์ต์ ํ ํฌ์ธํธ
์ฝ๋ ์์ค์ ์ค๊ณ(์: RDD์ DataFrame ์ค ํ๋๋ฅผ ์ ํํจ)
๋ณด๊ด์ฉ ๋ฐ์ดํฐ
์กฐ์ธ
์ง๊ณ
๋ฐ์ดํฐ ์ ์ก
์ ํ๋ฆฌ์ผ์ด์ ๋ณ ์์ฑ
์ต์คํํฐ ํ๋ก์ธ์ค์ JVM
์์ปค ๋ ธ๋
ํด๋ฌ์คํฐ์ ๋ฐฐํฌ ํ๊ฒฝ ์์ฑ
DataFrame vs SQL vs Dataset vs RDD
๋ชจ๋ ์ธ์ด์์ DataFrame, Dataset ๊ทธ๋ฆฌ๊ณ SQL์ ์๋๋ ๋์ผํฉ๋๋ค. DataFrame์ ์ด๋ค ์ธ์ด์์ ์ฌ์ฉํ๋๋ผ๋ ์ฑ๋ฅ์ ๋์ผํฉ๋๋ค.
ํ์ด์ฌ์ด๋ R์ ์ฌ์ฉํด UDF๋ฅผ ์ ์ํ๋ฉด ์ฑ๋ฅ ์ ํ๊ฐ ๋ฐ์ํ ์ ์์ผ๋ฏ๋ก ์๋ฐ์ ์ค์นผ๋ผ๋ฅผ ์ฌ์ฉํด UDF๋ฅผ ์ ์ํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
ํ์ด์ฌ์์ RDD ์ฝ๋๋ฅผ ์คํํ๋ค๋ฉด ํ์ด์ฌ ํ๋ก์ธ์ค๋ฅผ ์ค๊ฐ๋ ๋ง์ ๋ฐ์ดํฐ๋ฅผ ์ง๋ ฌํํด์ผํฉ๋๋ค., ๋งค์ฐ ํฐ ๋ฐ์ดํฐ๋ฅผ ์ง๋ ฌํํ๋ฉด ์์ฒญ๋ ๋น์ฉ์ด ๋ฐ์ํ๊ณ ์์ ์ฑ๊น์ง ๋จ์ด์ง ์ ์์ต๋๋ค.
RDD
https://phoenixnap.com/kb/resilient-distributed-datasets
DataFrame์ ๊ฐ ๋ ์ฝ๋๋ ์คํค๋ง๋ฅผ ์๊ณ ์๋ ํ๋๋ก ๊ตฌ์ฑ๋ ๊ตฌ์กฐํ๋ ๋ก์ฐ์ธ ๋ฐ๋ฉด, RDD์ ๋ ์ฝ๋๋ ๊ทธ์ ํ๋ก๊ทธ๋๋จธ๊ฐ ์ ํํ ์๋ฐ, ์ค์นผ๋ผ, ํ์ด์ฌ์ ๊ฐ์ฒด์ผ๋ฟ RDD API๋ Dataset๊ณผ ์ ์ฌํ์ง๋ง RDD๋ ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ ์์ง์ ์ฌ์ฉํด ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ฑฐ๋ ๋ค๋ฃจ์ง ์์ต๋๋ค. RDD์ Dataset ์ฌ์ด์ ์ ํ์ ๋งค์ฐ ์ฌ์ฐ๋ฏ๋ก ๋ API๋ฅผ ๋ชจ๋ ์ฌ์ฉํด ๊ฐ API์ ์ฅ์ ์ ๋์์ ํ์ฉํฉ๋๋ค.
Low Level API - Resilient Distributed Dataset (RDD)์ ์คํํฌ ํ๋ก๊ทธ๋๋ฐ์ ๊ธฐ๋ณธ์ ๋๋ค. ์ฃผ๋ก RDD๋ ํด๋ฌ์คํฐ์ ๋ ธ๋(์์ ์) ๊ฐ์ ๋ถํ ๋ ์์ ๋ชจ์์ผ๋ก ๋ ธ๋์์ ๋ณ๋ ฌ ์์ ์ ์ฝ๊ฒ ์ ๊ณตํฉ๋๋ค.
https://www.databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
https://phoenixnap.com/kb/rdd-vs-dataframe-vs-dataset
DataFrame
https://www.nvidia.com/ko-kr/ai-data-science/spark-ebook/introduction-spark-processing/#p3-s1
Spark SQL ๋ฐ DataFrame API๋ Spark SQL์ ์ต์ ํ๋ ์คํ ์์ง์ ํตํด ์ฌ์ฉ ํธ์์ฑ, ๊ณต๊ฐ ํจ์จ์ฑ ๋ฐ ์ฑ๋ฅ ํฅ์์ ์ ๊ณตํฉ๋๋ค.
๋ชจ๋์(์ผ๋ช ์ฌ์ฌ์ฉ ๊ฐ๋ฅ)์ผ๋ก ์ฝ๊ฒ ๋จ์ ํ ์คํธ ๊ฐ๋ฅ.์ฝ๊ธฐ๊ฐ ์ฌ์ฐ๋ฉฐ ๋ ผ๋ฆฌ๊ฐ ๋ถํด๋๊ณ ์บก์ํ๋ฉ๋๋ค.
DataFrame์ด Catalyst ์ตํฐ๋ง์ด์ ๋ฅผ ํต๊ณผํ์ฌ Spark SQL ์ฟผ๋ฆฌ์ ์ ์ฌํ ์ต์ ํ๋ ์คํ์ ๊ฐ๋ฅํ๊ฒ ํ๋ค๋ ๊ฒ์ ๋๋ค.
Spark sql
https://www.nvidia.com/ko-kr/ai-data-science/spark-ebook/spark-sql-dataframes/
Spark SQL์ ํ์ํ ์ด๋ง ์ค์บํ๊ณ , ์์ถ์ ์๋์ผ๋ก ์กฐ์ ํ๋ฉฐ, ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ์ ์ต์ํํ๋ฉฐ, JVM ๊ฐ๋น์ง ์์ง์ ์ต์ํํ๋ ๋ฐ ์ต์ ํ๋ ๋ฉ๋ชจ๋ฆฌ ๋ด ์ด ๊ธฐ๋ฐ ํ์์ ์ฌ์ฉํ์ฌ DataFrame์ ์บ์(dataFrame.cache ํธ์ถ ์)ํฉ๋๋ค.
์ปดํฉํธ, ๋ชจ๋ ๋ ผ๋ฆฌ๊ฐ ํจ๊ป, ๋จ์ ํ ์คํธ๋ ๋ ์ฌ์ํ์ง๋ง ๋ถ๊ฐ๋ฅํ์ง๋ ์์ต๋๋ค.์ฝ๊ธฐ ์ฌ์ฐ๋ฉฐ ์๋ง๋ ์ฝ๊ฒ ์ฌ์ฌ์ฉํ์ง ๋ชปํ ๊ฒ์ ๋๋ค
DataFrame ์ฌ์ฉ์๋ค์ SparkSQL ์ฌ์ฉ์ ๋ํด ๋ฌด์์ ๋ฐฐ์ธ ์ ์๋์
๋๋ก๋ ๋ณต์กํ ๋ณํ์ ๊ฐ๋ ์ฑ์ด ๋ ์ฝ๊ฒ ๋ฐ๋ฅด๊ณ SQL๋ก ๋ง์์ ๊ฐ์ ์ ์์ต๋๋ค.
SparkSQL์์ ์ฌ์ฉํ ์ ์๋ CTE์ ๊ฐ์ ๋ฉ์ง ๊ธฐ๋ฅ ์ฌ์ฉ ๊ฐ๋ฅํฉ๋๋ค.
์ผ๋ถ ๋ณํ์ SparkSQL์์ ๋ ์ฝ๊ณ ๋ ์งง์ ์์ ์ฝ๋์ผ ์ ์์ต๋๋ค
SparkSQL ์ฌ์ฉ์๋ DataFrame์ ์ฌ์ฉํ์ฌ ๋ฐฐ์ธ ์ ์๋์
๋จ์ ํ ์คํธ์ ์ฉ์ดํจ๊ณผ ์ค์์ฑ
์ฝ๋๋ฅผ ๋ ๋ชจ๋ํํ๊ณ ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ๊ฒ ๋ง๋ค๊ธฐ
SQL์ ์ฌ์ฉํ๋ ๋์ ๋ ๋์ ์ฝ๋๋ฅผ ์์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ฐฐ์ฐ๊ณ ํ๋ก๊ทธ๋๋ฐ์ ์ต์ํด์ง๋ ๊ฒ์ ๋๋ค.
from pyspark.sql import DataFrame, SparkSession import spark.sql.functions as F def read_sales_data(uri: str = 's3a://sales-data/customer-orders/2021/*') -> DataFrame: df = spark.read.parquet(uri) return df def define_product(input_df: DataFrame) -> DataFrame: output_df = input_df.withColumn('product', F.when( F.col('product_id').isin([1,2,3,4]), F.lit('product_one')).otherwise(F.lit('product_two') ) ) return output_df def agg_sales_by_product(input_df: DataFrame, gb: str = 'product', ag: str = 'order_amount') -> DataFrame: output_df = input_df.groupBy(gb).agg(F.sum(F.col(ag).alias('sales')) return output_df df = read_sales_data() products = define_product(df) metrics = agg_sales_by_product(products)
from pyspark.sql import DataFrame, SparkSession import spark.sql.functions as F def read_sales_data(uri: str = 's3a://sales-data/customer-orders/2021/*') -> DataFrame: df = spark.read.parquet(uri) return df def run_sql(input_df: DataFrame) -> DataFrame: input_df.createOrReplaceTempView('tmp_sales') df = spark.sql(""" SELECT CASE WHEN product_id IS IN (1,2,3,4) THEN 'product_one' ELSE 'product_two' END as product, SUM(order_amount) as sales FROM tmp_sales GROUP BY CASE WHEN product_id IS IN (1,2,3,4) THEN 'product_one' ELSE 'product_two' END """) return df df = read_sales_data() metrics = run_sql(products)
ํ
์ด๋ธ ํํฐ์
๋
https://selectfrom.dev/apache-spark-partitioning-bucketing-3fd350816911
ํํฐ์ ์ ๋ฐ์ดํฐ๋ฅผ ์ง์ญํํ๊ณ ๋คํธ์ํฌ ๋ ธ๋์์ ๋ฐ์ดํฐ ์ ํ๋ง์ ์ค์ฌ ๋ณํ ์์ ์ ์ฃผ์ ๊ตฌ์ฑ ์์์ธ ๋คํธ์ํฌ ๋๊ธฐ ์๊ฐ์ ์ค์ฌ ์๋ฃ ์๊ฐ์ ์ค์ด๋ ๋ฐ ๋์์ด ๋ฉ๋๋ค.
https://selectfrom.dev/apache-spark-partitioning-bucketing-3fd350816911
ํ ์ด๋ธ ํํฐ์ ๋์ ๋ฐ์ดํฐ์ ๋ ์ง ํ๋๋ฅผ ๊ฐ์ ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๊ฐ๋ณ ๋๋ ํฐ๋ฆฌ์ ํ์ผ์ ์ ์ฅํ๋ ๊ฒ์ ์๋ฏธํฉ๋๋ค.
์ฟผ๋ฆฌ์์ ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ์์ฃผ ํํฐ๋งํ๋ค๋ฉด ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ํํฐ์ ์ ์์ฑํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
ํํฐ์ ๋์ ์ฌ์ฉํ๋ฉด ์ฟผ๋ฆฌ์์ ์ฝ์ด์ผ ํ๋ ๋ฐ์ดํฐ์์ ํฌ๊ฒ ์ค์ผ ์ ์์ด ์ฟผ๋ฆฌ๋ฅผ ํจ์ฌ ๋น ๋ฅด๊ฒ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
ํํฐ์ ๋์ ํ ๋ ๋๋ฌด ์์ ๋จ์๋ก ๋ถํ ํ๋ฉด ์์ ํฌ๊ธฐ์ ํ์ผ์ด ๋๋์ผ๋ก ์์ฑ๋ ์ ์๊ณ ์ ์ฅ์ ์์คํ ์์ ์ ์ฒด ํ์ผ์ ๋ชฉ๋ก์ ์ฝ์ ๋ ์ค๋ฒํค๋๊ฐ ๋ฐ์ํฉ๋๋ค.
๋ฐ์ดํฐ๋ฅผ ํํฐ์ ์ด๋ ๋ฒ์ผ์ผ๋ก ๊ตฌ์ฑํ๋ ค๋ฉด ํ์ผ ์์ ์ ์ฅํ๋ ค๋ ํ์ผ ํฌ๊ธฐ๋ ๊ณ ๋ คํด์ผํฉ๋๋ค.
์์ ํ์ผ์ด ๋ง์ ๊ฒฝ์ฐ ํ์ผ ๋ชฉ๋ก ์กฐํ์ ํ์ผ ์ฝ๊ธฐ ๊ณผ์ ์์ ๋ถํ๊ฐ ๋ฐ์ํฉ๋๋ค.
ํธ๋ ์ด๋ ์คํ๋ฅผ ๊ฐ์ํด์ผํฉ๋๋ค.
์ ๋ ฅ ๋ฐ์ดํฐ ํ์ผ์ด ์ต์ ์์ญ ๋ฉ๊ฐ๋ฐ์ดํธ์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๋๋ก ํ์ผ์ ํฌ๊ธฐ๋ฅผ ์กฐ์ ํ๋๊ฒ์ด ์ข์ต๋๋ค.
๋ฒ์ผํ
https://selectfrom.dev/apache-spark-partitioning-bucketing-3fd350816911
Bucketing ์ ์์ ์ฑ๋ฅ์ ์ต์ ํํ๋ ๋ฐ ์ฌ์ฉ๋๋ Spark ๋ฐ Hive์ ๊ธฐ์ ์ ๋๋ค. ๋ฒํทํ ๋ฒํท( ํด๋ฌ์คํฐ๋ง ์ด )์์ ๋ฐ์ดํฐ ํํฐ์ ๋์ ๊ฒฐ์ ํ๊ณ ๋ฐ์ดํฐ ์ ํ์ ๋ฐฉ์งํฉ๋๋ค. ํ๋ ์ด์์ ๋ฒํท ์ด ๊ฐ์ ๋ฐ๋ผ ๋ฐ์ดํฐ๊ฐ ๋ฏธ๋ฆฌ ์ ์๋ ๋ฒํท ์์ ํ ๋น๋ฉ๋๋ค.
https://www.nvidia.com/ko-kr/ai-data-science/spark-ebook/spark-sql-dataframes/
ํ์ผ ๋ถํ ๋ฐ ๋ฒํท์ Spark SQL์์ ์ผ๋ฐ์ ์ธ ์ต์ ํ ๊ธฐ์ ์ ๋๋ค. ํ์ผ์ด๋ ๋๋ ํ ๋ฆฌ์์ ๋ฐ์ดํฐ๋ฅผ ๋ฏธ๋ฆฌ ์ง๊ณํ์ฌ ๋ฐ์ดํฐ ์๊ณก ๋ฐ ๋ฐ์ดํฐ ์ถ์๋ฅผ ์ค์ด๋ ๋ฐ ๋์์ด ๋ ์ ์์ต๋๋ค
๋ถํ ๊ฐ๋ฅํ ํฌ๋งท์ ์ฌ์ฉํ๋ฉด ์ฌ๋ฌ ํ์คํฌ๊ฐ ํ์ผ์ ์๋ก ๋ค๋ฅธ ๋ถ๋ถ์ ๋์์ ์ฝ์ ์ ์์
๋ฐ์ดํฐ๋ฅผ ๋ฒ์ผํ ํ๋ฉด ์คํํฌ๋ ์ฌ์ฉ์๊ฐ ์กฐ์ธ์ด๋ ์ง๊ณ๋ฅผ ์ํํ๋ ๋ฐฉ์์ ๋ฐ๋ผ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ ๋ถํ (pre-partition)ํ ์ ์์ต๋๋ค.
๋ฐ์ดํฐ๋ฅผ ํ๋ ๊ฐ ํํฐ์ ์ ์น์ฐ์น์ง ์๊ณ ์ ์ฒด ํํฐ์ ์ ๊ท ๋ฑํ๊ฒ ๋ถ์ฐ์ํฌ ์ ์์ต๋๋ค.
Hive vs Spark
https://selectfrom.dev/apache-spark-partitioning-bucketing-3fd350816911
Hive์์๋ ์์ฑํด์ผ ํ๋ ํ์ผ ์๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ ๋ฆฌ๋์๊ฐ ํ์ํฉ๋๋ค. Spark ๋ฒํทํ ์๋ ๋ฆฌ๋์๊ฐ ์์ต๋๋ค. ๋ฐ๋ผ์ ์์ ์์ ๋ฐ๋ผ n๊ฐ์ ํ์ผ์ ์์ฑํ๊ฒ ๋ฉ๋๋ค.
ํ์ผ ๊ธฐ๋ฐ ์ฅ๊ธฐ ๋ฐ์ดํฐ ์ ์ฅ์
๋ฐ์ดํฐ๋ฅผ ๋ฐ์ด๋๋ฆฌ ํํ๋ก ์ ์ฅํ๋ ค๋ฉด ๊ตฌ์กฐ์ API๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
CSV ๊ฐ์ ํ์ผ์ ๊ตฌ์กฐํ๋์ด ์๋ ๊ฒ์ฒ๋ผ ๋ณด์ด์ง๋ง ํ์ฑ ์๋๊ฐ ์์ฃผ ๋๋ฆฌ๊ณ ์์ธ ์ํฉ์ด ์์ฃผ ๋ฐ์ํฉ๋๋ค.
ํ์ผ์ด๋ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ด๋๋ฆฌ ํ์ผ์ ์ปฌ๋ผ ์งํฅ ๋ฐฉ์์ผ๋ก ์ ์ฅํฉ๋๋ค.
์ฟผ๋ฆฌ์์ ์ฌ์ฉํ์ง ์๋ ๋ฐ์ดํฐ๋ฅผ ๋น ๋ฅด๊ฒ ๊ฑด๋๋ธ ์ ์๋๋ก ๋ช ๊ฐ์ง ํต๊ณ๋ฅผ ํจ๊ป ์ ์ฅํฉ๋๋ค.
์
ํ ์ค์
https://engineering.linkedin.com/blog/2020/introducing-magnet
์คํํฌ์ ์ธ๋ถ ์ ํ ์๋น์ค๋ฅผ ์ค์ ํ๋ฉด ๋จธ์ ์์ ์คํ๋๋ ์ต์คํํฐ๊ฐ ๋ฐ์ ์ํฉ์์๋(์:๊ฐ๋น์ง ์ปฌ๋ ์ ์ํ) ์๊ฒฉ ๋จธ์ ์์ ์ ํ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ์ ์์ผ๋ฏ๋ก ์ฑ๋ฅ์ ๋์ผ ์ ์์ต๋๋ค.
ํํฐ์ ์๊ฐ ๋๋ฌด ์ ์ผ๋ฉด ์์์ ๋ ธ๋๋ง ์์ ์ ์ํํ๊ธฐ ๋๋ฌธ์ ๋ฐ์ดํฐ ์น์ฐ์นจ ํ์์ด ๋ฐ์ํฉ๋๋ค.
ํํฐ์ ์๊ฐ ๋๋ฌด ๋ง์ผ๋ฉด ํํฐ์ ์ ์ฒ๋ฆฌํ๊ธฐ ์ํ ํ์คํฌ๋ฅผ ๋ง์ด ์คํํด์ผ ํ๋ฏ๋ก ๋ถํ๊ฐ ๋ฐ์ํฉ๋๋ค.
์ ํ์ ์ํํ ๋๋ ๊ฒฐ๊ณผ ํํฐ์ ๋น ์ต์ ์์ญ ๋ฉ๊ฐ๋ฐ์ดํธ์ ๋ฐ์ดํฐ๊ฐ ํฌํจ๋์ด์ผ ํฉ๋๋ค.
๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ๊ณผ ๊ฐ๋น์ง ์ปฌ๋ ์
์คํํฌ ์ก ์คํ ๊ณผ์ ์ค์ ์ต์คํํฐ๋ ๋๋ผ์ด๋ฒ ๋จธ์ ์ ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋ถ์กฑํ๊ฑฐ๋ ๋ฉ๋ชจ๋ฆฌ ์๋ฐ(memory pressure)์ผ๋ก ์ธํด ํ์คํฌ๋ฅผ ์๋ฃํ์ง ๋ชปํ ์ ์์ต๋๋ค.
์ ํ๋ฆฌ์ผ์ด์ ์คํ ์ค์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋๋ฌด ๋ง์ด ์ฌ์ฉํ ๊ฒฝ์ฐ
๊ฐ๋น์ง ์ปฌ๋ ์ ์ด ์์ฃผ ์ํ๋๋ ๊ฒฝ์ฐ
JVM ๋ด์์ ๊ฐ์ฒด๊ฐ ๋๋ฌด ๋ง์ด ์์ฑ๋์ด ๋ ์ด์ ์ฌ์ฉํ์ง ์๋ ๊ฐ์ฒด๋ฅผ ๊ฐ๋น์ง ์ปฌ๋ ์ ์ด ์ ๋ฆฌํ๋ฉด์ ์คํ ์๋๊ฐ ๋๋ ค์ง๋ ๊ฒฝ์ฐ
๊ฐ๋น์ง ์ปฌ๋ ์ ์ ํ๋
https://www.databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
๊ฐ๋น์ง ์ปฌ๋ ์ ์ ๋ฐ์ ๋น๋์ ์์ ์๊ฐ์ ๋ํ ํต๊ณ๋ฅผ ๋ชจ์ผ๋ ๊ฒ, spark.executor.extraJavaOptions ์์ฑ์ ์คํํฌ JVM ์ต์ ์ผ๋ก -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ๊ฐ์ ์ถ๊ฐํด ํต๊ณ๋ฅผ ๋ชจ์ ์ ์์ต๋๋ค.
์์ฑ๊ฐ์ ์ค์ ํ ๋ค์ ์คํํฌ ์ก์ ์คํํ๋ฉด ๊ฐ๋น์ง ์ปฌ๋ ์ ์ด ๋ฐ์ํ ๋๋งค๋ค ์์ปค์ ๋ก๊ทธ์ ๋ฉ์์ง๊ฐ ์ถ๋ ฅ๋ฉ๋๋ค.
๋ก๊ทธ๋ ๋๋ผ์ด๋ฒ๊ฐ ์๋ ์์ปค ๋ ธ๋์ stdout ํ์ผ์ ์ ์ฅ๋ฉ๋๋ค.
์๋ฐ ํ ๊ณต๊ฐ์ Young ์์ญ๊ณผ Old ์์ญ์ผ๋ก ๋๋์ด์ง, Young ์์ญ์ ์๋ช ์ด ์งง์ ๊ฐ์ฒด๋ฅผ ์ ์งํ๊ณ ๋ฐ๋ฉด Old ์์ญ์ ์ค๋ ์ด์ ์๋ ๊ฐ์ฒด๋ฅผ ๋์์ผ๋ก ํฉ๋๋ค.
Young ์์ญ์ Eden, Survivor1, Survivor2 ์ธ ์์ญ์ผ๋ก ๋ค์ ๋๋ฉ๋๋ค.
๊ฐ๋น์ง ์ปฌ๋ ์ ์ํ ์ ์ฐจ
Eden ์์ญ์ด ๊ฐ๋ ์ฐจ๋ฉด Eden ์์ญ์ ๋ํ ๋ง์ด๋ ๊ฐ๋น์ง ์ปฌ๋ ์ (minor garbage collection)์ด ์คํ๋จ, Eden ์์ญ์์ ์ด์๋จ์ ๊ฐ์ฒด์ Survivor1 ์์ญ์ ๊ฐ์ฒด๋ Survivor2 ์์ญ์ผ๋ก ๋ณต์ ๋ฉ๋๋ค.
๋ Survivor ์์ญ์ ๊ต์ฒดํฉ๋๋ค.
๊ฐ์ฒด๊ฐ ์์ฃผ ์ค๋๋์๊ฑฐ๋ Survivor2 ์์ญ์ด ๊ฐ๋ ์ฐจ๋ฉด ๊ฐ์ฒด๋ Old ์์ญ์ผ๋ก ์ฎ๊ฒจ์ง๋๋ค.
๋ง์ง๋ง์ผ๋ก Old ์์ญ์ด ๊ฑฐ์ ๊ฐ๋ ์ฐจ๋ฉด ํ ๊ฐ๋น์ง ์ปฌ๋ ์ (full garbage collection)์ด ๋ฐ์ํฉ๋๋ค. ํ ๊ฐ๋น์ง ์ปฌ๋ ์ ์ ํ ๊ณต๊ฐ์ ๋ชจ๋ ๊ฐ์ฒด๋ฅผ ์ถ์ ํด ์ฐธ์กฐ ์ ๋ณด๊ฐ ์๋ ๊ฐ์ฒด๋ค์ ์ ๊ฑฐํ๊ณ ๋๋จธ์ง ๊ฐ์ฒด๋ฅผ ๋น๊ณณ์ผ๋ก ์ฎ๊ธฐ๋ ์์ ์ ์ํํฉ๋๋ค. ํ ๊ฐ๋น์ง ์ปฌ๋ ์ ์ ๊ฐ์ฅ ๋๋ฆฐ ๊ฐ๋น์ง ์ปฌ๋ ์ ์ฐ์ฐ์ ๋๋ค.
์๋ช ์ด ๊ธด ์บ์ ๋ฐ์ดํฐ์ ์ Old ์์ญ์ ์ ์ฅํจ, Young ์์ญ์์ ์๋ช ์ด ์งง์ ๋ชจ๋ ๊ฐ์ฒด๋ฅผ ๋ณด๊ดํ ์ ์๋๋ก ์ถฉ๋ถํ ๊ณต๊ฐ์ ์ ์งํฉ๋๋ค.
์คํํฌ ์ก์ ์ฑ๋ฅ ํ๋ ๋ฐฉ๋ฒ
์์ฑ๊ฐ์ ์ค์ ํ๊ฑฐ๋ ๋ฐํ์ ํ๊ฒฝ์ ๋ณ๊ฒฝํด ๊ฐ์ ์ ์ผ๋ก ์ฑ๋ฅ์ ๋์ ๋๋ค. ์ ์ฒด ์คํํฌ ์ ํ๋ฆฌ์ผ์ด์ ์ด๋ ์คํํฌ ์ก์ ์ํฅ์ ๋ฏธ์นฉ๋๋ค.
๊ฐ๋ณ ์คํํฌ ์ก, ์คํ ์ด์ง, ํ์คํฌ ์ฑ๋ฅ ํ๋์ ์๋ํ๊ฑฐ๋ ์ฝ๋ ์ค๊ณ๋ฅผ ๋ณ๊ฒฝํด ์ง์ ์ ์ผ๋ก ์ฑ๋ฅ์ ๋์ผ ์ ์์, ์ ํ๋ฆฌ์ผ์ด์ ์ ํน์ ์์ญ์๋ง ์ํฅ์ ์ฃผ๋ฏ๋ก ์ ์ฒด ์คํํฌ ์ ํ๋ฆฌ์ผ์ด์ ์ด๋ ์คํํฌ ์ก์๋ ์ํฅ์ ๋ฏธ์น์ง ์์ต๋๋ค.
๋์ ํ ๋น
์คํํฌ ์ํฌ๋ก๋์ ๋ฐ๋ผ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ฐจ์งํ ์์์ ๋์ ์ผ๋ก ์กฐ์ ํ๋ ๋ฉ์ปค๋์ฆ์ ์ ๊ณตํฉ๋๋ค.
์ฌ์ฉ์ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ ์ด์ ์ฌ์ฉํ์ง ์๋ ์์์ ํด๋ฌ์คํฐ์ ๋ฐํํ๊ณ ํ์ํ ๋ ๋ค์ ์์ฒญํ ์ ์์ต๋๋ค.
spark.dynamicAllocation.enabled ์์ฑ๊ฐ์ true๋ก ์ค์
๋ฐ์ดํฐ ์ง์ญ์ฑ(data locality)
http://www.datascienceassn.org/content/data-locality-hpc-vs-hadoop-vs-spark
๊ธฐ๋ณธ์ ์ผ๋ก ๋คํธ์ํฌ๋ฅผ ํตํด ๋ฐ์ดํฐ ๋ธ๋ก์ ๊ตํํ์ง ์๊ณ ํน์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ง ๋ ธ๋์์ ๋์ํ ์ ์๋๋ก ์ง์ ํ๋ ๊ฒ์ ์๋ฏธํฉ๋๋ค.
์ง์ ์ ์ธ ์ฑ๋ฅ ํฅ์ ๊ธฐ๋ฒ
๋ณ๋ ฌํ
spark.defaul.parallelism๊ณผ spark.sql.shuffle.partitions์ ๊ฐ์ ํด๋ฌ์คํฐ ์ฝ์ด ์์ ๋ฐ๋ผ ์ค์ ํฉ๋๋ค.
์คํ ์ด์ง์์ ์ฒ๋ฆฌํด์ผํ ๋ฐ์ดํฐ์์ด ๋งค์ฐ ๋ง๋ค๋ฉด ํด๋ฌ์คํฐ์ CPU ์ฝ์ด๋น ์ต์ 2~3๊ฐ์ ํ์คํฌ๋ฅผ ํ ๋นํฉ๋๋ค.
ํํฐ์ ์ฌ๋ถ๋ฐฐ์ ๋ณํฉ
ํํฐ์ ์ฌ๋ถ๋ฐฐ ๊ณผ์ ์ ์ ํ์ ์๋ฐํจ, ํด๋ฌ์คํฐ ์ ์ฒด์ ๊ฑธ์ณ ๋ฐ์ดํฐ๊ฐ ๊ท ๋ฑํ๊ฒ ๋ถ๋ฐฐ๋๋ฏ๋ก ์ก์ ์ ์ฒด ์คํ ๋จ๊ณ๋ฅผ ์ต์ ํํฉ๋๋ค.
์ผ๋ฐ์ ์ผ๋ก ๊ฐ๋ฅํ ํ ์ ์ ์์ ๋ฐ์ดํฐ๋ฅผ ์ ํํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
์ ํ ๋์ ๋์ผ ๋ ธ๋์ ํํฐ์ ์ ํ๋๋ก ํฉ์น๋ coalesce ๋ฉ์๋๋ฅผ ์คํํด DataFrame์ด๋ RDD์ ์ ์ฒด ํํฐ์ ์๋ฅผ ๋จผ์ ์ค์์ผ ํจ, ์ด๋ณด๋ค ๋๋ฆฐ repartition ๋ฉ์๋๋ ๋ถํ๋ฅผ ๋ถ์ฐํ๊ธฐ ์ํด ๋คํธ์ํฌ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ํ๋งํฉ๋๋ค.
ํํฐ์ ์ฌ๋ถ๋ฐฐ๋ ์กฐ์ธ์ด๋ cache ๋ฉ์๋๋ฅผ ํธ์ถ ์ ๋งค์ฐ ์ ์ฉํฉ๋๋ค.
ํํฐ์ ์ฌ๋ถ๋ฐฐ ๊ณผ์ ์ ๋ถํ๋ฅผ ์ ๋ฐํ์ง๋ง ์ ํ๋ฆฌ์ผ์ด์ ์ ์ ์ฒด์ ์ธ ์ฑ๋ฅ๊ณผ ์คํํฌ ์ก์ ๋ณ๋ ฌ์ฑ์ ๋์ผ ์ ์์์ ๊ธฐ์ตํด์ผํฉ๋๋ค.
ํฅ์๋ ํํฐ๋ง
์ฌ์ฉ์ ์ ์ ํํฐ์ ๋
์ก์ด ์ฌ์ ํ ๋๋ฆฌ๊ฒ ๋์ํ๊ฑฐ๋ ๋ถ์์ ํ๋ค๋ฉด RDD๋ฅผ ์ด์ฉํ ์ฌ์ฉ์ ์ ์ ํํฐ์ ๋ ๊ธฐ๋ฒ์ ์ ์ฉํฉ๋๋ค.
์ฌ์ฉ์ ์ ์ ํจ์(UDF)
์์ ๋ฐ์ดํฐ ์ ์ฅ์(์บ์ฑ)
https://livebook.manning.com/book/spark-in-action-with-examples-in-java/16-cache-and-checkpoint-enhancing-spark-s-performances/v-14/185
์ ํ๋ฆฌ์ผ์ด์ ์์ ๊ฐ์ ๋ฐ์ดํฐ์ ์ ๊ณ์ํด์ ์ฌ์ฌ์ฉํ๋ค๋ฉด ์บ์ฑ์ ์ฌ์ฉํด ์ต์ ํํ ์ ์์ต๋๋ค.
์บ์ฑ์ ํด๋ฌ์คํฐ์ ์ต์คํํฐ ์ ๋ฐ์ ๊ฑธ์ณ ๋ง๋ค์ด์ง ์์ ์ ์ฅ์(๋ฉ๋ชจ๋ฆฌ๋ ๋์คํฌ)์ DataFrame, ํ ์ด๋ธ ๋๋ RDD๋ฅผ ๋ณด๊ดํด ๋น ๋ฅด๊ฒ ์ ๊ทผํ ์ ์๋๋ก ํฉ๋๋ค
์บ์ฑ์ด ํ์ํ ์ํฉ : ์คํํฌ์ ๋ํํ ์ธ์ ์ด๋ ์คํ ๋์ผ๋ก ์ ํ๋ฆฌ์ผ์ด์ ์์ ํน์ ๋ฐ์ดํฐ์ (DataFrame ๋๋ RDD)์ ๋ค์ ์ฌ์ฉํ๋ ค ํ ๊ฒฝ์ฐ
์บ์ฑ์ ์ง์ฐ ์ฐ์ฐ, ๋ฐ์ดํฐ์ ์ ๊ทผํด์ผ ์บ์ฑ์ด ์ผ์ด๋ฉ๋๋ค.
DF1 = spark.read.format("csv")\ .option("inferSchema", "true")\ .option("header","true")\ .load("/data/flight-data/csv/2015-summary.csv") DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect() DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect() DF4 = DF1.groupBy("count").count().coolect() DF1.cache() DF1.count()
DataFrame์ cache ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด ์คํํฌ๋ ์ต์ด ์ฐ์ฐ ์ ๋ฐ์ดํฐ๋ฅผ ๋ฉ๋ชจ๋ฆฌ๋ ๋์คํฌ์ ์ ์ฅํจ, ๋ค์ ์บ์ฑ๋ DataFrame์ ์ฌ์ฉํ๋ ์ฟผ๋ฆฌ๋ฅผ ์ํํ๋ฉด ์๋ณธ ํ์ผ์ ์ฝ๋ ๋์ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์ฐธ์กฐํฉ๋๋ค.
์บ์ฑ์ ์ง์ฐ ์ฒ๋ฆฌ, ์คํํฌ๋ DataFrame์ ์ก์ ์ ์คํํ๋ ์์ ์ ๋ฐ์ดํฐ๋ฅผ ์บ์ฑํฉ๋๋ค.
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect() DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect() DF4 = DF1.groupBy("count").count().collect()
์บ์ฑ์ ๋์ผํ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๋ฒ ์ ๊ทผํ๋ ๋ฐ๋ณต์ ์ธ ๋จธ์ ๋ฌ๋ ์ํฌ๋ก๋์๋ ๋งค์ฐ ์ ํฉํฉ๋๋ค.
์คํํฌ์ cache ๋ช ๋ น์ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅํฉ๋๋ค.
ํด๋ฌ์คํฐ ์ ์ฒด ๋ฉ๋ชจ๋ฆฌ๊ฐ ๊ฐ๋ ์ฐผ๋ค๋ฉด ๋ฐ์ดํฐ์ ์ ์ผ๋ถ ๋ฐ์ดํฐ๋ง ์บ์ฑํฉ๋๋ค.
์ ๊ตํ ์ ์ด๋ฅผ ์ํด persist ๋ฉ์๋๋ฅผ ์ฌ์ฉํจ, persist ๋ฉ์๋๋ ๋ฐ์ดํฐ ์บ์ ์์ญ(๋ฉ๋ชจ๋ฆฌ, ๋์คํฌ ๋๋ ๋ ๋ค)์ ์ง์ ํ๋ StorageLevel ๊ฐ์ฒด๋ฅผ ํ๋ผ๋ฏธํฐ๋ก ์ฌ์ฉํฉ๋๋ค.
์กฐ์ธ
๋๋ฑ ์กฐ์ธ์ ์ต์ ํํ๊ธฐ ๊ฐ์ฅ ์ฌ์ฐ๋ฏ๋ก ์ฐ์ ์ ์ผ๋ก ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
์กฐ์ธ ์์ ๋ณ๊ฒฝ์ ๋ด๋ถ ์กฐ์ธ์ ์ฌ์ฉํด ํํฐ๋งํ๋ ๊ฒ๊ณผ ๋์ผํ ํจ๊ณผ๋ฅผ ๋๋ฆผ, ์์ ์ฑ๊ณผ ์ต์ ํ๋ฅผ ์ํด ์นดํ ์์ ์กฐ์ธ์ด๋ ์ ์ฒด ์ธ๋ถ ์กฐ์ธ์ ์ฌ์ฉ์ ์ต๋ํ ํผํด์ผ ํฉ๋๋ค.
๋ฐ์ดํฐ๋ฅผ ์ ์ ํ๊ฒ ๋ฒ์ผํ ํ๋ฉด ์กฐ์ธ ์ํ ์ ๊ฑฐ๋ํ ์์ ์ ํ์ด ๋ฐ์ํ์ง ์๋๋ก ๋ฏธ๋ฆฌ ๋ฐฉ์งํ ์ ์์ต๋๋ค.
์ง๊ณ
์ง๊ณ ์ ์ ์ถฉ๋ถํ ๋ง์ ์์ ํํฐ์ ์ ๊ฐ์ง ์ ์๋๋ก ๋ฐ์ดํฐ๋ฅผ ํํฐ๋งํ๋ ๊ฒ์ด ์ต์ ์ ๋ฐฉ๋ฒ์ ๋๋ค.
Reference
https://www.amazon.com/Spark-Definitive-Guide-Processing-Simple/dp/1491912219
https://sparkbyexamples.com/spark/spark-performance-tuning/?amp=1#udf
https://mageswaran1989.medium.com/spark-optimizations-for-advanced-users-spark-cheat-sheet-d74464618c20
Last updated