Flink--将表转换为DataStream或DataSet

摘要:
ATable可以转换为DataStream或DataSet。

A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序

将表转换为DataStream

有两种模式可以将 Table转换为DataStream:

1:Append Mode

将一个表附加到流上

2:Retract Mode

将表转换为流

语法格式:

 

// get TableEnvironment. 
// registration of a DataSet is equivalent
// ge val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

例子:

object TableTODataSet_DataStream {
  def main(args: Array[String]): Unit = {
    //构造数据,转换为table
    val data = List(
      Peoject(1L, 1, "Hello"),
      Peoject(2L, 2, "Hello"),
      Peoject(3L, 3, "Hello"),
      Peoject(4L, 4, "Hello"),
      Peoject(5L, 5, "Hello"),
      Peoject(6L, 6, "Hello"),
      Peoject(7L, 7, "Hello World"),
      Peoject(8L, 8, "Hello World"),
      Peoject(8L, 8, "Hello World"),
      Peoject(20L, 20, "Hello World"))

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    val stream = env.fromCollection(data)
    val table: Table = tEnv.fromDataStream(stream)
    //TODO 将table转换为DataStream----将一个表附加到流上Append Mode
    val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table)
    //TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息
    val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table)
    retractStream.print()
    env.execute()

  }
}

case class Peoject(user: Long, index: Int, content: String)

将表转换为DataSet

语法格式:

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

例子:

case class Peoject(user: Long, index: Int, content: String)

object TableTODataSet{
  def main(args: Array[String]): Unit = {

    //构造数据,转换为table
    val data = List(
      Peoject(1L, 1, "Hello"),
      Peoject(2L, 2, "Hello"),
      Peoject(3L, 3, "Hello"),
      Peoject(4L, 4, "Hello"),
      Peoject(5L, 5, "Hello"),
      Peoject(6L, 6, "Hello"),
      Peoject(7L, 7, "Hello World"),
      Peoject(8L, 8, "Hello World"),
      Peoject(8L, 8, "Hello World"),
      Peoject(20L, 20, "Hello World"))
    //初始化环境,加载table数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnvironment = TableEnvironment.getTableEnvironment(env)
    val collection: DataSet[Peoject] = env.fromCollection(data)
    val table: Table = tableEnvironment.fromDataSet(collection)
    //TODO 将table转换为dataSet
    val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table)
    toDataSet.print()
//    env.execute()
  }
}

免责声明:文章转载自《Flink--将表转换为DataStream或DataSet》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇堆积木----vector防止内存超限CSS的position(位置)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

C# Dictionary与List的相互转换

// 声明Dictionary并初始化 Dictionary<string, string> dic = new Dictionary<string, string>() { {"1", "one"}, {"2",...

Java开发中的23种设计模式详解(转)

设计模式(Design Patterns) ——可复用面向对象软件的基础 设计模式(Design pattern)是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为了可重用代码、让代码更容易被他人理解、保证代码可靠性。 毫无疑问,设计模式于己于他人于系统都是多赢的,设计模式使代码编制真正工程化,设计模式是软件工程的基石,如...

接口上传base64编码图片

1 package com.*.util; 2 3 import java.io.FileInputStream; 4 5 6 import java.io.FileOutputStream; 7 import java.io.IOException; 8 import java.io.InputStream; 9 import j...

线程工具类ThreadUtils

1.pom引入guava依赖 <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version>...

oracle pl/sql 变量

一、变量介绍在编写pl/sql程序时,可以定义变量和常量;在pl/sql程序中包括有:1)、标量类型(scalar)2)、复合类型(composite) --用于操作单条记录3)、参照类型(reference) --用于操作多条记录4)、lob(large object)    二、标量(scalar)——常用类型1)、在编写pl/sql块时,如果要使用...

spring属性配置执行过程,单列和原型区别

  Spring配置中,采用属性注入时,当创建IOC容器时,也直接创建对象,并且执行相对应的setter方法 Student.java 1 package com.scope; 2 3 public class Student { 4 private String name; 5 private String number;...