您的当前位置:首页正文

大数据-183 Elasticsearch - 原理剖析 - 并发冲突处理机制剖析、分布式数据一致性剖析

2024-11-08 来源:个人技术集锦

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Elasticsearch 倒排索引 原理剖析
  • Elasticsearch SkipList 原理剖析

详解并发冲突

在电商场景下,工作流程为:

  • 读取商品信息,包括库存数量
  • 用户下单购买
  • 更新商品信息,将库存数减一

如果是多线程操作,就可能有多个线程并发的去执行上述的3步骤流程,假如此时有两个人都来读取商品数据,两个线程并发的服务于两个人,同时在进行商品库存数据的修改,假设库存为100件,正确的情况:线程A将库存-1,设置为99件,线程B读取99再-1,设置为98件。但是如果A和B都是读取的99件,那么后续就会出现数量错误的问题。

解决方案

悲观锁

每次去拿数据的时候认为都会被人修改,所有每次拿数据的时候都会加锁,以防止别人修改,直到操作完成后,再释放锁,才会被别人拿去执行。
常见的关系型数据库,就用到了如行锁、表锁、写锁,都是在操作之前的锁。

  • 悲观锁优点:方便直接加锁,对外透明,不需要额外的操作。
  • 悲观锁缺点:并发能力低,同一时间只能有一个操作。

乐观锁

乐观锁不加锁,每个线程都可以任意操作。比如每条文档中都有一个version字段,新建文档后为1,每次修改都进行累加,线程A和B都拿到version为1,等写入时会和ES中的版本号进行比较,如果相等则写入成功,失败则重新读取数据再-1,再进行对比,如果相等则写入成功。

Elasticsearch的乐观锁

Elasticsearch的后台都是多线程异步的,多个请求之间是乱序的,可能后修改的先到,先修改的后到。
Elasticsearch的多线程异步是基于自己的_version版本号进行乐观锁并发控制的。
在后修改的先到时,比较版本号,版本号相同则修改成功,而当先修改的后到时,也会比较版本号,如果不相等就再次读取新的数据来修改,直到成功。
删除操作的时候,也会对这条数据的版本号+1,在删除一个Document之后,可以从一个侧面证明,它不是立即物理上删除的,因为它的一些版本号信息还保留着。先删除一条Document,再重新创建这套Document,其实会在delete version基础之上,再把version加1。

Elasticsearch的乐观锁测试

新建一条数据:

PUT /wzk_lock_index/_doc/1
{
  "test_field": "wzkicu"
}

执行结果如下图所示,可以看到 _version 是1。

假设我们现在有A和B两个客户端同时拿到了数据,想要进行更新:

# A 更新
PUT /wzk_lock_index/_doc/1
{
  "test_field": "client1 update"
}

可以看到执行结果,顺利更新了,_version变成了2:

此时B在同一时间要进行更新:

# B 更新
PUT /wzk_lock_index/_doc/1?if_seq_no=0&if_primary_term=1
{
  "test_field": "client2 update"
}

可以看到,此时报错了:

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[1]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",
        "index_uuid": "BBxoVVVqSw2TxtU-vPd-NA",
        "shard": "0",
        "index": "wzk_lock_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[1]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",
    "index_uuid": "BBxoVVVqSw2TxtU-vPd-NA",
    "shard": "0",
    "index": "wzk_lock_index"
  },
  "status": 409
}

对应的截图如下所示:

这说明我们的乐观锁是生效的,阻止了并发的问题。
我们需要重新 GET请求获取当前的版本信息:

GET /wzk_lock_index/_doc/1

获取到当前的 version 是 2、seq_no是1,primary_term是1:

B客户端重新发起更新:

# B 再次更新
PUT /wzk_lock_index/_doc/1?if_seq_no=1&if_primary_term=1
{
  "test_field": "client2 update"
}

我们可以看到执行成功了:

分布式数据一致性

在分布式环境下,一致性指的是多个数据副本是否能保持一致的特性。在一致性条件下,系统在执行数据更新操作之后能够从一致性状态到另一个一致性状态,对系统的一个数据更新成功之后,如果所有用户都能够读取到最新的值,该系统就被认为具有强一致性。

ES5.0以前的一致性

  • consistency
  • one(primary shard)
  • all(all shard)
  • quorum(default)

我们在发送任何一个增删改查的时候,比如PUT时,都可以带上一个consistency参数,指明我们想要的写一致性是什么?比如:

PUT /index/indextype/id?consistency=quorum

quorum机制

写之前必须确保shard可用:

# 当num_of_replicas > 1时才生效
int((primary shard + number_of_replicas) / 2) + 1

比如:1个primary shard,3个replica,那么quorum=((1 + 3 ) / 2) + 1 = 3。
如果这时只有两台机器的话:

Timeout机制

quorum不齐全时,会wait(等待)1分钟
默认是1分钟,但是可以通过timeout去手动调整,默认单位是毫秒。
等待期间,期望活跃的shard数量可以增加,最后无法满足shard数量就timeout。
我们在写操作的时候,可以加一个timeout参数,比如:

# 当quorum不齐全的时候 ES的timeout时长
PUT /index/_doc/id?timeout=30s

ES5.0以后的一致性

在ES5.0以后,原先执行PUT带consistency=all/quorum参数的,都会报错,提示语法错误。
原因是consistency检查是在PUT之前做的,然而,虽然检查的时候,shard满足quorum,但是真正primary shard写到replica之前,仍然会出现shard挂掉,但Update API也会返回Successd,因此,这个检查不能保证replica成功写入,甚至这个primary shard是否能成功写入也未必能保证。
因此,修改了语法,用了 wait_for_active_shards,这个更加清楚一些:

PUT /index/_doc/1?wait_for_active_shards=2&timeout=10s
{
  "xxx": "xxx"
}
显示全文