diff --git a/etcd/etcdsync.go b/etcd/etcdsync.go new file mode 100644 index 0000000..a448105 --- /dev/null +++ b/etcd/etcdsync.go @@ -0,0 +1,157 @@ +// Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved. +package etcd + +import ( + "context" + "fmt" + clientv3 "go.etcd.io/etcd/client/v3" + "time" +) + +/****************************************************************************** + * + * @Function Name : + *----------------------------------------------------------------------------- + * + * @Description : + * + * @Function Parameters: + * + * @Return Value : + * + * @Author : Lou Wenzhi + * + * @Date : 2021/3/25 9:02 + * + ******************************************************************************/ + +// 分布式锁(TXN事务) +type JobLock struct { + // etcd客户端 + Client *clientv3.Client + Kv clientv3.KV + Lease clientv3.Lease + JobName string // 任务名 + CancelFunc context.CancelFunc // 用于终止自动续租 + LeaseId clientv3.LeaseID // 租约ID + IsLocked bool // 是否上锁成功 +} + +var( + G_jobLock *JobLock +) + +//初始化任务锁 +func InitJobLock() (err error) { + var ( + config clientv3.Config + client *clientv3.Client + kv clientv3.KV + lease clientv3.Lease + + ) + //创建config + config = clientv3.Config{ + Endpoints:[]string{"127.0.0.1:2379"}, + DialTimeout: 5000 * time.Millisecond, + } + //创建client + if client, err = clientv3.New(config); err != nil { + fmt.Println(err) + } + //创建kv, lease + kv = clientv3.NewKV(client) + lease = clientv3.NewLease(client) + //赋值单例 + G_jobLock = &JobLock{ + Client:client, + Kv:kv, + Lease:lease, + } + return +} + + +//尝试上锁 +func (jobLock *JobLock) TryLock(jobName string) (err error){ + + var( + ctx context.Context + cancelFunc context.CancelFunc + leaseGrantRes *clientv3.LeaseGrantResponse + leaseId clientv3.LeaseID + KeepResChan <- chan *clientv3.LeaseKeepAliveResponse + txn clientv3.Txn + lockKey string + txnRes *clientv3.TxnResponse + + ) + //创建上下文 + ctx, cancelFunc = context.WithCancel(context.TODO()) + //创建续租 + leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5) + //续租id + leaseId = leaseGrantRes.ID + //保持续租 + if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{ + fmt.Println(err) + goto FAIL + } + //每隔1秒续约续租 + go func() { + var keepRes *clientv3.LeaseKeepAliveResponse + for{ + select { + case keepRes = <- KeepResChan: + //如果续约失败 + if keepRes == nil{ + goto END + } + } + time.Sleep(1*time.Second) + } + END: + }() + + //创建事务txn + txn = jobLock.Kv.Txn(context.TODO()) + //锁路径 + lockKey = jobName + //事务枪锁 + txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)). + Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))). + Else(clientv3.OpGet(lockKey)) + + //提交事务 + if txnRes, err = txn.Commit(); err != nil{ + fmt.Println("提交创建锁失败", err) + goto FAIL + + } + //如果抢锁失败 + if !txnRes.Succeeded{ + err = fmt.Errorf("锁被占用") + goto FAIL + } + + jobLock.CancelFunc = cancelFunc + jobLock.LeaseId = leaseId + jobLock.IsLocked = true + + return + +FAIL: + //释放上下文,取消续约 + cancelFunc() + jobLock.Lease.Revoke(ctx, leaseId) //释放租约 + return +} + + +//释放锁 +func (jobLock *JobLock)UnLock() { + if jobLock.IsLocked{ + jobLock.CancelFunc() //取消自动续租协程 + jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约 + } +}