服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - 编程技术 - 一篇带给你etcd与分布式锁

一篇带给你etcd与分布式锁

2021-05-19 23:48今日头条编程技术之道 编程技术

可以提供分布式锁功能的组件有多种,但是每一种都有自己的脾气与性格。至于选择哪一种组件,则要看数据对业务的重要性,数据要求强一致性推荐支持CP的etcd、zookeeper,数据允许少量丢失、不要求强一致性的推荐支持AP的Redis。

一篇带给你etcd与分布式锁

1. 实现分布式锁的组件们

 

分布式系统中,常用于实现分布式锁的组件有:Redis、zookeeper、etcd,下面针对各自的特性进行对比:

一篇带给你etcd与分布式锁

由上图可以看出三种组件各自的特点,其中对于分布式锁来说至关重要的一点是要求CP。但是,Redis集群却不支持CP,而是支持AP。虽然,官方也给出了redlock的方案,但由于需要部署多个实例(超过一半实例成功才视为成功),部署、维护比较复杂。所以在对一致性要求很高的业务场景下(电商、银行支付),一般选择使用zookeeper或者etcd。对比zookeeper与etcd,如果考虑性能、并发量、维护成本来看。由于etcd是用Go语言开发,直接编译为二进制可执行文件,并不依赖其他任何东西,则更具有优势。本文,则选择etcd来讨论某些观点。

2. 对于分布式锁来说AP为什么不好

 

在CAP理论中,由于分布式系统中多节点通信不可避免出现网络延迟、丢包等问题一定会造成网络分区,在造成网络分区的情况下,一般有两个选择:CP or AP。

① 选择AP模型实现分布式锁时,client在通过集群主节点加锁成功之后,则立刻会获取锁成功的反馈。此时,在主节点还没来得及把数据同步给从节点时发生down机的话,系统会在从节点中选出一个节点作为新的主节点,新的主节点没有老的主节点对应的锁数据,导致其他client可以在新的主节点上拿到相同的锁。这个时候,就会导致多个进程/线程/协程来操作相同的临界资源数据,从而引发数据不一致性等问题。

② 选择CP模型实现分布式锁,只有在主节点把数据同步给大于1/2的从节点之后才被视为加锁成功。此时,主节点由于某些原因down机,系统会在从节点中选取出来数据比较新的一个从节点作为新的主节点,从而避免数据丢失等问题。

所以,对于分布式锁来说,在对数据有强一致性要求的场景下,AP模型不是一个好的选择。如果可以容忍少量数据丢失,出于维护成本等因素考虑,AP模型的Redis可优先选择。

3. 分布式锁的特点以及操作

 

对于分布式锁来说,操作的动作包含:

  1. 获取锁
  2. 释放锁
  3. 业务处理过程中过程中,另起线程/协程进行锁的续约

一篇带给你etcd与分布式锁

4. 关于etcd

 

一篇带给你etcd与分布式锁

官方文档永远是最好的学习资料,官方介绍etcd如是说:

  • 分布式系统使用etcd作为配置管理、服务发现和协调分布式工作的一致键值存储。许多组织使用etcd来实现生产系统,如容器调度器、服务发现服务和分布式数据存储。使用etcd的常见分布式模式包括leader选举、分布式锁和监视机器活动。
  • Distributed systems use etcd as a consistent key-value store for configuration management, service discovery, and coordinating distributed work. Many organizations use etcd to implement production systems such as container schedulers, service discovery services, and distributed data storage. Common distributed patterns using etcd include leader election, distributed locks, and monitoring machine liveness.
  • https://etcd.io/docs/v3.4/learning/why/

分布式锁仅是etcd可以实现众多功能中的一项,服务注册与发现在etcd中用的则会更多。

官方也对众多组件进行了对比,并整理如下:

一篇带给你etcd与分布式锁

通过对比可以看出各自的特点,至于具体选择哪一款,你心中可能也有了自己的答案。

5. etcd实现分布式锁的相关接口

 

对于分布式锁,主要用到etcd对应的添加、删除、续约接口。

  1. // KV:键值相关操作 
  2. type KV interface { 
  3.     // 存放. 
  4.     Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) 
  5.     // 获取. 
  6.     Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) 
  7.     // 删除. 
  8.     Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) 
  9.     // 压缩rev指定版本之前的历史数据. 
  10.     Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) 
  11.     // 通用的操作执行命令,可用于操作集合的遍历。Put/Get/Delete也是基于Do. 
  12.     Do(ctx context.Context, op Op) (OpResponse, error) 
  13.     // 创建一个事务,只支持If/Then/Else/Commit操作. 
  14.     Txn(ctx context.Context) Txn 
  15.  
  16.  
  17. // Lease:租约相关操作 
  18. type Lease interface { 
  19.     // 分配一个租约. 
  20.     Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) 
  21.     // 释放一个租约. 
  22.     Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) 
  23.     // 获取剩余TTL时间. 
  24.     TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) 
  25.     // 获取所有租约. 
  26.     Leases(ctx context.Context) (*LeaseLeasesResponse, error) 
  27.     // 续约保持激活状态. 
  28.     KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) 
  29.     // 仅续约激活一次. 
  30.     KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) 
  31.     // 关闭续约激活的功能. 
  32.     Close() error 

 6. etcd实现分布式锁代码示例

 

  1. package main 
  2.  
  3. import ( 
  4.     "context" 
  5.     "fmt" 
  6.     "go.etcd.io/etcd/clientv3" 
  7.     "time" 
  8.  
  9. var conf clientv3.Config 
  10.  
  11. // 锁结构体 
  12. type EtcdMutex struct { 
  13.     Ttl int64//租约时间 
  14.  
  15.     Conf   clientv3.Config    //etcd集群配置 
  16.     Key    string//etcd的key 
  17.     cancel context.CancelFunc //关闭续租的func 
  18.  
  19.     txn     clientv3.Txn 
  20.     lease   clientv3.Lease 
  21.     leaseID clientv3.LeaseID 
  22.  
  23. // 初始化锁 
  24. func (em *EtcdMutex) init() error { 
  25.     var err error 
  26.     var ctx context.Context 
  27.  
  28.     client, err := clientv3.New(em.Conf) 
  29.     if err != nil { 
  30.         return err 
  31.     } 
  32.  
  33.     em.txn = clientv3.NewKV(client).Txn(context.TODO()) 
  34.     em.lease = clientv3.NewLease(client) 
  35.     leaseResp, err := em.lease.Grant(context.TODO(), em.Ttl) 
  36.  
  37.     if err != nil { 
  38.         return err 
  39.     } 
  40.  
  41.     ctx, em.cancel = context.WithCancel(context.TODO()) 
  42.     em.leaseID = leaseResp.ID 
  43.     _, err = em.lease.KeepAlive(ctx, em.leaseID) 
  44.  
  45.     return err 
  46.  
  47. // 获取锁 
  48. func (em *EtcdMutex) Lock() error { 
  49.     err := em.init() 
  50.     if err != nil { 
  51.         return err 
  52.     } 
  53.  
  54.     // LOCK 
  55.     em.txn.If(clientv3.Compare(clientv3.CreateRevision(em.Key), "=", 0)). 
  56.         Then(clientv3.OpPut(em.Key"", clientv3.WithLease(em.leaseID))).Else() 
  57.  
  58.     txnResp, err := em.txn.Commit() 
  59.     if err != nil { 
  60.         return err 
  61.     } 
  62.  
  63.     // 判断txn.if条件是否成立 
  64.     if !txnResp.Succeeded { 
  65.         return fmt.Errorf("抢锁失败"
  66.     } 
  67.  
  68.     returnnil 
  69.  
  70. //释放锁 
  71. func (em *EtcdMutex) UnLock() { 
  72.     // 租约自动过期,立刻过期 
  73.     // cancel取消续租,而revoke则是立即过期 
  74.     em.cancel() 
  75.     em.lease.Revoke(context.TODO(), em.leaseID) 
  76.  
  77.     fmt.Println("释放了锁"
  78.  
  79. // groutine1 
  80. func try2lock1() { 
  81.     eMutex1 := &EtcdMutex{ 
  82.         Conf: conf, 
  83.         Ttl:  10, 
  84.         Key:  "lock"
  85.     } 
  86.  
  87.     err := eMutex1.Lock() 
  88.     if err != nil { 
  89.         fmt.Println("groutine1抢锁失败"
  90.         return 
  91.     } 
  92.     defer eMutex1.UnLock() 
  93.  
  94.     fmt.Println("groutine1抢锁成功"
  95.     time.Sleep(10 * time.Second
  96.  
  97. // groutine2 
  98. func try2lock2() { 
  99.     eMutex2 := &EtcdMutex{ 
  100.         Conf: conf, 
  101.         Ttl:  10, 
  102.         Key:  "lock"
  103.     } 
  104.  
  105.     err := eMutex2.Lock() 
  106.     if err != nil { 
  107.         fmt.Println("groutine2抢锁失败"
  108.         return 
  109.     } 
  110.  
  111.     defer eMutex2.UnLock() 
  112.     fmt.Println("groutine2抢锁成功"
  113.  
  114. // 测试代码 
  115. func EtcdRunTester() { 
  116.     conf = clientv3.Config{ 
  117.         Endpoints:   []string{"127.0.0.1:2379"}, 
  118.         DialTimeout: 5 * time.Second
  119.     } 
  120.  
  121.     // 启动两个协程竞争锁 
  122.     go try2lock1() 
  123.     go try2lock2() 
  124.  
  125.     time.Sleep(300 * time.Second

 总结

 

可以提供分布式锁功能的组件有多种,但是每一种都有自己的脾气与性格。至于选择哪一种组件,则要看数据对业务的重要性,数据要求强一致性推荐支持CP的etcd、zookeeper,数据允许少量丢失、不要求强一致性的推荐支持AP的Redis。

原文链接:https://www.toutiao.com/i6963570087278182923/

延伸 · 阅读

精彩推荐