Browse Source

2021/4/1 scheduler加上了分布式锁

master
娄文智 3 years ago
parent
commit
d17b1f29f4
8 changed files with 112 additions and 34 deletions
  1. +6
    -1
      cmd/scheduler/main.go
  2. +2
    -2
      db/Pln_custorder.go
  3. +37
    -0
      db/Pln_seqdata_landing.go
  4. +19
    -0
      db/Pln_workorder_intstatus.go
  5. +1
    -23
      seq/Scheduler.go
  6. +16
    -2
      seq/Seqtask.go
  7. +30
    -5
      service/Workline.go
  8. +1
    -1
      tod/Todtask.go

+ 6
- 1
cmd/scheduler/main.go View File

@ -7,6 +7,7 @@ import (
"leit.com/leit_seat_aps/common"
"leit.com/leit_seat_aps/config"
"leit.com/leit_seat_aps/db"
"leit.com/leit_seat_aps/etcd"
"leit.com/leit_seat_aps/glog"
"leit.com/leit_seat_aps/seq"
"log"
@ -102,7 +103,11 @@ func imain() {
glog.Infoln("初始化数据库", "InitMssqlDb return fail")
return
}
//创建etcd连接
if err = etcd.InitJobLock(); err != nil {
glog.Infoln("初始化EtCD连接", "InitJobLock return fail")
return
}
// 生产派工
go seq.Scheduler()


+ 2
- 2
db/Pln_custorder.go View File

@ -129,12 +129,12 @@ func (t *Pln_custorder) Update(fields string) (err error) {
func (t *Pln_custorder) UpdateToFields(session *xorm.Session, fields string) (err error) {
//判断对应的工单,是否全是26状态,如果存在,则不更新
var plc Pln_workorder
ok, err := session.Table("pln_workorder").Where("finr = ? and custordernr = ? and status < ?", G_FINR, t.Custordernr, 26).Get(&plc)
ok, err := session.Table("pln_workorder").Where("finr = ? and custordernr = ? and status < ?", G_FINR, t.Custordernr, common.WO_STATUS_RELEASED).Get(&plc)
if err != nil {
return err
}
if !ok {
if _, err = session.Table("pln_custorder").Where("finr = ? and custordernr = ? and status = ?", G_FINR, t.Custordernr, 20).Cols(fields).Update(t); err != nil {
if _, err = session.Table("pln_custorder").Where("finr = ? and custordernr = ? and status = ?", G_FINR, t.Custordernr, common.WO_STATUS_PLANNED).Cols(fields).Update(t); err != nil {
return
}
}


+ 37
- 0
db/Pln_seqdata_landing.go View File

@ -162,3 +162,40 @@ func (t *Pln_seqdata_landing) GetUnparsedLandingData() (seqlandlsttab []Pln_seqd
}
return
}
//获取指定零件族已解析的最大的OEMSEQ号,不含0和9999999999
func (t *Pln_seqdata_landing) GetPFParsedMaxOemseq(projid, pfid string) (oemseq int, err error) {
var seqlandlsttab = []Pln_seqdata_landing{}
e := G_DbEngine
if err = e.Where("finr = ? and projnr = ? and partfamilyid = ? and parsed = ? and oemseq > ? and oemseq < ?",
G_FINR, projid, pfid, 0, 0, 9999999999).Desc("oemseq").Limit(1, 0).Find(&seqlandlsttab); err != nil {
return
}
if len(seqlandlsttab) > 0 {
oemseq = common.ValueToInt(seqlandlsttab[0].Oemseq, 0)
}else{
oemseq = 0
}
return
}
// 获取它的版本1是否已解析
func (t *Pln_seqdata_landing) IsFirstVersionParsed() (is bool, err error) {
var seqlandlsttab = []Pln_seqdata_landing{}
e := G_DbEngine
if err = e.Where("finr = ? and oemordernr = ? and partfamilyid = ? and vin = ? and version = ?",
G_FINR, t.Oemordernr, t.Partfamilyid, t.Vin, 1).Asc("version").Desc("oemseq").Limit(1, 0).Find(&seqlandlsttab); err != nil {
return
}
if len(seqlandlsttab) > 0 {
if seqlandlsttab[0].Parsed > 0 {
is = true
}else{
is = false
}
return
}else{
is = false
}
return
}

+ 19
- 0
db/Pln_workorder_intstatus.go View File

@ -3,6 +3,7 @@ package db
import (
"fmt"
"github.com/go-xorm/xorm"
"leit.com/leit_seat_aps/common"
"xorm.io/core"
)
@ -61,6 +62,24 @@ func (t *Pln_workorder_intstatus) Add() error {
return nil
}
//更新指定字段
func (t *Pln_workorder_intstatus) Insert(session *xorm.Session) (err error) {
countrole := new(Pln_workorder_intstatus)
affw, err := session.Table("pln_workorder_intstatus").ID(core.PK{G_FINR, t.Workordernr}).Count(countrole)
if err != nil {
return err
}
if affw > 0 {
return nil
}
_, err = session.Table("pln_workorder_intstatus").Insert(t)
if err != nil {
fmt.Printf("err is :%v",err)
return err
}
return
}
func (t *Pln_workorder_intstatus) Del() bool {
e := G_DbEngine
_, err := e.ID(core.PK{G_FINR, t.Workordernr}).Delete(&Pln_workorder_intstatus{})


+ 1
- 23
seq/Scheduler.go View File

@ -7,7 +7,6 @@ import (
"leit.com/leit_seat_aps/glog"
"leit.com/leit_seat_aps/service"
"strings"
"sync"
"time"
)
@ -62,7 +61,7 @@ func RunScheduler() (err error) {
return
}
var lock sync.RWMutex
// 产线的排序派工服务
// 1. 读取调度到产线的生产订单,状态 = 20
// 2. 按照 swet 和 Oemseq 排序
@ -72,22 +71,12 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) {
err error
workline db.Workline
)
// 如果产线是混线排序,解析混线的参数
//if bl_wl.MixSort {
// if err = bl_wl.ParseMixRule(); err != nil {
// glog.InfoExtln("排序调度", "ParseMixRule err: ", err)
// // 报错并返回
// return
// }
//}
// 循环加载产线的待派工的任务,并依次处理
for {
lock.Lock()
//查询产线信息
if workline, err = bl_wl.SelectWorkline(); err != nil {
glog.InfoExtln("排序调度", "LoadPlannedTask err: ", err)
// 报错并返回
lock.Unlock()
return
}
bl_wl.SortMode = workline.Taskqueuesortway
@ -111,11 +100,9 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) {
if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok {
glog.InfoExtln("排序调度", "GetTaskToReleaseByQty2 err: ", err)
time.Sleep(5 * time.Second)
lock.Unlock()
continue
}
// 报错并返回
lock.Unlock()
return
}
// 派工订单
@ -124,18 +111,15 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) {
errstr := err.Error()
if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok {
time.Sleep(1 * time.Second)
lock.Unlock()
continue
}
// 报错并返回
lock.Unlock()
return
}
case common.LINE_REL_BY_OEMSEQ:
if bl_wolst,err = bl_wl.GetTaskToReleaseByTime(); err != nil {
glog.InfoExtln("排序调度", "GetTaskToReleaseByTime err: ", err)
// 报错并返回
lock.Unlock()
return
}
// 派工订单
@ -144,18 +128,15 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) {
errstr := err.Error()
if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok {
time.Sleep(1 * time.Second)
lock.Unlock()
continue
}
// 报错并返回
lock.Unlock()
return
}
case common.LINE_REL_BY_DURATION:
if err = bl_wl.GetTaskToReleaseByDuration(&bl_wolst); err != nil {
glog.InfoExtln("排序调度", "GetTaskToReleaseByDuration err: ", err)
// 报错并返回
lock.Unlock()
return
}
// 派工订单
@ -164,16 +145,13 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) {
errstr := err.Error()
if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok {
time.Sleep(1 * time.Second)
lock.Unlock()
continue
}
// 报错并返回
lock.Unlock()
return
}
}
time.Sleep(2 * time.Second)
lock.Unlock()
}
}

+ 16
- 2
seq/Seqtask.go View File

@ -192,8 +192,8 @@ func ParseSeqOrder() {
continue
}
// 校验客户订单的OEMSEQ是否连续(基于零件族获取最大的已计划的客户订单的SEQ)
if bl_co.Custordertab.Orderinfo != "TCAR" && seqlandtablst[j].Oemseq < 9999999999 && seqProj.Projecttab.Seq_verify_sequence > 0 && seqProj.IsPartFamilyNeedToContinue(bl_co.Custordertab.Partfamilyid) {
// 校验客户订单的OEMSEQ是否连续(基于零件族获取最大的已计划的客户订单的SEQ),版本1校验,其它版本不校验
if seqlandtablst[j].Oemseq > 0 && seqlandtablst[j].Oemseq < 9999999999 && seqlandtablst[j].Version == 1 && seqProj.Projecttab.Seq_verify_sequence > 0 && seqProj.IsPartFamilyNeedToContinue(bl_co.Custordertab.Partfamilyid) {
// 读取项目各零件族最大的OEMSEQ号
if err = seqProj.GetPartFamilyMaxOemseq(); err != nil {
glog.InfoExtln("解析并读取SEQ", "加载项目零件族最大OEMSEQ号失败: ", err)
@ -212,6 +212,20 @@ func ParseSeqOrder() {
continue
}
}
// 对于版本号大于1的则需要校验它的版本1是否已解析,如果没有解析,则也不解析
if seqlandtablst[j].Oemseq > 0 && seqlandtablst[j].Oemseq < 9999999999 && seqlandtablst[j].Version > 1 {
if ok, err = seqlandtablst[j].IsFirstVersionParsed(); err != nil {
glog.InfoExtln("解析并读取SEQ", "获取客户订单SEQ的解析版本1数据失败!", seqlandtablst[j].Oemordernr, err)
etcd.G_jobLock.UnLock()
return
}
if !ok {
glog.InfoExtln("解析并读取SEQ", fmt.Sprintf("当前客户订单%s的SEQ解析版本1尚未解析!", seqlandtablst[j].Oemordernr))
etcd.G_jobLock.UnLock()
continue
}
}
//开启事务
session := db.G_DbEngine.NewSession()
// 基于SEQ保存客户订单版本信息(用于追溯)


+ 30
- 5
service/Workline.go View File

@ -6,6 +6,7 @@ import (
"fmt"
"leit.com/leit_seat_aps/common"
"leit.com/leit_seat_aps/db"
"leit.com/leit_seat_aps/etcd"
"leit.com/leit_seat_aps/glog"
"strconv"
"strings"
@ -1079,7 +1080,7 @@ func (bl_wl *BL_Workline) ReleaseTasks(bl_wolst []BL_WorkOrder, se *SchedEngine)
looptime int
)
looptime = 0
LOOP:
LOOPTIME:
// 获取产线最大的排序号
if err = db.G_DbEngine.Where("finr = ? and worklineid = ?", db.G_FINR,
bl_wl.WorklineId).Desc("schedseq").Limit(1, 0).Find(&wotablst); err != nil {
@ -1093,13 +1094,24 @@ LOOP:
}else {
glog.InfoExtln("排序调度", "LOOP起始排序号 = ", bl_wl.WorklineId, startseq)
time.Sleep(100 * time.Millisecond)
goto LOOP
goto LOOPTIME
}
// 遍历下达订单,依次更新内部排序号和状态
glog.InfoExtln("排序调度", "起始排序号 = ", bl_wl.WorklineId, startseq)
fmt.Println(bl_wl.WorklineId, "起始排序号 = ", startseq)
for i = 0; i < len(bl_wolst); i++ {
/* 获取锁 */
LOOP:
err = etcd.G_jobLock.TryLock("lock")
if err != nil {
fmt.Println("seq groutine lock fail!")
time.Sleep(10 * time.Millisecond)
goto LOOP
}
//开启事务
session := db.G_DbEngine.NewSession()
wotab = db.Pln_workorder{}
wotab = bl_wolst[i].Workordertab
wotab.Schedseq = startseq
@ -1107,8 +1119,11 @@ LOOP:
wotab.Lastmodif = common.Date(time.Now().Unix(), "YYYYMMDDHHmmss")
// 处理生产订单的拣料零件
if err = wotab.Update("schedseq,status,pickstatus,lastmodif"); err != nil {
if err = wotab.UpdateFields(session,"schedseq,status,pickstatus,lastmodif"); err != nil {
glog.InfoExtln("排序调度", "err is : ", err)
session.Rollback()
session.Close()
etcd.G_jobLock.UnLock()
return
}
//下达的订单放到缓存数据表里,为接口数据做准备
@ -1121,21 +1136,31 @@ LOOP:
intstatus.Lastmodif = common.Date(time.Now().Unix(), "YYYYMMDDHHmmss")
intstatus.Credatuz = common.Date(time.Now().Unix(), "YYYYMMDDHHmmss")
intstatus.Lastuser = "scheduler"
err = intstatus.Add()
err = intstatus.Insert(session)
if err != nil {
glog.InfoExtln("排序调度", "err is : ", err)
session.Rollback()
session.Close()
etcd.G_jobLock.UnLock()
return
}
startseq = startseq + 1
// 更新客户订单状态
cotab = db.Pln_custorder{Finr: db.G_FINR, Custordernr: wotab.Custordernr, Status: common.WO_STATUS_RELEASED}
err = cotab.Update("status")
err = cotab.UpdateToFields(session,"status")
if err != nil {
glog.InfoExtln("排序调度", "err is : ", err)
session.Rollback()
session.Close()
etcd.G_jobLock.UnLock()
return
}
session.Commit()
session.Close()
etcd.G_jobLock.UnLock()
}
return
}


+ 1
- 1
tod/Todtask.go View File

@ -360,7 +360,7 @@ func ParseTodCustorderNew() {
//添加邮件预警
mailsubject = "SEQ+1的客户订单已下达"
mailbody = "项目号: " + todProj.Projectid + "\r\n客户订单号: " + bl_co.Oemordernr + "\r\nErrorinfo : 已下达,SEQ+1更新出错!"
if err != nil {
if err = SendTodErrorEmail(projtablst[i], mailsubject, mailbody); err != nil {
glog.InfoExtln("发送邮件", "err is :", err)
}
}


Loading…
Cancel
Save