Spark Streaming vs Structured Streaming

2024-02-27

đã sinh ra Spark Streaming lại sinh ra Structured Streaming

So sánh

  Spark Streaming Structured Streaming
import là 1 package riêng spark-streaming_2.12 + spark-streaming-kafka-0-10_2.12 có sẵn trong spark-sql_2.13 từ 2.x
api DStream API from Spark RDDs Dataframe, Dataset APIs -> dùng được sql. Dataframe thì kiểu jh cũng optimize hơn rdd của bên trái
cơ chế micro batch theo duration k có khái niệm batch, các data mới đến được append vào một unbounded result table.
data late chỉ xử lí theo thời gian mà spark nhận được data đó. với các dữ liệu đến trễ do latency thì coi như mất với batch đó có thể xử lí dữ liệu event-time (các dữ liệu tới trễ). đây cũng là 1 trong những feature chính của structure streaming
End to end guarantees cả 2 đều có cơ chế checkpoint có cơ chế restricted sinks : hỗ trợ exactly once tốt hơn
sink không hạn chế bao nhiêu sink. foreachRDD foreachBatch

*sink: kết quả của stream operator được gửi tới một nơi gọi là sink (sink - destination of a streaming operation). ví dụ: storage, print console. đặc điểm của sink là tính imdempotent

Code

Spark Streaming
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate
hello world

(hello,1)
(world,1)
Structure Streaming


import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();



// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();




apache spark
apache hadoop

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+

Structure streaming trigger

Spark streaming: fix duration

Structure streaming: Khoảng thời gian mà nó processing. mặc định là cứ xong micro-batch trước là thực hiện.

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

...

Structure streaming append row

Spark streaming: chia theo batch của input và sử lí như spark bình thường (convert streaming về batch)

Structure streaming: append vào 1 table

Structure streaming window late

Spark streaming: window length(3), sliding interval(2)

Structure streaming: late được cập nhật lại

Vấn đề: data quá trễ (tính đến đơn vị ngày) thì có thể dữ liệu đã k còn trên ram -> watermarking in spark 2.1.

Ref

medium

spark streaming

strcuture streaming

trigger