본문 바로가기

Data Engineering/Spark

Spark Section1 - Apache Spark RDD 특징 및 예제

0. INDEX

      1. Spark 환경 구성
      2. word count 예제
      3. Key Value Pair / Average Example 평균값 구하기 예제
      4. Filter와 Min/Max를 이용해 데이터 필터링 및 최대, 최소값 구하기 예제
      5. Map vs Flatmap 차이점

(더 자세한 RDD의 특징은 https://cyoo.tistory.com/26 참조)


1. Spark 환경 구성

1.Local 환경에 도커 설치 후 Spark 환경이 구축된 Jupyter 노트북 이미지를 가져와서 환경을 구축했다.

# docker run으로 jupyter 노트북 서버 실행
docker run -it --rm -p 8888:8888 -v C:\Users\Cyoo\study\spark:/home/jovyan/work --user root -e NB_GID=100 -e GRANT_SUDO=yes -e GRANT_SUDO=yes jupyter/pyspark-notebook

2. 웹 브라우저에서 Jupyter 노트북에 접속.(http://127.0.0.1:8888)


2. Word Count 예제

# 첫번째 예제
import pyspark

test_file = "file:///home/jovyan/work/sample/word.txt"
# sc = pyspark.SparkContext('local[*]')
sc = pyspark.SparkContext.getOrCreate();

text_file = sc.textFile(test_file)
# hello world
# hello world
# hello world
# hello world
# hello world
# hello world

tmp1 = text_file.flatMap(lambda line: line.split(" "))
tmp2 = tmp1.map(lambda word: (word, 1))
counts = tmp2.reduceByKey(lambda a, b: a + b)

print(tmp1.collect()) # ['hello', 'world', 'hello', 'world', 'hello', 'world', 'hello', 'world', 'hello', 'world', 'hello', 'world']
print(tmp2.collect()) # [('hello', 1), ('world', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1)]
print(counts.collect()) # [('hello', 6), ('world', 6)]

 

  • pyspark.SparkContext(): spark 어플리케이션의 진입점으로 실행환경을 설정하고 리소스매니저, 워커노드에서 실행되는 executor와 통신하는 주체이다. 또한 RDD를 생성하는 데 사용된다. 하나의 spark 어플리케이션에는 하나의 sparkContext가 필요하며 스파크 2.0부터는 sparkSession(내부적으로 sparkContext  사용)를 사용한다.
  • pyspark.SparkContext().getOrCreate(): 이미 존재하는 SparkContext가 있는 경우 해당 SparkContext를 반환하고, 없는 경우 새로운 SparkContext를 생성한다.
  • collect(): RDD(Resilient Distributed Dataset)의 모든 요소를 수집하여 로컬 컬렉션으로 반환하는 메서드로, 분산된 데이터를 클러스터에서 로컬 머신으로 수집하는 데 사용된다.(대규모 데이터셋의 경우에는 모든 데이터를 수집하는 collect() 대신 take(), first() 등의 메서드를 사용하여 일부 데이터를 가져오는 것이 테스트 차원에서 좋다)
# 두번째 예제
import collections
import pyspark

test_file = "file:///home/jovyan/work/sample/grade.txt"
# sc = pyspark.SparkContext('local[*]')
sc = pyspark.SparkContext.getOrCreate();

text_file = sc.textFile(test_file)
# 파일내용
# tom 70
# sara 80
# joon 100
# kevin 90
# John 90

grade = text_file.map(lambda line: line.split(" ")[1])
print(grade.collect()) #['70', '80', '100', '90', '90']

# Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
grade_count = grade.countByValue() 
print(type(grade_count)) # <class 'collections.defaultdict'>
print(grade_count) # defaultdict(<class 'int'>, {'70': 1, '80': 1, '100': 1, '90': 2})
print(grade_count.items()) # dict_items([('70', 1), ('80', 1), ('100', 1), ('90', 2)])

for grade, count in sorted(grade_count.items(), key=lambda item: item[1], reverse=True):
    print(f"{grade}: {count}")
# 90: 2
# 70: 1
# 80: 1
# 100: 1
  • countByValue(): print 시 rdd에 대한 정보가 아닌 데이터가 바로 출력되는 것을 통해 action 연산임을 알 수 있다.

3. Key Value Pair / Average Example 평균값 구하기 예제

# 2-2-1
import pyspark

sc = pyspark.SparkContext.getOrCreate();
# Key / Value RDD

# creating Key / Value RDD
rdd = sc.parallelize(["a", "b", "a"])
total_by_brand = rdd.map(lambda brand: (brand, 1)) #[('a', 1), ('b', 1), ('a', 1)]

# reduceByKey(): Merge the values for each key using an associative and commutative reduce function.
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
sorted(rdd.reduceByKey(add).collect()) # [('a', 2), ('b', 1)]

# groupByKey(): Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect()) # [('a', 2), ('b', 1)]
sorted(rdd.groupByKey().mapValues(list).collect()) # [('a', [1, 1]), ('b', [1])]

# sortByKey(): Sorts this RDD, which is assumed to consist of (key, value) pairs.
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().first() # ('1', 3)

# keys(), values(): Create a RDD of keys or just values
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.keys() # ['a', 'b', 'a']

# join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect()) # [('a', (1, 2)), ('a', (1, 3))]

# Efficiency is the key for performance!!! 
# if you only need values, use mapValues() or flatMapValues() -> 셔플링을 줄이는 것이 포인트!
  • sc.parallelize(): 주어진 데이터를 RDD로 변환하며 필요 시 여러 파티션으로 분할하여 분산 데이터셋으로 만든다. 이렇게 만들어진 RDD는 클러스터의 여러 노드에 분산되어 저장된다.
  • first(): RDD의 첫 번째 요소를 반환하는 action 연산.
#2-2-2
import pyspark 

sc = pyspark.SparkContext.getOrCreate();
test_file = "file:///home/jovyan/work/sample/house_price.csv"
# 파일내용
# 	seoul	10000	3
# 1	seoul	10000	5
# 2	seoul	40000	7
# 3	busan	5000	7
# ...

def parse_line(line: str):
    city, price, count = line.split(',')
    return (int(price), int(count))

lines = sc.textFile(test_file)
price_count = lines.map(parse_line) # [(10000, 3), (10000, 5), (40000, 7), (5000, 7), (4000, 2), (9000, 4), (5000, 7), (4000, 2), (8000, 9)]

sum_of_count = price_count.mapValues(lambda count: (count, 1))\
                .reduceByKey(lambda a, b: (int(a[0]) + int(b[0]), int(a[1]) + int(b[1]))) 
# ('10000', (3, 1)), ('10000', (5, 1)) ...
# [('10000', (8, 2)), ('4000', (4, 2)), ('9000', ('4', 1)), ('8000', ('9', 1)), ('40000', ('7', 1)), ('5000', (14, 2))]

avg_by_count = sum_of_count.mapValues(lambda total_count: int(total_count[0]) / total_count[1])
results = avg_by_count.collect()
print(results) # [(10000, 4.0), (40000, 7.0), (5000, 7.0), (4000, 2.0), (9000, 4.0), (8000, 9.0)]

4. Filter와 Min/Max를 이용해 데이터 필터링 및 최대, 최소값 구하기 예제

# 2-3
# filter
# Return a new RDD containing only the elements that satisfy a predicate.
import pyspark

sc = pyspark.SparkContext.getOrCreate();
test_file = "file:///home/jovyan/work/sample/temperature.csv"
# 	record_id	month	day	year	AverageTemperatureFahr	AverageTemperatureUncertaintyFahr	City	country_id	Country	Latitude	Longitude
# 1	474376	01	01	1853	NA	NA	Auckland	NEW	New Zealand	36.17S	175.03E
# 2	474377	02	01	1853	NA	NA	Auckland	NEW	New Zealand	36.17S	175.03E
# ...

def get_data(line, header):
    if line != header:
        col = line.split(',')
        city = col[6].strip("\"")
        avg_temp_fahr = col[4]
        yield (city, avg_temp_fahr)

lines = sc.textFile(test_file)

# get header string
header = lines.first()

parsed_line = lines.flatMap(lambda line: get_data(line, header))

# filter NA values
filtered_line = parsed_line.filter(lambda x: "NA" not in x[1])

# finding min temperature
min_temp = filtered_line.reduceByKey(lambda x, y: min(float(x), float(y)))
final_list = min_temp.collect();
for city, temperature in final_list:
    print(f"{city}: {temperature}")
# Auckland: 49.856
# Canoas: 50.009
# ...

5. Map vs Flatmap 차이점

# 2-4
# map vs. flatMap

# map transformation applies a function to each row in a DataFrame/Dataset and returns the new transformed Dataset.
# 1 => 1
# flatMap transformation flattens the DataFrame/Dataset after applying the function on every element and returns a new transformed Dataset. The returned Dataset will return more rows than the current DataFrame. It is also referred to as a one-to-many transformation function
# 1 => Many
# One of the use cases of flatMap() is to flatten column which contains arrays, list, or any nested collection

import pyspark

sc = pyspark.SparkContext.getOrCreate();
rdd = sc.parallelize([("name", "joe,sarah,tom"), ("car", "hyundai")])
result = rdd.map(lambda x: x[1].split(","))
print(result.collect()) # [['joe', 'sarah', 'tom'], ['hyundai']]

rdd = sc.parallelize([("name", "joe,sarah,tom"), ("car", "hyundai")])
result = rdd.flatMap(lambda x: x[1].split(","))
print(result.collect()) # ['joe', 'sarah', 'tom', 'hyundai']

test_file = "file:///home/jovyan/work/sample/lorem_ipsum.txt"
# 파일내용
# Lorem Ipsum is simply dummy text of the ...

lines = sc.textFile(test_file)
words = lines.flatMap(lambda x: x.split())
# print(words.collect())
word_count = words.countByValue()
print(word_count) #  {'Lorem': 4, 'Ipsum': 3, 'is': 1, ...
for word, count in word_count.items():
    print(f"{word}: {count}")
    
# # How about sort by key?
word_count = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
sorted_word_count = word_count.map(lambda x: (x[1], x[0])).sortByKey()
for word, count in sorted_word_count.collect():
    print(f"{word}: {count}")
# 1: industry's
# 1: standard
# ...
  • map: rdd1과 rdd2가 1대1 대응(rdd2는 rdd1의 가공 후)
  • flatmap: 1대 다수로 대응됨, flatmap이 적용되면 결과값이 하나의 리스트로 펼쳐져서 결과가 반환됨.