admin管理员组

文章数量:1026989

Spark中的checkpoint机制

一.Spark Core中的checkpoint

def main(args: Array[String]) {val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]").getOrCreate()val sc = spark.sparkContextsc.setCheckpointDir("checkpoint")val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))val pairs = sc.parallelize(data, 3)pairs.cache()pairs.checkpoint()println(pairs.count)
}


二.Spark Streaming中的checkpoint

checkpoint主要保存:

  • 1.metadata(一些配置)
  • 2.RDD数据(保存状态)

1.无状态

def main(args: Array[String]): Unit = {val checkpointDirectory = "offset/checkpoints"def functionToCreateContext(): StreamingContext = {val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2))  ssc.checkpoint(checkpointDirectory)val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop000:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "my-spark-group-1","auto.offset.reset" -> "earliest","enable.automit" -> (false: java.lang.Boolean))val topics = Array("my-topic")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))words.print()ssc}val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)ssc.start()ssc.awaitTermination()
}

2.自定义有状态

用到updateStateByKey函数进行状态保存。

val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1)).updateStateByKey[Int](updateFunction _)  
words.print()def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = newValues.sumval old = runningCount.getOrElse(0)Some(newCount+old)
}

checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。

  •     元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
  •     所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦。

UUID文件夹下就是保存状态的RDD数据。

3.窗口函数中的有状态和无状态

单纯的window函数是无状态的。
countByWindow函数是有状态的,需要checkpoint来保存状态。

4.问答

Q1:streaming中checkpoint是在何时做的?

A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。

Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?

A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。

在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。

其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。

Checkpoint主要包含的信息如下:

val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll

具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。

三.Spark Structured Streaming中的checkpoint

def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getName).config("spark.sql.shuffle.partitions", 10).getOrCreate()import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop000:9092").option("subscribe", "my-topic").load().selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" ")).map(word => (word, 1))//.groupBy("value")//.count().writeStream.outputMode("append").format("console").option("checkpointLocation","sss/chk").start().awaitTermination()
}

 

内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。

offsets文件夹下每个文件都是一个batch的偏移量。

{"batchWatermarkMs":0,"batchTimestampMs":1647251960388,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}

稍微更改下业务逻辑,flatMap(.split(" ")) 改为 flatMap(.split(",")) 依然能继续消费,ss也能。
但是ss取消map(word => (word, 1))操作,会报错,sss不会。因为ss的checkpoint中写死了操作集的元数据信息。

sss中append模式改为complete模式则会报错。

.groupBy("value")
.count()
.writeStream
.outputMode("complete")

 使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。

.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")

个参数决定了state/0 下文件夹下的个数。
delta前的数字每个批次完成后加一。

四.checkpoint比较


五.参考文章

Spark中的checkpoint机制_程研板的博客-CSDN博客_spark的checkpoint机制

Spark中的checkpoint机制

一.Spark Core中的checkpoint

def main(args: Array[String]) {val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]").getOrCreate()val sc = spark.sparkContextsc.setCheckpointDir("checkpoint")val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))val pairs = sc.parallelize(data, 3)pairs.cache()pairs.checkpoint()println(pairs.count)
}


二.Spark Streaming中的checkpoint

checkpoint主要保存:

  • 1.metadata(一些配置)
  • 2.RDD数据(保存状态)

1.无状态

def main(args: Array[String]): Unit = {val checkpointDirectory = "offset/checkpoints"def functionToCreateContext(): StreamingContext = {val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2))  ssc.checkpoint(checkpointDirectory)val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop000:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "my-spark-group-1","auto.offset.reset" -> "earliest","enable.automit" -> (false: java.lang.Boolean))val topics = Array("my-topic")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))words.print()ssc}val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)ssc.start()ssc.awaitTermination()
}

2.自定义有状态

用到updateStateByKey函数进行状态保存。

val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1)).updateStateByKey[Int](updateFunction _)  
words.print()def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = newValues.sumval old = runningCount.getOrElse(0)Some(newCount+old)
}

checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。

  •     元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
  •     所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦。

UUID文件夹下就是保存状态的RDD数据。

3.窗口函数中的有状态和无状态

单纯的window函数是无状态的。
countByWindow函数是有状态的,需要checkpoint来保存状态。

4.问答

Q1:streaming中checkpoint是在何时做的?

A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。

Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?

A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。

在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。

其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。

Checkpoint主要包含的信息如下:

val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll

具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。

三.Spark Structured Streaming中的checkpoint

def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getName).config("spark.sql.shuffle.partitions", 10).getOrCreate()import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop000:9092").option("subscribe", "my-topic").load().selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" ")).map(word => (word, 1))//.groupBy("value")//.count().writeStream.outputMode("append").format("console").option("checkpointLocation","sss/chk").start().awaitTermination()
}

 

内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。

offsets文件夹下每个文件都是一个batch的偏移量。

{"batchWatermarkMs":0,"batchTimestampMs":1647251960388,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}

稍微更改下业务逻辑,flatMap(.split(" ")) 改为 flatMap(.split(",")) 依然能继续消费,ss也能。
但是ss取消map(word => (word, 1))操作,会报错,sss不会。因为ss的checkpoint中写死了操作集的元数据信息。

sss中append模式改为complete模式则会报错。

.groupBy("value")
.count()
.writeStream
.outputMode("complete")

 使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。

.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")

个参数决定了state/0 下文件夹下的个数。
delta前的数字每个批次完成后加一。

四.checkpoint比较


五.参考文章

Spark中的checkpoint机制_程研板的博客-CSDN博客_spark的checkpoint机制

本文标签: Spark中的checkpoint机制