Elasticsearch的乐观并发控制和分片管理

摘要:
1.乐观的并发控制首先,我们需要明确Elasticsearch的三个特性:分布式:当创建、删除或更新文档时,必须将新版本的文档复制到集群中的其他节点;并发:这些复制请求将并行发送;异步:这些复制请求以无序的顺序到达目标。因此,Elasticsearch需要确保文档的旧版本不会覆盖新版本。Elasticserve使用_版本字段来确保文档以正确的顺序执行。如果文档的旧版本在新版本之后到达,则可以忽略它。
1. 乐观并发控制

  首先,需要明确Elasticsearch的三个特性:

  • 分布式的:当文档创建,删除或更新的时候,新版本的文档必须被复制到集群中的其他节点;
  • 并发的:这些复制请求将被并行发送;
  • 异步的:这些复制请求到达目的地的顺序是乱的.

  因此,Elasticsearch需要保证文档的旧版本不会覆盖新版本.Elasticserch通过_version字段来确保并更以正确的顺序得到执行.如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

2. 分片管理

2.1 动态索引

采用Luence的per-segment search机制,...

2.2 近实时搜索

通过refresh操作,默认每秒自动刷新,文件系统缓存,...

2.3 持久化变更

flush,translog...

2.4 段合并

optimize...

3. 集群管理

3.1 主节点的作用

  • 决定分配哪一个shard给哪一个节点
  • 何时移动shard
  • 维护全局集群状态,并通知所有节点

3.2 节点间的互相发现

  Elasticsearch中各节点使用zen discovery机制互相发现,该机制包含了ping模块找到其他节点.为了检测节点故障,同时有两个ping进程运行,一是主节点ping其他节点,及其他节点ping主节点.该过程中相关参数有:

  • ping_interval: 节点进行ping的周期,默认1s
  • ping_timeout: 等待ping回应的时间,默认30s
  • ping_retries: ping失败或者超时的次数,默认3

  每当有一个新节点加入集群后,它将向主节点发送加入请求,超时时间(discovery.zen.join_timeout)默认为ping_timeout的20倍.

3.3 主节点崩溃后

  • 若候选主节点数不低于最小值,则重新选举主节点(默认超时时间为3s,可设置discovery.zen.ping_timeout调整),对于正在进行的写入操作影响不大;
  • 若候选主节点数低于最小值,则默认阻塞写请求,允许读请求,但如果该节点与其他节点分离,则可能读到部分的陈旧数据.
    另外,可设置discovery.zen.no_master_block(默认为write)为all,阻塞所有读写请求.
    需要注意的是,无论如何设置,基于节点的api仍可用,比如集群指标,节点指标,节点信息等api.

3.4 数据节点崩溃后

  • 若副分片失效,主分片将向主节点申请重新分配副本分片;
  • 若主分片失效,正在进行的写操作最多等待1min(默认)让主分片将某副分片提升为主分片.
4. 扩容,缩容及移动分片

4.1 扩容及缩容方法

扩容,即增加新的数据节点,需将

  • cluster.name设为当前集群名;
  • discovery.zen.ping.unicast.hosts中写入任一台集群中运行正常的节点ip,建议将之写成所有候选主节点的ip.

缩容,即删除一些数据节点,需确保待删节点中的所有分片在其他节点中存在备份.

4.2 分配分片

当集群扩容或缩容时,Elasticsearch将会自动在节点间迁移分片,以使集群保持平衡。

同时,Elasticsearch也支持使用reroute命令手动分配分片.

Elasticsearch支持以下几种调整分片的命令:

  • move: 在节点间移动分片,index设置索引名,shard设置分片,from_nodes设置移出分片节点,to_nodes设置移入分片节点
  • cancel: 取消某分片的分配,index设置索引名,shard设置分片,node为取消分配的节点,可接受allow_primary参数允许取消主分片的分配
  • allocate_replica: 将未分片的副本分配给某节点,index设置索引名,shard设置分片,node为分配的节点
  • allocate_stale_primary: 将主分片分配给存在旧备份的节点,index设置索引名,shard设置分片,node为分配的节点,另外需指定accept_data_loss为true
  • allocate_empty_primary: 将空的主分片分配给节点,index设置索引名,shard设置分片,node为分配的节点,另外需指定accept_data_loss为true

关于分片分配的设置:

  集群级的分片分配配置

  • cluster.routing.allocation.enable: 控制可分配的分片种类
  • cluster.routing.allocation.node_concurrent_incoming_recoveries: 控制一个节点上同时进行的分片移出的进程数
  • cluster.routing.allocation.node_concurrent_outgoing_recoveries: 控制一个节点上同时进行的分片移入的进程数
  • cluster.routing.allocation.node_initial_primaries_recoveries: 控制前两个配置的总和
  • cluster.routing.allocation.node_initial_primaries_recoveries: 控制一个节点上同时进行的主分片恢复的进程数

  分片分配过滤设置

  • cluster.routing.allocation.include.{attribute}: 将索引分配给任意的给定节点

  • cluster.routing.allocation.require.{attribute}: 将索引分配给所有的给定节点
  • cluster.routing.allocation.exclude.{attribute}: 不将索引分配给所有的给定节点

5 写(Write)操作的实现原理

当向协调节点发送写入文档的请求时,将执行:

  1. 协调节点将文档ID路由到对应的主分片上. 路由算法: shard = hash(document_id) % (num_of_primary_shards), hash算法使用murmur3.
  2. 相应主分片所在节点收到请求时,将请求写入translog,并将数据发送到Index Buffer中.
  3. Refresh: 内存缓冲区定时刷新(默认1s),将数据刷新到文件系统缓冲中的新段.此时文档已经可以被搜索到.
  4. Flush: 触发lucene commit,文件系统缓冲每隔30min,或者translog太大(>512M),清空translog,将内容写入新的文件分段.

注意:

  • 每次执行index,bulk,delete,update后,都会触发flush,可能影响性能.设置"index.translog.durability":"async" 和 "index.translog.sync_interval":30s (默认是5s)可提升性能.
  • 如果请求在主分片上成功,则请求将并行发送到副本分片。只有在所有主分片和副本分片上的translog被fsync后,客户端才会收到请求成功的确认。

Elasticsearch的乐观并发控制和分片管理第1张

6 删(Delete)及更新(Update)操作的实现原理

Elasticsearch中的文档都是不可变的.所以删除和更新操作也是基于写实现的.磁盘中的段中有一个.del文件.

  • 执行删除时,即在文件中将之标记为已删除.此文档可能仍然能被搜索到,但会从结果中过滤掉。当分段合并时,在.del文件中标记为已删除的文档不会被包括在新的合并段中。
  • 执行更新时,旧版本在.del文件中被标记为已删除,并且新版本在新的分段中编入索引。旧版本可能仍然与搜索查询匹配,但是从结果中将其过滤掉。
7 线程池

线程池类型

fixed  

固定线程池处理请求,并挂起没有线程处理的请求.如果请求没有线程处理,并且队列满了,将会抛弃该请求。

size默认为5*cores,queue_size默认为-1

scaling

可变大小的pool,大小根据负载在core到max间,同样keep_alive参数指定了闲置线程被回收的时间。

fixed_auto_queue_size

和fixed类似,queue_size的值基于Little’s Law的计算自动调整.

size默认为5*cores

queue_size初始队列大小

min_queue_size队列大小最小值

max_queue_size队列大小最大值

auto_queue_frame_size测量时的操作数,应该足够大.

target_response_time任务执行时间超过该值,将会拒绝该任务.

cached(es 5.0之后已经不存在此类型)

5.0之前是generic线程池(generic操作,比如后台节点发现)的类型。无限制的线程池,为每个请求创建一个线程。这种线程池是为了防止请求被阻塞或者拒绝,其中的每个线程都有一个超时时间(keep_alive),默认5分钟,一旦超时就会回收/终止。

注意:5.0之后generic线程池已经变成scaling类型.

几种重要的线程池

generic  generic操作  scaling类型

index  index/delete操作  fixed类型,size为可用处理器数,queue_size为200,线程池最大值为可用处理器数+1

search  count/search/suggest操作  fixed_auto_queue_size类型,size为int((# of available_processors * 3) / 2) + 1,初始队列1000

get  get操作  fixed类型,size为可用处理器数,queue_size为1000

bulk  bulk操作   fixed类型,size为可用处理器数,queue_size为200,线程池最大值为可用处理器数+1

snapshot  snapshot/restore操作  scaling类型,keep_alive为5m,最大值min(5, (# of available processors)/2)

warmer  warm-up操作  scaling类型,keep_alive为5m,最大值为min(5, (# of available processors)/2)

refresh  refresh操作  scaling类型,keep_alive为5m,最大值为min(10, (# of available processors)/2)

listener  java客户端执行操作时,并且Listener Thread设为true  scaling类型.最大值为min(10, (# of available processors)/2)

8 搜索问题

结果震荡

搜索同一query,结果ES返回的顺序却不尽相同,这就是请求轮询到不同分片,而未设置排序条件,相同相关性评分情况下,由于评分采用的算法时TF(term frequency)和IDF(inverst document frequecy) 算出的总分在不同的shard上时不一样的,那么就造成了默认按照_score的分数排序,导致会出现结果不一致的情况。查询分析时将所有的请求发送到所有的shard上去。可用设置preference为字符串或者primary shard插叙等来解决该问题。

搜索类型

QUERY_THEN_FEATCH  向索引的所有分片(shard)都发出查询请求,各分片返回的时候把元素文档(document)和计算后的排名信息一起返回。

QUERY_AND_FEATCH  第一步,先向所有的shard发出请求,各分片只返回排序和排名相关的信息(不包括文档document),然后按照各分片返回的分数进行重新排序和排名,取前size个文档。第二步,去相关的shard取document。这种方式返回的document与用户要求的size是相等的。

DFS_QUERY_THEN_FEATCH  这种方式比第一种方式多了一个初始化散发(initial scatter)步骤,有这一步,据说可以更精确控制搜索打分和排名。

DFS_QUERY_AND_FEATCH  比第2种方式多了一个初始化散发(initial scatter)步骤。

初始化散发其实就是在进行真正的查询之前,先把各个分片的词频率和文档频率收集一下,然后进行词搜索的时候,各分片依据全局的词频率和文档频率进行搜索和排名。

 

免责声明:文章转载自《Elasticsearch的乐观并发控制和分片管理》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇perl语言入门学习笔记Java单体应用下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

解决elasticsearch报错报错“java.lang.IllegalArgumentException: Rejecting mapping update to [这里是索引名称保密] as the final mapping would have more than 1 type: [_doc, log]"”

某日在研究kibana的索引生命周期功能,感觉对于我们现在几千个索引蛮有用途,之前都是写个删除脚本呢,放到定时任务进行删除。 通过新建一个生命后期策略,设置日期15日,并通过索引模板匹配到测试索引的时候,第二天发现该索引无任何数据,显示索引大小283B,文档数0,似乎是不接受数据了 通过查看elasticsearch的logs_server.json日志,...

Elasticsearch入门和基本使用

1. 什么是Elasticsearch? Elasticsearch,分布式,高性能,高可用,可伸缩的搜索和分析系统;Elastic 是 Lucene 的封装,提供了 REST API 的操作接口,开箱即用。用于快速存储,搜索和海量数据分析; 2. Elasticsearch的优点 1)横向可扩展性:只需要增加一台机器,添加一些配置即可; 2)分片机制提供...

x-pack本地安装方式

一.首先下载本地安装包,我使用的ELK是5.6.1版本: https://artifacts.elastic.co/downloads 二.进入到elasticsearch/bin(所有节点)和kibana/bin安装x-pack:(都是非root) bin/elasticsearch-plugin install file:///绝对路径/x-pack....

Springboot学习笔记(一)-线程池的简化及使用

工作中经常涉及异步任务,通常是使用多线程技术,比如线程池ThreadPoolExecutor,它的执行规则如下: 在Springboot中对其进行了简化处理,只需要配置一个类型为java.util.concurrent.TaskExecutor或其子类的bean,并在配置类或直接在程序入口类上声明注解@EnableAsync。 调用也简单,在由Sprin...

Elasticsearch Java高级客户端

1.  概述 Java REST Client 有两种风格: Java Low Level REST Client :用于Elasticsearch的官方低级客户端。它允许通过http与Elasticsearch集群通信。将请求编排和响应反编排留给用户自己处理。它兼容所有的Elasticsearch版本。(PS:学过WebService的话,对编排与反编...

MySQL MySql连接数与线程池

MySql连接数与线程池 by:授客 QQ:1033553122   连接数1、  查看允许的最大并发连接数 SHOW VARIABLES LIKE 'max_connections';   2、  修改最大连接数 方法1:临时生效 SET GLOBAL max_connections=200; SET语法参考: http://dev.my...