본문 바로가기

Data Engineering/Spark

Spark Section2 - Apache Spark SQL과 Dataframe(데이터프레임)

0. INDEX

  1. 아파치 스파크 SQL과 데이타프레임(Dataframe) 기본 - 이론
  2. Dataframe을 예제로 알아보기
  3. CSV 파일을 Dataframe으로 읽어오기
  4. Dataframe으로 단어 세어보기
  5. Dataframe StructType에 대해 알아보기
  6. Dataframe으로 데이터 총합 구하기
  7. Broadcast(브로드캐스트)의 소개와 브로드캐스트 조인 방법 알아보기
  8. Dataframe Graph 알아보기
  9. Dataframe missing data(빈 데이터)와 date(시간포맷) 다루는 방법 알아보기
  10. Dataframe Join(데이터 병합)하는 모든 방법

1. 아파치 스파크 SQL과 데이타프레임(Dataframe) 기본

Spark SQL은 Apache Spark 모듈 중 테이블과 같은 구조화된 데이터를 다루는 데 활용되는 모듈이다. 이것에 사용되는 데이터 구조는 RDD에서 확장된 개념인 DataFrame이라는 데이터 타입(클래스)를 사용한다.

 

[Dataframe 특징]

  • 테이블 형태이며, 스키마를 가진다.
  • 직접 쿼리(SQL문) 혹은 dataframe API(withColume(), groupBy()..) 통해 원하는 형태로 가공할 수 있다.
  • JSON, CSV, Avro, Parquet, ORC 등의 여러 포맷의 데이터를 읽고 쓸 수 있다.
  • JDBC 연결이 가능하다.(JDBC를 통해 연동된 DB의 데이터를 데이터프레임으로 읽거나, 쓸 수 있다)
  • Tabluau와 같은 BI 툴과도 연동이 가능하다.(Tableau에서 spark 클러스터와 연동해서 sparksql을 사용하여 추출한 데이터를 가져오고 바로 시각화하여 보는 기능도 있다)
  • Hive와 연동하여 하둡 내 데이터를 SQL 형식으로 질의할 수 있다. 즉, spark 코드에서 HQL 방식으로 코드를 짤 수 있다. 다만 driver 자체는 spark이므로 데이터 프로세싱 방식은 spark 방식대로 동작한다.
    이때 spark와 Hive MetaStore를 연동해서 Hive 쿼리를 해석하고 최종적으로 spark 엔진에 의해 처리된다.
  • RDD보다 DataFrame을 많이 사용하는 추세이다.
  • 데이터프레임도 RDD처럼 Immutable, Lazy Evaluation 특성을 가진다.

tableau with sparksql

 

Hive와 spark 연동

 

[Spark SQL 방식(직접 sql문 수행) vs Dataframe API 방식]

  • Spark SQL 방식(임시테이블 생성 필요)
# human_df contains 'human' data
human_df.createOrReplaceTempView("human")

result_df = spark.sql("""
    select city, count(*) as `count`, avg(age) as `avg age`
    from human
    where age <= 40
    group by city
    order by 2 desc
""")
result_df.show()
  • Dataframe API 방식
# human_df contains 'human' data
import pyspark.sql.functions as F

result_df = human_df.filter(human_df["age"] <= 40) \
                    .groupby(["city"]) \
                    .agg(F.count(human_df["id"]).alias("count"), F.avg(human_df["age"]).alias("avg age")) \
                    .sort("count", ascending=False)
result_df.show()

 

결론적으로 두 방식 모두 처리하는 방식이 동일하며 성능면에서 큰 차이가 없다. (https://velog.io/@datastsea/spark-sql-vs-dataframe-api 참조)

 

성능면에서는 차이가 없을지라도 사용자 입장에서는 분명 아래와 같은 장단점이 존재하기 때문에 상황에 맞게 두 방식 모두 자유자재로 이용할 수 있는 것이 최선이라고 본다.

  Spark SQL 방식 DataFrame API 방식
가독성 좋음 별로(문맥 파악이 어려움)
모듈화 별로 좋음
디테일 별로 좋음(쿼리 문법 외 다양한 함수 활용)
유지보수 별로(전체 쿼리를 바꿔야 할 수도) 좋음(모듈화되어 있어서)
에러확인 별로 좋음

 

[RDD vs DataFrame]

RDD를 사용하여 코드를 짤 때 셔플링 등의 물리적 관점에서 성능적인 부분이 숙련도에 따라 차이가 많이 날 수 있어서 사람마다 코드 성능의 차이를 무시할 수 없었다. 반면에 데이터프레임에서는 아래 두 가지 Optimizer를 통해 최적화하여 데이터를 반환해주기 때문에 성능 상 어느정도 평준화된 코드를 짤 수 있어서 더 편리해졌다.

  • Q) RDD 지연평가를 통한 실행계획 최적화와 데이터프레임에서의 엔진을 통한 최적화의 차이는 무엇인가?(견해)
    지연평가는 사용자의 코드에서 필요한 연산만을 실행함으로써(불필요한 컬럼은 제외하는 등) 성능을 향상시키는 개념이고, 아래의 최적화 엔진에서는 추가적으로 사용자가 수행한 연산보다 더 좋은 방법이 있다면 개선해서 최적화하는 개념인 것으로 보임.

[스파크 쿼리 최적화에 사용되는 엔진:  Catalyst와 Tungsten]

스파크 쿼리 동작방식

1) Catalyst Optimizer: 실행 계획 최적화 옵티마이저

  1. 먼저 논리 최적화 단계에서는 사용자가 작성한 쿼리를 파악해 최적화 가능한 부분을 찾아내서 논리적으로 동등하지만 더 좋은 성능의 방식으로 변환한다.
  2. 이후 물리 최적화 단계에서는 실제로 어떻게 클러스터 위에서 최적화 될지 정의하는 부분인데 데이터의 셔플링, 파티셔닝, 정렬 등의 최적화를 하고 최종적으로 하나의 실행계획을 선정하여 데이터가 처리된다.

2) Tungsten Execution Engine: 실행시간과 메모리 사용 최적화

  1. 메모리 중심의 데이터 처리를 위한 엔진으로, 메모리 사용량을 최소화하고 CPU 사용량을 최적화하여 성능을 향상시킨다.

 

(참고 블로그)

tableau spark 연동)

https://www.google.com/search?q=tableau+using+sparksql&oq=tableau+using+sparksql&gs_lcrp=EgZjaHJvbWUyBggAEEUYOTIHCAEQIRigATIHCAIQIRigAdIBCDY3ODhqMGo3qAIAsAIA&sourceid=chrome&ie=UTF-8#fpstate=ive&vld=cid:2e0a0d14,vid:wZwY2r3aUIo,st:0

 

🔎 tableau using sparksql: Google 검색

 

www.google.com

스파크 쿼리 동작방식)

https://magpienote.tistory.com/210

 

[Apache Spark]Spark RDD의 한계점 및 Dataframe, SQL 등장, 최적화 원리

RDD Shuffling & partition이해하기 Shuffling이란 썪는다는 의미이다. Partition은 부분으로 나눈다는 의미도 된다. RDD가 분산처리를 하기 위해 데이터를 Shuffling하고 Partition을 나눠서 데이터 연산 처리를

magpienote.tistory.com

rdd vs dataframe vs sparksql)

https://mjs1995.tistory.com/221

 

spark 최적화

실무에 사용한 데이터 엔지니어링 스킬에 대한 정리내용입니다. 개인적인 기록을 위해 작성하였습니다. https://github.com/mjs1995/muse-data-engineer/blob/main/doc/Batch%20Processing/spark_optimization.md GitHub - mjs1995

mjs1995.tistory.com


2. Dataframe을 예제로 알아보기

# 3-2. RDD로부터 데이터프레임 생성 | SQL 방식 vs 데이터프레임 API 방식
from pyspark.sql import (
    Row,
    SparkSession)
from pyspark.sql.functions import col, asc, desc

def parse_line(line: str):
    fields = line.split('|') # |
    return Row(
        name=str(fields[0]),
        country=str(fields[1]),
        email=str(fields[2]),
        compensation=int(fields[3]))

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
lines = spark.sparkContext.textFile("file:///home/jovyan/work/sample/income.txt")
# 파일 내용
# Kam Long|Dominica|VinThomas@example.taobao|137611
# Jamey Warner|Botswana|badlittleduck@test.gf|134999
# ...

income_data = lines.map(parse_line)

# Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
# SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)[source]
schema_income = spark.createDataFrame(data=income_data).cache()

# Creates or replaces a local temporary view with this DataFrame.
schema_income.createOrReplaceTempView("income")

# returns the dataframe -> SQL 방식
medium_income_df = spark.sql(
    "SELECT * FROM income WHERE compensation >= 70000 AND compensation <= 100000")
medium_income_df.show()
# |              name|             country|               email|compensation|
# +------------------+--------------------+--------------------+------------+
# |  Willian Cummings|             Senegal|    areus@test.canon|       77369|
# |      Clarita Gill|             Ecuador| tomaslau@test.games|       86986|
# ..

# for문을 통해 한 행씩 iterator 가능
for income_data in medium_income_df.collect():
    # print(income_data)
    print(income_data.name)

# use function instead of sql function -> DATAFRAME API 방식
schema_income.groupBy("country").count().orderBy(col("count").desc()).show()
  • Pyspark.sql.SparkSession: SparkContext보다 상위 버전의 Spark Job 실행을 위한 진입점으로, 구조적인 형태의 데이터 처리를 위해 사용된다. SparkContext에 더해 데이터 원본으로부터 데이터프레임을 생성하거나 sql 쿼리를 실행하하여 결과를 데이터프레임 형태로 반환할 수 있다. SparkSession는 SparkContext를 포함하기 때문에 SparkSession이 열려 있다면 SparkContext를 바로 사용할 수 있다.
  • pyspark.sql.Row: DataFrame의 각 행을 표현하는 클래스로, Row 객체를 출력하면 해당 객체에 포함된 속성과 값이 키-값 쌍으로 출력됨.(ex. Row(name='John', age=30, city='New York'))
  • spark.createDataFrame(): 리스트, 튜플, 딕셔너리, RDD를 입력으로 받아, 데이터프레임을 생성할 수 있음.
  • createOrReplaceTempView("테이블명"): 데이터프레임을 SQL쿼리에서 사용할 수 있도록 임시 뷰로 등록 가능(해당 세션에서만 사용됨)
  • show(): 데이터프레임의 내용을 출력하는 메서드로 action 연산임.(기본값이 20행까지 출력)

3. CSV 파일을 Dataframe으로 읽어오기

# 3-3. read.csv()
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    col,
    round as rnd
)

spark = SparkSession.builder.appName("sql_import_csv").getOrCreate()
csv_file_path = "file:///home/jovyan/work/sample/age.csv"
# 파일내용
# 	name	age	country
# 1	Neville Hardy	56	Niue
# 2	Dacia Cohen	74	Falkland Islands (Malvinas)

# header option: either csv has header or not(default: header = false)
# inferSchema: either all columns are str or not
data = spark.read.option("header", "true")\
            .option("inferSchema", "true")\
            .csv(csv_file_path)
# |             name|age|             country|
# +-----------------+---+--------------------+
# |    Neville Hardy| 56|                Niue|
# |      Dacia Cohen| 74|Falkland Islands ...|
# ...

# # show schema
data.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- age: integer (nullable = true)
#  |-- country: string (nullable = true)

# show column name with data
data.select("name", "age").show()

# filter the data for age of 20 above
data.filter(data.age > 20).show()

# group by age and aggregates for count
data.groupBy("age").count().show()

# custom arithmetic
data.select(data.name, data.age, data.age - 10).show()

# column alias
data.select(data.name, col("age").alias("age1")).show()

# average
data.select(data.name, data.age, data.country)\
        .groupBy("country")\
        .avg("age").show()

# average & sort
data.select(data.name, data.age, data.country)\
        .groupBy("country")\
        .avg("age").sort("avg(age)").show()

# average & round
data.select(data.name, data.age, data.country)\
        .groupBy("country")\
        .agg(rnd(avg("age"), 2).alias("avg_age")).show()
  • textFile(): 텍스트 파일을 읽어들여 텍스트 파일 내 각 라인을 RDD의 요소로 생성하는 메서드로 비정형 텍스트를 처리할 때 사용한다.
  • read.csv(): CSV 파일을 읽어들여 DataFrame을 생성한다. 스키마 지정이 필요할 경우 수동으로 옵션 설정도 가능하다. textFile()이 sparkContext를 통해 RDD를 생성하는 것과 차이가 있으며, 구조화된 데이터를 처리할 때 사용한다.(sparkSession.read를 통해 pyspark.sql.DataFrameReader 객체를 생성하는데 이 객체는 여러 데이터 포맷에서 데이터를 읽는 기능이 있다. 그중 해당 객체 내 csv() 메서드를 사용하면 csv 파일을 읽어서 데이터프레임을 생성하게 된다)

4. Dataframe으로 단어 세어보기

# 3-4. split -> explode
from pyspark.sql import (
    functions,
    Row,
    SparkSession
)

spark = SparkSession.builder.appName("df_wordcount").getOrCreate()

# functions.explode(col)
# Returns a new row for each element in the given array or map
df = spark.createDataFrame([
        Row(a=1,
            intlist=[1,2,3],
            mapfield={"a": "b"}
           )])

df.select(functions.explode(df.intlist).alias("anInt")).collect()
# output: [Row(anInt=1), Row(anInt=2), Row(anInt=3)]

# functions.split(str, pattern, limit=-1)
# Splits str around matches of the given pattern.
df = spark.createDataFrame([
        Row(word="hello world and pyspark")])
df.select(functions.split(df.word, ' ').alias("word")).collect() # [Row(word=['hello', 'world', 'and', 'pyspark'])]

csv_file_path = "file:///home/jovyan/work/sample/lorem_ipsum.txt"
df = spark.read.text(csv_file_path)

# word counting
words = df.select(
    functions.explode(
        functions.split(df.value, ' ')).alias("word"))
words.show()
# +-----------+
# |       word|
# +-----------+
# |      Lorem|
# ...
word_counts = words.groupBy("word").count().orderBy(functions.col("count").desc())
word_counts.show()
# +----------+-----+
# |      word|count|
# +----------+-----+
# |       the|    6|
# ...
  • explode(): DataFrame의 값이 배열 형식으로 들어간 컬럼을 풀어서 새로운 행을 생성하는 함수이다. 배열 컬럼의 각 요소를 새로운 행으로 확장하여 결과적으로 원래 DataFrame보다 더 많은 행을 가진 새로운 DataFrame을 생성하게 된다.

5. Dataframe StructType에 대해 알아보기

# 3-5. StructType, StructField
from pyspark.sql import (
    functions as f,
    Row,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_struct").getOrCreate()

# types.StructField(name, dataType, nullable=True, metadata=None)
table_schema = t.StructType([
    t.StructField("country", t.StringType(), True),
    t.StructField("temperature", t.FloatType(), True),
    t.StructField("observed_date", t.StringType(), True)])

csv_file_path = "file:///home/jovyan/work/sample/temp_with_date.csv"
# 파일내용
# 	Guam	-25	2022-03-25
# 1	Guam	39	2022-02-18
# 2	Serbia	-35	2022-08-31

df = spark.read.schema(table_schema).csv(csv_file_path)
# df.printSchema()

data = df.select("country", "temperature", "observed_date")

min_temperature = data.groupBy("country").min("temperature")

# min_temperature.show()

# # celsius to fahrenheit: (0°C × 9/5) + 32 
f_temperature = data.withColumn(
                    "temperature",
                    (f.col("temperature") * 9 / 5) + 32)\
                .select("country", "temperature")
f_temperature.show()
  • types.StructType: 구조화된 데이터 형식을 정의하는 클래스로, DataFrame의 스키마를 정의하는 데 사용된다. 내부의 각 필드는 StructField 객체로 구성된다.
  • types.StructField: 스키마에서 하나의 열을 나타내는 클래스로 각 열은 이름, 데이터 유형, 추가 속성으로 구성된다.
    (ex. class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None))
  • withColumn(): DataFrame에서 새로운 열을 추가하거나 기존 열을 대체하기 위해 사용되는 메서드.

6. Dataframe으로 데이터 총합 구하기

# 3-6. groupBy, agg
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_total").getOrCreate()
table_schema = t.StructType([
    t.StructField("customer_name", t.StringType(), True),
    t.StructField("product_id", t.IntegerType(), True),
    t.StructField("price", t.IntegerType(), True)])

csv_file_path = "file:///home/jovyan/work/sample/product.csv"
# 파일내용
# 	Whitney Duncan	2265	899
# 1	Ebonie Hudson	2509	762
# 2	Luise Warren	1995	657
# ...

df = spark.read.schema(table_schema).csv(csv_file_path)
customer_spent = df.groupBy("customer_name")\
                    .agg(
                        f.round(
                            f.sum("price"),
                            2
                        ).alias("cost"))
                    
# customer_spent.show()
sorted_customer_spent = customer_spent.orderBy(f.col("cost").desc())
sorted_customer_spent.show()
# +-----------------+----+
# |    customer_name|cost|
# +-----------------+----+
# |     Damion Wolfe|1397|
# | Benedict Frazier| 998|
# ...

7. Broadcast(브로드캐스트)의 소개와 브로드캐스트 조인 방법 알아보기

# 3-7. broadcast, udf
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_most_interviewed").getOrCreate()
table_schema = t.StructType([
    t.StructField("interviwer_id", t.StringType(), False),
    t.StructField("occupation_id", t.StringType(), False),
    t.StructField("rating", t.IntegerType(), False)])

csv_file_path = "file:///home/jovyan/work/sample/like.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)
# 1	13727	2030	2
# 2	59892	3801	1
# ..

# 직업별 사람수
interviwer_count = df.groupBy("occupation_id").count().orderBy(f.desc("count"))

for d in interviwer_count.select("occupation_id", f.col("count").alias("cnt")).collect():
    print(f"{d.occupation_id}: {d.cnt}")

# But, What if we want to know what occupation_id is?  
# 1100: engineer
# 2030: developer
# 3801: painter
# 3021: chemistry teacher
# 9382: priest
meta = {
    "1100": "engineer",
    "2030": "developer",
    "3801": "painter",
    "3021": "chemistry teacher",
    "9382": "priest"
}
occupation_dict = spark.sparkContext.broadcast(meta)

def get_occupation_name(occupation_id: str) -> str:
    return occupation_dict.value[occupation_id]

occupation_lookup_udf = f.udf(get_occupation_name) # 정의한 파이썬 함수를 udf로 감싸서 spark 함수로 만든다
occupation_with_name = interviwer_count.withColumn("occupation_name", occupation_lookup_udf(f.col("occupation_id")))
occupation_with_name.show(10)
# +-------------+-----+-----------------+
# |occupation_id|count|  occupation_name|
# +-------------+-----+-----------------+
# |         1100|  217|         engineer|
  • sparkContext.broadcast(): Spark에서 브로드캐스트 변수를 생성하는 메서드로, 데이터를 클러스터의 모든 노드에 전파한다. 브로드캐스트 변수는 읽기 전용이며, 클러스터의 모든 task에서 공유된다.
  • functions.udf(): 사용자 정의 함수(UDF)를 생성하는 PySpark 메서드로, Python 함수를 PySpark에서 사용할 수 있는 사용자 정의 함수로 변환한다. UDF를 만들 때는 일반적으로 파이썬 함수를 정의하고 이를 functions.udf()로 wrapping하여 PySpark의 UDF로 변환한다.
    • UDF 사용 이유?
      Spark 내부 최적화 엔진이 udf를 인식하여 데이터를 효율적으로 처리하고 입출력 타입을 지정하여 코드 안정성 확보할 수 있다.

[브로드캐스트]

broadcast는 효율적인 데이터 전송을 위해 워커노드 간 데이터 전송을 최소화하여 성능을 향상시키는 방법으로, 일반적으로 작은 크기의 데이터를 모든 워커노드에 복사하여 전송하므로 노드 간의 통신 비용을 줄일 수 있게 된다. 브로드캐스트된 데이터는 아래처럼 executor가 실행되는 container 외부에 존재함으로써 워커 노드 내 여러 executor에서 접근이 가능하다.

또한 udf를 사용하여 조인하는 것을 브로드캐스트 조인이라고 부르는데, 브로드캐스트에 사용되는 데이터(위 예시에서 meta 변수) 크기가 크다면 브로드캐스트용 메모리 부하로 인해 해당 데이터 크기가 작은 경우에 활용하는 방식이다.

Broadcast 이용 모식도


8. Dataframe Graph 알아보기

# 3-8 : 히어로들의 관계 | collect_set, concat_ws, coalesce
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

# Attribution 3.0 Unported (CC BY 3.0)
# https://www.kaggle.com/datasets/csanhueza/the-marvel-universe-social-network

spark = SparkSession.builder.appName("df_most_popular").getOrCreate()
csv_file_path = "file:///home/jovyan/work/sample/hero-network.csv"
# 파일내용
# 	hero1	hero2
# 1	LITTLE, ABNER	PRINCESS ZANDA
# 2	LITTLE, ABNER	BLACK PANTHER/T'CHAL
# ...

# read file
df = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true").csv(csv_file_path)

# pyspark.sql.functions.collect_set(col): Aggregate function: returns a set of objects with duplicate elements eliminated.
data = df.groupBy("hero1")\
            .agg(
                f.collect_set("hero2").alias("connection"))\
            .withColumnRenamed("hero1", "hero")
# data.show()
# pyspark.sql.functions.concat_ws(sep, *cols): Concatenates multiple input string columns together into a single string column, using the given separator.
data = data.withColumn("connection", f.concat_ws(",", f.col("connection")))
data.show()
# |                hero|          connection|
# +--------------------+--------------------+
# |             ABCISSA|ELSIE DEE,FURY, C...|
# |ABSORBING MAN | MUTA|DRAX | MUTANT X-V...|
# ..

# DataFrame.coalesce(numPartitions): Returns a new DataFrame that has exactly numPartitions partitions.
data.coalesce(1).write.option("header", True).csv("output")

# load the file
csv_file_path = "file:///home/jovyan/work/output"
df = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .csv(csv_file_path)
# df.show()

# pyspark.sql.functions.size(col): Collection function: returns the length of the array or map stored in the column.
df = df.withColumn(
        "connection_size",
        f.size(
            f.split(f.col("connection"), ",")))\
        .orderBy(f.desc("connection_size"))
df.show()
# |                hero|          connection|connection_size|
# +--------------------+--------------------+---------------+
# |     CAPTAIN AMERICA|URICH, DORIS,ARMA...|           1795|
# |SPIDER-MAN/PETER PAR|MAGMA II/JONATHAN...|           1737|
#     ..

most_popular_hero = df.select("hero").first()
print(most_popular_hero.hero)
    • functions.collect_set(): 그룹화된 열의 고유한 값을 수집하여 집합(set)으로 반환한다. set 자료구조를 생각하면 된다.
    • functions.concat_ws(): 지정된 구분자를 사용하여 리스트 내 문자열들을 결합하는 메서드로, 리스트 원소들을 특정구분자로 구분하여 하나의 문자열로 변환할 수 있다. (format. functions.concat_ws(delimiter, *cols))
    • coalesce(): 데이터프레임의 파티션(데이터를 물리적으로 나누는 단위) 수를 설정하는 메서드이다. 해당 메서드를 통해 파티션 수를 줄여 셔플링 횟수를 적게 하는 데 활용될 수 있다. coalesce(2) 이면 2개의 파티션으로 설정하여 데이터를 분산 저장한다는 의미다.

9. Dataframe missing data(빈 데이터)와 date(시간포맷) 다루는 방법 알아보기

데이터가 비어있는 경우, drop() 을 통해 해당 row를 제거하는 방식과 fill()을 통해 빈 데이터를 특정 값으로(ex. 평균값) 채워넣어서 해결할 수 있다.

# 3-9. 빈데이터 처리 및 시간 파싱 | year, month, 
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_missing_data").getOrCreate()
df = spark.read.csv(
    "file:///home/jovyan/work/sample/null_data.csv", header=True, inferSchema=True)
df.show()
# +----+----------+------+
# |  id|occupation|salary|
# +----+----------+------+
# |1000|  engineer|100000|
# |2000|      NULL|  NULL|
# ...

# DataFrame.na: Returns a DataFrameNaFunctions for handling missing values.
# DataFrame.dropna(how='any', thresh=None, subset=None)[source]: Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.
#   how: 'any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
#   thresh: default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
#   subset: optional list of column names to consider.

df.na.drop(how="any").show()
df.na.drop(thresh=2).show()
df.na.drop(subset=["salary"]).show()

df.printSchema()

# fill string
df.na.fill("engineer").show()

# fill integer
df.na.fill(0).show()

# fill the subset
df.na.fill("NA", subset=["occupation"]).show()

# fill the mean value
mean_value = df.select(f.mean(df['salary'])).collect()

# print(mean_value[0][0])

df.na.fill(mean_value[0][0], subset=["salary"]).show()


# Date parsing
spark = SparkSession.builder.appName("df_manage_date").getOrCreate()
df = spark.read.csv(
    "file:///home/jovyan/work/sample/date_parsing.csv", header=True, inferSchema=True)

# show year
df.select(f.year('date')).show()

# show month
df.select(f.month('date')).show()

# show day
df.select(f.dayofmonth('date').alias('day')).show()
df.select(f.dayofyear('date').alias('day')).show()

df = df.withColumn("year", f.year('date')).groupBy("year").mean("number").withColumnRenamed("avg(number)", "avg")
# df.show()
df.select("year", f.format_number("avg", 2).alias("avg")).show()
# |year|     avg|
# +----+--------+
# |2022|2,540.67|
# |2021|2,195.68|

 


10. Dataframe Join(데이터 병합)하는 모든 방법

[예시 테이블]

[left] - df_salary
+----+------+--------------+
|  id|salary|    department|
+----+------+--------------+
|1000|150000|      engineer|
|2000|240000|       manager|
|3000|120000|human resource|
|6000|100000|         sales|
+----+------+--------------+

[right] - df_user
+----+-------------+--------+
|  id|         name| company|
+----+-------------+--------+
|1000|Neville Hardy|   Apple|
|2000|  Dacia Cohen|Alphabet|
|3000|    Elois Cox|  Neflix|
|4000| Junita Meyer|    Meta|
|5000| Cleora Banks|  Amazon|
+----+-------------+--------+
  • inner join: 공통된 key에 대해서만 조인
    +----+-------------+--------+----+------+--------------+
    |  id|         name| company|  id|salary|    department|
    +----+-------------+--------+----+------+--------------+
    |1000|Neville Hardy|   Apple|1000|150000|      engineer|
    |2000|  Dacia Cohen|Alphabet|2000|240000|       manager|
    |3000|    Elois Cox|  Neflix|3000|120000|human resource|
    +----+-------------+--------+----+------+--------------+
  • multiple join: 2개 이상의 조건에 대해 조인
  • full outer join: 각 테이블에서 공통되지 않는 key에 대해서도 모두 출력(해당 key가 없는 타 테이블에서는 null로 매칭)
    +----+-------------+--------+----+------+--------------+
    |  id|         name| company|  id|salary|    department|
    +----+-------------+--------+----+------+--------------+
    |1000|Neville Hardy|   Apple|1000|150000|      engineer|
    |2000|  Dacia Cohen|Alphabet|2000|240000|       manager|
    |3000|    Elois Cox|  Neflix|3000|120000|human resource|
    |4000| Junita Meyer|    Meta|NULL|  NULL|          NULL|
    |5000| Cleora Banks|  Amazon|NULL|  NULL|          NULL|
    |NULL|         NULL|    NULL|6000|100000|         sales|
    +----+-------------+--------+----+------+--------------+
  • left(right) join: 왼쪽(오른쪽)에 있는 테이블을 기준으로 조인(기준 테이블에 존재하는 데이터가 추가로 조인)
    [left join]
    +----+-------------+--------+----+------+--------------+
    |  id|         name| company|  id|salary|    department|
    +----+-------------+--------+----+------+--------------+
    |1000|Neville Hardy|   Apple|1000|150000|      engineer|
    |2000|  Dacia Cohen|Alphabet|2000|240000|       manager|
    |3000|    Elois Cox|  Neflix|3000|120000|human resource|
    |4000| Junita Meyer|    Meta|NULL|  NULL|          NULL|
    |5000| Cleora Banks|  Amazon|NULL|  NULL|          NULL|
    +----+-------------+--------+----+------+--------------+
    [right join]
    +----+-------------+--------+----+------+--------------+
    |  id|         name| company|  id|salary|    department|
    +----+-------------+--------+----+------+--------------+
    |1000|Neville Hardy|   Apple|1000|150000|      engineer|
    |2000|  Dacia Cohen|Alphabet|2000|240000|       manager|
    |3000|    Elois Cox|  Neflix|3000|120000|human resource|
    |NULL|         NULL|    NULL|6000|100000|         sales|
    +----+-------------+--------+----+------+--------------+
  • left semi join: inner join 결과에서 왼쪽 테이블 컬럼만 출력한 결과
    +----+-------------+--------+
    |  id|         name| company|
    +----+-------------+--------+
    |1000|Neville Hardy|   Apple|
    |2000|  Dacia Cohen|Alphabet|
    |3000|    Elois Cox|  Neflix|
    +----+-------------+--------+

  • left anti join: 왼쪽 테이블 - left semi join 결과
    +----+------------+-------+
    |  id|        name|company|
    +----+------------+-------+
    |4000|Junita Meyer|   Meta|
    |5000|Cleora Banks| Amazon|
    +----+------------+-------+

  • SQL join: 임시테이블을 만들어서 sql문으로 조인하는 방식
# 3-10. 조인
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_join").getOrCreate()

# user data
user_data = [
    ["1000", "Neville Hardy", "Apple"],
    ["2000", "Dacia Cohen", "Alphabet"],
    ["3000", "Elois Cox", "Neflix"],
    ["4000", "Junita Meyer", "Meta"],
    ["5000", "Cleora Banks", "Amazon"]]

user_col = ['id', 'name', 'company']
df_user = spark.createDataFrame(data=user_data, schema=user_col)
df_user.show()

# salary data
salary_data = [
    ["1000", "150000", "engineer"],
    ["2000", "240000", "manager"],
    ["3000", "120000", "human resource"],
    ["6000", "100000", "sales"]]

salary_col = ['id', 'salary', 'department']
df_salary = spark.createDataFrame(data=salary_data, schema=salary_col)
df_salary.show()

# inner join: join the two dataframes on common key columns.
# dataframe1.join(dataframe2,dataframe1.column_name ==  dataframe2.column_name,”inner”)
print("== inner join ==")
df_user.join(df_salary,
               df_user.id == df_salary.id,
               "inner").show()

# inner join, then filter
df_user.join(df_salary,
               df_user.id == df_salary.id,
               "inner").filter(df_user.id == 1000).show()

# inner join, then where
df_user.join(df_salary,
               df_user.id == df_salary.id,
               "inner").where(df_user.id == 1000).show()

# multiple join with &
df_user.join(df_salary,
               (df_user.id == df_salary.id) & (df_user.id == 1000)
            ).show()

# full outer join: join the two dataframes with all matching and non-matching rows
print("== full outer join ==")
df_user.join(df_salary, 
               df_user.id == df_salary.id, 
               "fullouter").show()

# left join:  joins by returning all rows from the first dataframe and only matched rows from the second one
print("== left join ==")
df_user.join(df_salary, 
               df_user.id == df_salary.id, 
               "left").show()

# right join: joins by returning all rows from the second dataframe and only matched rows from the first one
print("== right join ==")
df_user.join(df_salary, 
               df_user.id == df_salary.id, 
               "right").show()

# left semi join: join all rows from the first dataframe and return only matched rows from the second one
print("== left semi join ==")
df_user.join(df_salary, 
               df_user.id == df_salary.id, 
               "leftsemi").show()

# left anti join: join returns only columns from the first dataframe for non-matched records of the second dataframe
print("== left anti join ==")
df_user.join(df_salary, 
               df_user.id == df_salary.id, 
               "leftanti").show()

# SQL join
df_user.createOrReplaceTempView("user")
df_salary.createOrReplaceTempView("salary")

spark.sql("SELECT * FROM user, salary WHERE user.id == salary.id").show()

spark.sql("SELECT * FROM user INNER JOIN salary ON user.id == salary.id").show()