// Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved.
|
|
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
conf "leit.com/leit_seat_aps/config"
|
|
"time"
|
|
)
|
|
|
|
/******************************************************************************
|
|
*
|
|
* @Function Name :
|
|
*-----------------------------------------------------------------------------
|
|
*
|
|
* @Description :
|
|
*
|
|
* @Function Parameters:
|
|
*
|
|
* @Return Value :
|
|
*
|
|
* @Author : Lou Wenzhi
|
|
*
|
|
* @Date : 2021/3/25 9:02
|
|
*
|
|
******************************************************************************/
|
|
|
|
// 分布式锁(TXN事务)
|
|
type JobLock struct {
|
|
// etcd客户端
|
|
Client *clientv3.Client
|
|
Kv clientv3.KV
|
|
Lease clientv3.Lease
|
|
JobName string // 任务名
|
|
CancelFunc context.CancelFunc // 用于终止自动续租
|
|
LeaseId clientv3.LeaseID // 租约ID
|
|
IsLocked bool // 是否上锁成功
|
|
}
|
|
|
|
var(
|
|
G_jobLock *JobLock
|
|
)
|
|
|
|
//初始化任务锁
|
|
func InitJobLock() (err error) {
|
|
var (
|
|
config clientv3.Config
|
|
client *clientv3.Client
|
|
kv clientv3.KV
|
|
lease clientv3.Lease
|
|
|
|
)
|
|
//创建config
|
|
config = clientv3.Config{
|
|
Endpoints:[]string{"127.0.0.1:2379"},
|
|
DialTimeout: 5000 * time.Millisecond,
|
|
}
|
|
//创建client
|
|
if client, err = clientv3.New(config); err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
//创建kv, lease
|
|
kv = clientv3.NewKV(client)
|
|
lease = clientv3.NewLease(client)
|
|
//赋值单例
|
|
G_jobLock = &JobLock{
|
|
Client:client,
|
|
Kv:kv,
|
|
Lease:lease,
|
|
}
|
|
return
|
|
}
|
|
|
|
|
|
//尝试上锁
|
|
func (jobLock *JobLock) TryLock(jobName string) (err error){
|
|
|
|
var(
|
|
ctx context.Context
|
|
cancelFunc context.CancelFunc
|
|
leaseGrantRes *clientv3.LeaseGrantResponse
|
|
leaseId clientv3.LeaseID
|
|
KeepResChan <- chan *clientv3.LeaseKeepAliveResponse
|
|
txn clientv3.Txn
|
|
lockKey string
|
|
txnRes *clientv3.TxnResponse
|
|
|
|
)
|
|
//创建上下文
|
|
ctx, cancelFunc = context.WithCancel(context.TODO())
|
|
//创建续租
|
|
leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5)
|
|
//续租id
|
|
leaseId = leaseGrantRes.ID
|
|
//保持续租
|
|
if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{
|
|
fmt.Println(err)
|
|
goto FAIL
|
|
}
|
|
//每隔1秒续约续租
|
|
go func() {
|
|
var keepRes *clientv3.LeaseKeepAliveResponse
|
|
for{
|
|
select {
|
|
case keepRes = <- KeepResChan:
|
|
//如果续约失败
|
|
if keepRes == nil{
|
|
goto END
|
|
}
|
|
}
|
|
time.Sleep(1*time.Second)
|
|
}
|
|
END:
|
|
}()
|
|
|
|
//创建事务txn
|
|
txn = jobLock.Kv.Txn(context.TODO())
|
|
//锁路径
|
|
lockKey = jobName
|
|
//事务枪锁
|
|
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
|
|
Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
|
|
Else(clientv3.OpGet(lockKey))
|
|
|
|
//提交事务
|
|
if txnRes, err = txn.Commit(); err != nil{
|
|
fmt.Println("提交创建锁失败", err)
|
|
goto FAIL
|
|
|
|
}
|
|
//如果抢锁失败
|
|
if !txnRes.Succeeded{
|
|
err = fmt.Errorf("锁被占用")
|
|
goto FAIL
|
|
}
|
|
|
|
jobLock.CancelFunc = cancelFunc
|
|
jobLock.LeaseId = leaseId
|
|
jobLock.IsLocked = true
|
|
|
|
return
|
|
|
|
FAIL:
|
|
//释放上下文,取消续约
|
|
cancelFunc()
|
|
jobLock.Lease.Revoke(ctx, leaseId) //释放租约
|
|
return
|
|
}
|
|
|
|
|
|
//释放锁
|
|
func (jobLock *JobLock)UnLock() {
|
|
if jobLock.IsLocked{
|
|
jobLock.CancelFunc() //取消自动续租协程
|
|
jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约
|
|
}
|
|
}
|
|
|
|
|
|
var AppEtcdClient *clientv3.Client
|
|
|
|
func InitEtcdClient() (err error) {
|
|
AppEtcdClient, err = clientv3.New(clientv3.Config{
|
|
Endpoints: conf.AppConfig.ETCD.Addrs,
|
|
DialTimeout: time.Duration(conf.AppConfig.ETCD.Timeout) * time.Second,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|