Browse Source

snr 分布式锁

master
娄文智 3 years ago
parent
commit
a84e90c518
9 changed files with 206 additions and 10 deletions
  1. +6
    -1
      cmd/bbaasn/main.go
  2. +6
    -0
      cmd/calloff/main.go
  3. +7
    -0
      cmd/msg/main.go
  4. +5
    -0
      cmd/scheduler/main.go
  5. +5
    -0
      cmd/seq/main.go
  6. +7
    -2
      common/Utils.go
  7. +157
    -0
      etcd/snretcd.go
  8. +1
    -0
      service/Calloffproject.go
  9. +12
    -7
      service/Snrtab.go

+ 6
- 1
cmd/bbaasn/main.go View File

@ -7,6 +7,7 @@ import (
"leit.com/leit_seat_aps/common"
"leit.com/leit_seat_aps/config"
"leit.com/leit_seat_aps/db"
"leit.com/leit_seat_aps/etcd"
"leit.com/leit_seat_aps/glog"
"leit.com/leit_seat_aps/msg"
"log"
@ -102,7 +103,11 @@ func imain(){
glog.Infoln("初始化数据库", "InitMssqlDb return fail")
return
}
//创建snretcd连接
if err = etcd.InitSnrJobLock(); err != nil {
glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail")
return
}
// 为发运单生成独立的ASN,基于发运单消息
go msg.Asn()
//生成发运单


+ 6
- 0
cmd/calloff/main.go View File

@ -108,6 +108,12 @@ func imain() {
glog.Infoln("初始化EtCD连接", "InitJobLock return fail")
return
}
//创建snretcd连接
if err = etcd.InitSnrJobLock(); err != nil {
glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail")
return
}
// Calloff文件解析和保存,读取Calloff信息并生成发运包装
go calloff.ParseCalloffNew()


+ 7
- 0
cmd/msg/main.go View File

@ -7,6 +7,7 @@ import (
"leit.com/leit_seat_aps/common"
"leit.com/leit_seat_aps/config"
"leit.com/leit_seat_aps/db"
"leit.com/leit_seat_aps/etcd"
"leit.com/leit_seat_aps/glog"
"leit.com/leit_seat_aps/msg"
"leit.com/leit_seat_aps/seq"
@ -104,6 +105,12 @@ func imain() {
glog.Infoln("初始化数据库", "InitMssqlDb return fail")
return
}
//创建snretcd连接
if err = etcd.InitSnrJobLock(); err != nil {
glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail")
return
}
// 运行二级件需求计算服务
go tier2.RunDemandCollectorTask()
//msg消息推送


+ 5
- 0
cmd/scheduler/main.go View File

@ -108,6 +108,11 @@ func imain() {
glog.Infoln("初始化EtCD连接", "InitJobLock return fail")
return
}
//创建snretcd连接
if err = etcd.InitSnrJobLock(); err != nil {
glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail")
return
}
// 生产派工
go seq.Scheduler()


+ 5
- 0
cmd/seq/main.go View File

@ -108,6 +108,11 @@ func imain(){
glog.Infoln("初始化EtCD连接", "InitJobLock return fail")
return
}
//创建snretcd连接
if err = etcd.InitSnrJobLock(); err != nil {
glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail")
return
}
//测试SEQ文件解析和保存
go seq.ParseSeqNew()


+ 7
- 2
common/Utils.go View File

@ -22,7 +22,6 @@ import (
"unsafe"
)
type HWND uintptr
const (
@ -81,7 +80,12 @@ func ConvInt2FormatString(input, ilen int) (retstring string) {
retstring = string(input)
} else {
igap = ilen - len(strconv.Itoa(input))
retstring = strings.Repeat("0", igap) + strconv.Itoa(input)
if igap > 0 {
retstring = strings.Repeat("0", igap) + strconv.Itoa(input)
} else {
retstring = strconv.Itoa(input)
}
}
return
}
@ -472,6 +476,7 @@ func CreateCaptcha() string {
func CreateCaptchaFive() string {
return fmt.Sprintf("%05v", rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(100000))
}
/*!
username 发送者邮件
password 授权码


+ 157
- 0
etcd/snretcd.go View File

@ -0,0 +1,157 @@
// Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved.
package etcd
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
/******************************************************************************
*
* @Function Name :
*-----------------------------------------------------------------------------
*
* @Description :
*
* @Function Parameters:
*
* @Return Value :
*
* @Author : Lou Wenzhi
*
* @Date : 2021/3/25 9:02
*
******************************************************************************/
// 分布式锁(TXN事务)
type SnrJobLock struct {
// etcd客户端
Client *clientv3.Client
Kv clientv3.KV
Lease clientv3.Lease
JobName string // 任务名
CancelFunc context.CancelFunc // 用于终止自动续租
LeaseId clientv3.LeaseID // 租约ID
IsLocked bool // 是否上锁成功
}
var(
Snr_jobLock *SnrJobLock
)
//初始化任务锁
func InitSnrJobLock() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
)
//创建config
config = clientv3.Config{
Endpoints:[]string{"127.0.0.1:2379"},
DialTimeout: 5000 * time.Millisecond,
}
//创建client
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
}
//创建kv, lease
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
//赋值单例
Snr_jobLock = &SnrJobLock{
Client:client,
Kv:kv,
Lease:lease,
}
return
}
//尝试上锁
func (jobLock *SnrJobLock) TryLock(jobName string) (err error){
var(
ctx context.Context
cancelFunc context.CancelFunc
leaseGrantRes *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
KeepResChan <- chan *clientv3.LeaseKeepAliveResponse
txn clientv3.Txn
lockKey string
txnRes *clientv3.TxnResponse
)
//创建上下文
ctx, cancelFunc = context.WithCancel(context.TODO())
//创建续租
leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5)
//续租id
leaseId = leaseGrantRes.ID
//保持续租
if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{
fmt.Println(err)
goto FAIL
}
//每隔1秒续约续租
go func() {
var keepRes *clientv3.LeaseKeepAliveResponse
for{
select {
case keepRes = <- KeepResChan:
//如果续约失败
if keepRes == nil{
goto END
}
}
time.Sleep(1*time.Second)
}
END:
}()
//创建事务txn
txn = jobLock.Kv.Txn(context.TODO())
//锁路径
lockKey = jobName
//事务枪锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet(lockKey))
//提交事务
if txnRes, err = txn.Commit(); err != nil{
fmt.Println("提交创建锁失败", err)
goto FAIL
}
//如果抢锁失败
if !txnRes.Succeeded{
err = fmt.Errorf("锁被占用")
goto FAIL
}
jobLock.CancelFunc = cancelFunc
jobLock.LeaseId = leaseId
jobLock.IsLocked = true
return
FAIL:
//释放上下文,取消续约
cancelFunc()
jobLock.Lease.Revoke(ctx, leaseId) //释放租约
return
}
//释放锁
func (jobLock *SnrJobLock)UnLock() {
if jobLock.IsLocked{
jobLock.CancelFunc() //取消自动续租协程
jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约
}
}

+ 1
- 0
service/Calloffproject.go View File

@ -534,6 +534,7 @@ func (cop *CalloffProject) CreatePackOrderByCheckSeqWO(session *xorm.Session, cf
// 4. 获取包装箱流水号( 默认取多个模板最后一个模板为参照)
if bl_pak.Paktmpltab.Boxsn != "" {
glog.InfoExtln("流水号","bl_pak.Paktmpltab.Boxsn",bl_pak.Paktmpltab.Boxsn)
if boxid, err = SN_GetNextSnrBySession(bl_pak.Paktmpltab.Boxsn, session); err != nil {
return
}


+ 12
- 7
service/Snrtab.go View File

@ -6,6 +6,7 @@ import (
"leit.com/leit_seat_aps/common"
"leit.com/leit_seat_aps/db"
"leit.com/leit_seat_aps/etcd"
"leit.com/leit_seat_aps/glog"
"strconv"
"strings"
"time"
@ -182,7 +183,8 @@ func SN_GetNextSnrBySession(snr string, session *xorm.Session) (retsnr string, e
nextsnr = snrtab.Nextnr
}
}
glog.InfoExtln("流水号","nextsnr",nextsnr)
glog.InfoExtln("流水号","snrtab.Length",snrtab.Length)
// 组合返回值
layout = []byte(snrtab.Identifierlayout)
for i = 0; i < len(layout); i++ {
@ -228,14 +230,13 @@ func SN_GetNextSnrBySession(snr string, session *xorm.Session) (retsnr string, e
// 获取指定序列的下一个流水号(异步获取)
func SN_SyncGetNextSnr(snr string) (retsnr string, err error) {
/* 获取锁 */
LOOP:
err = etcd.G_jobLock.TryLock("msgid")
LOOPSNR:
err = etcd.Snr_jobLock.TryLock("msgid")
if err != nil {
fmt.Println("msgid groutine lock fail!")
time.Sleep(5 * time.Millisecond)
goto LOOP
goto LOOPSNR
}
defer etcd.G_jobLock.UnLock()
var (
snrtab db.Snrtab
lastupdtime time.Time
@ -248,6 +249,7 @@ LOOP:
snrtab = db.Snrtab{}
if _, err = db.G_DbEngine.Id(core.PK{db.G_FINR, snr}).Get(&snrtab); err != nil {
etcd.Snr_jobLock.UnLock()
return
}
snrtab.Clipped()
@ -256,6 +258,7 @@ LOOP:
nextsnr = snrtab.Nextnr
} else {
if lastupdtime, err = common.DateParse(snrtab.Lastnrcreate, "YmdHis"); err != nil {
etcd.Snr_jobLock.UnLock()
return
}
// 获取下一序列值
@ -298,7 +301,8 @@ LOOP:
nextsnr = snrtab.Nextnr
}
}
glog.InfoExtln("流水号","nextsnr",nextsnr)
glog.InfoExtln("流水号","snrtab.Length",snrtab.Length)
// 组合返回值
layout = []byte(snrtab.Identifierlayout)
for i = 0; i < len(layout); i++ {
@ -334,8 +338,9 @@ LOOP:
sql = "update `snrtab` set nextnr = ?, lastnrcreate = ?, lastmodif = ? where finr = ? AND snrid = ?"
if _, err = db.G_DbEngine.Exec(sql, snrtab.Nextnr, snrtab.Lastnrcreate, snrtab.Lastmodif, db.G_FINR, snrtab.Snrid); err != nil {
etcd.Snr_jobLock.UnLock()
return
}
etcd.Snr_jobLock.UnLock()
return
}

Loading…
Cancel
Save