项目实战 从 0 到 1 学习之Flink (28)FlinkSql教程(二)

摘要:
在新项目之后,我们首先要做的自然是编写一个Flink版本的Hello World。此外,如果您想了解更多关于DataStream的内容,欢迎关注另一系列FlinkDataStream的新kafka数据源表。接下来,我们不多说,直接粘贴代码import.apache.flink.streaming.api.datastream.datastream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apach.flink.table.api.table;importorg.apache.flink.table.api.java.StreamTableEnvironment;importorg.apach.flink.types.Row;PublicclassFlinkSql02{publicstaticfinalStringKAFKA_TABLE_SOURCE_DDL=“”+“CREATETABLEuser_behaviorWITH”;publicstaticvoidmaintrowsException{//构建StreamExecutionEnvironmentStreamExecutionEnvironment env=StreamExecutionenv.getExecutionEnvironment();//构建环境设置并指定BlinkPlannerEnvironmentSettingsbsSettings=EnvironmentSetting.newInstance()。useBlink Planner().inStreamingMode().build();//生成StreamTableEnvironment StreamTableEnvironmenttEnv=StreamTableEnvironment。创建//通过DDL注册kafka数据源表tEnv.sqlUpdate//执行查询Tabletable=tEnv.sql查询//返回数据流并输出tEnv。到AppendStream。print()。setParallelism//任务启动至关重要!创建新的mysql数据的结果是在mysql中创建表。毕竟,Flink现在无法帮助您自动创建表格,因此您只能自己吃饭和穿衣。

从kafka到mysql

新建Java项目

  • 最简单的方式是按照官网的方法,命令行执行curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0,不过这种方法有些包还得自行添加,大家可以复制我的pom.xml,我已经将常用的包都放进去了,并且排除了冲突的包。注意的是,本地测试的时候,记得将scope注掉,不然会出现少包的情况。也可以在Run -> Edit Configurations中,勾选Include dependencies with "Provided" scope。最好在resources目录下丢个log4j的配置文件,这样有时候方便我们看日志找问题。

  • 新建完项目之后,我们要做的第一件事,自然是写个Flink 版本的Hello World。所以,新建测试类,然后输入代码

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream dataStream = env.fromElements("Hello World");
    
        dataStream.print();
      
        env.execute("test");

    看一下控制台

     Hello World

    如愿以偿的得到了想要的结果,不过这个4>是什么玩应?其实这个4代表是第四个分区输出的结果。很多人可能会问,我也妹指定并发啊,数据怎么会跑到第四个分区呢?其实是因为本地模式的时候,会以匹配CPU的核数,启动对应数量的分区。只要我们在每个算子之后加上setParallelism(1),就会只以一个分区来执行了。至此,我们的DataStream 版的Hellow World试验完毕,这里主要是为了验证一下环境是否正确,接下来才是我们今天的主题从kafka到mysql。另外,如果更想了解DataStream的内容,欢迎大家关注另一个系列Flink DataStream(不过目前还没开始写)

新建kafka数据源表

接下来咱们废话不多说,直接贴代码

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


public class FlinkSql02 {
    public static final String  KAFKA_TABLE_SOURCE_DDL = "" +
            "CREATE TABLE user_behavior (
" +
            "    user_id BIGINT,
" +
            "    item_id BIGINT,
" +
            "    category_id BIGINT,
" +
            "    behavior STRING,
" +
            "    ts TIMESTAMP(3)
" +
            ") WITH (
" +
            "    'connector.type' = 'kafka',  -- 指定连接类型是kafka
" +
            "    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致
" +
            "    'connector.topic' = 'mykafka', -- 之前创建的topic 
" +
            "    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度
" +
            "    'connector.startup-mode' = 'earliest-offset',  --指定从最早消费
" +
            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址
" +
            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址
" +
            "    'format.type' = 'json'  -- json格式,和topic中的消息格式保持一致
" +
            ")";
    public static void main(String[] args) throws Exception {
        //构建StreamExecutionEnvironment 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        
        //构建StreamTableEnvironment 
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        
        //通过DDL,注册kafka数据源表
        tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);
        
        //执行查询
        Table table = tEnv.sqlQuery("select * from user_behavior");
        
        //转回DataStream并输出
        tEnv.toAppendStream(table, Row.class).print().setParallelism(1);

        //任务启动,这行必不可少!
        env.execute("test");

    }
}

接下来就是激动人性的测试了,右击,run!查看控制台

543462,1715,1464116,pv,2017-11-26T01:00
543462,1715,1464116,pv,2017-11-26T01:00
543462,1715,1464116,pv,2017-11-26T01:00
543462,1715,1464116,pv,2017-11-26T01:00

嗯,跟我之前往kafka中丢的数据一样,没毛病!

如果大家在使用过程中遇到Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in这种异常,请仔细查看你的DDL语句,是否缺少或者用错了配置,这里大家可以参考一下Flink官网的手册,查看一下对应的配置。也可以在下方留言,一起交流。

新建mysql数据结果表

  • 现在mysql中把表创建,毕竟flink现在还没法帮你自动建表,只能自己动手丰衣足食咯。
CREATE TABLE `user_behavior` (
  `user_id` bigint(20) DEFAULT NULL,
  `item_id` bigint(20) DEFAULT NULL,
  `behavior` varchar(255) DEFAULT NULL,
  `category_id` bigint(20) DEFAULT NULL,
  `ts` timestamp(6) NULL DEFAULT NULL
)

在mysql端创建完成后,回到我们的代码,注册mysql数据结果表,并将从kafka中读取到的数据,插入mysql结果表中。下面是完整代码,包含kafka数据源表的构建。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


public class FlinkSql02 {
    public static final String  KAFKA_TABLE_SOURCE_DDL = "" +
            "CREATE TABLE user_behavior (
" +
            "    user_id BIGINT,
" +
            "    item_id BIGINT,
" +
            "    category_id BIGINT,
" +
            "    behavior STRING,
" +
            "    ts TIMESTAMP(3)
" +
            ") WITH (
" +
            "    'connector.type' = 'kafka',  -- 指定连接类型是kafka
" +
            "    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致
" +
            "    'connector.topic' = 'mykafka', -- 之前创建的topic 
" +
            "    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度
" +
            "    'connector.startup-mode' = 'earliest-offset',  --指定从最早消费
" +
            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址
" +
            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址
" +
            "    'format.type' = 'json'  -- json格式,和topic中的消息格式保持一致
" +
            ")";

    public static final String MYSQL_TABLE_SINK_DDL=""+
            "CREATE TABLE `user_behavior_mysql` (
" +
            "  `user_id` bigint  ,
" +
            "  `item_id` bigint  ,
" +
            "  `behavior` varchar  ,
" +
            "  `category_id` bigint  ,
" +
            "  `ts` timestamp(3)   
" +
            ")WITH (
" +
            "  'connector.type' = 'jdbc', -- 连接方式
" +
            "  'connector.url' = 'jdbc:mysql://localhost:3306/mysql', -- jdbc的url
" +
            "  'connector.table' = 'user_behavior',  -- 表名
" +
            "  'connector.driver' = 'com.mysql.jdbc.Driver', -- 驱动名字,可以不填,会自动从上面的jdbc url解析 
" +
            "  'connector.username' = 'root', -- 顾名思义 用户名
" +
            "  'connector.password' = '123456' , -- 密码
" +
            "  'connector.write.flush.max-rows' = '5000', -- 意思是攒满多少条才触发写入 
" +
            "  'connector.write.flush.interval' = '2s' -- 意思是攒满多少秒才触发写入;这2个参数,无论数据满足哪个条件,就会触发写入
"+
            ")"



            ;
    public static void main(String[] args) throws Exception {
        //构建StreamExecutionEnvironment 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        
        //构建StreamTableEnvironment 
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        
        //通过DDL,注册kafka数据源表
        tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);

        //通过DDL,注册mysql数据结果表
        tEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL);
        
        //将从kafka中查到的数据,插入mysql中
        tEnv.sqlUpdate("insert into user_behavior_mysql select user_id,item_id,behavior,category_id,ts from user_behavior");
        
        //任务启动,这行必不可少!
        env.execute("test");

    }
}

打开我们的Navicat,看看我们的数据是否正确输入到mysql中。

user_iditem_idbehaviorcategory_idts
5434621715pv14641162017-11-26 01:00:00.000
5434621715pv14641162017-11-26 01:00:00.000
5434621715pv14641162017-11-26 01:00:00.000
5434621715pv14641162017-11-26 01:00:00.000

成功!并且数据和我们kafka中的数据也是一致,大家也可以通过上一章讲过的Java连接kafka来对比验证数据的一致性,此处就不再赘述。那么好了,本次的Flink Sql之旅就结束,下一章我们将带大家,在这次课程的基础上,完成常用聚合查询以及目前Flink Sql原生支持的维表Join。另外,有同学反映有些地方不知道为什么要这样做,不想只知其然而不知所以然,我们之后同样会有另外的专题讲述Flink 原理。

附录

pom.xml

    
    <properties>
        <flink.version>1.10.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <!-- Flink modules -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- CLI dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>javassist</artifactId>
                    <groupId>org.javassist</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>scala-parser-combinators_2.11</artifactId>
                    <groupId>org.scala-lang.modules</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
            <exclusions>
                <exclusion>
                    <artifactId>force-shading</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.5</version>
        </dependency>

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.5.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.10.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.4.Final</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <artifactSet>
                                <excludes>
                                    <exclude>junit:junit</exclude>
                                </excludes>
                            </artifactSet>

                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

有点乱,懒得整理了,大家直接复制过去用就行。

log4j.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/' >

    <appender name="myConsole" class="org.apache.log4j.ConsoleAppender">
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern"
                   value="[%d{dd HH:mm:ss,SSS} %-5p] [%t] %c{2} - %m%n" />
        </layout>
        <!--过滤器设置输出的级别-->
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="info" />
            <param name="levelMax" value="error" />
            <param name="AcceptOnMatch" value="true" />
        </filter>
    </appender>

    <!-- 指定logger的设置,additivity指示是否遵循缺省的继承机制-->
    <logger name="com.runway.bssp.activeXdemo" additivity="false">
        <appender-ref ref="myConsole" />
    </logger>

    <!-- 根logger的设置-->
    <root>
        <priority value ="debug"/>
        <appender-ref ref="myConsole"/>
    </root>
</log4j:configuration>

记得要放在resource目录下,别放错了。

 

免责声明:文章转载自《项目实战 从 0 到 1 学习之Flink (28)FlinkSql教程(二)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇span 换行与不换行系统测试下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

MySQL++:liunx 安装 MySQL

第一步: 1):下载mysql安装包:这里选择下载版本 5.6.33,通用版,linux下64位 http://dev.mysql.com/get/Downloads/MySQL-5.6/mysql-5.6.33-linux-glibc2.5-x86_64.tar.gz 第二步: 2):卸载老版本MySQL 查找并删除mysql有关的文件 find /...

no_merge hint

This is tested in 10gR2. SQL> select * from v$version; BANNER ------------------------------------------------ Oracle Database 10g Enterprise Edition Release 1 0.2.0.5.0 - 64b...

Zabbix监控系统详解:ubuntu系统下软件的安装

Zabbix监控系统详解1、介绍zabbix是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。 zabbix能监视各种网络参数,保证服务器系统的安全运营;并提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题。 zabbix由2部分构成,zabbix server与可选组件zabbix agent。 zabbix se...

25 Zabbix系统数据表结构介绍

点击返回:自学Zabbix之路 点击返回:自学Zabbix4.0之路 点击返回:自学zabbix集锦 25 Zabbix系统数据表结构介绍自学Zabbix之路15.1 Zabbix数据库表结构简单解析-Hosts表、Hosts_groups表、Interface表自学Zabbix之路15.2 Zabbix数据库表结构简单解析-Items表自学Zabbix之...

Apache NiFi之MySQL数据同步到HBase

一.说明 将Apache NiFi做为关系型数据与非关系型数据库的数据同步工具使用,在此场景中需要将mysql导出的avro数据格式转化为json入库HBase 二.开拔 Ⅰ).配置ExecuteSQLRecord a).选择ExecuteSQLRecord 在Processor中搜索ExecuteSQLRecord b).配置ExecuteSQLR...

Linux 下 FreeSWITCH 远程连接使用 MySQL 替代 SQLite

1. 安装unixODBC 和 MySQL ODBC Connector yum install unixODBC-devel mysql-connector-odbc 2. 创建软连接 ln -s libmyodbc5.so libmyodbc.so 3. 在远程MySQL服务器中创建数据库“freeswitch” ,创建有密码用户 4. 添加如下...