|
|
- // Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved.
- package etcd
-
- import (
- "context"
- "fmt"
- clientv3 "go.etcd.io/etcd/client/v3"
- "time"
- )
-
- /******************************************************************************
- *
- * @Function Name :
- *-----------------------------------------------------------------------------
- *
- * @Description :
- *
- * @Function Parameters:
- *
- * @Return Value :
- *
- * @Author : Lou Wenzhi
- *
- * @Date : 2021/3/25 9:02
- *
- ******************************************************************************/
-
- // 分布式锁(TXN事务)
- type SnrJobLock struct {
- // etcd客户端
- Client *clientv3.Client
- Kv clientv3.KV
- Lease clientv3.Lease
- JobName string // 任务名
- CancelFunc context.CancelFunc // 用于终止自动续租
- LeaseId clientv3.LeaseID // 租约ID
- IsLocked bool // 是否上锁成功
- }
-
- var(
- Snr_jobLock *SnrJobLock
- )
-
- //初始化任务锁
- func InitSnrJobLock() (err error) {
- var (
- config clientv3.Config
- client *clientv3.Client
- kv clientv3.KV
- lease clientv3.Lease
-
- )
- //创建config
- config = clientv3.Config{
- Endpoints:[]string{"127.0.0.1:2379"},
- DialTimeout: 5000 * time.Millisecond,
- }
- //创建client
- if client, err = clientv3.New(config); err != nil {
- fmt.Println(err)
- }
- //创建kv, lease
- kv = clientv3.NewKV(client)
- lease = clientv3.NewLease(client)
- //赋值单例
- Snr_jobLock = &SnrJobLock{
- Client:client,
- Kv:kv,
- Lease:lease,
- }
- return
- }
-
-
- //尝试上锁
- func (jobLock *SnrJobLock) TryLock(jobName string) (err error){
-
- var(
- ctx context.Context
- cancelFunc context.CancelFunc
- leaseGrantRes *clientv3.LeaseGrantResponse
- leaseId clientv3.LeaseID
- KeepResChan <- chan *clientv3.LeaseKeepAliveResponse
- txn clientv3.Txn
- lockKey string
- txnRes *clientv3.TxnResponse
-
- )
- //创建上下文
- ctx, cancelFunc = context.WithCancel(context.TODO())
- //创建续租
- leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5)
- //续租id
- leaseId = leaseGrantRes.ID
- //保持续租
- if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{
- fmt.Println(err)
- goto FAIL
- }
- //每隔1秒续约续租
- go func() {
- var keepRes *clientv3.LeaseKeepAliveResponse
- for{
- select {
- case keepRes = <- KeepResChan:
- //如果续约失败
- if keepRes == nil{
- goto END
- }
- }
- time.Sleep(1*time.Second)
- }
- END:
- }()
-
- //创建事务txn
- txn = jobLock.Kv.Txn(context.TODO())
- //锁路径
- lockKey = jobName
- //事务枪锁
- txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
- Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
- Else(clientv3.OpGet(lockKey))
-
- //提交事务
- if txnRes, err = txn.Commit(); err != nil{
- fmt.Println("提交创建锁失败", err)
- goto FAIL
-
- }
- //如果抢锁失败
- if !txnRes.Succeeded{
- err = fmt.Errorf("锁被占用")
- goto FAIL
- }
-
- jobLock.CancelFunc = cancelFunc
- jobLock.LeaseId = leaseId
- jobLock.IsLocked = true
-
- return
-
- FAIL:
- //释放上下文,取消续约
- cancelFunc()
- jobLock.Lease.Revoke(ctx, leaseId) //释放租约
- return
- }
-
-
- //释放锁
- func (jobLock *SnrJobLock)UnLock() {
- if jobLock.IsLocked{
- jobLock.CancelFunc() //取消自动续租协程
- jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约
- }
- }
|