spark之单词统计

摘要:
importjava.util.Arrays;\words.txt“);call(Stringline)throwsException{String[]words=line.split(”“);returnArrays.asList(words).iiterator();call(Stringword)throwsException{returnnewTuple2<
spark之单词统计

1.Java语言开发单词统计

package com.wordCountdemo2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

// todo: Java语言开发spark的单词统计程序
public class JavaWordCount {
    public static void main(String[] args) {
        // 1.创建SparkConf对象
        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local[2]");
        // 2.构建JavaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        // 3.读取数据文件
        JavaRDD<String> data = jsc.textFile("F:\words.txt");
        // 4.切分每一行获取所有单词
        JavaRDD<String> wordsJavaRDD = data.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String line) throws Exception {
                String[] words = line.split(" ");
                return Arrays.asList(words).iterator();
            }
        });
        // 5. 每个单词计数1
        JavaPairRDD<String, Integer> wordAndOne = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        // 6.相同单词出现的累加1
        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
            }
        });
        // 按照单词出现次数排序
        // 按照单词出现次数降序:(单词,次数)-->(次数,单词).sortByKey --> (单词,次数)
        JavaPairRDD<Integer, String> reverseJavaPairRDD = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<Integer, String>(t._2, t._1);
            }
        });
        JavaPairRDD<String, Integer> sortedRDD = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<String, Integer>(t._2, t._1);
            }
        });
        // 7.收集打印
        List<Tuple2<String, Integer>> finalResult = sortedRDD.collect();
        for (Tuple2<String, Integer> t: finalResult) {
            System.out.println("单词:" + t._1 + "	次数:"+ t._2);
        }
        jsc.stop();
    }
}

2.RDD概述

2.1RDD

  • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。
    Dataset:一个数据集合,用于存放数据的。
    Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
    Resilient:RDD中的数据可以存储在内存中或者磁盘中。

2.2RDD 五大属性

1.一个RDD有很多分区,每一个分区内部包含了该RDD的部分数据。spark中任务以task线程方式运行,一个分区后期就对应spark任务中的一个task线程。
2.它会作用在每一个分区中的函数。
3.一个RDD会依赖于其他多个RDD,这里涉及到RDD与RDD之间依赖关系,后期spark任务的容错机制就是根据这个特性而来,通过这个容错机制可以进行数据恢复。
4.KEY-VALUE类型的RDD有分区概念(必须要产生shuffle),它决定了数据后期会流入到哪一个分区中。(可选项)
	spark中分的RDD分区函数有2种:
		第一种是hashPartitioner分区,其本质就是使用key.hashcode % 分区数 = 分区号(默认分区)
		第二种是rangePartitioner分区,基于一定范围进行分区。
5.一组最优的数据块位置,数据的本地性或数据位置最优(可选项)
	spark后期任务的计算会优先考虑存有数据的节点开启计算任务。

3.基于spark的单词统计程序剖析RDD的五大属性

  • 启动spark-shell
spark-shell --master spark://linux01:7077 --executor-memory 1g --total-executor-cores 4
  • 加载数据
sc.textFile("/a.txt")
  • wordcount统计
sc.textFile("/a.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveTextFile("/out1")
  • 整个RDD过程:

spark之单词统计第1张

  • spark单词统计任务划分和任务提交流程
1.通过IDEA开发程序,构建object,然后编写main方法中的代码。
2.把程序打成jar包提交到集群中运行
3.提交任务: spark-submit 
4.Driver 向 Master 注册申请计算资源
5.Master通知指定的worker启动executor进程(计算资源)
6.worker上的executor进程向Driver端进行反向注册申请task
7.Driver内部开始运行客户端程序中的main方法,在main方法中构建了SparkContext对象,它内部分别构建了DAGSchudler和 TaskSchduler 
8.程序最后调用action操作,然后按照RDD的一系列操作顺序生成一张DAG有向无环图,然后把DAG有向无环图传递给DAGSchduler
9.DAGSchduler 获取到DAG有向无环图之后,按照宽依赖进行stage的划分,每一个stage内部有很多可以并行运行的task,把每一个stage中这些并行运行的task封装在一个taskSet集合中,然后把一个一个的taskSet集合提交给TaskSchduler
10.TaskSchduler拿到一个一个的taskSet集合之后,按照stage与stage之间的依赖关系,前面stage中的task任务先运行,依次遍历每一个taskSet集合,取出每一个task,然后提交到worker节点上的executor进程中运行
11.所有task运行完成之后,Driver向master发送注销请求,接下来Master通知worker释放计算资源,也就是把对应的executor进程关闭掉。
  • application、job、stage、 task之间关系
1.一个application程序包含了客户端写的代码和任务在运行的时候需要的资源信息
2.客户端代码中会有多个action操作,一个action操作就是一个job
3.一个job中会涉及到rdd大量的转换操作,一个job对应一个DAG有向无环图,这些操作中可能存在大量的宽依赖,后期是按照宽依赖去划分stage,也就是说一个job会有很多个stage。
4.每一个stage内部是根据rdd有很多分区,一个分区对应一个task,每一个task内部就存在很多个可以并行运行的task。

总结:一个aplication应用程序有很多个job,一个job又有很多个stage,每一个stage内部又有很多个task。 所以最后一个application应用程序有很多个task在运行

4.RDD创建方式

  • 通过已存在的scala集合构建:
val rdd1 = sc.parallelize(List(1,2,3,4,5))
val rdd2 = sc.parallelize(Array("hadoop","hive", "spark"))
  • 外部数据读取
val rdd3 = sc.textFile("/words.txt")
rdd3.collect
  • 从一个rdd进行转换之后生成一个新的rdd
val rdd4 = rdd1.flatMap(_.split(" "))

5.RDD的算子(方法)分类介绍

  • 1.trainsformation (转换)

    • 可以实现把一个rdd转换生成一个新的rdd,它延迟加载,不会立即执行。

    • map/flatMap/reduceByKey等

    • 常用的Transformation:

      map(func)	返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
      filter(func)	返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
      flatMap(func)	类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
      mapPartitions(func)	类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
      mapPartitionsWithIndex(func)	类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是
      (Int, Interator[T]) => Iterator[U]
      union(otherDataset)	对源RDD和参数RDD求并集后返回一个新的RDD
      intersection(otherDataset)	对源RDD和参数RDD求交集后返回一个新的RDD
      distinct([numTasks]))	对源RDD进行去重后返回一个新的RDD
      groupByKey([numTasks])		在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
      reduceByKey(func, [numTasks])	在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
      sortByKey([ascending], [numTasks])	在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
      sortBy(func,[ascending], [numTasks])	与sortByKey类似,但是更灵活
      join(otherDataset, [numTasks])	在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
      cogroup(otherDataset, [numTasks])	在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
      coalesce(numPartitions)		减少 RDD 的分区数到指定值。
      repartition(numPartitions)	重新给 RDD 分区
      repartitionAndSortWithinPartitions(partitioner)
      	重新给 RDD 分区,并且每个分区内以记录的 key 排序
      
  • 2.action(动作)

    • 它会真正触发任务运行

    • collect/saveAsTextFile等

    • 常用action:

      reduce(func)	reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
      collect()	在驱动程序中,以数组的形式返回数据集的所有元素
      count()	返回RDD的元素个数
      first()	返回RDD的第一个元素(类似于take(1))
      take(n)	返回一个由数据集的前n个元素组成的数组
      takeOrdered(n, [ordering])	返回自然顺序或者自定义顺序的前 n 个元素
      saveAsTextFile(path)	将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
      saveAsSequenceFile(path) 	将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
      saveAsObjectFile(path) 	将数据集的元素,以 Java 序列化的方式保存到指定的目录下
      countByKey()	针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
      foreach(func)	在数据集的每一个元素上,运行函数func
      foreachPartition(func)	在数据集的每一个分区上,运行函数func
      

    6.RDD算子操作练习

    val rdd1 = sc.parallelize(List(4,5,6,7,8,9,10))
    // 1.乘10
    rdd1.map(_*10).collect
    // 2.过滤大于50
    rdd1.map(_*10).filter(_>50).collect
    // 3.切分
    val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
    rdd1.flatMap(_.split(" ")).collect
    // 4.交集
    val rdd1 = sc.parallelize(List(1,2,3,4))
    val rdd2 = sc.parallelize(List(3,4,5,6))
    rdd1.intersection(rdd2).collect
    // 5.求并集
    rdd1.union(rdd2).collect
    // 6.去重
    rdd1.union(rdd2).distinct.collect
    // 7.join
    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
    scala> rdd1.join(rdd2).collect
    // res11: Array[(String, (Int, Int))] = Array((tom,(1,1)), (jerry,(3,2)))
    // groupByKey 对应value是一个迭代器
    rdd1.union(rdd2).groupByKey.collect
    res15: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
    // 8.cogroup 如果没有就会给一个空的迭代器
    rdd1.cogroup(rdd2).collect
    // 9.reduce
    val rdd1 = sc.parallelize(List(1,2,3,4))
    rdd1.reduce(_+_)
    // 10.获取分区数
    rdd1.partitions.length
    // 11.指定分区数
    val rdd2 = sc.parallelize(List("1","2","3","4","5"), 5)
    // 12.reduceByKey, sortByKey
    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
    val rdd3 = rdd1.union(rdd2)
    //按key进行聚合
    val rdd4 = rdd3.reduceByKey(_ + _)
    rdd4.collect
    //按value的降序排序
    val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
    rdd5.collect
    // 13.repartition、coalesce
    val rdd1 = sc.parallelize(1 to 10,3)
    //利用repartition改变rdd1分区数
    //减少分区
    rdd1.repartition(2).partitions.size
    //增加分区
    rdd1.repartition(4).partitions.size
    //利用coalesce改变rdd1分区数
    //减少分区
    rdd1.coalesce(2).partitions.size
    /*
    repartition:  重新分区, 有shuffle
    coalesce:    合并分区 / 减少分区 	默认不shuffle   
    默认 coalesce 不能扩大分区数量。除非添加true的参数,或者使用repartition。
    
    适用场景:
    1、如果要shuffle,都用 repartition
    2、不需要shuffle,仅仅是做分区的合并,coalesce
    3、repartition常用于扩大分区。
    
    扩大分区的目的: 提升并行度。
    filter之后,可以用coalesce来合并分区。
    
    */
    // 14.map、mapPartitions
    //通过并行化生成rdd
    val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
    
    //map实现对rdd1里的每一个元素乘2然后排序
    val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
    rdd2.collect
    
    //mapPartitions实现对rdd1里的每一个元素乘2然后排序
    val rdd3 = rdd1.mapPartitions(iter => iter.map(_*2)).sortBy(x => x, true)
    rdd3.collect
    // 15.foreach、foreachPartition
    //通过并行化生成rdd
    val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
    
    //foreach实现对rdd1里的每一个元素乘10然后打印输出
    rdd1.foreach(println(_ * 10))
    
    //foreachPartition实现对rdd1里的每一个元素乘10然后打印输出
    rdd1.foreachPartition(iter => iter.foreach(println(_ * 10)))
    
    foreach:用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)。
    foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。
    
    
  • 总结:
    一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推荐使用。

6.案例:实现点击流日志分析案例

  • PV统计
package com.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

// TODO: 利用spark 实现点击流日志分析 pageview
object PV {
  def main(args: Array[String]): Unit = {
    // 构建sparkConf对象
    val sparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
    // 构建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")
    // 读取数据文件
    val data: RDD[String] = sc.textFile("J:\javaBigData\data\spark\spark_day02\资料\运营商日志")
    // 4.统计PV
    val pv: Long = data.count()
    println("PV", pv)
    sc.stop()
  }
}

  • UV统计
package com.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object UV {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")
    val data: RDD[String] = sc.textFile("J:\javaBigData\data\spark\spark_day02\资料\运营商日志")
    // 切分每一行获取第一个元素,也就是ip
    val ips:RDD[String] = data.map(x => x.split(" ")(0))
    // 去重
    val distinctRDD:RDD[String] = ips.distinct()
    val uv:Long = distinctRDD.count()
    println("uv", uv)
    sc.stop()
  }
}

  • TopN
package com.rdd

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//TODO: 利用spark实现TopN页面访问最多前N位
object TopN {
  def main(args: Array[String]): Unit = {
    // 构建sparkConf对象
    val sparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
    // 构建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")
    // 读取数据文件
    val data: RDD[String] = sc.textFile("J:\javaBigData\data\spark\spark_day02\资料\运营商日志")
    // 切分每一行,过滤数据,获取页面地址
    val urlRDD:RDD[String] = data.filter(x => x.split(" ").length > 10).map(x => x.split(" ")(10))
    // 每次访问次数合并
    val result:RDD[(String,Int)] =urlRDD.map((_, 1)).reduceByKey(_+_)
    // 按照次数排序
    val sortedRDD: RDD[(String, Int)] = result.sortBy(_._2,false)
    // 筛选出 - 的数据
    val filterRDD:RDD[(String, Int)] = sortedRDD.filter(x => x._1.length > 10)
    // 取出前5位
    val top5:Array[(String,Int)] = filterRDD.take(5)
    top5.foreach(println)
  }
}

7.使用foreach和foreachpartition

  • foreachpartition以分区方式创建连接。以数据刷入mysql为例

  • 插入数据库需要在pom.xml配置mysql驱动

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>
  • foreach

    package com.Data2Mysql
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object Data2Mysql {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        sc.setLogLevel("warn")
        val data: RDD[String] = sc.textFile("J:\javaBigData\data\spark\spark_day02\资料\person")
        // 切分每一行
        val personRDD:RDD[(String,String,String)] = data.map(x => x.split(" ")).map(x => (x(0), x(1), x(2)))
        
        // 保存mysql表中
        personRDD.foreach(line => {
          // 把数据插入mysql表中
          // 1.获取连接
          val connection: Connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/spark", "root", "123")
          // 2.定义插入数据sql语句
          val sql = "insert into person(id, name, age) values (?,?,?)"
          // 3.获取PreParedStatement
          try {
            val ps: PreparedStatement = connection.prepareStatement(sql)
            // 获取数据 给?号赋值
            ps.setString(1,line._1)
            ps.setString(2,line._2)
            ps.setString(3,line._3)
            ps.execute()
          } catch {
            case e:Exception => e.printStackTrace()
          } finally {
            if (connection!=null) {
              connection.close()
            }
          }
        })
      }
    }
    
    
  • foreachpartition 以分区方式写入数据减少连接

    // 以分区为单位建立连接
        personRDD.foreachPartition(iter => {
          // 1.获取连接
          val connection: Connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/spark", "root", "123")
          // 2.定义插入数据sql语句
          val sql = "insert into person(id, name, age) values (?,?,?)"
          try {
            val ps: PreparedStatement = connection.prepareStatement(sql)
            iter.foreach(line => {
              ps.setString(1,line._1)
              ps.setString(2,line._2)
              ps.setString(3,line._3)
              ps.execute()
            })
          } catch {
            case e:Exception => e.printStackTrace()
          } finally {
            if (connection!=null) {
              connection.close()
            }
          }
    
    
        })
    

免责声明:文章转载自《spark之单词统计》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Navicat使用常见的两个问题及解决方法,提高开发效率OpenStack 虚机网卡的创建过程下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

EF结合SqlBulkCopy实现高效的批量数据插入 |EF插件EntityFramework.Extended实现批量更新和删除

原文链接:http://blog.csdn.net/fanbin168/article/details/51485969   批量插入 (17597条数据批量插入耗时1.7秒)   using System;   using System.Collections.Generic;   using System.Linq;   usi...

mysql字符集设置

1.1mysql字符集知识: 概述:字符集就是一套文字符号及其编码,比较规则的集合。 Mysql数据库字符集包括字符集(character)和校对规则(collation)这两个概念。其中字符集用于定义mysql数据库的存储方式,而校对规则是定义字符串的比较方式,并且字符集和校对规则是一对多的关系 查看mysql可用的字符集的命令是show charact...

APM系统SkyWalking介绍

  公司最近在构建服务化平台,需要上线APM系统,本篇文章简单的介绍SkyWalking APM APM全称Application Performance Management应用性能管理,目的是通过各种探针采集数据,收集关键指标,同时搭配数据呈现以实现对应用程序性能管理和故障管理的系统化解决方案 Zabbix、Premetheus、open-falco...

mysql按照年月日查询,导出每日数据数量

mysql没有提供unix时间戳的专门处理函数,所以,如果遇到时间分组,而你用的又是整型unix时间戳,则只有转化为mysql的其他日期类型!FROM_UNIXTIM()将unix时间戳转为datetime等日期型!一、年度查询查询 本年度的数据SELECT *FROM blog_articleWHERE year( FROM_UNIXTIME( Blo...

ArcGIS中加载百度地图

现在Google,Bing,OpenStreetMap....众多优秀的免费地图资源都被伟大的祖国给挡在墙外了,这些数据能够提 供真实的GPS数据时多么的难能可贵啊,可惜没有办法,偶也是爱国人士,钓鱼岛是中国的,日本也迟早是中国的!!      在此背景下,国内百度地图水涨船高,一步一个脚印发展迅速,最近也是发现其数据库服务器速度比其他的都快,因此很多东西...

TCP 的那些事儿(下)

本文转载自TCP 的那些事儿(下) 导语 这篇文章是下篇,所以如果你对TCP不熟悉的话,还请你先看看上篇《TCP的那些事儿(上)》 上篇中,我们介绍了TCP的协议头、状态机、数据重传中的东西。但是TCP要解决一个很大的事,那就是要在一个网络根据不同的情况来动态调整自己的发包的速度,小则让自己的连接更稳定,大则让整个网络更稳定。在你阅读下篇之前,你需要做好准...