es-09-spark集成

摘要:
xmlversion="1.0"encoding="UTF-8"?˃xiaoniubigdatacom.wenbronk1.04.0.0spark06-es2.3.12.112.11.12org.apache.sparkspark-core_${spark.scala.version}${spark.version}˂!--provided--˃org.apache.sparkspark-sql_${spark.scala.version}${spark.version}˂!

es和spark的集成比较简单, 直接使用内部封装的一些方法即可

版本设置说明:

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html

maven依赖说明:

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html

1, maven配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>xiaoniubigdata</artifactId>
        <groupId>com.wenbronk</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>spark06-es</artifactId>
    <properties>
        <spark.version>2.3.1</spark.version>
        <spark.scala.version>2.11</spark.scala.version>
        <scala.version>2.11.12</scala.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.3.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.8.2</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2, RDD的使用

1), read

package com.wenbronk.spark.es.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 从es中读取数据
  */
objectReadMain {
  def main(args: Array[String]) ={
//val sparkconf = new SparkConf().setAppName("read-es").setMaster("local[4]")
//val spark = new SparkContext(sparkconf)

    val sparkSession =SparkSession.builder()
      .appName("read-es-rdd")
      .master("local[4]")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()
    val spark =sparkSession.sparkContext
    //自定义query, 导入es包
import org.elasticsearch.spark._
    //以array方式读取
    val esreadRdd: RDD[(String, collection.Map[String, AnyRef])] = spark.esRDD("macsearch_fileds/mac",
      """
        |{
        |  "query": {
        |    "match_all": {}
        |}
        |}
      """.stripMargin)

    val value: RDD[(Option[AnyRef], Int)] = esreadRdd.map(_._2.get("mac")).map(mac => (mac, 1)).reduceByKey(_ +_)
      .sortBy(_._2)
    val tuples: Array[(Option[AnyRef], Int)] =value.collect()
    tuples.foreach(println)
    esreadRdd.saveAsTextFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")
    sparkSession.close()
  }
}

2, readJson

package com.wenbronk.spark.es.rdd
import org.apache.spark.sql.SparkSession
import scala.util.parsing.json.JSON
objectReadJsonMain {
  def main(args: Array[String]): Unit ={
    val sparkSession =SparkSession.builder()
      .appName("read-es-rdd")
      .master("local[4]")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()
    val spark =sparkSession.sparkContext
    //使用json的方式读取, 带查询的
import org.elasticsearch.spark._
    val esJsonRdd = spark.esJsonRDD("macsearch_fileds/mac",
      """
{
        "query": {
          "match_all": {}
        }
      }
      """.stripMargin)

    esJsonRdd.map(_._2).saveAsTextFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")
    sparkSession.close()
  }
}

3, write

package com.wenbronk.spark.es.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark
objectWriteMain {
  def main(args: Array[String]): Unit ={
    val spark =SparkSession.builder()
      .master("local[4]")
      .appName("write-spark-es")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()
    val df: RDD[String] = spark.sparkContext.textFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")
//df.map(_.substring())

    import org.elasticsearch.spark._
//df.rdd.saveToEs("spark/docs")
//EsSpark.saveToEs(df, "spark/docs")
    EsSpark.saveJsonToEs(df, "spark/json")
    spark.close()
  }
}

4, 写入多个index中

package com.wenbronk.spark.es.rdd
import org.apache.spark.sql.SparkSession
objectWriteMultiIndex {
  def main(args: Array[String]): Unit ={
    val spark =SparkSession.builder()
      .master("local[4]")
      .appName("es-spark-multiindex")
      .config("es.es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()
    val sc =spark.sparkContext
    val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")
    val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
    val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")
    import org.elasticsearch.spark._
    //可以自定义自己的metadata, 只添加id
    sc.makeRDD(Seq((1, game), (2, book), (3, cd))).saveToEs("my-collection-{media_type}/doc")
    spark.close()
  }
}

2, streaming

1), write

package com.wenbronk.spark.es.stream
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.streaming.EsSparkStreaming
import scala.collection.mutable
objectWriteStreamingMain {
  def main (args: Array[String]): Unit ={
    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")
      conf.set("es.index.auto.create", "true")
      conf.set("es.nodes", "10.124.147.22")
      //默认端口9200, 不知道怎么设置 Int类型

    val sc = newSparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
    val rdd =sc.makeRDD(Seq(numbers, airports))
    val microbatches =mutable.Queue(rdd)
    val dstream: InputDStream[Map[String, Any]] =ssc.queueStream(microbatches)
//import org.elasticsearch.spark.streaming._
//dstream.saveToEs("sparkstreaming/doc")
//EsSparkStreaming.saveToEs(dstream, "sparkstreaming/doc")
    //带有id的
//EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
    //json格式
    EsSparkStreaming.saveJsonToEs(dstream, "sparkstreaming/json")
    ssc.start()
    ssc.awaitTermination()
  }
}

2, 写入带有meta的, rdd也是用

package com.wenbronk.spark.es.stream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
objectWriteStreamMeta {
  def main(args: Array[String]): Unit ={
    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "10.124.147.22")
    //默认端口9200, 不知道怎么设置 Int类型

    val sc = newSparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    val microbatches =mutable.Queue(airportsRDD)
    import org.elasticsearch.spark.streaming._
    ssc.queueStream(microbatches).saveToEsWithMeta("airports/2015")
    ssc.start()
    ssc.awaitTermination()
  }
  /**
    * 使用多种meta
    */
  def main1(args: Array[String]): Unit ={
    val ID = "id";
    val TTL = "ttl"
    val VERSION = "version"
    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")
    val sc = newSparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
    //定义meta 不需要一对一对应
    val otpMeta = Map(ID -> 1, TTL -> "3h")
    val mucMeta = Map(ID -> 2, VERSION -> "23")
    val sfoMeta = Map(ID -> 3)
    val airportsRDD =sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
    val microbatches =mutable.Queue(airportsRDD)
    import org.elasticsearch.spark.streaming._
    ssc.queueStream(microbatches).saveToEsWithMeta("airports/2015")
    ssc.start()
    ssc.awaitTermination()
  }
}

3, sql的使用

1), read

package com.wenbronk.spark.es.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
objectESSqlReadMain {
  def main(args: Array[String]): Unit ={
    val spark =SparkSession.builder()
      .master("local[4]")
      .appName("es-sql-read")
      .config("es.index.auto.create", true)
      //转换sql为es的DSL
      .config("pushown", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()
    //完全查询
//val df: DataFrame = spark.read.format("es").load("macsearch_fileds/mac")
import org.elasticsearch.spark.sql._
    val df = spark.esDF("macsearch_fileds/mac",
      """
        |{
        |   "query": {
        |     "match_all": {
        |}
        |}
      """.stripMargin)
    //显示下数据
df.printSchema()
    df.createOrReplaceTempView("macseach_fileds")
    val dfSql: DataFrame =spark.sql(
      """
        select
         mac,
         count(mac) con
        frommacseach_fileds
        group by mac
        order by con desc
      """.stripMargin)

    dfSql.show()
    //存入本地文件中
import spark.implicits._
    df.write.json("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/sql/json")
    spark.stop()
  }
}

2), write

package com.wenbronk.spark.es.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL
objectESSqlWriteMain {
  def main(args: Array[String]): Unit ={
    val spark =SparkSession.builder()
      .master("local[4]")
      .appName("es-sql-write")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()
    import spark.implicits._
    val df: DataFrame = spark.read.format("json").load("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/sql/json")
    df.show()
    //json格式直接写入
//import org.elasticsearch.spark.sql._
//df.saveToEs("spark/people")

    EsSparkSQL.saveToEs(df, "spark/people")
    spark.close()
  }
}

4, structStream

对 结构化流不太熟悉, 等熟悉了在看

package com.wenbronk.spark.es.structstream
import org.apache.spark.sql.SparkSession
objectStructStreamWriteMain {
  def main(args: Array[String]): Unit ={
    val spark =SparkSession.builder()
      .appName("structstream-es-write")
      .master("local[4]")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()
    val df =spark.readStream
        .format("json")
      .load("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")
    df.writeStream
        .option("checkpointLocation", "/save/location")
        .format("es")
        .start()
    spark.close()
  }
}

免责声明:文章转载自《es-09-spark集成》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Timer 的缺陷[ZT]文本框(input)获取焦点(onfocus)时样式改变的实现方法下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

C#获取类库(DLL)的绝对路径

C#中当我们在写公共的类库的时候难免会调用一些xml配置文件,而这个配置文件的路径则非常重要,常用的方式就是写在web.config中,而我们也可以将配置文件直接放在dll的同级目录,那么怎么获得当前dll的同级目录呢,使用下面方法即可。 /// <summary> /// 获取Assembly的运行路径 /// </summary&...

unity editor模式下读取文件夹资源

string path = EditorUtility.OpenFolderPanel("Load png Textures", "", "");  //打开文件夹 string[] files = Directory.GetFiles(path);  //获取所有文件路径 Object[] os = new Object[] { AssetDatabas...

IDEA+SpringBoot整合Swagger2创建API文档

------------恢复内容开始------------ 1.创建SpringBoot项目 2.选择快捷方式创建springboot项目          3.工程文件树形图  4.pom.xml中导入Swagger依赖  代码如下: 1 <dependency> 2 <groupId&...

macos键盘映射修改

在windows下,我使用AutoHotkey (AHK)进行键盘映射,AHK不支持macos,所以重新选了一款软件,这款软件是Karabiner-Elements。 要把键位映射成什么样子? 如下图: 这样基本就可以实现在编辑东西的时候不需要使用鼠标了。 配置Karabiner-Elements vi方式的上下左右使用官网带的(Add rule->...

将json转换为数据结构体

主要用到的依赖:(划重点:这个依赖需要加jdk版本号,不加的话用不了,且目前最高是jdk15) (ps: 用于json与其他类型格式转换,JSONObject, JSONArray等来自这个包) <!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib -->...

Mac中如何搭建Vue项目并利用VSCode开发

(一)部署Node环境 (1)下载适合Mac环境的Node包,点击进入下载页面 (2)安装Node环境:找到下载好的Node包,这里是node-v12.14.1.pkg,我们双击它,会进入Node.js安装器界面,如下图所示: 我们只要一直点击继续按钮即可,采用默认设置,安装成功后最终的效果图如下图所示: 默认是安装了npm,我们可以在终端中输入nod...