flink-SQL

摘要:
TableAPI和SQL绑定在flash tableMaven工件中。对于批查询,您需要添加:<dependency><groupId>org。阿帕奇。flinkflink scala_2.111.5.0TableAPI和SQL程序结构flink的批处理和流处理TableAPI和SQL程序遵循相同的模式;因此,我们只需要使用一种方法来证明我们可以执行Flink的SQL语句。首先,您需要获得SQL执行环境:有两种方法://*******************************************/STREAMINQUERY//***********************valsEnv=StreamExecutionEnvironment。getExecutionEnvironment//为流查询创建TableEnvironment TableEnv=TableEnvironment=TableEnvironment。GetTableEnvironment可以通过GetTableEnvironment获取TableEnvironment;此TableEnvironment是TableAPI和SQL集成的核心概念。输出表可用于将表API或SQL查询的结果发送到外部系统。可以从各种来源注册输入表:-现有的“表”对象,通常是表API或SQL查询的结果。可以向TableSink注册输出表。

Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.5.0</version>
</dependency>

另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>1.5.0</version>
</dependency>

Table API和SQL程序的结构

Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;

所以我们只需要使用一种来演示即可

要想执行flink的SQL语句,首先需要获取SQL的执行环境:

两种方式(batch和streaming):

// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

- 在内部目录中注册一个表
- 注册外部目录
- 执行SQL查询
- 注册用户定义的(标量,表格或聚合)函数
- 转换DataStream或DataSet成Table
- 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

在内部目录中注册一个表

TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格输出表格

输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统

输入表可以从各种来源注册:

- 现有`Table`对象,通常是表API或SQL查询的结果。
- `TableSource`,它访问外部数据,例如文件,数据库或消息传递系统。
- `DataStream`或`DataSet`来自DataStream或DataSet程序。

输出表可以使用注册TableSink

注册一个表

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)

// Table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("projectedTable ").select(...)

注册一个tableSource

TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...)

// get a TableEnvironment 
val tableEnv = TableEnvironment.getTableEnvironment(env) 
// create a TableSource
 val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
 // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)

注册一个tableSink

注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

例子:

 //创建batch执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //创建table环境用于batch查询
    val tableEnvironment = TableEnvironment.getTableEnvironment(env)
    //加载外部数据
    val csvTableSource = CsvTableSource.builder()
      .path("data1.csv")//文件路径
      .field("id" , Types.INT)//第一列数据
      .field("name" , Types.STRING)//第二列数据
      .field("age" , Types.INT)//第三列数据
      .fieldDelimiter(",")//列分隔符,默认是","
      .lineDelimiter("
")//换行符
      .ignoreFirstLine()//忽略第一行
      .ignoreParseErrors()//忽略解析错误
      .build()
    //将外部数据构建成表
    tableEnvironment.registerTableSource("tableA" , csvTableSource)
    //TODO 1:使用table方式查询数据
    val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'")
    //将数据写出去
    table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
    //TODO 2:使用sql方式
    //    val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2")
////    //将数据写出去
//    sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
    env.execute()

免责声明:文章转载自《flink-SQL》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇3springboot:springboot配置文件(外部配置加载顺序、自动配置原理,@Conditional)bochs使用指南下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Oracle执行计划

Oracle数据库查看执行计划 基于ORACLE的应用系统很多性能问题,是由应用系统SQL性能低劣引起的,所以,SQL的性能优化很重要,分析与优化SQL的性能我们一般通过查看该SQL的执行计划,本文就如何看懂执行计划,以及如何通过分析执行计划对SQL进行优化做相应说明。 一、什么是执行计划(explain plan) 执行计划:一条查询语句在ORACLE中...

索引长度过长 ERROR 1071 (42000): Specified key was too long; max key length is 767 bytes

1.发现问题  今天在修改innodb表的某个列的长度时,报如下错误:   [html]view plaincopy  print? alter table test2 modify column id varchar(500);   ERROR 1071 (42000): Specified key was too long; max key ...

mysql 修改大表字段,报错ERROR 1878 (HY000): Temporary file write failure. 用pt-online-schema-change

在线上一个表上执行了alter 增加字段操作,报异常:ERROR 1878 (HY000): Temporary file write failure. 初步怀疑表太大,临时空间不够。 1.查了下表的大小将近28G,索引18G,mysql配置的tmp缓存目录只有2G select data_length,index_length   from tables...

laravel中migration 数据迁移

简介 数据库迁移就像是数据库的版本控制,可以让你的团队轻松修改并共享应用程序的数据库结构。迁移通常与 Laravel 的数据库结构生成器配合使用,让你轻松地构建数据库结构。如果你曾经试过让同事手动在数据库结构中添加字段,那么数据库迁移可以让你不再需要做这样的事情。 Laravel Schema facade 对所有 Laravel 支持的数据库系统提供了创...

oracle hint

Hint概述基于代价的优化器是很聪明的,在绝大多数情况下它会选择正确的优化器,减轻了DBA的负担。但有时它也聪明反被聪明误,选择了很差的执行计划,使某个语句的执行变得奇慢无比。 此时就需要DBA进行人为的干预,告诉优化器使用我们指定的存取路径或连接类型生成执行计划,从 而使语句高效的运行。例如,如果我们认为对于一个特定的语句,执行全表扫描要比执行索引扫描更...

SQLite学习笔记

安装 在Windows上安装SQLite。 访问官网下载下Precompliled Binaries for Windows的两个压缩包。 创建sqlite文件夹,路径不要包含中文,把压缩包的内容解压到文件夹中。再将这个文件添加进PATH环境变量中。 此时打开命令行窗口输入sqlite3,将会出现对应提示,表示安装完成。 SQLite命令 在命令行窗口输...