Data Engineering/Spark

Spark Section5 - Apache Spark Streaming, 실시간 데이터 처리 라이브러리

아르밋우 2024. 4. 13. 19:42

0. INDEX

  1. Streaming(스트리밍) 소개
  2. Streaming 예제로 알아보기
  3. Structured Streaming(구조화된 스트리밍) 예제로 알아보기
  4. Structured Streaming Output Mode(아웃풋 모드)에 대해 자세히 알아보기
  5. Streaming Input sources, Processing Model and Trigger settings에 대해 이해하기
  6. Streaming Fault Tolerance(결함 감내)에 대해서 알아보기

1. Streaming(스트리밍) 소개 

spark streaming은 spark SQL & Dataframe처럼 spark core위에 올라가 있는 라이브러리로, 실시간 데이터를 처리하기 위한 라이브러리이다. 사용하는 데이터 방식에 따라 구 버전인 RDD를 베이스로 하는 spark streaming과 신버전인 데이터프레임을 베이스로 하는 structured streaming 이렇게 두 가지가 있다.

 

[spark streaming동작 방식]

아래와 같이 kafka, hdfs 등의 실시간 데이터 소스에서 spark streaming으로 데이터를 push하면 spark streaming에서 데이터를 처리하고 그 결과 데이터를 hdfs, db에 적재할 수 있다.

 

소스 > 데이터처리 > 저장

 

[데이터 처리 방식]

- Dstream의 경우) RDD 방식

1. 실시간 데이터 소스로부터 spark streaming으로 전달된 데이터 스트림을 여러 batch들로 잘게 쪼갠다. 이때 잘게 쪼개진 배치들 각각을 Dstream(Discreted stream)이라고 한다.(RDD의 sequence로 볼 수 있음) 예를 들어 1초 간격으로 잘라지게 설정할 수 있다.(micro 단위의 배치 처리이기 때문에 완전 실시간이라고 보기엔 어렵다)

2. 이러한 batch들을 spark 엔진에서 처리된다.

데이터 처리 과정

 - structured stream의 경우) - Dstream 이후에 나왔으며, 데이터프레임 단위로 사용.

1. input 데이터가 지속적으로 unbounded table(무한히 증가하는 하나의 커다란 데이터셋)에 add 된다. 즉, 데이터가 연속적으로 추가되는 하나의 테이블로서 input 데이터를 다룬다. 이때 들어오는 데이터는 각각 행으로 추가된다고 생각하면 된다. 이렇게 추가되는 데이터는 저장장치에 기록된다.

2. 데이터를 처리할 때 여러가지 이유로 데이터가 들어온 시각, 데이터가 생성된 시각이 다를 수 있다. spark streaming에서는 데이터가 생성된 시각(timestamp)를 기준으로 데이터를 처리한다. 따라서 데이터가 늦게 spark streaming으로 들어오게 되도 일관되게 처리가 가능하다.

3. sliding 타임마다 각 윈도우 시간 범위에 들어온 데이터가 집계된다. 그래서 특정시간에 들어온 데이터는 여러 윈도우에 의해 중복 집계될 수 있다.

4. spark streaming에서는 데이터가 생성된 시각과 spark streaming에 인입된 시각 차이가 클 경우에도 해당 인입된 데이터를 생성된 시각을 포함하는 윈도우에 집계한다. 따라서 느즈막히 들어온 데이터라도 데이터 처리에 문제가 되지 않는다.

 

[spark streaming 내부(물리적) 동작 방식]

아래는 참고 블로그들을 통해 정리해 본 sparkstreaming의 전체적인 동작방식이다. 틀릴 수 있으니 이 점 참고 바랍니다.

receiver 생성 후 리시버가 데이터 블록 정보 회신

 

1. streamingContext가 생성되면 입력소스당( kafka 등) receiver 하나를 executor에 실행시킨다. 이때 receiver는 데이터를 전달받고 이를 메모리에 저장하는 역할을 한다.

2. 데이터(Dstream)가 receiver에 인입되면 이를 다른 executor에 복제한다.

3. 해당 블록 정보를 streamingcontext driver의 network input tracker에 전달한다.

job 생성, 스케줄링, 실행

 

4. 해당 데이터 정보는 driver 내 dstream graph에 업데이트된다.

5. 스케줄러가 dstream graph를 주기적으로 쿼리해 job을 생성하고 실행을 위해 job 매니저에 전달한다.

스케줄러가 sparkContext에 job 전달

6. job 매니저는 해당 job을 sparkContext에 전달하여 기본 spark 동작방식대로 실행한다. 


2. Streaming 예제로 알아보기

[docker 환경에서 사전 준비 작업]

1. 아래 코드를 통해 주피터노트북 서버 start.

2. 해당 주피터 컨테이너에 cmd를 통해 docker exec -it 로 접속.

3. spark streaming은 데이터 소스가 될 서버와 연결이 필요한 클라이언트 app이기 때문에 해당 데이터 소스 역할을 할 소켓 서버를 주피터 컨테이너 안에 설치.

4. 소켓 서버 start.

# docker run -it --rm -p 8888:8888 \
#     -v /Users/seungjoonlee/git/pyspark:/home/jovyan/work \
#     --user root \
#     -e NB_GID=100 \
#     -e GRANT_SUDO=yes \
#     -e GRANT_SUDO=yes jupyter/pyspark-notebook

# # apt update
# apt update

# # netcat install
# apt-get install netcat

# # start netcat server
# nc -lk 9999

# # # run the sparkstreaming in terminal
# # /usr/local/spark/bin/spark-submit /home/jovyan/work/spark_streaming.py

 

[코드 예제]

# 5-2. 스트리밍 어플리케이션 기본 예제
# 소켓 서버에서 텍스트를 사용자가 write하면 10초 단위로 wordcounting 하여 출력하는 예제
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time


# Create a local StreamingContext with two working thread and batch interval of 5 seconds
sc = SparkContext("local[2]", "NetworkWordCount")
# 10 sec interval
ssc = StreamingContext(sc, 10) # 10초 간격으로 

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("127.0.0.1", 9999) 

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
# ssc.awaitTermination()  # Wait for the computation to terminate

while True:
    print("Streaming application is running...")
    # 다른 작업 수행, 예: 로깅, 파일 작성, 외부 서비스 호출 등

    # 스트리밍 애플리케이션이 실행 중이면 추가 작업을 수행할 수 있음

    # 일정 시간 동안 대기하거나 조건을 확인하여 종료
    # 예를 들어, 1초마다 체크하고 애플리케이션 종료 조건이 충족되면 break
    time.sleep(1)  # 1초마다 갱신

 

스트리밍 컨텍스트 설정 시에는 core를 반드시 2개 이상으로 설정해줘야 한다. 하나의 JVM에서는 데이터 소스(소켓 서버 등)에서 데이터를 받아와야 하고, 다른 JVM에서는 데이터 처리를 해야하기 때문에 최소 두 대가 필요하다.

 

  • streamingContext(): SparkContext를 내부적으로 포함하며 스트리밍 데이터를 처리하기 위한 객체이다. streamingContext가 시작되면 새로운 스트리밍 계산은 추가로 설정할 수 없다. 또한 streamingContext가 중단되면 객체가 완전히 종료되고 연쇄적으로 sparkContext 역시 종료된다(설정값으로 sparkContext는 종료 안할 수 있음). 이 경우 재실행을 위해서는 객체를 새로 생성해야 한다.
  • streamingContext().start(): 스트리밍 컨텍스트를 시작하는 메서드로 실제 데이터 스트림을 수신하고 처리를 시작하는 지점이다.
  • streamingContext().socketTextStream(): TCP 소켓을 통해 들어오는 텍스트 스트림을 처리하기 위해 사용되는 메서드로 지정된 호스트와 포트에서 데이터를 수신하여 아래와 같이 DStream을 생성 및 반환한다. 
<class 'pyspark.streaming.dstream.DStream'>

 

  • pprint(): DataFrame이 보기 좋게 형식화되어 출력된다.
  • streamingContext().awaitTermination(): start()은 논블로킹 함수로, 제출한 spark 코드가 비동기적으로 수행되게 하는데 스트리밍 어플리케이션이 코드 상에서 블로킹 되어 있는 채로 있길 원한다면 awaitTermination()을 통해 어플리케이션이 종료될 때까지 대기할 수 있다. 이를 통한 이점으로 애플리케이션의 실행 상태를 모니터링하거나 종료 시점 관리가 편할 것 같다.

3. Structured Streaming(구조화된 스트리밍) 예제로 알아보기

5-2 예제와 다르게 이번에는 구조화된 스트리밍에 대해 알아본다. 여기서는 sparkContext가 아닌 sparkSession 객체 기반으로 데이터프레임 형태로 데이터를 처리하는 어플리케이션을 실행하게 된다.

 

[코드예제]

# 5-3. 구조화된 스트리밍 사용 예제
# word counting을 구조화된 스트리밍 방식으로 연산하는 예제
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# query.awaitTermination()

 

  • readStream: 스트리밍 데이터를 읽어들여 DataFrame(무제한으로 커질 수 있는 데이터프레임)을 생성하는 메서드이다. .format() 메서드를 추가로 사용하여 데이터 형식을 지정하고, .option() 메서드를 사용하여 데이터 소스 위치 정보를 설정한다. 마지막으로 .load() 메서드를 사용하여 데이터를 읽어들인다.
  • writeStream: 스트리밍 데이터프레임을 지정된 형식으로 저장하는 메서드로, 스트리밍 데이터를 처리한 결과를 파일 시스템, Kafka, 소켓 등의 대상에 저장할 수 있다. readStream과 마찬가지로 .format() 메서드를 사용하여 저장 형식을 지정하고, outputMode()는 출력 방식을 지정하는데 append, complete, update 등의 방식이 있다. 마지막으로 .start() 메서드를 호출하여 스트리밍 처리를 시작한다.

왼쪽: 주피터 컨테이너 실행 터미널 console / 오른쪽: 주피터 내부 소켓 서버


4. Structured Streaming Output Mode(아웃풋 모드)에 대해 자세히 알아보기

위 5-3 예제의 출력모드는 complete였다. 출력모드 종류는 아래와 같다.

  • append: 새로운 추가된 데이터만 출력하는 모드로, 스트리밍 DataFrame이 증분적으로 갱신되는 경우에만 사용이 가능한 출력모드이다. 즉, append 모드는 위 5-3 예제처럼 aggregation 로직과 같은 상태 유지 연산을 할 수 없기 때문에 5-3 예제에서는 append 모드 사용 시 에러가 발생한다.
    에러메세지
  • update: update하게 된 데이터 행만 출력하는 모드이다.
    hello를 데이터로 보내면 hello에 대해서만 연산이 수행된다.
  • complete: 모든 결과 데이터를 출력하며, 리소스가 많이 드는 모드이다.

5. Streaming Input sources, Processing Model and Trigger settings에 대해 이해하기

[Streaming inptut source]

Streaming 데이터 소스 종류로는 socket, rate, file(파일 수정 시각을 기반으로 데이터를 읽어온다. 이때 파일 내용은 변하면 안되기 때문에 파일은 생성 이후에 mv 명령어를 통해 spark streaming에서 읽는 디렉토리에 옮겨지도록 자동화가 필요하다), kafka(0.10.0 버전 이후) 소스를 이용할 수 있다.

 

[Processing Model]

하나의 micro-batch가 Spark SQL 엔진에 제출되고 실행계획이 세워짐

0. spark streaming 프로세스는 read -> transform -> sink 이렇게 세 단계로 크게 나눠지며, 이를 micro-batch라고 한다. 이러한 micro-batch(하나의 job으로 볼 수 있음)가 주기적으로 반복되면서 실행되게 되는 컨셉이다.

1. 위 그림과 같이 sink의 start() 단계에서 spark driver가 micro-batch 코드를 spark SQL 엔진에 제출한다.

2. 그러면 spark SQL 엔진은 해당 코드를 분석하여 최적화하는 과정을 거쳐 실행계획을 세운다.

실행계획 생성 후 백그라운드 프로세스가 wating하며 micro-batch 작업을 수행함

3. 이후 micro-batch 로직은 백그라운드 프로세스에 의해 실행된다. 백그라운드로 실행되기 때문에 기존에 spark 배치 프로세스를 실행했을 때처럼 작업 실행 과정이 cli에 나타나지 않는다.(spark ui에서 확인할 수 있다)

4. 하나의 micro-batch process를 수행한 후에는 spark streaming 백그라운드 프로세스는 죽지 않고 그 다음 input 데이터가 들어올 떄까지 대기하다가 들어오면 위 0~3 작업을 반복 수행한다. 이때 이전 배치에서 사용된 인풋 데이터는 더 이상 필요하지 않으므로 버려진다.

 

 

 [Trigger Setting]

그렇다면 spark streaming의 백그라운드 프로세스는 어떤 기준을 가지고 input 데이터가 들어오면 trigger가 되는 것일까?

input 데이터 양 기준일지, 특정 시간 주기를 가지고 동작하는지 등 여러가지 방식이 존재할 수 있는데 이러한 기준을 사용자가 아래와 같이 설정할 수 있다.

  1. Unspecified: default로 적용된 기준으로, 현재 배치가 끝나면 다음 배치를 수행한다. 이때 다음 input 데이터가 들어오지 않았으면 들어올 때까지 기다린다.
  2. Time Interval: 예를 들어 5분 주기로 배치를 돌린다. 이전 배치주기가 interval보다 길면 해당 배치가 끝나자마자 다음 배치가 시작된다. 데이터가 인입되지 않으면 배치는 시작하지 않는다.
  3. One Time: 기존에 사용하던 배치 방식과 같다.
  4. Continuous: 새롭게 적용된 방식으로, milli second 단위의 실시간 스트리밍을 지원한다. 이 방식은 주기가 매우 짧기 때문에 micro-batch 방식은 적절하지 않아, continuous trigger 방식을 채택하는 아직 실험적인 부분으로 명시되어 있다.

[요약]

  • spark streaming 코드를 짤 때 micro-batch 과정에 대해서만 코드 상에 존재한다.
  • 새로운 데이터가 유입되어 매번 해당 micro-batch 로직을 수행하는 것은(코드 상에 존재하지 않는) spark streaming 백그라운드 프로세스에 의해 관리된다.
  • spark streaming 백그라운드 프로세스의 trigger 기준은 커스터마이징 가능하다.

[예제 코드]

# 5-5. 구조화된 스트리밍 사용 예제
# 특정 디렉토리 내 파일을 읽어들여 데이터처리를 하는 streaming spark 작업 
from pyspark.sql import SparkSession

# generate the data
# fake data --format=ndjson --limit 1000 city domain event=event.value.action > streaming_sample/sample.json
spark = SparkSession \
    .builder \
    .appName("StructuredStreamingSum") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("maxFilesPerTrigger", 1) \
    .getOrCreate()

# create dataframe reparesenting the stream of input lines from connection to localhost:9999
df = spark \
    .readStream \
    .format("json") \
    .option("path", "streaming_sample") \
    .load()

shorten_df = df.select("city", "event")

query = shorten_df \
            .writeStream \
            .format("json") \
            .option("path", "streaming_output") \
            .option("checkpointLocation", "checkpoint") \
            .outputMode("append") \
            .trigger(processingTime='5 seconds') \
            .start()

query.awaitTermination()
  • config("spark.streaming.stopGracefullyOnShutdown", "true"): 스트리밍 작업을 graceful하게 종료하기 위해 설정하는 값으로, 스트리밍 작업이 종료될 때 데이터 손실을 최소화할 수 있다. 스파크 스트리밍 작업은 계속해서 데이터를 처리하고 있기 때문에 예기치 않은 이유로 스트리밍 작업을 중지해야 할 때 해당 설정을 하지 않으면 처리 중인 작업을 완료하지 않고 즉시 종료하기 때문에 데이터 손실이 발생할 수 있다.
  • config("maxFilesPerTrigger", 1): 각 트리거링 당 읽을 파일의 최대 수를 설정한다. 해당 설정을 하는 이유는, 무턱대고 한번에 많은 개수로 설정하게 되면 많은 파일을 읽어들이는 데 디스크 I/O가 크게 발생하거나 해당 작업을 위한 리소스 사용량 등 작업 처리에 장애가 발생할 수 있다. 따라서 파일이 지속적으로 쌓이는 경우에는 특히, 적당한 개수로 해당 값을 설정하여  파일을 처리하는 것이 효율적일 것이다.
  • option("checkpointLocation", "checkpoint"): 스트리밍 처리의 상태를 유지하기 위한 체크포인트 위치를 설정.
  • trigger(processingTime='5 seconds'): 처리 간격을 설정함. 

6. Streaming Fault Tolerance(결함 감내)에 대해서 알아보기

[spark streaming은 언젠가 한번은 멈출 수 밖에 없다]

spark streaming을 사용하다 보면 잘못 정의된 input 데이터가 들어오거나 여러가지 exception 오류의 발생 또는 앱을 업그레이드 하는 등의 유지보수 차원에서도 잠시 중단해야 되는 경우가 생긴다. 이러한 상황에서도 spark streaming은 input 데이터를 놓치지 않고 중복 없이 데이터를 생성해야 할 것이다.

 

spark streaming의 이러한 "exactly Once" 특징은 API 수준에서 이미 달성된다. 내부적으로 어떻게 달성되는 것일까? 이는 매 micro-batch마다 checkpoint(디렉토리 내 여러 폴더로 관리됨)를 통해 Read Position, state information(group by와 같은 연산 시 이전 배치에서 sum 해놓은 정보를 저장해 놓음으로써 재연산하는 낭비를 줄일 수 있게 함)을 기록함으로써 달성한다.

매 micro-batch 시마다 checkpoint로 시점을 기록한다.

그럼 불완전하게 끝난 micro-batch는 spark streaming을 재시작하면 무조건 exact-once를 달성할 수 있을까? 그렇지 않다. 달성하기 위해서는 아래 조건이 갖춰져야 한다.

 

[[exactly-once 달성 조건]]

  1. 같은 checkpoint에서 재시작되어야 한다. 만약 해당 checkpoint 디렉토리가 제거되었거나 같은 쿼리가 다른 checkpoint 디렉토리에서 수행되면 안 된다.
  2. 해당 micro-batch를 다시 돌리기 위해 과거의 소스 데이터를 가져올 수 있어야 한다. 즉 재현성이 있는 kafka와 같은 소스가 있어야 한다. 소켓같은 데이터 소스는 그래서 안 된다.
  3. 같은 input이 들어왔을 때 같은 output을 내는 일관된 로직이어야 한다. 예를 들어 시간 의존적인 로직이 존재한다면 output이 달라질 수 있어서 안 된다.
  4. ?중복을 무시하기 위해 이를 인지할 수 있는 output sink여야 한다. 그렇지 않으면 과거 데이터를 새롭게 업데이트 할 수도 있기 때문이다

[코드를 중간에 수정해서 spark streaming을 지속해도 되는가?]

spark streaming 실행 중에 버그가 발견되어 코드를 수정해야 할 상황에서는 그냥 수정 후 재가동하면 문제가 없을까?

코드를 수정할 때 filter같은 것들은 spark에 의해 감지되지 않을 거라(filter에 의해 output 데이터프레임의 형태가 달라지지 않음 ) 상관없다. 하지만 예를 들어 group by하는 컬럼명을 바꾸게 된다면 이전의 state information와 충돌이 일어나(즉 spark에 의해 감지됨) exception 에러가 나타날 것이다. 따라서 checkpoint가 더 이상 필요하지 않으며 spark streaming 실행 시 방해가 되는 정보이기 때문에 이럴 경우에는, checkpoint를 지우고 실행해야 할 것이다. 이처럼 중간에 코드를 수정하는 것은 product 환경에서는 주의해야 한다.

 

[요약]

  1. spark streaming에서 checkpoint를 통해 fault tolerance를 달성한다. 따라서 checkpoint를 사용할 수 있도록 코드에서 명시해야 할 것 같다.
  2. spark streaming 중간에 코드를 수정하여 재가동할 때에는 로직 상 변경으로 corrupt가 발생하지 않도록 주의해야 한다. 



참고)

- streaming 소개)

https://qkqhxla1.tistory.com/1143

 

spark streaming(dstream, structured streaming) 정리 + 삽질

spark streaming 에 대해 읽고 간단히 정리한다. spark streaming에는 dstream과 structured streaming의 두가지가 있다. 1. dstream (spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers) dstream의 input

qkqhxla1.tistory.com

https://seaweed-one.tistory.com/45

 

[Spark] Spark Streaming 의 종류 (DStream, Structured Streaming)

안녕하세요. 씨위드입니다. 오늘은 Spark Streaming의 종류에 대하여 알아보겠습니다. Spark Streaming은 DStream과 Structured Streaming으로 나눠집니다. 두 스트리밍에는 어떤 차이점이 있는지 알아보기 위하

seaweed-one.tistory.com

- streamingContext 동작방식)

https://techmagie.wordpress.com/tag/spark/

 

Spark – World of BigData

Whats wrong with Hadoop ? Around a near decade ago Hadoop became the de-facto standard for batch processing unstructured data in the form of Map-Reduce jobs.Over the years people developed layers of other services/tools like Oozie for workflow management ,

techmagie.wordpress.com

https://www.cnblogs.com/itboys/p/7771086.html

 

Spark Streaming实战 - 大葱拌豆腐 - 博客园

1.Storm 和 SparkStreaming区别 Storm 纯实时的流式处理,来一条数据就立即进行处理 SparkStreaming 微批处理,每次处理的都是一批非常小的数据 Storm支持动态调整并行度(动态的资源分配),SparkStreaming(

www.cnblogs.com

https://cloud.tencent.com/developer/article/1157534

 

SparkStreaming 入门-腾讯云开发者社区-腾讯云

1. 基本原理 其实在 SparkStreaming 中和之前的Core不同的就是他会把任务分成批次的进行处理,也就是我们需要设置间隔多久计算一次。 我们从网络,文件系统,Kafka 等等数据源产生的地方获取数据

cloud.tencent.com

https://alphahackerhan.tistory.com/13

 

스파크 스트리밍, Spark Streaming

1. 배경 모바일 디바이스와 더불에 IoT 시대로 들어서면서, 데이터는 양만큼이나 빠른 속도가 중요해졌다. 빅데이터가 만족시켜야 하는 3가지 요소, (3V, Volume, Velocity, Variety), 중 속도(Velocity)의 중

alphahackerhan.tistory.com

- spark streaming process model)

https://www.youtube.com/watch?v=koh7UOucMb0

- spark streaming fault tolerance)

https://www.youtube.com/watch?v=RxnkYTwMcis&t=27s