Flink 写数据到MySql (JDBC Sink)

摘要:
POM文件<artifactId>flink-scala_2.11<org.apache.flink<燧石连接器-kafka-0.11 _ 2.11</artifactId>版本>5.1.44</版本></依赖性>

POM 文件

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>

代码:

package com.kpwong.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._

object JDBCSinkTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)

    //sink

    socketDS.addSink(new MyJDBCSink())


    env.execute()
  }

}
//注意 必须继承富函数类,这样才有生命周期管理函数
class MyJDBCSink extends  RichSinkFunction[String]{
  //定义SQL 链接 预编译器
  var conn: Connection =_
  var insertSql: PreparedStatement=_

  //创建链接和预编译语句
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test","root","000000")

    insertSql= conn.prepareStatement("insert into flink(socket) values (?)")

  }

  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {

//    super.invoke(value, context)
    insertSql.setString(1,value)
    insertSql.execute()
  }

  override def close(): Unit = {
//    super.close()
    insertSql.close()
    conn.close()
  }

}

测试 :socket输入输入数据

Flink 写数据到MySql (JDBC Sink)第1张

 MySql 接受数据:

Flink 写数据到MySql (JDBC Sink)第2张

免责声明:文章转载自《Flink 写数据到MySql (JDBC Sink)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇vc++编程之在程序中加入网址链接Huawei-R&amp;amp;S-网络工程师实验笔记20190607-STP生成树协议(基本配置、桥优先级、根桥选举、根端口、路径开销、边缘端口)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

meter标签

meter meter元素标签用来表示范围已知且可度量的等级标量或分数值,如磁盘使用量比例、关键词匹配程度等。需要注意的是,meter不可以用来表示那些没有已知范围的任意值,例如重量、高度,除非已经设定了它们值的范围。meter元素共有6个属性: Value:表示当前标量的实际值;如果不做指定,那么meter标签中的第一个数字就会被认为是其当前实际值,例如...

配置Jenkins 实现自动发布maven项目至weblogic(svn+maven+weblogic12c)

Jenkins安装完成之后,需要我们对其配置,然后才可以实现自动部署项目。 前提 防火墙开放weblogic的7001端口 Linux(CentOS):firewall-cmd --zone=public --add-port=7001/tcp --permanent --zone:作用域 --add-port:添加端口 --permanent:永久...

git add --all 为啥不能添加空文件夹,这样设计的初衷是

git add --all 为啥不能添加空文件夹,这样设计的初衷是? 好多项目还得弄个假文件在空文件夹里面占位 这个算设计失误吧,见 https://git.wiki.kernel.org/index.php/GitFaq#Can_I_add_empty_directories.3FCurrently the design of the git inde...

【Maven】CentOS7使用Nexus3搭建maven私服

一、简介   Maven是一个采用纯Java编写的开源项目管理工具, Maven采用了一种被称之为Project Object Model(POM)概念来管理项目,所有的项目配置信息都被定义在一个叫做POM.xml的文件中, 通过该文件Maven可以管理项目的整个生命周期,包括清除、编译,测试,报告、打包、部署等等。目前Apache下绝大多数项目都已经采用...

uniapp将时间日期格式化的组件unidateformat的用法

uniapp开发时,我们需要将数据库里取到的时间戳格式化为某个格式的日期时间形式,uniapp官方插件市场的uni-dateformat组件即可解决。 uniapp官方插件地址及详细用法介绍:https://ext.dcloud.net.cn/plugin?id=3279 刚开始用时,显示日期时间为1970年,只要给数据库里的值乘以1000即可, 如,我们...

easyui扩展行默认展开 以及 去除滚动条

 问题背景: 在做打印页面的时候,要求有详细的默认展开显示。    遇到的问题: 1)在用扩展行的时候,grid的所有行都添加了展开收起的图标,(第二行没有明细)如下  2)默认展示有详细行的时候,内容被滚动条遮挡(影响打印)    3) 解决方法:在easyui扩展行的 onLoadSuccess 函数里进行处理  1 $list.datag...