admin管理员组文章数量:1026989
Spark中的checkpoint机制
一.Spark Core中的checkpoint
def main(args: Array[String]) {val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]").getOrCreate()val sc = spark.sparkContextsc.setCheckpointDir("checkpoint")val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))val pairs = sc.parallelize(data, 3)pairs.cache()pairs.checkpoint()println(pairs.count)
}
二.Spark Streaming中的checkpoint
checkpoint主要保存:
- 1.metadata(一些配置)
- 2.RDD数据(保存状态)
1.无状态
def main(args: Array[String]): Unit = {val checkpointDirectory = "offset/checkpoints"def functionToCreateContext(): StreamingContext = {val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2)) ssc.checkpoint(checkpointDirectory)val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop000:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "my-spark-group-1","auto.offset.reset" -> "earliest","enable.automit" -> (false: java.lang.Boolean))val topics = Array("my-topic")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))words.print()ssc}val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)ssc.start()ssc.awaitTermination()
}
2.自定义有状态
用到updateStateByKey函数进行状态保存。
val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1)).updateStateByKey[Int](updateFunction _)
words.print()def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = newValues.sumval old = runningCount.getOrElse(0)Some(newCount+old)
}
checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。
- 元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
- 所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦。
UUID文件夹下就是保存状态的RDD数据。
3.窗口函数中的有状态和无状态
单纯的window函数是无状态的。
countByWindow函数是有状态的,需要checkpoint来保存状态。
4.问答
Q1:streaming中checkpoint是在何时做的?
A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。
Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?
A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。
在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。
其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。
Checkpoint主要包含的信息如下:
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll
具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。
三.Spark Structured Streaming中的checkpoint
def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getName).config("spark.sql.shuffle.partitions", 10).getOrCreate()import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop000:9092").option("subscribe", "my-topic").load().selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" ")).map(word => (word, 1))//.groupBy("value")//.count().writeStream.outputMode("append").format("console").option("checkpointLocation","sss/chk").start().awaitTermination()
}
内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。
offsets文件夹下每个文件都是一个batch的偏移量。
{"batchWatermarkMs":0,"batchTimestampMs":1647251960388,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}
稍微更改下业务逻辑,flatMap(.split(" ")) 改为 flatMap(.split(",")) 依然能继续消费,ss也能。
但是ss取消map(word => (word, 1))操作,会报错,sss不会。因为ss的checkpoint中写死了操作集的元数据信息。
sss中append模式改为complete模式则会报错。
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。
.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")
个参数决定了state/0 下文件夹下的个数。
delta前的数字每个批次完成后加一。
四.checkpoint比较
五.参考文章
Spark中的checkpoint机制_程研板的博客-CSDN博客_spark的checkpoint机制
Spark中的checkpoint机制
一.Spark Core中的checkpoint
def main(args: Array[String]) {val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]").getOrCreate()val sc = spark.sparkContextsc.setCheckpointDir("checkpoint")val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))val pairs = sc.parallelize(data, 3)pairs.cache()pairs.checkpoint()println(pairs.count)
}
二.Spark Streaming中的checkpoint
checkpoint主要保存:
- 1.metadata(一些配置)
- 2.RDD数据(保存状态)
1.无状态
def main(args: Array[String]): Unit = {val checkpointDirectory = "offset/checkpoints"def functionToCreateContext(): StreamingContext = {val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2)) ssc.checkpoint(checkpointDirectory)val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop000:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "my-spark-group-1","auto.offset.reset" -> "earliest","enable.automit" -> (false: java.lang.Boolean))val topics = Array("my-topic")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))words.print()ssc}val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)ssc.start()ssc.awaitTermination()
}
2.自定义有状态
用到updateStateByKey函数进行状态保存。
val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1)).updateStateByKey[Int](updateFunction _)
words.print()def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = newValues.sumval old = runningCount.getOrElse(0)Some(newCount+old)
}
checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。
- 元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
- 所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦。
UUID文件夹下就是保存状态的RDD数据。
3.窗口函数中的有状态和无状态
单纯的window函数是无状态的。
countByWindow函数是有状态的,需要checkpoint来保存状态。
4.问答
Q1:streaming中checkpoint是在何时做的?
A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。
Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?
A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。
在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。
其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。
Checkpoint主要包含的信息如下:
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll
具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。
三.Spark Structured Streaming中的checkpoint
def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getName).config("spark.sql.shuffle.partitions", 10).getOrCreate()import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop000:9092").option("subscribe", "my-topic").load().selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" ")).map(word => (word, 1))//.groupBy("value")//.count().writeStream.outputMode("append").format("console").option("checkpointLocation","sss/chk").start().awaitTermination()
}
内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。
offsets文件夹下每个文件都是一个batch的偏移量。
{"batchWatermarkMs":0,"batchTimestampMs":1647251960388,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}
稍微更改下业务逻辑,flatMap(.split(" ")) 改为 flatMap(.split(",")) 依然能继续消费,ss也能。
但是ss取消map(word => (word, 1))操作,会报错,sss不会。因为ss的checkpoint中写死了操作集的元数据信息。
sss中append模式改为complete模式则会报错。
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。
.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")
个参数决定了state/0 下文件夹下的个数。
delta前的数字每个批次完成后加一。
四.checkpoint比较
五.参考文章
Spark中的checkpoint机制_程研板的博客-CSDN博客_spark的checkpoint机制
本文标签: Spark中的checkpoint机制
版权声明:本文标题:Spark中的checkpoint机制 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/IT/1694671162a254873.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论