admin管理员组文章数量:1024074
I would like to use KinesisMessageDrivenChannelAdapter
to read records from a Kinesis stream. When starting the consumer application for the very first time, I would like it to receive all records that already exist in the stream. On subsequent starts though, the application should continue reading from the latest checkpointed sequence number coming from DynamoDb.
Is my assumption correct that adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon())
provides this behaviour?
I would like to use KinesisMessageDrivenChannelAdapter
to read records from a Kinesis stream. When starting the consumer application for the very first time, I would like it to receive all records that already exist in the stream. On subsequent starts though, the application should continue reading from the latest checkpointed sequence number coming from DynamoDb.
Is my assumption correct that adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon())
provides this behaviour?
1 Answer
Reset to default 0That works only for new consumers in the group. If there is already a checkpoint for this consumer group and that shard, then we go like this:
if (this.shardOffset.isReset()) {
this.checkpointer.remove();
}
else {
String checkpoint = this.checkpointer.getCheckpoint();
if (checkpoint != null) {
this.shardOffset.setSequenceNumber(checkpoint);
this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}
}
So, it is going to consume from a stored checkpoint.
If you'd like to use that trimHorizon
, then you call KinesisMessageDrivenChannelAdapter.resetCheckpoints()
.
I would like to use KinesisMessageDrivenChannelAdapter
to read records from a Kinesis stream. When starting the consumer application for the very first time, I would like it to receive all records that already exist in the stream. On subsequent starts though, the application should continue reading from the latest checkpointed sequence number coming from DynamoDb.
Is my assumption correct that adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon())
provides this behaviour?
I would like to use KinesisMessageDrivenChannelAdapter
to read records from a Kinesis stream. When starting the consumer application for the very first time, I would like it to receive all records that already exist in the stream. On subsequent starts though, the application should continue reading from the latest checkpointed sequence number coming from DynamoDb.
Is my assumption correct that adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon())
provides this behaviour?
1 Answer
Reset to default 0That works only for new consumers in the group. If there is already a checkpoint for this consumer group and that shard, then we go like this:
if (this.shardOffset.isReset()) {
this.checkpointer.remove();
}
else {
String checkpoint = this.checkpointer.getCheckpoint();
if (checkpoint != null) {
this.shardOffset.setSequenceNumber(checkpoint);
this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}
}
So, it is going to consume from a stored checkpoint.
If you'd like to use that trimHorizon
, then you call KinesisMessageDrivenChannelAdapter.resetCheckpoints()
.
本文标签:
版权声明:本文标题:spring integration - What is the effect of streamInitialSequence in case there are already checkpoints in DynamoDB? - Stack Over 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745608064a2158841.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论