diff --git a/cmd/bbaasn/main.go b/cmd/bbaasn/main.go index 6555b83..c2f7341 100644 --- a/cmd/bbaasn/main.go +++ b/cmd/bbaasn/main.go @@ -7,6 +7,7 @@ import ( "leit.com/leit_seat_aps/common" "leit.com/leit_seat_aps/config" "leit.com/leit_seat_aps/db" + "leit.com/leit_seat_aps/etcd" "leit.com/leit_seat_aps/glog" "leit.com/leit_seat_aps/msg" "log" @@ -102,7 +103,11 @@ func imain(){ glog.Infoln("初始化数据库", "InitMssqlDb return fail") return } - + //创建snretcd连接 + if err = etcd.InitSnrJobLock(); err != nil { + glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail") + return + } // 为发运单生成独立的ASN,基于发运单消息 go msg.Asn() //生成发运单 diff --git a/cmd/calloff/main.go b/cmd/calloff/main.go index daa0fbf..ded7205 100644 --- a/cmd/calloff/main.go +++ b/cmd/calloff/main.go @@ -108,6 +108,12 @@ func imain() { glog.Infoln("初始化EtCD连接", "InitJobLock return fail") return } + + //创建snretcd连接 + if err = etcd.InitSnrJobLock(); err != nil { + glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail") + return + } // Calloff文件解析和保存,读取Calloff信息并生成发运包装 go calloff.ParseCalloffNew() diff --git a/cmd/msg/main.go b/cmd/msg/main.go index 1edae5d..e506320 100644 --- a/cmd/msg/main.go +++ b/cmd/msg/main.go @@ -7,6 +7,7 @@ import ( "leit.com/leit_seat_aps/common" "leit.com/leit_seat_aps/config" "leit.com/leit_seat_aps/db" + "leit.com/leit_seat_aps/etcd" "leit.com/leit_seat_aps/glog" "leit.com/leit_seat_aps/msg" "leit.com/leit_seat_aps/seq" @@ -104,6 +105,12 @@ func imain() { glog.Infoln("初始化数据库", "InitMssqlDb return fail") return } + + //创建snretcd连接 + if err = etcd.InitSnrJobLock(); err != nil { + glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail") + return + } // 运行二级件需求计算服务 go tier2.RunDemandCollectorTask() //msg消息推送 diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index ac040a3..9e284ed 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -108,6 +108,11 @@ func imain() { glog.Infoln("初始化EtCD连接", "InitJobLock return fail") return } + //创建snretcd连接 + if err = etcd.InitSnrJobLock(); err != nil { + glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail") + return + } // 生产派工 go seq.Scheduler() diff --git a/cmd/seq/main.go b/cmd/seq/main.go index 4cc9659..25b5971 100644 --- a/cmd/seq/main.go +++ b/cmd/seq/main.go @@ -108,6 +108,11 @@ func imain(){ glog.Infoln("初始化EtCD连接", "InitJobLock return fail") return } + //创建snretcd连接 + if err = etcd.InitSnrJobLock(); err != nil { + glog.Infoln("初始化EtCD连接", "InitSnrJobLock return fail") + return + } //测试SEQ文件解析和保存 go seq.ParseSeqNew() diff --git a/common/Utils.go b/common/Utils.go index 4dd8be1..6342544 100644 --- a/common/Utils.go +++ b/common/Utils.go @@ -22,7 +22,6 @@ import ( "unsafe" ) - type HWND uintptr const ( @@ -81,7 +80,12 @@ func ConvInt2FormatString(input, ilen int) (retstring string) { retstring = string(input) } else { igap = ilen - len(strconv.Itoa(input)) - retstring = strings.Repeat("0", igap) + strconv.Itoa(input) + if igap > 0 { + retstring = strings.Repeat("0", igap) + strconv.Itoa(input) + } else { + retstring = strconv.Itoa(input) + } + } return } @@ -472,6 +476,7 @@ func CreateCaptcha() string { func CreateCaptchaFive() string { return fmt.Sprintf("%05v", rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(100000)) } + /*! username 发送者邮件 password 授权码 diff --git a/etcd/snretcd.go b/etcd/snretcd.go new file mode 100644 index 0000000..8dd5d20 --- /dev/null +++ b/etcd/snretcd.go @@ -0,0 +1,157 @@ +// Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved. +package etcd + +import ( + "context" + "fmt" + clientv3 "go.etcd.io/etcd/client/v3" + "time" +) + +/****************************************************************************** + * + * @Function Name : + *----------------------------------------------------------------------------- + * + * @Description : + * + * @Function Parameters: + * + * @Return Value : + * + * @Author : Lou Wenzhi + * + * @Date : 2021/3/25 9:02 + * + ******************************************************************************/ + +// 分布式锁(TXN事务) +type SnrJobLock struct { + // etcd客户端 + Client *clientv3.Client + Kv clientv3.KV + Lease clientv3.Lease + JobName string // 任务名 + CancelFunc context.CancelFunc // 用于终止自动续租 + LeaseId clientv3.LeaseID // 租约ID + IsLocked bool // 是否上锁成功 +} + +var( + Snr_jobLock *SnrJobLock +) + +//初始化任务锁 +func InitSnrJobLock() (err error) { + var ( + config clientv3.Config + client *clientv3.Client + kv clientv3.KV + lease clientv3.Lease + + ) + //创建config + config = clientv3.Config{ + Endpoints:[]string{"127.0.0.1:2379"}, + DialTimeout: 5000 * time.Millisecond, + } + //创建client + if client, err = clientv3.New(config); err != nil { + fmt.Println(err) + } + //创建kv, lease + kv = clientv3.NewKV(client) + lease = clientv3.NewLease(client) + //赋值单例 + Snr_jobLock = &SnrJobLock{ + Client:client, + Kv:kv, + Lease:lease, + } + return +} + + +//尝试上锁 +func (jobLock *SnrJobLock) TryLock(jobName string) (err error){ + + var( + ctx context.Context + cancelFunc context.CancelFunc + leaseGrantRes *clientv3.LeaseGrantResponse + leaseId clientv3.LeaseID + KeepResChan <- chan *clientv3.LeaseKeepAliveResponse + txn clientv3.Txn + lockKey string + txnRes *clientv3.TxnResponse + + ) + //创建上下文 + ctx, cancelFunc = context.WithCancel(context.TODO()) + //创建续租 + leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5) + //续租id + leaseId = leaseGrantRes.ID + //保持续租 + if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{ + fmt.Println(err) + goto FAIL + } + //每隔1秒续约续租 + go func() { + var keepRes *clientv3.LeaseKeepAliveResponse + for{ + select { + case keepRes = <- KeepResChan: + //如果续约失败 + if keepRes == nil{ + goto END + } + } + time.Sleep(1*time.Second) + } + END: + }() + + //创建事务txn + txn = jobLock.Kv.Txn(context.TODO()) + //锁路径 + lockKey = jobName + //事务枪锁 + txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)). + Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))). + Else(clientv3.OpGet(lockKey)) + + //提交事务 + if txnRes, err = txn.Commit(); err != nil{ + fmt.Println("提交创建锁失败", err) + goto FAIL + + } + //如果抢锁失败 + if !txnRes.Succeeded{ + err = fmt.Errorf("锁被占用") + goto FAIL + } + + jobLock.CancelFunc = cancelFunc + jobLock.LeaseId = leaseId + jobLock.IsLocked = true + + return + +FAIL: + //释放上下文,取消续约 + cancelFunc() + jobLock.Lease.Revoke(ctx, leaseId) //释放租约 + return +} + + +//释放锁 +func (jobLock *SnrJobLock)UnLock() { + if jobLock.IsLocked{ + jobLock.CancelFunc() //取消自动续租协程 + jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约 + } +} \ No newline at end of file diff --git a/service/Calloffproject.go b/service/Calloffproject.go index 99a45c0..da111f6 100644 --- a/service/Calloffproject.go +++ b/service/Calloffproject.go @@ -534,6 +534,7 @@ func (cop *CalloffProject) CreatePackOrderByCheckSeqWO(session *xorm.Session, cf // 4. 获取包装箱流水号( 默认取多个模板最后一个模板为参照) if bl_pak.Paktmpltab.Boxsn != "" { + glog.InfoExtln("流水号","bl_pak.Paktmpltab.Boxsn",bl_pak.Paktmpltab.Boxsn) if boxid, err = SN_GetNextSnrBySession(bl_pak.Paktmpltab.Boxsn, session); err != nil { return } diff --git a/service/Snrtab.go b/service/Snrtab.go index abf5547..463e613 100644 --- a/service/Snrtab.go +++ b/service/Snrtab.go @@ -6,6 +6,7 @@ import ( "leit.com/leit_seat_aps/common" "leit.com/leit_seat_aps/db" "leit.com/leit_seat_aps/etcd" + "leit.com/leit_seat_aps/glog" "strconv" "strings" "time" @@ -182,7 +183,8 @@ func SN_GetNextSnrBySession(snr string, session *xorm.Session) (retsnr string, e nextsnr = snrtab.Nextnr } } - + glog.InfoExtln("流水号","nextsnr",nextsnr) + glog.InfoExtln("流水号","snrtab.Length",snrtab.Length) // 组合返回值 layout = []byte(snrtab.Identifierlayout) for i = 0; i < len(layout); i++ { @@ -228,14 +230,13 @@ func SN_GetNextSnrBySession(snr string, session *xorm.Session) (retsnr string, e // 获取指定序列的下一个流水号(异步获取) func SN_SyncGetNextSnr(snr string) (retsnr string, err error) { /* 获取锁 */ -LOOP: - err = etcd.G_jobLock.TryLock("msgid") +LOOPSNR: + err = etcd.Snr_jobLock.TryLock("msgid") if err != nil { fmt.Println("msgid groutine lock fail!") time.Sleep(5 * time.Millisecond) - goto LOOP + goto LOOPSNR } - defer etcd.G_jobLock.UnLock() var ( snrtab db.Snrtab lastupdtime time.Time @@ -248,6 +249,7 @@ LOOP: snrtab = db.Snrtab{} if _, err = db.G_DbEngine.Id(core.PK{db.G_FINR, snr}).Get(&snrtab); err != nil { + etcd.Snr_jobLock.UnLock() return } snrtab.Clipped() @@ -256,6 +258,7 @@ LOOP: nextsnr = snrtab.Nextnr } else { if lastupdtime, err = common.DateParse(snrtab.Lastnrcreate, "YmdHis"); err != nil { + etcd.Snr_jobLock.UnLock() return } // 获取下一序列值 @@ -298,7 +301,8 @@ LOOP: nextsnr = snrtab.Nextnr } } - + glog.InfoExtln("流水号","nextsnr",nextsnr) + glog.InfoExtln("流水号","snrtab.Length",snrtab.Length) // 组合返回值 layout = []byte(snrtab.Identifierlayout) for i = 0; i < len(layout); i++ { @@ -334,8 +338,9 @@ LOOP: sql = "update `snrtab` set nextnr = ?, lastnrcreate = ?, lastmodif = ? where finr = ? AND snrid = ?" if _, err = db.G_DbEngine.Exec(sql, snrtab.Nextnr, snrtab.Lastnrcreate, snrtab.Lastmodif, db.G_FINR, snrtab.Snrid); err != nil { + etcd.Snr_jobLock.UnLock() return } - + etcd.Snr_jobLock.UnLock() return } \ No newline at end of file