admin管理员组文章数量:1026989
Flink字符串数据流转换数据类型(Row)流及Row的源码
文章目录
- 1.Row定义
- 2.常用方法
- 2.1.构造函数
- 2.2.getArity()
- 2.3.getField(int pos)
- 2.4.setField(int pos, Object value)
- 2.5. Row of(Object... values)
- 2.6. copy(Row row)
- 2.7.project(Row row, int[] fields)
- 2.8.Row join(Row first, Row... remainings)
- 3.利用Row对象将流数据转成初始化动态表的数据类型
- 3.1.说明
- 3.2.直接接收String类型数据流的问题
- 3.3.采用String流 转 Row流
- 3.3.1.整体代码
- 3.3.2.genTypeInformation
- 3.3.3.LineSplitter
- 4.总结
- 4.1.示例dmeo代码的模拟数据源
- 2021.12.2 补充
1.Row定义
本质上是一个对象数组。
/** The array to store actual values. */
private final Object[] fields;
2.常用方法
2.1.构造函数
赋予对象数组长度。
public Row(int arity) {this.fields = new Object[arity];
}
2.2.getArity()
相当于获取length
public Object getField(int pos) {return fields[pos];
}
2.3.getField(int pos)
获取指定的对象数组fields的对象;
public Object getField(int pos) {return fields[pos];
}
2.4.setField(int pos, Object value)
public void setField(int pos, Object value) {fields[pos] = value;
}
2.5. Row of(Object… values)
封装的快捷的构造函数
官方举例:
/*** Creates a new Row and assigns the given values to the Row's fields.* This is more convenient than using the constructor.** <p>For example:** <pre>* Row.of("hello", true, 1L);}* </pre>* instead of* <pre>* Row row = new Row(3);* row.setField(0, "hello");* row.setField(1, true);* row.setField(2, 1L);* </pre>**/
2.6. copy(Row row)
复制并返回一个row。
public static Row copy(Row row) {final Row newRow = new Row(row.fields.length);System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);return newRow;
}
2.7.project(Row row, int[] fields)
官方的解释是:
以不是深复制的方式复制创建一个新的row对象。
实际上是:可选复制部分的row对象【截取】,int[] fiedls便是截取(原先row)部分的index。
public static Row project(Row row, int[] fields) {final Row newRow = new Row(fields.length);for (int i = 0; i < fields.length; i++) {newRow.fields[i] = row.fields[fields[i]];}return newRow;
}
2.8.Row join(Row first, Row… remainings)
合并两个row对象。
3.利用Row对象将流数据转成初始化动态表的数据类型
3.1.说明
Flink的数据类型以及序列化。
DataStream - String流 转 Row流
在生成FlinkSQL动态表的时候,如果按照kafka数据源的String类型进行创建String数据流:
3.2.直接接收String类型数据流的问题
提示数据流:DataStream 和 动态表结构”no,name,balance”参数对应补上。
3.3.采用String流 转 Row流
3.3.1.整体代码
3.3.2.genTypeInformation
将数据源的数据类型转换为实际的数据类型,返回的是TypeInformation[]。
举例:
// TypeInformation为字段对应的数据类型public static TypeInformation[] getTypeInformation(String srcFieldTypes){String[] tokens = srcFieldTypes.split(",");TypeInformation[] fieldTypes = new TypeInformation[tokens.length];for(int i=0, len=tokens.length; i<len; i++){switch(tokens[i].toLowerCase()){case "string" :fieldTypes[i] = Types.STRING;break;case "int" :fieldTypes[i] = Types.INT;break;case "integer" :fieldTypes[i] = Types.INT;break;case "long" :fieldTypes[i] = Types.LONG;break;case "double" :fieldTypes[i] = Types.DOUBLE;break;case "timestamp" :fieldTypes[i] = Types.SQL_TIMESTAMP;break;default:break;}}return fieldTypes;}
3.3.3.LineSplitter
是一个FlatMapFunction<String, Row>实现flatMaP算子的方法,主要是将:kafka数据源的String分割切换为对象数组Row,并将默认的切割对象String匹配上面的getTypeInformationd的类型数据,采用switch,举例:
// kafka消息一行分隔成多个字段, 再封装成Row类型public static final class LineSplitter implements FlatMapFunction<String, Row> {private static final long serialVersionUID = 1L;public void flatMap(String line, Collector<Row> out) {System.out.println("读取数据: "+line);try {String[] fields = line.split(",,"); // kafka 字段分隔符 splitCharint len = fields.length;Row rsRow = new Row(len);if(len>0){for(int i=0; i<len; i++){if( i>=len || fields[i]==null ||fields[i].length()==0){continue;}// 将kafka消息的每个字段转成 flink表格里一行的字段switch (srcFieldTypesArr[i]) {case "long":rsRow.setField(i, Long.valueOf(fields[i]));break;case "string":rsRow.setField(i,fields[i]);break;case "timestamp":SimpleDateFormat simdate1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date1 = simdate1.parse(fields[i]);Timestamp sqlDate = new Timestamp(date1.getTime());rsRow.setField(i,sqlDate);break;default: rsRow.setField(i, fields[i]);break;}}}//System.out.println("数据输入: "+rsRow);out.collect(rsRow);} catch (Exception e) {e.printStackTrace(); } }}
4.总结
总结
至此,kafka发送生产的数据源String类型,便转换为了带数据类型的动态FlinkSQL表,SQL面板要求只需要验证格式:即Table的正确产生即可。
4.1.示例dmeo代码的模拟数据源
2021.12.2 补充
我以为我把Row类的常用方法列出来,讲明白如何转换成Row对象。大家可以自行查看对应的API对号入座实操实现对应的Row流,毕竟一篇文章是一个整体,没有多余的地方。这倒是我的疏忽,为此附上getTypeInformation方法源码以及自定义flatMap算子的LineSplitter代码以及文尾的一个简单的Row测试代码:
row对象简单实现:
void rowTest() {String str = "1,长臂人猿,666"; String[] fields = str.split(",");int len = fields.length; Row row = new Row(len);for(int i=0;i<len;i++) {row.setField(i, fields[i]);}System.out.println("完整的Row:" + row.toString());System.out.println("第二个元素(对应的是name:)" + row.getField(1) + " Row总长:" + row.getArity());System.out.println(row.getField(1).getClass());}
Flink字符串数据流转换数据类型(Row)流及Row的源码
文章目录
- 1.Row定义
- 2.常用方法
- 2.1.构造函数
- 2.2.getArity()
- 2.3.getField(int pos)
- 2.4.setField(int pos, Object value)
- 2.5. Row of(Object... values)
- 2.6. copy(Row row)
- 2.7.project(Row row, int[] fields)
- 2.8.Row join(Row first, Row... remainings)
- 3.利用Row对象将流数据转成初始化动态表的数据类型
- 3.1.说明
- 3.2.直接接收String类型数据流的问题
- 3.3.采用String流 转 Row流
- 3.3.1.整体代码
- 3.3.2.genTypeInformation
- 3.3.3.LineSplitter
- 4.总结
- 4.1.示例dmeo代码的模拟数据源
- 2021.12.2 补充
1.Row定义
本质上是一个对象数组。
/** The array to store actual values. */
private final Object[] fields;
2.常用方法
2.1.构造函数
赋予对象数组长度。
public Row(int arity) {this.fields = new Object[arity];
}
2.2.getArity()
相当于获取length
public Object getField(int pos) {return fields[pos];
}
2.3.getField(int pos)
获取指定的对象数组fields的对象;
public Object getField(int pos) {return fields[pos];
}
2.4.setField(int pos, Object value)
public void setField(int pos, Object value) {fields[pos] = value;
}
2.5. Row of(Object… values)
封装的快捷的构造函数
官方举例:
/*** Creates a new Row and assigns the given values to the Row's fields.* This is more convenient than using the constructor.** <p>For example:** <pre>* Row.of("hello", true, 1L);}* </pre>* instead of* <pre>* Row row = new Row(3);* row.setField(0, "hello");* row.setField(1, true);* row.setField(2, 1L);* </pre>**/
2.6. copy(Row row)
复制并返回一个row。
public static Row copy(Row row) {final Row newRow = new Row(row.fields.length);System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);return newRow;
}
2.7.project(Row row, int[] fields)
官方的解释是:
以不是深复制的方式复制创建一个新的row对象。
实际上是:可选复制部分的row对象【截取】,int[] fiedls便是截取(原先row)部分的index。
public static Row project(Row row, int[] fields) {final Row newRow = new Row(fields.length);for (int i = 0; i < fields.length; i++) {newRow.fields[i] = row.fields[fields[i]];}return newRow;
}
2.8.Row join(Row first, Row… remainings)
合并两个row对象。
3.利用Row对象将流数据转成初始化动态表的数据类型
3.1.说明
Flink的数据类型以及序列化。
DataStream - String流 转 Row流
在生成FlinkSQL动态表的时候,如果按照kafka数据源的String类型进行创建String数据流:
3.2.直接接收String类型数据流的问题
提示数据流:DataStream 和 动态表结构”no,name,balance”参数对应补上。
3.3.采用String流 转 Row流
3.3.1.整体代码
3.3.2.genTypeInformation
将数据源的数据类型转换为实际的数据类型,返回的是TypeInformation[]。
举例:
// TypeInformation为字段对应的数据类型public static TypeInformation[] getTypeInformation(String srcFieldTypes){String[] tokens = srcFieldTypes.split(",");TypeInformation[] fieldTypes = new TypeInformation[tokens.length];for(int i=0, len=tokens.length; i<len; i++){switch(tokens[i].toLowerCase()){case "string" :fieldTypes[i] = Types.STRING;break;case "int" :fieldTypes[i] = Types.INT;break;case "integer" :fieldTypes[i] = Types.INT;break;case "long" :fieldTypes[i] = Types.LONG;break;case "double" :fieldTypes[i] = Types.DOUBLE;break;case "timestamp" :fieldTypes[i] = Types.SQL_TIMESTAMP;break;default:break;}}return fieldTypes;}
3.3.3.LineSplitter
是一个FlatMapFunction<String, Row>实现flatMaP算子的方法,主要是将:kafka数据源的String分割切换为对象数组Row,并将默认的切割对象String匹配上面的getTypeInformationd的类型数据,采用switch,举例:
// kafka消息一行分隔成多个字段, 再封装成Row类型public static final class LineSplitter implements FlatMapFunction<String, Row> {private static final long serialVersionUID = 1L;public void flatMap(String line, Collector<Row> out) {System.out.println("读取数据: "+line);try {String[] fields = line.split(",,"); // kafka 字段分隔符 splitCharint len = fields.length;Row rsRow = new Row(len);if(len>0){for(int i=0; i<len; i++){if( i>=len || fields[i]==null ||fields[i].length()==0){continue;}// 将kafka消息的每个字段转成 flink表格里一行的字段switch (srcFieldTypesArr[i]) {case "long":rsRow.setField(i, Long.valueOf(fields[i]));break;case "string":rsRow.setField(i,fields[i]);break;case "timestamp":SimpleDateFormat simdate1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date1 = simdate1.parse(fields[i]);Timestamp sqlDate = new Timestamp(date1.getTime());rsRow.setField(i,sqlDate);break;default: rsRow.setField(i, fields[i]);break;}}}//System.out.println("数据输入: "+rsRow);out.collect(rsRow);} catch (Exception e) {e.printStackTrace(); } }}
4.总结
总结
至此,kafka发送生产的数据源String类型,便转换为了带数据类型的动态FlinkSQL表,SQL面板要求只需要验证格式:即Table的正确产生即可。
4.1.示例dmeo代码的模拟数据源
2021.12.2 补充
我以为我把Row类的常用方法列出来,讲明白如何转换成Row对象。大家可以自行查看对应的API对号入座实操实现对应的Row流,毕竟一篇文章是一个整体,没有多余的地方。这倒是我的疏忽,为此附上getTypeInformation方法源码以及自定义flatMap算子的LineSplitter代码以及文尾的一个简单的Row测试代码:
row对象简单实现:
void rowTest() {String str = "1,长臂人猿,666"; String[] fields = str.split(",");int len = fields.length; Row row = new Row(len);for(int i=0;i<len;i++) {row.setField(i, fields[i]);}System.out.println("完整的Row:" + row.toString());System.out.println("第二个元素(对应的是name:)" + row.getField(1) + " Row总长:" + row.getArity());System.out.println(row.getField(1).getClass());}
本文标签: Flink字符串数据流转换数据类型(Row)流及Row的源码
版权声明:本文标题:Flink字符串数据流转换数据类型(Row)流及Row的源码 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/IT/1694650684a254527.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论