Spark python集成

摘要:
Sparkpython集成1.介绍Spark支持python语言。对于大量SQL操作,您可以直接将python文件提交给Spark运行,而无需编译。这非常简单和方便,但其性能比scala或java慢。对于常规任务,可以使用Python编写它们。对于特殊任务,建议使用scala编写。˃˃spark.sql。显示结果如图所示:3.Win7+Spark+hive+python集成Windows使用Spark的pyspark访问hive数据仓库。

Spark python集成

1、介绍

Spark支持python语言,对于大量的SQL类型的操作,不需要编译,可以直接提交python文件给spark来运行,因此非常简单方便,但是性能要比scala或java慢。对于常规任务,可以使用python来编写,特殊任务还是建议scala编写。

2、使用pyspark启动spark shell(centos)

2.1 启动pyspark

$>spark/bin/pyspark --master spark://s101:7077

spark_048

使用python实现word count

>>>rdd1 = sc.textFile("/user/centos/data/1.txt")
>>>rdd2 = rdd1.flatMap(lambda e : e.split(" "))
>>>rdd3 = rdd2.map(lambda e : (e , 1))
>>>rdd4 = rdd3.reduceByKey(lambda a,b : a + b)
>>>rdd4.collect()

2.2 自定义函数问题

在hive中注册的自定义函数在spark sql中需要删除重新注册,pyspark中也是一样的。

2.2.1 使用driver端jar注册

在client部署模式可以使用。

#注意 python是区分大小写的,False和True是关键字
>>>spark.sql("use umeng_big11").show(1000,False)

#删除原有的函数
>>>spark.sql("drop function forklogs").show()

#添加jar包
>>>spark.sql("add jar /soft/hive/lib/umeng_hive.jar").show()

#注册函数
>>>spark.sql("create function forklogs as 'com.oldboy.umeng.hive.udf.ForkLogUDTF'")
.show()
2.2.2 使用--jars进行注册

在cluster部署模式下,driver运行在哪个worker不一定,因此可采用该中方式来注册函数。

  1. 使用--jars参数启动pyspark

    $>pyspark --master yarn --jars /soft/hive/lib/umeng_hive.jar
    
  2. 删除原来的函数

    >>>spark.sql("drop function umeng_big11.forklogs").show()
    
  3. 注册函数

    >>>spark.sql("use umeng_big11").show()
    >>>spark.sql("create function forklogs as 'com.oldboy.umeng.hive.udf.ForkLogUDTF'")
    .show()
    

2.3 使用函数

函数注册后,就可以在sql中进行使用。

>>>spark.sql("select forklogs(servertimestr ,clienttimems , clientip , log) from raw_logs limit 1").show(100, False)

结果如图:

spark_049

3、win7 + spark + hive + python集成

windows上使用spark的pyspark访问hive数据仓库。

3.1 安装spark软件包

解压即可!

spark_050

3.2 复制mysql驱动到spark/lib下

hive元数据存在了mysql中,因此需要将mysql的驱动程序复制到sparkjars目录下,以便能够连接到mysql,否则报出JDO事务异常之类的消息。

spark_051

3.3 复制hadoop的配置目录到是spark的conf下

spar启动时需要访问hadoop集群,我们使用的hadoop的ha配置模式,因此赋值ha目录到spark下。

spark_052

3.4 复制hadoop和hive的配置文件到spark conf下

core-site.xml + hdfs-site.xml + hive-site.xml文件复制到spark的conf目录下。

spark_053

3.5 在pyspark脚本中添加HADOOP_CONF_DIR环境变量,指向hadoop配置目录

spark的pyspark.cmd调用的是pyspark2.cmd,因此在pyspark2.cmd设置即可。

@echo off

rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem    http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

##################    设置hadoop配置目录的环境变量   ##################
set HADOOP_CONF_DIR=D:downloadsspark-2.1.0-bin-hadoop2.7hadoop_ha

rem Figure out where the Spark framework is installed
set SPARK_HOME=%~dp0..

call "%SPARK_HOME%inload-spark-env.cmd"
set _SPARK_CMD_USAGE=Usage: binpyspark.cmd [options]

rem Figure out which Python to use.
if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
  set PYSPARK_DRIVER_PYTHON=python
  if not [%PYSPARK_PYTHON%] == [] set PYSPARK_DRIVER_PYTHON=%PYSPARK_PYTHON%
)

set PYTHONPATH=%SPARK_HOME%python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%pythonlibpy4j-0.10.4-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%pythonpysparkshell.py

call "%SPARK_HOME%inspark-submit2.cmd" pyspark-shell-main --name "PySparkShell" %*

截图如下:

spark_054

3.6 启动pyspark,连接到yarn

cmd>pyspark --master yarn

启动成功后,如下图所示:

spark_055

spark_056

3.7 使用如下命令,查看操作结果

>>>spark.sql("show databases").show(1000 ,False)

执行结果如图:

spark_057

3.7 总结

windows下配置方式主要就是三个要素,mysql驱动程序、hadoop的配置目录与环境变量指定,还有就是注意namenode的standby问题。如果长时间启动不起来,查看是否是namenode standby了!!!

4、IDEA下开发pyspark程序

在windows的idea集成使用python访问hive数据库,先在windows上安装python和spark。注意,在进行该工作前,一定要搞定步骤(3)。

4.1 创建java模块

4.2 项目结构中引入python支持

  1. 点击project structure

    spark_058

  2. 选中模块,右键添加python

    spark_058

  3. 指定python解释器

    spark_058

  4. 结果如图

    spark_058

4.3 运行配置中指定环境变量

idea下执行spark的python,主要指定SPARK_HOME和PYTHONPATH环境变量。

  1. 打开运行配置窗口

    spark_058

  2. 点击环境变量按钮

    spark_058

  3. 按照如下添加环境变量值

    SPARK_HOME=D:downloadsspark-2.1.0-bin-hadoop2.7
    PYTHONPATH=D:downloadsspark-2.1.0-bin-hadoop2.7python
    

    如图所示:

    spark_058

4.4 导入spark的pyspark.zip包

spark的pytyon核心库位于pyspark.zip包中,比如SparkSession等。因此需要在模块中进行导入。

  1. 选择模块依赖部分的+,选择“JAR or directories...”按钮

    spark_058

  2. 定位python.zip文件

    spark_058

  3. 完成后,效果如下:

    spark_058

4.5 编写Python程序

4.5.1 创建python文件

spark_058

spark_058

4.5.2 输入如下代码
# -*-coding:utf-8-*-

from pyspark.sql import *

if __name__ == '__main__':
    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    arr = spark.sql("show databases").collect()
    for x in arr:
        print  x
4.5.3 运行python文件

spark_058

4.5.4 运行结果如下

spark_058

4.5.5 处理自定义函数

对于自定义的hive函数,需要drop掉后,重新注册,注册时需要add jar,代码如下:

# -*-coding:utf-8-*-

from pyspark import SparkContext, SparkConf
from pyspark.sql import *

if __name__ == '__main__':
    #注意 appName千万不要有空格!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    spark = SparkSession.builder.enableHiveSupport().appName("pydemo").getOrCreate()
    spark.sql("show databases").show()
    
    #使用库
    spark.sql("use umeng_big11").show()
    
    #删除函数
    dropfunc = "drop function forklogs"
    spark.sql(dropfunc).show
    
    #添加jar包
    addjar = "add jar D:\big11_umeng\out\artifacts\umeng_hive_jar\umeng_hive.jar"
    spark.sql(addjar).show
    
    #创建函数
    regfunc = "create function forklogs as 'com.oldboy.umeng.hive.udf.ForkLogUDTF'"
    spark.sql(regfunc).show

    #调用自定义函数
    spark.sql("select forklogs(servertimestr , clienttimems , clientip , log) from raw_logs limit 1").show(1,False)

执行结果如下:

spark_058

免责声明:文章转载自《Spark python集成》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇ssh访问流程linux shell 命令获取字符串/文件的MD5值下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Python3 tkinter基础 Listbox Scrollbar 创建垂直滚动条

         Python : 3.7.0          OS : Ubuntu 18.04.1 LTS         IDE : PyCharm 2018.2.4       Conda : 4.5.11    typesetting : Markdown   code """ @Author : 行初心 @Date : 18...

CentOS8安装JDK8并配置环境变量

1、找到JDK下载地址 https://www.oracle.com/java/technologies/javase-downloads.html 在Oracle的网站下载东西都要登录,登录之后即可开始下载,也可以去下载别准备好的资源 以下的2、3、4步,可以任选一种安装方式 2、安装.tar.gz格式的JDK 2.1、解压 在/usr/local目...

Spark History Server配置使用

Spark history Server产生背景 以standalone运行模式为例,在运行Spark Application的时候,Spark会提供一个WEBUI列出应用程序的运行时信息;但该WEBUI随着Application的完成(成功/失败)而关闭,也就是说,Spark Application运行完(成功/失败)后,将无法查看Application...

3Python脚本在linux环境下头文件解释

#!/usr/bin/python到底是什么意思有这句的,加上执行权限后,可以直接用 ./ 执行,不然会出错,因为找不到 python 解释器。 #!/usr/bin/python 是告诉操作系统执行这个脚本的时候,调用 /usr/bin 下的 python 解释器。 #!/usr/bin/env python 这种用法是为了防止操作系统用户没有将 pyt...

pip 安装

一、说明 CentOS6.5自带python环境为2.6,公司的python环境为2.7. 为了避免出现以后代码出现版本差异,所以把自带的2 .6版本升级到了2.7,过程十分曲折。。。。 中途遇到的问题和解决方法请点击:Python安装时遇到的问题 二、安装步骤 1、下载安装包 官方下载地址为:https://www.python.org/download...

解决win7 安装完jdk7后,再安装jdk8出现的问题 has value '1.8', but '1.7' is required.

电脑装了jdk8,JAVA_HOME也是设置的8.不删除8变回7.改了JAVA_HOME,并且path值里的C:ProgramDataOracleJavajavapath也删了运行java -version,报错Error: Registry key ‘SoftwareJavaSoftJava Runtime Environment’CurrentVers...