Browse Source

添加提交代码和关闭session的代码

pull/2/head
zhangxin 3 years ago
parent
commit
e0b9c808b9
4 changed files with 21 additions and 10 deletions
  1. +9
    -6
      main.go
  2. +1
    -0
      services/etl/implments/DB.service.impl.go
  3. +10
    -3
      services/etl/implments/TaskHead.service.impl.go
  4. +1
    -1
      web/controllers/etl/etl.go

+ 9
- 6
main.go View File

@ -9,7 +9,9 @@ import (
"LAPP_ETL/infra/driver"
"LAPP_ETL/infra/logger"
meta "LAPP_ETL/meta/etl"
"LAPP_ETL/scheduler"
"LAPP_ETL/utils"
"LAPP_ETL/web/middleware"
"LAPP_ETL/web/routes"
"fmt"
"github.com/kardianos/service"
@ -116,11 +118,11 @@ func appMain() {
log.Fatal("init db engine failed, error:", err)
}
//start scheduler
//err = scheduler.Start()
//if err != nil {
// log.Fatal("start scheduler failed error:", err)
//}
// start scheduler
err = scheduler.Start()
if err != nil {
log.Fatal("start scheduler failed error:", err)
}
// register app service to etcd
err = RegisterAppService()
if err != nil {
@ -131,7 +133,7 @@ func appMain() {
app := iris.New()
routes.Hub(app)
app.HandleDir("/public", "./web/public")
//middleware.PreSettring(app)
middleware.PreSettring(app)
// init iris config
irisConfig := iris.DefaultConfiguration()
@ -152,6 +154,7 @@ func appMain() {
func InitDBEngine() error {
session := db.AppEngine.NewSession()
defer session.Close()
dao := dal.NewDBDAO(session)
dbLi, err := dao.Select(nil, []grmi.Field{meta.DB_ID})
if err != nil {


+ 1
- 0
services/etl/implments/DB.service.impl.go View File

@ -195,6 +195,7 @@ func (impl *DBServiceImplement) UpdateOne(entity *model.DB) error {
_ = session.Rollback()
return err
}
_ = session.Commit()
return nil
}


+ 10
- 3
services/etl/implments/TaskHead.service.impl.go View File

@ -267,7 +267,7 @@ func (impl *TaskHeadServiceImplement) UpdateOne(entity *model.TaskHead) error {
return grmi.NewBusinessError("不存在指定记录!")
}
lstLi := entity.TaskLstLi
if len(lstLi) == 0 {
if len(lstLi) == 0 && entity.ExtractType != model.EXTRACT_TYPE_SHELL {
_ = session.Rollback()
return grmi.NewBusinessError("未获取到任务详细信息")
}
@ -477,7 +477,7 @@ func (impl *TaskHeadServiceImplement) Update(entities *[]model.TaskHead) error {
* @Reference LAPP_ETL/services/etl/TaskHeadService.SelectWithDetail
*
******************************************************************************/
func (impl *TaskHeadServiceImplement) SelectWithDetail() ([]model.TaskHead, error) {
func (impl *TaskHeadServiceImplement) SelectWithDetail(stage string) ([]model.TaskHead, error) {
engine := db.AppEngine
session := engine.NewSession()
defer session.Close()
@ -486,7 +486,14 @@ func (impl *TaskHeadServiceImplement) SelectWithDetail() ([]model.TaskHead, erro
triggerDao := dal.NewTriggerDAO(session)
shellDao := dal.NewShellDAO(session)
etcdDao := dal.NewETCDLockDAO(session)
taskLi, err := dao.Select([]grmi.Predicate{meta.TaskHead_Status.NewPredicate(grmi.Equal, model.TASK_STATUS_ON)}, []grmi.Field{meta.TaskHead_TaskId})
var taskLi []model.TaskHead
var err error
if stage == model.STAGE_EXTRACT_DATA {
taskLi, err = dao.Select([]grmi.Predicate{meta.TaskHead_Status.NewPredicate(grmi.Equal, model.TASK_STATUS_ON)}, []grmi.Field{meta.TaskHead_TaskId})
} else {
taskLi, err = dao.Select([]grmi.Predicate{meta.TaskHead_Status.NewPredicate(grmi.Equal, model.TASK_STATUS_ON), meta.TaskHead_CtlParam3.NewPredicate(grmi.NotEqual, model.LOAD_NOT_NEED)}, []grmi.Field{meta.TaskHead_TaskId})
}
if err != nil {
return nil, err
}


+ 1
- 1
web/controllers/etl/etl.go View File

@ -33,7 +33,7 @@ func RegisterRoutes() {
// Database插入多条
// RegisterInsertDatabase(database, "/insert", serviceOfDatabase.Insert)
// Database删除一条
// RegisterDeleteOneDatabase(database, "/deleteone", serviceOfDatabase.DeleteOne)
RegisterDeleteOneDB(database, "/deleteone", serviceOfDatabase.DeleteOne)
// Database删除多条
// RegisterDeleteDatabase(database, "/delete", serviceOfDatabase.Delete)
// Database查询多条


Loading…
Cancel
Save