Browse Source

更新

master
娄文智 3 years ago
parent
commit
21aeb77540
1 changed files with 157 additions and 0 deletions
  1. +157
    -0
      etcd/etcdsync.go

+ 157
- 0
etcd/etcdsync.go View File

@ -0,0 +1,157 @@
// Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved.
package etcd
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
/******************************************************************************
*
* @Function Name :
*-----------------------------------------------------------------------------
*
* @Description :
*
* @Function Parameters:
*
* @Return Value :
*
* @Author : Lou Wenzhi
*
* @Date : 2021/3/25 9:02
*
******************************************************************************/
// 分布式锁(TXN事务)
type JobLock struct {
// etcd客户端
Client *clientv3.Client
Kv clientv3.KV
Lease clientv3.Lease
JobName string // 任务名
CancelFunc context.CancelFunc // 用于终止自动续租
LeaseId clientv3.LeaseID // 租约ID
IsLocked bool // 是否上锁成功
}
var(
G_jobLock *JobLock
)
//初始化任务锁
func InitJobLock() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
)
//创建config
config = clientv3.Config{
Endpoints:[]string{"127.0.0.1:2379"},
DialTimeout: 5000 * time.Millisecond,
}
//创建client
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
//创建kv, lease
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
//赋值单例
G_jobLock = &JobLock{
Client:client,
Kv:kv,
Lease:lease,
}
return
}
//尝试上锁
func (jobLock *JobLock) TryLock(jobName string) (err error){
var(
ctx context.Context
cancelFunc context.CancelFunc
leaseGrantRes *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
KeepResChan <- chan *clientv3.LeaseKeepAliveResponse
txn clientv3.Txn
lockKey string
txnRes *clientv3.TxnResponse
)
//创建上下文
ctx, cancelFunc = context.WithCancel(context.TODO())
//创建续租
leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5)
//续租id
leaseId = leaseGrantRes.ID
//保持续租
if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{
fmt.Println(err)
goto FAIL
}
//每隔1秒续约续租
go func() {
var keepRes *clientv3.LeaseKeepAliveResponse
for{
select {
case keepRes = <- KeepResChan:
//如果续约失败
if keepRes == nil{
goto END
}
}
time.Sleep(1*time.Second)
}
END:
}()
//创建事务txn
txn = jobLock.Kv.Txn(context.TODO())
//锁路径
lockKey = jobName
//事务枪锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet(lockKey))
//提交事务
if txnRes, err = txn.Commit(); err != nil{
fmt.Println("提交创建锁失败", err)
goto FAIL
}
//如果抢锁失败
if !txnRes.Succeeded{
err = fmt.Errorf("锁被占用")
goto FAIL
}
jobLock.CancelFunc = cancelFunc
jobLock.LeaseId = leaseId
jobLock.IsLocked = true
return
FAIL:
//释放上下文,取消续约
cancelFunc()
jobLock.Lease.Revoke(ctx, leaseId) //释放租约
return
}
//释放锁
func (jobLock *JobLock)UnLock() {
if jobLock.IsLocked{
jobLock.CancelFunc() //取消自动续租协程
jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约
}
}

Loading…
Cancel
Save