pyspark 针对Elasticsearch的读写操作

摘要:
1.在Spark和Elasticsearch之间创建连接为了读写Elasticsearch,您需要添加Elasticearch的依赖包。有三种方法可以添加依赖包(org.lasticsearch_elasticsearch-spark-202.11-6.8.7.jar):1)将依赖包直接放置在安装spark目录下的jars目录中;2) 提交任务时,使用sparksubmit

1.创建spark与Elasticsearch的连接

为了对Elasticsearch进行读写操作,需要添加Elasticsearch的依赖包,其中,添加依赖包(org.elasticsearch_elasticsearch-spark-20_2.11-6.8.7.jar)有下面的三种方式:

1)将依赖包直接放在安装spark目录下面的jars目录下,即可;

2) 在提交任务时,利用spark submit --jars 的方式

3)在创建spark对象时,添加依赖,如下图所示

spark = SparkSession 
.builder
.appName('es connection')
.config('spark.jars.packages', "org.elasticsearch_elasticsearch-spark-20_2.11-6.8.7")
.getOrCreate()

2.spark 读取Elasticsearch的数据

df3 = spark.read 
.format("org.elasticsearch.spark.sql")
.option("es.nodes", '节点')
.option('es.port', '端口')
.option("es.resource", '索引/索引类型')
.option('es.query', '?q=*')
.option('es.nodes.wan.only','true')
.option("es.nodes.discovery", "false")
.option("es.index.auto.create", "true")
.option("es.write.ignore_exception", "true")
.option("es.read.ignore_exception","true")
.load()

3.spark 写入elasticsearch

df.write.format('org.elasticsearch.spark.sql') 
.option('es.nodes', '节点')
.option('es.port', '9200')
.option('es.nodes.wan.only', 'true')
.option("es.nodes.discovery", "false")
.option('es.resource', '索引/索引类型')
.save(mode='append')

备注:

当spark读写elasticsearch的过程中,elasticsearch包含Array类型的字段,就会出现下面错误:

pyspark 针对Elasticsearch的读写操作第1张

无法将List类型数据写入到es, 或者从es读出list类型数据

解决方案:

在option 中添加一个es.read.field.as.array.include属性,value为list Schema的字段名

免责声明:文章转载自《pyspark 针对Elasticsearch的读写操作》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇chomre常用快捷键mac升级node后还是原来版本问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

ELK常见错误分析(转)

ELK 常见错误处理 ELK 这里就不介绍了,如何安装请参考博客之前的文章。在这里感谢ttlsa团队,同时,我很荣幸能加入到ttlsa团队中,分享点滴,凉白开说发文章有红包,期待这篇群主能给多少红包。哈哈。 好了,不闲扯,下面总结下ELK使用过程中遇到的常见问题以及解决方案。 1. Kibana No Default Index Pattern War...

layui在open弹出层回显,解决动态select数据回显问题

//监听数据表格工具条         table.on('tool(contentList)', function(obj){ //注:tool是工具条事件名,test是table原始容器的属性 lay-filter="对应的值"             var data = obj.data //获得当前行数据                 ,lay...

springboot2.2.6 elasticsearch 6.8.7 多条件查询、高亮显示、分页

最近做了一个爬虫项目,需要把数据存入ES中,在网上找资料的过程中挺辛苦的,大部分文章上来就是贴代码,没有讲springboot和es之间版本关系,而本身ES更新真的是快,坑是真的多(自我学习能力不强,见谅),很多方法在新版本中都被弃用,最后冷静下来,也算是终于解决了各种问题吧。 博客园... ` @Autowired private El...

Elasticsearch logstsh同步mysql数据到ES中

1、准备: 1) 启动前面搭建的ES集群, 192.168.127.130,192.168.127.128,192.168.127.129 2) 准备要同步的数据库和数据 数据库所在的服务器IP为192.168.1.104 数据库端口为3306 数据库名shop,表名items items的表结构如下 items表的数据如下 这里有两条数据,实际环境会...

ElasticSearch的基本原理与用法

一、简介 ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。 这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.co...

SkyWalking8.3.0安装

一:下载SkyWalking文件 访问github地址:https://github.com/apache/skywalking 官方给出的文档地址:https://github.com/apache/skywalking/tree/master/docs/en 安装直接去:https://github.com/apache/skywalking/tags...