From e0b9c808b9a289e5695eecc0a917cb559ac5513c Mon Sep 17 00:00:00 2001 From: zhangxin Date: Mon, 12 Jul 2021 15:53:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8F=90=E4=BA=A4=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E5=92=8C=E5=85=B3=E9=97=ADsession=E7=9A=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 15 +++++++++------ services/etl/implments/DB.service.impl.go | 1 + services/etl/implments/TaskHead.service.impl.go | 13 ++++++++++--- web/controllers/etl/etl.go | 2 +- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/main.go b/main.go index 2da503d..057e4b8 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,9 @@ import ( "LAPP_ETL/infra/driver" "LAPP_ETL/infra/logger" meta "LAPP_ETL/meta/etl" + "LAPP_ETL/scheduler" "LAPP_ETL/utils" + "LAPP_ETL/web/middleware" "LAPP_ETL/web/routes" "fmt" "github.com/kardianos/service" @@ -116,11 +118,11 @@ func appMain() { log.Fatal("init db engine failed, error:", err) } - //start scheduler - //err = scheduler.Start() - //if err != nil { - // log.Fatal("start scheduler failed error:", err) - //} + // start scheduler + err = scheduler.Start() + if err != nil { + log.Fatal("start scheduler failed error:", err) + } // register app service to etcd err = RegisterAppService() if err != nil { @@ -131,7 +133,7 @@ func appMain() { app := iris.New() routes.Hub(app) app.HandleDir("/public", "./web/public") - //middleware.PreSettring(app) + middleware.PreSettring(app) // init iris config irisConfig := iris.DefaultConfiguration() @@ -152,6 +154,7 @@ func appMain() { func InitDBEngine() error { session := db.AppEngine.NewSession() + defer session.Close() dao := dal.NewDBDAO(session) dbLi, err := dao.Select(nil, []grmi.Field{meta.DB_ID}) if err != nil { diff --git a/services/etl/implments/DB.service.impl.go b/services/etl/implments/DB.service.impl.go index 7bc6e68..6c4528e 100644 --- a/services/etl/implments/DB.service.impl.go +++ b/services/etl/implments/DB.service.impl.go @@ -195,6 +195,7 @@ func (impl *DBServiceImplement) UpdateOne(entity *model.DB) error { _ = session.Rollback() return err } + _ = session.Commit() return nil } diff --git a/services/etl/implments/TaskHead.service.impl.go b/services/etl/implments/TaskHead.service.impl.go index f305221..5179b86 100644 --- a/services/etl/implments/TaskHead.service.impl.go +++ b/services/etl/implments/TaskHead.service.impl.go @@ -267,7 +267,7 @@ func (impl *TaskHeadServiceImplement) UpdateOne(entity *model.TaskHead) error { return grmi.NewBusinessError("不存在指定记录!") } lstLi := entity.TaskLstLi - if len(lstLi) == 0 { + if len(lstLi) == 0 && entity.ExtractType != model.EXTRACT_TYPE_SHELL { _ = session.Rollback() return grmi.NewBusinessError("未获取到任务详细信息") } @@ -477,7 +477,7 @@ func (impl *TaskHeadServiceImplement) Update(entities *[]model.TaskHead) error { * @Reference LAPP_ETL/services/etl/TaskHeadService.SelectWithDetail * ******************************************************************************/ -func (impl *TaskHeadServiceImplement) SelectWithDetail() ([]model.TaskHead, error) { +func (impl *TaskHeadServiceImplement) SelectWithDetail(stage string) ([]model.TaskHead, error) { engine := db.AppEngine session := engine.NewSession() defer session.Close() @@ -486,7 +486,14 @@ func (impl *TaskHeadServiceImplement) SelectWithDetail() ([]model.TaskHead, erro triggerDao := dal.NewTriggerDAO(session) shellDao := dal.NewShellDAO(session) etcdDao := dal.NewETCDLockDAO(session) - taskLi, err := dao.Select([]grmi.Predicate{meta.TaskHead_Status.NewPredicate(grmi.Equal, model.TASK_STATUS_ON)}, []grmi.Field{meta.TaskHead_TaskId}) + var taskLi []model.TaskHead + var err error + if stage == model.STAGE_EXTRACT_DATA { + taskLi, err = dao.Select([]grmi.Predicate{meta.TaskHead_Status.NewPredicate(grmi.Equal, model.TASK_STATUS_ON)}, []grmi.Field{meta.TaskHead_TaskId}) + + } else { + taskLi, err = dao.Select([]grmi.Predicate{meta.TaskHead_Status.NewPredicate(grmi.Equal, model.TASK_STATUS_ON), meta.TaskHead_CtlParam3.NewPredicate(grmi.NotEqual, model.LOAD_NOT_NEED)}, []grmi.Field{meta.TaskHead_TaskId}) + } if err != nil { return nil, err } diff --git a/web/controllers/etl/etl.go b/web/controllers/etl/etl.go index d937ba5..3a122de 100644 --- a/web/controllers/etl/etl.go +++ b/web/controllers/etl/etl.go @@ -33,7 +33,7 @@ func RegisterRoutes() { // Database插入多条 // RegisterInsertDatabase(database, "/insert", serviceOfDatabase.Insert) // Database删除一条 - // RegisterDeleteOneDatabase(database, "/deleteone", serviceOfDatabase.DeleteOne) + RegisterDeleteOneDB(database, "/deleteone", serviceOfDatabase.DeleteOne) // Database删除多条 // RegisterDeleteDatabase(database, "/delete", serviceOfDatabase.Delete) // Database查询多条