admin管理员组文章数量:1031982
StreamExecutionEnvironment
1.定义
StreamExecutionEnvironment是Flink中用于定义和执行流处理程序的主要类。它提供了一系列函数和方法来配置流处理程序的执行环境(例如并行度、checkpoint、时间特性),并将其部署到Flink集群中运行。
2.主要功能和设计思路
- 提供编程接口 StreamExecutionEnvironment提供了Java和Scala编程接口,使得开发者可以使用自己熟悉的编程语言来定义和执行流处理程序。
- 定义数据源 StreamExecutionEnvironment提供了多种定义数据源的方法,包括从文件、socket、Kafka等读取数据,也可以通过自定义数据源来读取数据。
- 定义数据处理操作 StreamExecutionEnvironment提供了多种数据处理操作,包括map、filter、reduce、keyBy、window等,可以根据需求进行灵活配置和组合。
- 定义数据输出 StreamExecutionEnvironment提供了多种数据输出方法,包括将数据输出到文件、数据库、Kafka等,也可以通过自定义输出方法来输出数据。
- 定义并行度 StreamExecutionEnvironment可以根据数据处理任务的特点和集群资源的情况来动态调整并行度,以提高程序的执行效率。
- 集成第三方库 StreamExecutionEnvironment集成了多种第三方库,如Apache Kafka、Apache Cassandra、Elasticsearch等,方便开发者使用这些库来进行数据处理和存储。
- 支持流式迭代 StreamExecutionEnvironment支持流式迭代,即在数据流中对数据进行多次迭代处理,以支持迭代算法和机器学习等应用场景。
- 支持事件时间处理 StreamExecutionEnvironment支持事件时间处理,即在数据流中根据事件发生的时间进行数据处理和聚合,以支持实时数据分析和监控等应用场景。
3.使用示例
4.源代码核心方法剖析
4.1方法说明
- 构造函数 StreamExecutionEnvironment有多个构造函数,其中默认构造函数是私有的,不能从外部直接创建StreamExecutionEnvironment实例。另外一个常用的构造函数是fromXXX()系列方法,例如fromElements()、fromCollection()、fromParallelCollection(),用于创建DataStream的执行环境。
- setParallelism()方法 用于设置任务的并行度,即任务中并行执行的任务数。默认情况下,Flink会根据系统资源和任务的特性自动设置并行度,但用户也可以通过setParallelism()方法手动设置并行度。
- enableCheckpointing()方法 用于启用和配置checkpoint机制。checkpoint是一种容错机制,用于将任务执行过程中的中间结果保存到外部存储中,以便在任务失败时恢复任务执行进度。enableCheckpointing()方法可以设置checkpoint的间隔时间、超时时间、最大并发数等参数。
- setStreamTimeCharacteristic()方法 用于设置任务的时间特性,即任务如何处理事件时间和处理时间。Flink支持三种时间特性:ProcessingTime、EventTime和IngestionTime。ProcessingTime表示任务处理数据时使用的本地系统时间;EventTime表示数据本身携带的时间戳信息,用于实现基于事件时间的数据处理;IngestionTime表示数据进入系统的时间,用于实现基于系统时间的数据处理。
- addSource()方法 用于向任务中添加数据源。addSource()方法可以接受多种类型的数据源,例如Kafka、Socket、文件、集合等。
- addSink()方法 用于向任务中添加数据汇。addSink()方法可以接受多种类型的数据汇,例如Kafka、Socket、文件、集合等。
- execute()方法 用于启动流处理任务的执行。execute()方法会将所有的Transformation操作和数据源、数据汇等组件构建成一个StreamGraph,并将StreamGraph转换为JobGraph,最终提交给Flink集群执行。
4.2部分核心方法源码示例
StreamExecutionEnvironment的代码比较复杂,这里只提供其中一部分的示例代码作为参考:
代码语言:javascript代码运行次数:0运行复制public class StreamExecutionEnvironment {
//默认的执行环境并行度
private final int defaultLocalParallelism;
//默认的执行环境
private final ExecutorService defaultExecutorService;
//配置文件
private final Configuration configuration;
//执行环境的ID
private final String executorId;
//用户自定义的类加载器
private final ClassLoader userClassLoader;
//数据源注册中心
private final SourceFunctionRegistry sourceFunctionRegistry;
//转换器注册中心
private final StreamOperatorFactory<?> operatorFactory;
//配置信息
private final CheckpointConfig checkpointConfig;
//时间特性
private final TimeCharacteristic timeCharacteristic;
//状态后端
private final StateBackend stateBackend;
/**
* 构造方法,用于创建一个StreamExecutionEnvironment对象
*
* @param executorService 默认的执行环境
* @param configuration 配置文件
* @param userClassLoader 用户自定义的类加载器
* @param defaultLocalParallelism 默认的执行环境并行度
* @param executorId 执行环境的ID
*/
public StreamExecutionEnvironment(
ExecutorService executorService,
Configuration configuration,
ClassLoader userClassLoader,
int defaultLocalParallelism,
String executorId) {
this.defaultLocalParallelism = defaultLocalParallelism;
this.defaultExecutorService = executorService;
this.configuration = configuration == null ? new Configuration() : configuration;
this.executorId = executorId != null ? executorId : UUID.randomUUID().toString();
this.userClassLoader = userClassLoader == null ? getClass().getClassLoader() : userClassLoader;
this.sourceFunctionRegistry = new SourceFunctionRegistry();
this.operatorFactory = new StreamOperatorFactory<>();
this.checkpointConfig = new CheckpointConfig();
this.timeCharacteristic = TimeCharacteristic.ProcessingTime;
this.stateBackend = null;
}
/**
* 获取数据流处理的默认并行度
*
* @return 默认并行度
*/
public int getDefaultLocalParallelism() {
return defaultLocalParallelism;
}
/**
* 获取配置文件
*
* @return 配置文件
*/
public Configuration getConfiguration() {
return configuration;
}
/**
* 获取执行环境的ID
*
* @return 执行环境的ID
*/
public String getId() {
return executorId;
}
/**
* 获取用户自定义的类加载器
*
* @return 用户自定义的类加载器
*/
public ClassLoader getUserClassLoader() {
return userClassLoader;
}
/**
* 获取数据源注册中心
*
* @return 数据源注册中心
*/
public SourceFunctionRegistry getSourceFunctionRegistry() {
return sourceFunctionRegistry;
}
/**
* 获取转换器注册中心
*
* @return 转换器注册中心
*/
public StreamOperatorFactory<?> getOperatorFactory() {
return operatorFactory;
}
/**
* 获取检查点配置
*
* @return 检查点配置
*/
public CheckpointConfig getCheckpointConfig() {
return checkpointConfig;
}
/**
* 获取时间特性
*
* @return 时间特性
*/
public TimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
/**
* 获取状态后端
*
* @return 状态后端
*/
public StateBackend getStateBackend() {
return stateBackend;
}
/**
* 设置执行环境的默认并行度
*
* @param parallelism 并行度
*/
public void setDefaultLocalParallelism(int parallelism) {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, parallelism);
}
/**
* 获取一个DataStream对象
*
* @param source 数据源
* @param type 数据类型
* @param <T> 数据类型
* @return DataStream对象
*/
public <T> DataStream<T> addSource(SourceFunction<T> source, TypeInformation<T> type) {
// 创建SourceTransformation对象,表示对数据源进行转换操作
SourceTransformation<T> transform = new SourceTransformation<>(source, "Source", type, defaultLocalParallelism);
// 将SourceTransformation对象添加到转换器注册中心中
operatorFactory.addOperator(transform);
// 返回转换后的DataStream对象
return new DataStream<>(this, getNewNodeId(), transform.getOutputType());
}
/**
* 获取一个DataStream对象
*
* @param source 数据源
* @param <T> 数据类型
* @return DataStream对象
*/
public <T> DataStreamSource<T> addSource(SourceFunction<T> source) {
return addSource(source, TypeExtractor.createTypeInfo(SourceFunction.class, source.getClass(), 0));
}
/**
* 获取一个Table对象
*
* @return Table对象
*/
public TableEnvironment createTableEnvironment() {
// 创建TableEnvironment对象
return TableEnvironment.create(configuration, executorComponents);
}
// 其他操作的实现略
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-03-11,如有侵权请联系 cloudcommunity@tencent 删除数据数据处理flink对象教程StreamExecutionEnvironment
1.定义
StreamExecutionEnvironment是Flink中用于定义和执行流处理程序的主要类。它提供了一系列函数和方法来配置流处理程序的执行环境(例如并行度、checkpoint、时间特性),并将其部署到Flink集群中运行。
2.主要功能和设计思路
- 提供编程接口 StreamExecutionEnvironment提供了Java和Scala编程接口,使得开发者可以使用自己熟悉的编程语言来定义和执行流处理程序。
- 定义数据源 StreamExecutionEnvironment提供了多种定义数据源的方法,包括从文件、socket、Kafka等读取数据,也可以通过自定义数据源来读取数据。
- 定义数据处理操作 StreamExecutionEnvironment提供了多种数据处理操作,包括map、filter、reduce、keyBy、window等,可以根据需求进行灵活配置和组合。
- 定义数据输出 StreamExecutionEnvironment提供了多种数据输出方法,包括将数据输出到文件、数据库、Kafka等,也可以通过自定义输出方法来输出数据。
- 定义并行度 StreamExecutionEnvironment可以根据数据处理任务的特点和集群资源的情况来动态调整并行度,以提高程序的执行效率。
- 集成第三方库 StreamExecutionEnvironment集成了多种第三方库,如Apache Kafka、Apache Cassandra、Elasticsearch等,方便开发者使用这些库来进行数据处理和存储。
- 支持流式迭代 StreamExecutionEnvironment支持流式迭代,即在数据流中对数据进行多次迭代处理,以支持迭代算法和机器学习等应用场景。
- 支持事件时间处理 StreamExecutionEnvironment支持事件时间处理,即在数据流中根据事件发生的时间进行数据处理和聚合,以支持实时数据分析和监控等应用场景。
3.使用示例
4.源代码核心方法剖析
4.1方法说明
- 构造函数 StreamExecutionEnvironment有多个构造函数,其中默认构造函数是私有的,不能从外部直接创建StreamExecutionEnvironment实例。另外一个常用的构造函数是fromXXX()系列方法,例如fromElements()、fromCollection()、fromParallelCollection(),用于创建DataStream的执行环境。
- setParallelism()方法 用于设置任务的并行度,即任务中并行执行的任务数。默认情况下,Flink会根据系统资源和任务的特性自动设置并行度,但用户也可以通过setParallelism()方法手动设置并行度。
- enableCheckpointing()方法 用于启用和配置checkpoint机制。checkpoint是一种容错机制,用于将任务执行过程中的中间结果保存到外部存储中,以便在任务失败时恢复任务执行进度。enableCheckpointing()方法可以设置checkpoint的间隔时间、超时时间、最大并发数等参数。
- setStreamTimeCharacteristic()方法 用于设置任务的时间特性,即任务如何处理事件时间和处理时间。Flink支持三种时间特性:ProcessingTime、EventTime和IngestionTime。ProcessingTime表示任务处理数据时使用的本地系统时间;EventTime表示数据本身携带的时间戳信息,用于实现基于事件时间的数据处理;IngestionTime表示数据进入系统的时间,用于实现基于系统时间的数据处理。
- addSource()方法 用于向任务中添加数据源。addSource()方法可以接受多种类型的数据源,例如Kafka、Socket、文件、集合等。
- addSink()方法 用于向任务中添加数据汇。addSink()方法可以接受多种类型的数据汇,例如Kafka、Socket、文件、集合等。
- execute()方法 用于启动流处理任务的执行。execute()方法会将所有的Transformation操作和数据源、数据汇等组件构建成一个StreamGraph,并将StreamGraph转换为JobGraph,最终提交给Flink集群执行。
4.2部分核心方法源码示例
StreamExecutionEnvironment的代码比较复杂,这里只提供其中一部分的示例代码作为参考:
代码语言:javascript代码运行次数:0运行复制public class StreamExecutionEnvironment {
//默认的执行环境并行度
private final int defaultLocalParallelism;
//默认的执行环境
private final ExecutorService defaultExecutorService;
//配置文件
private final Configuration configuration;
//执行环境的ID
private final String executorId;
//用户自定义的类加载器
private final ClassLoader userClassLoader;
//数据源注册中心
private final SourceFunctionRegistry sourceFunctionRegistry;
//转换器注册中心
private final StreamOperatorFactory<?> operatorFactory;
//配置信息
private final CheckpointConfig checkpointConfig;
//时间特性
private final TimeCharacteristic timeCharacteristic;
//状态后端
private final StateBackend stateBackend;
/**
* 构造方法,用于创建一个StreamExecutionEnvironment对象
*
* @param executorService 默认的执行环境
* @param configuration 配置文件
* @param userClassLoader 用户自定义的类加载器
* @param defaultLocalParallelism 默认的执行环境并行度
* @param executorId 执行环境的ID
*/
public StreamExecutionEnvironment(
ExecutorService executorService,
Configuration configuration,
ClassLoader userClassLoader,
int defaultLocalParallelism,
String executorId) {
this.defaultLocalParallelism = defaultLocalParallelism;
this.defaultExecutorService = executorService;
this.configuration = configuration == null ? new Configuration() : configuration;
this.executorId = executorId != null ? executorId : UUID.randomUUID().toString();
this.userClassLoader = userClassLoader == null ? getClass().getClassLoader() : userClassLoader;
this.sourceFunctionRegistry = new SourceFunctionRegistry();
this.operatorFactory = new StreamOperatorFactory<>();
this.checkpointConfig = new CheckpointConfig();
this.timeCharacteristic = TimeCharacteristic.ProcessingTime;
this.stateBackend = null;
}
/**
* 获取数据流处理的默认并行度
*
* @return 默认并行度
*/
public int getDefaultLocalParallelism() {
return defaultLocalParallelism;
}
/**
* 获取配置文件
*
* @return 配置文件
*/
public Configuration getConfiguration() {
return configuration;
}
/**
* 获取执行环境的ID
*
* @return 执行环境的ID
*/
public String getId() {
return executorId;
}
/**
* 获取用户自定义的类加载器
*
* @return 用户自定义的类加载器
*/
public ClassLoader getUserClassLoader() {
return userClassLoader;
}
/**
* 获取数据源注册中心
*
* @return 数据源注册中心
*/
public SourceFunctionRegistry getSourceFunctionRegistry() {
return sourceFunctionRegistry;
}
/**
* 获取转换器注册中心
*
* @return 转换器注册中心
*/
public StreamOperatorFactory<?> getOperatorFactory() {
return operatorFactory;
}
/**
* 获取检查点配置
*
* @return 检查点配置
*/
public CheckpointConfig getCheckpointConfig() {
return checkpointConfig;
}
/**
* 获取时间特性
*
* @return 时间特性
*/
public TimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
/**
* 获取状态后端
*
* @return 状态后端
*/
public StateBackend getStateBackend() {
return stateBackend;
}
/**
* 设置执行环境的默认并行度
*
* @param parallelism 并行度
*/
public void setDefaultLocalParallelism(int parallelism) {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, parallelism);
}
/**
* 获取一个DataStream对象
*
* @param source 数据源
* @param type 数据类型
* @param <T> 数据类型
* @return DataStream对象
*/
public <T> DataStream<T> addSource(SourceFunction<T> source, TypeInformation<T> type) {
// 创建SourceTransformation对象,表示对数据源进行转换操作
SourceTransformation<T> transform = new SourceTransformation<>(source, "Source", type, defaultLocalParallelism);
// 将SourceTransformation对象添加到转换器注册中心中
operatorFactory.addOperator(transform);
// 返回转换后的DataStream对象
return new DataStream<>(this, getNewNodeId(), transform.getOutputType());
}
/**
* 获取一个DataStream对象
*
* @param source 数据源
* @param <T> 数据类型
* @return DataStream对象
*/
public <T> DataStreamSource<T> addSource(SourceFunction<T> source) {
return addSource(source, TypeExtractor.createTypeInfo(SourceFunction.class, source.getClass(), 0));
}
/**
* 获取一个Table对象
*
* @return Table对象
*/
public TableEnvironment createTableEnvironment() {
// 创建TableEnvironment对象
return TableEnvironment.create(configuration, executorComponents);
}
// 其他操作的实现略
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-03-11,如有侵权请联系 cloudcommunity@tencent 删除数据数据处理flink对象教程本文标签: StreamExecutionEnvironment
版权声明:本文标题:StreamExecutionEnvironment 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/jiaocheng/1747895995a2224256.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论