admin管理员组

文章数量:1026989

Flink字符串数据流转换数据类型(Row)流及Row的源码

文章目录

  • 1.Row定义
  • 2.常用方法
    • 2.1.构造函数
    • 2.2.getArity()
    • 2.3.getField(int pos)
    • 2.4.setField(int pos, Object value)
    • 2.5. Row of(Object... values)
    • 2.6. copy(Row row)
    • 2.7.project(Row row, int[] fields)
    • 2.8.Row join(Row first, Row... remainings)
  • 3.利用Row对象将流数据转成初始化动态表的数据类型
    • 3.1.说明
    • 3.2.直接接收String类型数据流的问题
    • 3.3.采用String流 转 Row流
      • 3.3.1.整体代码
      • 3.3.2.genTypeInformation
      • 3.3.3.LineSplitter
  • 4.总结
    • 4.1.示例dmeo代码的模拟数据源
  • 2021.12.2 补充

1.Row定义

本质上是一个对象数组。
/** The array to store actual values. */
private final Object[] fields;

2.常用方法

2.1.构造函数

赋予对象数组长度。

public Row(int arity) {this.fields = new Object[arity];
}

2.2.getArity()

相当于获取length

public Object getField(int pos) {return fields[pos];
}

2.3.getField(int pos)

获取指定的对象数组fields的对象;

public Object getField(int pos) {return fields[pos];
}

2.4.setField(int pos, Object value)

public void setField(int pos, Object value) {fields[pos] = value;
}

2.5. Row of(Object… values)

封装的快捷的构造函数
官方举例:

	/*** Creates a new Row and assigns the given values to the Row's fields.* This is more convenient than using the constructor.** <p>For example:** <pre>*     Row.of("hello", true, 1L);}* </pre>* instead of* <pre>*     Row row = new Row(3);*     row.setField(0, "hello");*     row.setField(1, true);*     row.setField(2, 1L);* </pre>**/

2.6. copy(Row row)

复制并返回一个row。

public static Row copy(Row row) {final Row newRow = new Row(row.fields.length);System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);return newRow;
}

2.7.project(Row row, int[] fields)

官方的解释是:
以不是深复制的方式复制创建一个新的row对象。
实际上是:可选复制部分的row对象【截取】,int[] fiedls便是截取(原先row)部分的index。

public static Row project(Row row, int[] fields) {final Row newRow = new Row(fields.length);for (int i = 0; i < fields.length; i++) {newRow.fields[i] = row.fields[fields[i]];}return newRow;
}

2.8.Row join(Row first, Row… remainings)

合并两个row对象。

3.利用Row对象将流数据转成初始化动态表的数据类型

3.1.说明

Flink的数据类型以及序列化。

DataStream - String流 转 Row流
在生成FlinkSQL动态表的时候,如果按照kafka数据源的String类型进行创建String数据流:

3.2.直接接收String类型数据流的问题

提示数据流:DataStream 和 动态表结构”no,name,balance”参数对应补上。

3.3.采用String流 转 Row流

3.3.1.整体代码

3.3.2.genTypeInformation

将数据源的数据类型转换为实际的数据类型,返回的是TypeInformation[]。

举例:

	// TypeInformation为字段对应的数据类型public static TypeInformation[] getTypeInformation(String srcFieldTypes){String[] tokens = srcFieldTypes.split(",");TypeInformation[] fieldTypes = new TypeInformation[tokens.length];for(int i=0, len=tokens.length; i<len; i++){switch(tokens[i].toLowerCase()){case  "string" :fieldTypes[i] = Types.STRING;break;case  "int" :fieldTypes[i] = Types.INT;break;case  "integer" :fieldTypes[i] = Types.INT;break;case  "long" :fieldTypes[i] = Types.LONG;break;case  "double" :fieldTypes[i] = Types.DOUBLE;break;case  "timestamp" :fieldTypes[i] =  Types.SQL_TIMESTAMP;break;default:break;}}return fieldTypes;}

3.3.3.LineSplitter

是一个FlatMapFunction<String, Row>实现flatMaP算子的方法,主要是将:kafka数据源的String分割切换为对象数组Row,并将默认的切割对象String匹配上面的getTypeInformationd的类型数据,采用switch,举例:

 // kafka消息一行分隔成多个字段, 再封装成Row类型public static final class LineSplitter implements FlatMapFunction<String, Row> {private static final long serialVersionUID = 1L;public void flatMap(String line, Collector<Row> out) {System.out.println("读取数据: "+line);try {String[] fields = line.split(",,");		// kafka 字段分隔符 splitCharint len = fields.length;Row rsRow = new Row(len);if(len>0){for(int i=0; i<len; i++){if( i>=len || fields[i]==null ||fields[i].length()==0){continue;}// 将kafka消息的每个字段转成 flink表格里一行的字段switch (srcFieldTypesArr[i]) {case "long":rsRow.setField(i, Long.valueOf(fields[i]));break;case "string":rsRow.setField(i,fields[i]);break;case "timestamp":SimpleDateFormat simdate1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date1 = simdate1.parse(fields[i]);Timestamp sqlDate = new Timestamp(date1.getTime());rsRow.setField(i,sqlDate);break;default: rsRow.setField(i, fields[i]);break;}}}//System.out.println("数据输入: "+rsRow);out.collect(rsRow);} catch (Exception e) {e.printStackTrace(); }            }}

4.总结

总结
至此,kafka发送生产的数据源String类型,便转换为了带数据类型的动态FlinkSQL表,SQL面板要求只需要验证格式:即Table的正确产生即可。

4.1.示例dmeo代码的模拟数据源

2021.12.2 补充

我以为我把Row类的常用方法列出来,讲明白如何转换成Row对象。大家可以自行查看对应的API对号入座实操实现对应的Row流,毕竟一篇文章是一个整体,没有多余的地方。这倒是我的疏忽,为此附上getTypeInformation方法源码以及自定义flatMap算子的LineSplitter代码以及文尾的一个简单的Row测试代码:

row对象简单实现:

	void rowTest() {String str = "1,长臂人猿,666";		String[] fields = str.split(",");int len = fields.length;		Row row = new Row(len);for(int i=0;i<len;i++) {row.setField(i, fields[i]);}System.out.println("完整的Row:" + row.toString());System.out.println("第二个元素(对应的是name:)" + row.getField(1) + "       Row总长:" + row.getArity());System.out.println(row.getField(1).getClass());}

Flink字符串数据流转换数据类型(Row)流及Row的源码

文章目录

  • 1.Row定义
  • 2.常用方法
    • 2.1.构造函数
    • 2.2.getArity()
    • 2.3.getField(int pos)
    • 2.4.setField(int pos, Object value)
    • 2.5. Row of(Object... values)
    • 2.6. copy(Row row)
    • 2.7.project(Row row, int[] fields)
    • 2.8.Row join(Row first, Row... remainings)
  • 3.利用Row对象将流数据转成初始化动态表的数据类型
    • 3.1.说明
    • 3.2.直接接收String类型数据流的问题
    • 3.3.采用String流 转 Row流
      • 3.3.1.整体代码
      • 3.3.2.genTypeInformation
      • 3.3.3.LineSplitter
  • 4.总结
    • 4.1.示例dmeo代码的模拟数据源
  • 2021.12.2 补充

1.Row定义

本质上是一个对象数组。
/** The array to store actual values. */
private final Object[] fields;

2.常用方法

2.1.构造函数

赋予对象数组长度。

public Row(int arity) {this.fields = new Object[arity];
}

2.2.getArity()

相当于获取length

public Object getField(int pos) {return fields[pos];
}

2.3.getField(int pos)

获取指定的对象数组fields的对象;

public Object getField(int pos) {return fields[pos];
}

2.4.setField(int pos, Object value)

public void setField(int pos, Object value) {fields[pos] = value;
}

2.5. Row of(Object… values)

封装的快捷的构造函数
官方举例:

	/*** Creates a new Row and assigns the given values to the Row's fields.* This is more convenient than using the constructor.** <p>For example:** <pre>*     Row.of("hello", true, 1L);}* </pre>* instead of* <pre>*     Row row = new Row(3);*     row.setField(0, "hello");*     row.setField(1, true);*     row.setField(2, 1L);* </pre>**/

2.6. copy(Row row)

复制并返回一个row。

public static Row copy(Row row) {final Row newRow = new Row(row.fields.length);System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);return newRow;
}

2.7.project(Row row, int[] fields)

官方的解释是:
以不是深复制的方式复制创建一个新的row对象。
实际上是:可选复制部分的row对象【截取】,int[] fiedls便是截取(原先row)部分的index。

public static Row project(Row row, int[] fields) {final Row newRow = new Row(fields.length);for (int i = 0; i < fields.length; i++) {newRow.fields[i] = row.fields[fields[i]];}return newRow;
}

2.8.Row join(Row first, Row… remainings)

合并两个row对象。

3.利用Row对象将流数据转成初始化动态表的数据类型

3.1.说明

Flink的数据类型以及序列化。

DataStream - String流 转 Row流
在生成FlinkSQL动态表的时候,如果按照kafka数据源的String类型进行创建String数据流:

3.2.直接接收String类型数据流的问题

提示数据流:DataStream 和 动态表结构”no,name,balance”参数对应补上。

3.3.采用String流 转 Row流

3.3.1.整体代码

3.3.2.genTypeInformation

将数据源的数据类型转换为实际的数据类型,返回的是TypeInformation[]。

举例:

	// TypeInformation为字段对应的数据类型public static TypeInformation[] getTypeInformation(String srcFieldTypes){String[] tokens = srcFieldTypes.split(",");TypeInformation[] fieldTypes = new TypeInformation[tokens.length];for(int i=0, len=tokens.length; i<len; i++){switch(tokens[i].toLowerCase()){case  "string" :fieldTypes[i] = Types.STRING;break;case  "int" :fieldTypes[i] = Types.INT;break;case  "integer" :fieldTypes[i] = Types.INT;break;case  "long" :fieldTypes[i] = Types.LONG;break;case  "double" :fieldTypes[i] = Types.DOUBLE;break;case  "timestamp" :fieldTypes[i] =  Types.SQL_TIMESTAMP;break;default:break;}}return fieldTypes;}

3.3.3.LineSplitter

是一个FlatMapFunction<String, Row>实现flatMaP算子的方法,主要是将:kafka数据源的String分割切换为对象数组Row,并将默认的切割对象String匹配上面的getTypeInformationd的类型数据,采用switch,举例:

 // kafka消息一行分隔成多个字段, 再封装成Row类型public static final class LineSplitter implements FlatMapFunction<String, Row> {private static final long serialVersionUID = 1L;public void flatMap(String line, Collector<Row> out) {System.out.println("读取数据: "+line);try {String[] fields = line.split(",,");		// kafka 字段分隔符 splitCharint len = fields.length;Row rsRow = new Row(len);if(len>0){for(int i=0; i<len; i++){if( i>=len || fields[i]==null ||fields[i].length()==0){continue;}// 将kafka消息的每个字段转成 flink表格里一行的字段switch (srcFieldTypesArr[i]) {case "long":rsRow.setField(i, Long.valueOf(fields[i]));break;case "string":rsRow.setField(i,fields[i]);break;case "timestamp":SimpleDateFormat simdate1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date1 = simdate1.parse(fields[i]);Timestamp sqlDate = new Timestamp(date1.getTime());rsRow.setField(i,sqlDate);break;default: rsRow.setField(i, fields[i]);break;}}}//System.out.println("数据输入: "+rsRow);out.collect(rsRow);} catch (Exception e) {e.printStackTrace(); }            }}

4.总结

总结
至此,kafka发送生产的数据源String类型,便转换为了带数据类型的动态FlinkSQL表,SQL面板要求只需要验证格式:即Table的正确产生即可。

4.1.示例dmeo代码的模拟数据源

2021.12.2 补充

我以为我把Row类的常用方法列出来,讲明白如何转换成Row对象。大家可以自行查看对应的API对号入座实操实现对应的Row流,毕竟一篇文章是一个整体,没有多余的地方。这倒是我的疏忽,为此附上getTypeInformation方法源码以及自定义flatMap算子的LineSplitter代码以及文尾的一个简单的Row测试代码:

row对象简单实现:

	void rowTest() {String str = "1,长臂人猿,666";		String[] fields = str.split(",");int len = fields.length;		Row row = new Row(len);for(int i=0;i<len;i++) {row.setField(i, fields[i]);}System.out.println("完整的Row:" + row.toString());System.out.println("第二个元素(对应的是name:)" + row.getField(1) + "       Row总长:" + row.getArity());System.out.println(row.getField(1).getClass());}

本文标签: Flink字符串数据流转换数据类型(Row)流及Row的源码