// Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved. package etcd import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "time" ) /****************************************************************************** * * @Function Name : *----------------------------------------------------------------------------- * * @Description : * * @Function Parameters: * * @Return Value : * * @Author : Lou Wenzhi * * @Date : 2021/3/25 9:02 * ******************************************************************************/ // 分布式锁(TXN事务) type JobLock struct { // etcd客户端 Client *clientv3.Client Kv clientv3.KV Lease clientv3.Lease JobName string // 任务名 CancelFunc context.CancelFunc // 用于终止自动续租 LeaseId clientv3.LeaseID // 租约ID IsLocked bool // 是否上锁成功 } var( G_jobLock *JobLock ) //初始化任务锁 func InitJobLock() (err error) { var ( config clientv3.Config client *clientv3.Client kv clientv3.KV lease clientv3.Lease ) //创建config config = clientv3.Config{ Endpoints:[]string{"127.0.0.1:2379"}, DialTimeout: 5000 * time.Millisecond, } //创建client if client, err = clientv3.New(config); err != nil { fmt.Println(err) } //创建kv, lease kv = clientv3.NewKV(client) lease = clientv3.NewLease(client) //赋值单例 G_jobLock = &JobLock{ Client:client, Kv:kv, Lease:lease, } return } //尝试上锁 func (jobLock *JobLock) TryLock(jobName string) (err error){ var( ctx context.Context cancelFunc context.CancelFunc leaseGrantRes *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID KeepResChan <- chan *clientv3.LeaseKeepAliveResponse txn clientv3.Txn lockKey string txnRes *clientv3.TxnResponse ) //创建上下文 ctx, cancelFunc = context.WithCancel(context.TODO()) //创建续租 leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5) //续租id leaseId = leaseGrantRes.ID //保持续租 if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{ fmt.Println(err) goto FAIL } //每隔1秒续约续租 go func() { var keepRes *clientv3.LeaseKeepAliveResponse for{ select { case keepRes = <- KeepResChan: //如果续约失败 if keepRes == nil{ goto END } } time.Sleep(1*time.Second) } END: }() //创建事务txn txn = jobLock.Kv.Txn(context.TODO()) //锁路径 lockKey = jobName //事务枪锁 txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)). Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))). Else(clientv3.OpGet(lockKey)) //提交事务 if txnRes, err = txn.Commit(); err != nil{ fmt.Println("提交创建锁失败", err) goto FAIL } //如果抢锁失败 if !txnRes.Succeeded{ err = fmt.Errorf("锁被占用") goto FAIL } jobLock.CancelFunc = cancelFunc jobLock.LeaseId = leaseId jobLock.IsLocked = true return FAIL: //释放上下文,取消续约 cancelFunc() jobLock.Lease.Revoke(ctx, leaseId) //释放租约 return } //释放锁 func (jobLock *JobLock)UnLock() { if jobLock.IsLocked{ jobLock.CancelFunc() //取消自动续租协程 jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约 } }