admin管理员组文章数量:1031961
使用 Flink 和 Kafka 构建数据管道
1. 概述
Apache Flink是一个流处理框架,可以很容易地与Java一起使用。Apache Kafka是一个支持高容错的分布式流处理系统。
在本教程中,我们将了解如何使用这两种技术构建数据管道。
2. 安装
要安装和配置 Apache Kafka,请参考官方指南。安装后,我们可以使用以下命令创建名为flink_input和 flink_output 的新主题:
代码语言:javascript代码运行次数:0运行复制 bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_output
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_inputCopy
在本教程中,我们将使用 Apache Kafka 的默认配置和默认端口。
3. Flink使用
Apache Flink允许实时流处理技术。该框架允许使用多个第三方系统作为流源或接收器。
在 Flink 中 – 有各种可用的连接器:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
要将 Flink 添加到我们的项目中,我们需要包含以下 Maven 依赖项:
代码语言:javascript代码运行次数:0运行复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.5.0</version>
</dependency>Copy
添加这些依赖项将允许我们使用和生成 Kafka 主题。你可以在Maven Central 上找到当前版本的 Flink。
4. 消费者
为了使用Flink从Kafka中消费数据,我们需要提供一个主题和一个Kafka地址。我们还应该提供一个组id,用于保存偏移量,这样我们就不会总是从头读取整个数据。
让我们创建一个静态方法,使FlinkKafkaConsumer的创建更容易:
代码语言:javascript代码运行次数:0运行复制public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
topic, new SimpleStringSchema(), props);
return consumer;
}Copy
此方法采用topic、kafkaAddress 和kafkaGroup 作为输入参数来创建FlinkKafkaConsumer,它将使用给定主题中的数据作为String,因为我们使用了SimpleStringSchema来解码数据。
类名中的数字011指的是卡夫卡版本。
5. 生产者
为了向 Kafka 生成数据,需要提供我们要使用的 Kafka 地址和主题。同样,可以创建一个静态方法,帮助我们为不同的主题创建生产者:
代码语言:javascript代码运行次数:0运行复制public static FlinkKafkaProducer011<String> createStringProducer(
String topic, String kafkaAddress){
return new FlinkKafkaProducer011<>(kafkaAddress,
topic, new SimpleStringSchema());
}Copy
此方法仅将topic和kafkaAddress作为参数,因为在生成 Kafka 主题时无需提供组 id。
6. 字符串流处理
当我们有一个完全工作的消费者和生产者时,我们可以尝试处理来自 Kafka 的数据,然后将我们的结果保存回 Kafka。可在此处找到可用于流处理的函数的完整列表。
在此示例中,我们将每个 Kafka 条目中的单词大写,然后将其写回 Kafka。
为此,我们需要创建一个自定义的 MapFunction:
代码语言:javascript代码运行次数:0运行复制public class WordsCapitalizer implements MapFunction<String, String> {
@Override
public String map(String s) {
return s.toUpperCase();
}
}Copy
创建函数后,我们可以在流处理中使用它:
代码语言:javascript代码运行次数:0运行复制public static void capitalize() {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment
.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
inputTopic, address, consumerGroup);
DataStream<String> stringInputStream = environment
.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
}Copy
应用程序将从flink_input主题读取数据,对流执行操作,然后将结果保存到 Kafka 中的flink_output主题。
我们已经看到了如何使用 Flink 和 Kafka 处理字符串。但通常需要对自定义对象执行操作。我们将在下一章中看到如何做到这一点。
7. 自定义对象反序列化
以下类表示包含有关发件人和收件人的信息的简单邮件:
代码语言:javascript代码运行次数:0运行复制@JsonSerialize
public class InputMessage {
String sender;
String recipient;
LocalDateTime sentAt;
String message;
}Copy
以前,我们使用SimpleStringSchema对来自 Kafka 的消息进行反序列化,但现在我们希望将数据直接反序列化为自定义对象。
为此,我们需要一个自定义的反序列化模式:
代码语言:javascript代码运行次数:0运行复制public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {
static ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
@Override
public InputMessage deserialize(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, InputMessage.class);
}
@Override
public boolean isEndOfStream(InputMessage inputMessage) {
return false;
}
@Override
public TypeInformation<InputMessage> getProducedType() {
return TypeInformation.of(InputMessage.class);
}
}Copy
我们在这里假设消息在 Kafka 中以 JSON 形式保存。
由于我们有一个类型为 LocalDateTime 的字段,我们需要指定JavaTimeModule,它负责将LocalDateTime 对象映射到 JSON。
Flink 模式不能有不可序列化的字段,因为所有运算符(如模式或函数)都在作业开始时序列化。
Apache Spark中也有类似的问题。此问题的已知修复之一是将字段初始化为静态,就像我们对上面的 ObjectMapper所做的那样。这不是最漂亮的解决方案,但它相对简单并且可以完成工作。
该方法 isEndOfStream可用于特殊情况,即仅在收到某些特定数据之前处理流。但在我们的情况下不需要它。
8. 自定义对象序列化
现在,假设我们希望系统有可能创建消息备份。希望该过程是自动化的,并且每个备份应由一整天内发送的消息组成。
此外,备份消息应分配唯一 ID。
为此,我们可以创建以下类:
代码语言:javascript代码运行次数:0运行复制public class Backup {
@JsonProperty("inputMessages")
List<InputMessage> inputMessages;
@JsonProperty("backupTimestamp")
LocalDateTime backupTimestamp;
@JsonProperty("uuid")
UUID uuid;
public Backup(List<InputMessage> inputMessages,
LocalDateTime backupTimestamp) {
this.inputMessages = inputMessages;
this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID();
}
}Copy
请注意,UUID 生成机制并不完美,因为它允许重复。但是,这对于此示例的范围来说已经足够了。
我们希望将备份对象作为 JSON 保存到 Kafka,因此我们需要创建SerializationSchema:
代码语言:javascript代码运行次数:0运行复制public class BackupSerializationSchema
implements SerializationSchema<Backup> {
ObjectMapper objectMapper;
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
}
try {
return objectMapper.writeValueAsString(backupMessage).getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}Copy
9. 时间戳消息
由于我们要为每天的所有消息创建备份,因此消息需要时间戳。
Flink 提供了三种不同的时间特征EventTime、ProcessingTime 和IngestionTime。
在我们的例子中,我们需要使用消息的发送时间,因此我们将使用EventTime。
要使用EventTime,我们需要一个TimestampAssigner,它将从我们的输入数据中提取时间戳:
代码语言:javascript代码运行次数:0运行复制public class InputMessageTimestampAssigner
implements AssignerWithPunctuatedWatermarks<InputMessage> {
@Override
public long extractTimestamp(InputMessage element,
long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(InputMessage lastElement,
long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1500);
}
}Copy
我们需要将LocalDateTime转换为EpochSecond,因为这是 Flink 所期望的格式。分配时间戳后,所有基于时间的操作都将使用sentAt字段中的时间进行操作。
由于 Flink 期望时间戳以毫秒为单位,而toEpochSecond() 以秒为单位返回时间,我们需要将其乘以 1000,因此 Flink 将正确创建窗口。
Flink 定义了水印(Watermarks)的概念。水印在数据未按发送顺序到达的情况下很有用。水印定义允许处理元素的最大延迟。
时间戳低于水印的元素根本不会被处理。
10. 创建时间窗口
为了确保我们的备份只收集一天内发送的消息,我们可以在流上使用timeWindowAll方法,该方法会将消息拆分为窗口。
但是,我们仍然需要聚合来自每个窗口的消息,并将它们作为备份返回。
为此,我们需要一个自定义的AggregateFunction:
代码语言:javascript代码运行次数:0运行复制public class BackupAggregator
implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(
InputMessage inputMessage,
List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
return new Backup(inputMessages, LocalDateTime.now());
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages,
List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}Copy
11. 聚合备份
在分配适当的时间戳并实现我们的AggregateFunction 之后,我们终于可以获取我们的 Kafka 输入并处理它:
代码语言:javascript代码运行次数:0运行复制public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "192.168.99.100:9092";
StreamExecutionEnvironment environment
= StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
= createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer.assignTimestampsAndWatermarks(
new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer
= createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream
= environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-27,如有侵权请联系 cloudcommunity@tencent 删除javakafkaflink教程数据管道使用 Flink 和 Kafka 构建数据管道
1. 概述
Apache Flink是一个流处理框架,可以很容易地与Java一起使用。Apache Kafka是一个支持高容错的分布式流处理系统。
在本教程中,我们将了解如何使用这两种技术构建数据管道。
2. 安装
要安装和配置 Apache Kafka,请参考官方指南。安装后,我们可以使用以下命令创建名为flink_input和 flink_output 的新主题:
代码语言:javascript代码运行次数:0运行复制 bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_output
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic flink_inputCopy
在本教程中,我们将使用 Apache Kafka 的默认配置和默认端口。
3. Flink使用
Apache Flink允许实时流处理技术。该框架允许使用多个第三方系统作为流源或接收器。
在 Flink 中 – 有各种可用的连接器:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
要将 Flink 添加到我们的项目中,我们需要包含以下 Maven 依赖项:
代码语言:javascript代码运行次数:0运行复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.5.0</version>
</dependency>Copy
添加这些依赖项将允许我们使用和生成 Kafka 主题。你可以在Maven Central 上找到当前版本的 Flink。
4. 消费者
为了使用Flink从Kafka中消费数据,我们需要提供一个主题和一个Kafka地址。我们还应该提供一个组id,用于保存偏移量,这样我们就不会总是从头读取整个数据。
让我们创建一个静态方法,使FlinkKafkaConsumer的创建更容易:
代码语言:javascript代码运行次数:0运行复制public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
topic, new SimpleStringSchema(), props);
return consumer;
}Copy
此方法采用topic、kafkaAddress 和kafkaGroup 作为输入参数来创建FlinkKafkaConsumer,它将使用给定主题中的数据作为String,因为我们使用了SimpleStringSchema来解码数据。
类名中的数字011指的是卡夫卡版本。
5. 生产者
为了向 Kafka 生成数据,需要提供我们要使用的 Kafka 地址和主题。同样,可以创建一个静态方法,帮助我们为不同的主题创建生产者:
代码语言:javascript代码运行次数:0运行复制public static FlinkKafkaProducer011<String> createStringProducer(
String topic, String kafkaAddress){
return new FlinkKafkaProducer011<>(kafkaAddress,
topic, new SimpleStringSchema());
}Copy
此方法仅将topic和kafkaAddress作为参数,因为在生成 Kafka 主题时无需提供组 id。
6. 字符串流处理
当我们有一个完全工作的消费者和生产者时,我们可以尝试处理来自 Kafka 的数据,然后将我们的结果保存回 Kafka。可在此处找到可用于流处理的函数的完整列表。
在此示例中,我们将每个 Kafka 条目中的单词大写,然后将其写回 Kafka。
为此,我们需要创建一个自定义的 MapFunction:
代码语言:javascript代码运行次数:0运行复制public class WordsCapitalizer implements MapFunction<String, String> {
@Override
public String map(String s) {
return s.toUpperCase();
}
}Copy
创建函数后,我们可以在流处理中使用它:
代码语言:javascript代码运行次数:0运行复制public static void capitalize() {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment
.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
inputTopic, address, consumerGroup);
DataStream<String> stringInputStream = environment
.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
}Copy
应用程序将从flink_input主题读取数据,对流执行操作,然后将结果保存到 Kafka 中的flink_output主题。
我们已经看到了如何使用 Flink 和 Kafka 处理字符串。但通常需要对自定义对象执行操作。我们将在下一章中看到如何做到这一点。
7. 自定义对象反序列化
以下类表示包含有关发件人和收件人的信息的简单邮件:
代码语言:javascript代码运行次数:0运行复制@JsonSerialize
public class InputMessage {
String sender;
String recipient;
LocalDateTime sentAt;
String message;
}Copy
以前,我们使用SimpleStringSchema对来自 Kafka 的消息进行反序列化,但现在我们希望将数据直接反序列化为自定义对象。
为此,我们需要一个自定义的反序列化模式:
代码语言:javascript代码运行次数:0运行复制public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {
static ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
@Override
public InputMessage deserialize(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, InputMessage.class);
}
@Override
public boolean isEndOfStream(InputMessage inputMessage) {
return false;
}
@Override
public TypeInformation<InputMessage> getProducedType() {
return TypeInformation.of(InputMessage.class);
}
}Copy
我们在这里假设消息在 Kafka 中以 JSON 形式保存。
由于我们有一个类型为 LocalDateTime 的字段,我们需要指定JavaTimeModule,它负责将LocalDateTime 对象映射到 JSON。
Flink 模式不能有不可序列化的字段,因为所有运算符(如模式或函数)都在作业开始时序列化。
Apache Spark中也有类似的问题。此问题的已知修复之一是将字段初始化为静态,就像我们对上面的 ObjectMapper所做的那样。这不是最漂亮的解决方案,但它相对简单并且可以完成工作。
该方法 isEndOfStream可用于特殊情况,即仅在收到某些特定数据之前处理流。但在我们的情况下不需要它。
8. 自定义对象序列化
现在,假设我们希望系统有可能创建消息备份。希望该过程是自动化的,并且每个备份应由一整天内发送的消息组成。
此外,备份消息应分配唯一 ID。
为此,我们可以创建以下类:
代码语言:javascript代码运行次数:0运行复制public class Backup {
@JsonProperty("inputMessages")
List<InputMessage> inputMessages;
@JsonProperty("backupTimestamp")
LocalDateTime backupTimestamp;
@JsonProperty("uuid")
UUID uuid;
public Backup(List<InputMessage> inputMessages,
LocalDateTime backupTimestamp) {
this.inputMessages = inputMessages;
this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID();
}
}Copy
请注意,UUID 生成机制并不完美,因为它允许重复。但是,这对于此示例的范围来说已经足够了。
我们希望将备份对象作为 JSON 保存到 Kafka,因此我们需要创建SerializationSchema:
代码语言:javascript代码运行次数:0运行复制public class BackupSerializationSchema
implements SerializationSchema<Backup> {
ObjectMapper objectMapper;
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());
}
try {
return objectMapper.writeValueAsString(backupMessage).getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}Copy
9. 时间戳消息
由于我们要为每天的所有消息创建备份,因此消息需要时间戳。
Flink 提供了三种不同的时间特征EventTime、ProcessingTime 和IngestionTime。
在我们的例子中,我们需要使用消息的发送时间,因此我们将使用EventTime。
要使用EventTime,我们需要一个TimestampAssigner,它将从我们的输入数据中提取时间戳:
代码语言:javascript代码运行次数:0运行复制public class InputMessageTimestampAssigner
implements AssignerWithPunctuatedWatermarks<InputMessage> {
@Override
public long extractTimestamp(InputMessage element,
long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(InputMessage lastElement,
long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1500);
}
}Copy
我们需要将LocalDateTime转换为EpochSecond,因为这是 Flink 所期望的格式。分配时间戳后,所有基于时间的操作都将使用sentAt字段中的时间进行操作。
由于 Flink 期望时间戳以毫秒为单位,而toEpochSecond() 以秒为单位返回时间,我们需要将其乘以 1000,因此 Flink 将正确创建窗口。
Flink 定义了水印(Watermarks)的概念。水印在数据未按发送顺序到达的情况下很有用。水印定义允许处理元素的最大延迟。
时间戳低于水印的元素根本不会被处理。
10. 创建时间窗口
为了确保我们的备份只收集一天内发送的消息,我们可以在流上使用timeWindowAll方法,该方法会将消息拆分为窗口。
但是,我们仍然需要聚合来自每个窗口的消息,并将它们作为备份返回。
为此,我们需要一个自定义的AggregateFunction:
代码语言:javascript代码运行次数:0运行复制public class BackupAggregator
implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(
InputMessage inputMessage,
List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
return new Backup(inputMessages, LocalDateTime.now());
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages,
List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}Copy
11. 聚合备份
在分配适当的时间戳并实现我们的AggregateFunction 之后,我们终于可以获取我们的 Kafka 输入并处理它:
代码语言:javascript代码运行次数:0运行复制public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "192.168.99.100:9092";
StreamExecutionEnvironment environment
= StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
= createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer.assignTimestampsAndWatermarks(
new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer
= createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream
= environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-27,如有侵权请联系 cloudcommunity@tencent 删除javakafkaflink教程数据管道本文标签: 使用 Flink 和 Kafka 构建数据管道
版权声明:本文标题:使用 Flink 和 Kafka 构建数据管道 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/jiaocheng/1747899795a2224832.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论