From d17b1f29f4499ba3f32f2c4e9607c7da9461c933 Mon Sep 17 00:00:00 2001 From: louwenzhi Date: Wed, 28 Apr 2021 12:09:36 +0800 Subject: [PATCH] =?UTF-8?q?2021/4/1=20=20=20=20scheduler=E5=8A=A0=E4=B8=8A?= =?UTF-8?q?=E4=BA=86=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/scheduler/main.go | 7 ++++++- db/Pln_custorder.go | 4 ++-- db/Pln_seqdata_landing.go | 37 +++++++++++++++++++++++++++++++++++ db/Pln_workorder_intstatus.go | 19 ++++++++++++++++++ seq/Scheduler.go | 24 +---------------------- seq/Seqtask.go | 18 +++++++++++++++-- service/Workline.go | 35 ++++++++++++++++++++++++++++----- tod/Todtask.go | 2 +- 8 files changed, 112 insertions(+), 34 deletions(-) diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index 462695b..ac040a3 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -7,6 +7,7 @@ import ( "leit.com/leit_seat_aps/common" "leit.com/leit_seat_aps/config" "leit.com/leit_seat_aps/db" + "leit.com/leit_seat_aps/etcd" "leit.com/leit_seat_aps/glog" "leit.com/leit_seat_aps/seq" "log" @@ -102,7 +103,11 @@ func imain() { glog.Infoln("初始化数据库", "InitMssqlDb return fail") return } - + //创建etcd连接 + if err = etcd.InitJobLock(); err != nil { + glog.Infoln("初始化EtCD连接", "InitJobLock return fail") + return + } // 生产派工 go seq.Scheduler() diff --git a/db/Pln_custorder.go b/db/Pln_custorder.go index 4b7ef6e..4d277a9 100644 --- a/db/Pln_custorder.go +++ b/db/Pln_custorder.go @@ -129,12 +129,12 @@ func (t *Pln_custorder) Update(fields string) (err error) { func (t *Pln_custorder) UpdateToFields(session *xorm.Session, fields string) (err error) { //判断对应的工单,是否全是26状态,如果存在,则不更新 var plc Pln_workorder - ok, err := session.Table("pln_workorder").Where("finr = ? and custordernr = ? and status < ?", G_FINR, t.Custordernr, 26).Get(&plc) + ok, err := session.Table("pln_workorder").Where("finr = ? and custordernr = ? and status < ?", G_FINR, t.Custordernr, common.WO_STATUS_RELEASED).Get(&plc) if err != nil { return err } if !ok { - if _, err = session.Table("pln_custorder").Where("finr = ? and custordernr = ? and status = ?", G_FINR, t.Custordernr, 20).Cols(fields).Update(t); err != nil { + if _, err = session.Table("pln_custorder").Where("finr = ? and custordernr = ? and status = ?", G_FINR, t.Custordernr, common.WO_STATUS_PLANNED).Cols(fields).Update(t); err != nil { return } } diff --git a/db/Pln_seqdata_landing.go b/db/Pln_seqdata_landing.go index 651ce00..7ca60b0 100644 --- a/db/Pln_seqdata_landing.go +++ b/db/Pln_seqdata_landing.go @@ -162,3 +162,40 @@ func (t *Pln_seqdata_landing) GetUnparsedLandingData() (seqlandlsttab []Pln_seqd } return } + +//获取指定零件族已解析的最大的OEMSEQ号,不含0和9999999999 +func (t *Pln_seqdata_landing) GetPFParsedMaxOemseq(projid, pfid string) (oemseq int, err error) { + var seqlandlsttab = []Pln_seqdata_landing{} + e := G_DbEngine + if err = e.Where("finr = ? and projnr = ? and partfamilyid = ? and parsed = ? and oemseq > ? and oemseq < ?", + G_FINR, projid, pfid, 0, 0, 9999999999).Desc("oemseq").Limit(1, 0).Find(&seqlandlsttab); err != nil { + return + } + if len(seqlandlsttab) > 0 { + oemseq = common.ValueToInt(seqlandlsttab[0].Oemseq, 0) + }else{ + oemseq = 0 + } + return +} + +// 获取它的版本1是否已解析 +func (t *Pln_seqdata_landing) IsFirstVersionParsed() (is bool, err error) { + var seqlandlsttab = []Pln_seqdata_landing{} + e := G_DbEngine + if err = e.Where("finr = ? and oemordernr = ? and partfamilyid = ? and vin = ? and version = ?", + G_FINR, t.Oemordernr, t.Partfamilyid, t.Vin, 1).Asc("version").Desc("oemseq").Limit(1, 0).Find(&seqlandlsttab); err != nil { + return + } + if len(seqlandlsttab) > 0 { + if seqlandlsttab[0].Parsed > 0 { + is = true + }else{ + is = false + } + return + }else{ + is = false + } + return +} diff --git a/db/Pln_workorder_intstatus.go b/db/Pln_workorder_intstatus.go index 58d835e..86e97df 100644 --- a/db/Pln_workorder_intstatus.go +++ b/db/Pln_workorder_intstatus.go @@ -3,6 +3,7 @@ package db import ( "fmt" + "github.com/go-xorm/xorm" "leit.com/leit_seat_aps/common" "xorm.io/core" ) @@ -61,6 +62,24 @@ func (t *Pln_workorder_intstatus) Add() error { return nil } +//更新指定字段 +func (t *Pln_workorder_intstatus) Insert(session *xorm.Session) (err error) { + countrole := new(Pln_workorder_intstatus) + affw, err := session.Table("pln_workorder_intstatus").ID(core.PK{G_FINR, t.Workordernr}).Count(countrole) + if err != nil { + return err + } + if affw > 0 { + return nil + } + _, err = session.Table("pln_workorder_intstatus").Insert(t) + if err != nil { + fmt.Printf("err is :%v",err) + return err + } + return +} + func (t *Pln_workorder_intstatus) Del() bool { e := G_DbEngine _, err := e.ID(core.PK{G_FINR, t.Workordernr}).Delete(&Pln_workorder_intstatus{}) diff --git a/seq/Scheduler.go b/seq/Scheduler.go index c25ef2d..58e9067 100644 --- a/seq/Scheduler.go +++ b/seq/Scheduler.go @@ -7,7 +7,6 @@ import ( "leit.com/leit_seat_aps/glog" "leit.com/leit_seat_aps/service" "strings" - "sync" "time" ) @@ -62,7 +61,7 @@ func RunScheduler() (err error) { return } -var lock sync.RWMutex + // 产线的排序派工服务 // 1. 读取调度到产线的生产订单,状态 = 20 // 2. 按照 swet 和 Oemseq 排序 @@ -72,22 +71,12 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) { err error workline db.Workline ) - // 如果产线是混线排序,解析混线的参数 - //if bl_wl.MixSort { - // if err = bl_wl.ParseMixRule(); err != nil { - // glog.InfoExtln("排序调度", "ParseMixRule err: ", err) - // // 报错并返回 - // return - // } - //} // 循环加载产线的待派工的任务,并依次处理 for { - lock.Lock() //查询产线信息 if workline, err = bl_wl.SelectWorkline(); err != nil { glog.InfoExtln("排序调度", "LoadPlannedTask err: ", err) // 报错并返回 - lock.Unlock() return } bl_wl.SortMode = workline.Taskqueuesortway @@ -111,11 +100,9 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) { if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok { glog.InfoExtln("排序调度", "GetTaskToReleaseByQty2 err: ", err) time.Sleep(5 * time.Second) - lock.Unlock() continue } // 报错并返回 - lock.Unlock() return } // 派工订单 @@ -124,18 +111,15 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) { errstr := err.Error() if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok { time.Sleep(1 * time.Second) - lock.Unlock() continue } // 报错并返回 - lock.Unlock() return } case common.LINE_REL_BY_OEMSEQ: if bl_wolst,err = bl_wl.GetTaskToReleaseByTime(); err != nil { glog.InfoExtln("排序调度", "GetTaskToReleaseByTime err: ", err) // 报错并返回 - lock.Unlock() return } // 派工订单 @@ -144,18 +128,15 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) { errstr := err.Error() if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok { time.Sleep(1 * time.Second) - lock.Unlock() continue } // 报错并返回 - lock.Unlock() return } case common.LINE_REL_BY_DURATION: if err = bl_wl.GetTaskToReleaseByDuration(&bl_wolst); err != nil { glog.InfoExtln("排序调度", "GetTaskToReleaseByDuration err: ", err) // 报错并返回 - lock.Unlock() return } // 派工订单 @@ -164,16 +145,13 @@ func ScheduleTaskToRelease(bl_wl service.BL_Workline, se *service.SchedEngine) { errstr := err.Error() if ok := strings.Contains(errstr, "mssql: Transaction (Process ID"); ok { time.Sleep(1 * time.Second) - lock.Unlock() continue } // 报错并返回 - lock.Unlock() return } } time.Sleep(2 * time.Second) - lock.Unlock() } } diff --git a/seq/Seqtask.go b/seq/Seqtask.go index 2d728e2..08f0a58 100644 --- a/seq/Seqtask.go +++ b/seq/Seqtask.go @@ -192,8 +192,8 @@ func ParseSeqOrder() { continue } - // 校验客户订单的OEMSEQ是否连续(基于零件族获取最大的已计划的客户订单的SEQ) - if bl_co.Custordertab.Orderinfo != "TCAR" && seqlandtablst[j].Oemseq < 9999999999 && seqProj.Projecttab.Seq_verify_sequence > 0 && seqProj.IsPartFamilyNeedToContinue(bl_co.Custordertab.Partfamilyid) { + // 校验客户订单的OEMSEQ是否连续(基于零件族获取最大的已计划的客户订单的SEQ),版本1校验,其它版本不校验 + if seqlandtablst[j].Oemseq > 0 && seqlandtablst[j].Oemseq < 9999999999 && seqlandtablst[j].Version == 1 && seqProj.Projecttab.Seq_verify_sequence > 0 && seqProj.IsPartFamilyNeedToContinue(bl_co.Custordertab.Partfamilyid) { // 读取项目各零件族最大的OEMSEQ号 if err = seqProj.GetPartFamilyMaxOemseq(); err != nil { glog.InfoExtln("解析并读取SEQ", "加载项目零件族最大OEMSEQ号失败: ", err) @@ -212,6 +212,20 @@ func ParseSeqOrder() { continue } } + // 对于版本号大于1的则需要校验它的版本1是否已解析,如果没有解析,则也不解析 + if seqlandtablst[j].Oemseq > 0 && seqlandtablst[j].Oemseq < 9999999999 && seqlandtablst[j].Version > 1 { + if ok, err = seqlandtablst[j].IsFirstVersionParsed(); err != nil { + glog.InfoExtln("解析并读取SEQ", "获取客户订单SEQ的解析版本1数据失败!", seqlandtablst[j].Oemordernr, err) + etcd.G_jobLock.UnLock() + return + } + if !ok { + glog.InfoExtln("解析并读取SEQ", fmt.Sprintf("当前客户订单%s的SEQ解析版本1尚未解析!", seqlandtablst[j].Oemordernr)) + etcd.G_jobLock.UnLock() + continue + } + } + //开启事务 session := db.G_DbEngine.NewSession() // 基于SEQ保存客户订单版本信息(用于追溯) diff --git a/service/Workline.go b/service/Workline.go index d3e1410..c296be7 100644 --- a/service/Workline.go +++ b/service/Workline.go @@ -6,6 +6,7 @@ import ( "fmt" "leit.com/leit_seat_aps/common" "leit.com/leit_seat_aps/db" + "leit.com/leit_seat_aps/etcd" "leit.com/leit_seat_aps/glog" "strconv" "strings" @@ -1079,7 +1080,7 @@ func (bl_wl *BL_Workline) ReleaseTasks(bl_wolst []BL_WorkOrder, se *SchedEngine) looptime int ) looptime = 0 -LOOP: +LOOPTIME: // 获取产线最大的排序号 if err = db.G_DbEngine.Where("finr = ? and worklineid = ?", db.G_FINR, bl_wl.WorklineId).Desc("schedseq").Limit(1, 0).Find(&wotablst); err != nil { @@ -1093,13 +1094,24 @@ LOOP: }else { glog.InfoExtln("排序调度", "LOOP起始排序号 = ", bl_wl.WorklineId, startseq) time.Sleep(100 * time.Millisecond) - goto LOOP + goto LOOPTIME } // 遍历下达订单,依次更新内部排序号和状态 glog.InfoExtln("排序调度", "起始排序号 = ", bl_wl.WorklineId, startseq) fmt.Println(bl_wl.WorklineId, "起始排序号 = ", startseq) for i = 0; i < len(bl_wolst); i++ { + /* 获取锁 */ + LOOP: + err = etcd.G_jobLock.TryLock("lock") + if err != nil { + fmt.Println("seq groutine lock fail!") + time.Sleep(10 * time.Millisecond) + goto LOOP + } + //开启事务 + session := db.G_DbEngine.NewSession() + wotab = db.Pln_workorder{} wotab = bl_wolst[i].Workordertab wotab.Schedseq = startseq @@ -1107,8 +1119,11 @@ LOOP: wotab.Lastmodif = common.Date(time.Now().Unix(), "YYYYMMDDHHmmss") // 处理生产订单的拣料零件 - if err = wotab.Update("schedseq,status,pickstatus,lastmodif"); err != nil { + if err = wotab.UpdateFields(session,"schedseq,status,pickstatus,lastmodif"); err != nil { glog.InfoExtln("排序调度", "err is : ", err) + session.Rollback() + session.Close() + etcd.G_jobLock.UnLock() return } //下达的订单放到缓存数据表里,为接口数据做准备 @@ -1121,21 +1136,31 @@ LOOP: intstatus.Lastmodif = common.Date(time.Now().Unix(), "YYYYMMDDHHmmss") intstatus.Credatuz = common.Date(time.Now().Unix(), "YYYYMMDDHHmmss") intstatus.Lastuser = "scheduler" - err = intstatus.Add() + err = intstatus.Insert(session) if err != nil { glog.InfoExtln("排序调度", "err is : ", err) + session.Rollback() + session.Close() + etcd.G_jobLock.UnLock() return } startseq = startseq + 1 // 更新客户订单状态 cotab = db.Pln_custorder{Finr: db.G_FINR, Custordernr: wotab.Custordernr, Status: common.WO_STATUS_RELEASED} - err = cotab.Update("status") + err = cotab.UpdateToFields(session,"status") if err != nil { glog.InfoExtln("排序调度", "err is : ", err) + session.Rollback() + session.Close() + etcd.G_jobLock.UnLock() return } + session.Commit() + session.Close() + etcd.G_jobLock.UnLock() } + return } diff --git a/tod/Todtask.go b/tod/Todtask.go index 683946a..413a40b 100644 --- a/tod/Todtask.go +++ b/tod/Todtask.go @@ -360,7 +360,7 @@ func ParseTodCustorderNew() { //添加邮件预警 mailsubject = "SEQ+1的客户订单已下达" mailbody = "项目号: " + todProj.Projectid + "\r\n客户订单号: " + bl_co.Oemordernr + "\r\nErrorinfo : 已下达,SEQ+1更新出错!" - if err != nil { + if err = SendTodErrorEmail(projtablst[i], mailsubject, mailbody); err != nil { glog.InfoExtln("发送邮件", "err is :", err) } }