Browse Source

添加通过接口执行数据抽取任务

pull/1/head
zhangxin 3 years ago
parent
commit
80f000a4eb
1 changed files with 42 additions and 5 deletions
  1. +42
    -5
      services/etl/implments/TaskHead.service.impl.go

+ 42
- 5
services/etl/implments/TaskHead.service.impl.go View File

@ -10,7 +10,6 @@ import (
"LAPP_ETL/infra/db" "LAPP_ETL/infra/db"
meta "LAPP_ETL/meta/etl" meta "LAPP_ETL/meta/etl"
model "LAPP_ETL/models/etl" model "LAPP_ETL/models/etl"
svr "LAPP_ETL/services/etl"
"LAPP_ETL/utils" "LAPP_ETL/utils"
"fmt" "fmt"
"strconv" "strconv"
@ -373,6 +372,7 @@ func (impl *TaskHeadServiceImplement) Select(urlParameters map[string]string) (i
} }
dao := dal.NewTaskHeadDAO(session) dao := dal.NewTaskHeadDAO(session)
etcdDao := dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr) etcdDao := dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr)
defer etcdDao.RevokeLease()
keys, err := etcdDao.GetTask() keys, err := etcdDao.GetTask()
if err != nil { if err != nil {
return nil, err return nil, err
@ -643,6 +643,9 @@ func (impl *TaskHeadServiceImplement) ExecuteTask(taskId int) error {
defer session.Close() defer session.Close()
taskDao := dal.NewTaskHeadDAO(session) taskDao := dal.NewTaskHeadDAO(session)
triggerDao := dal.NewTriggerDAO(session) triggerDao := dal.NewTriggerDAO(session)
lstDao := dal.NewTaskLstDAO(session)
shellDao := dal.NewShellDAO(session)
etcdDao := dal.NewETCDLockDAO(session)
task, err := taskDao.SelectOne(taskId) task, err := taskDao.SelectOne(taskId)
if err != nil { if err != nil {
return err return err
@ -650,10 +653,26 @@ func (impl *TaskHeadServiceImplement) ExecuteTask(taskId int) error {
if task == nil { if task == nil {
return grmi.NewBusinessError("不存在对应的任务") return grmi.NewBusinessError("不存在对应的任务")
} }
triggerLi, err := triggerDao.Select([]grmi.Predicate{meta.Trigger_TaskId.NewPredicate(grmi.Equal, taskId)}, nil)
lstLi, err := lstDao.Select([]grmi.Predicate{meta.TaskLst_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.TaskLst_TaskId})
if err != nil {
return err
}
task.TaskLstLi = lstLi
shellLi, err := shellDao.Select([]grmi.Predicate{meta.Shell_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.Shell_TaskId})
if err != nil {
return err
}
task.ShellLi = shellLi
lockLi, err := etcdDao.Select([]grmi.Predicate{meta.ETCDLock_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.ETCDLock_TaskId})
if err != nil { if err != nil {
return err return err
} }
task.ETCDLockLi = lockLi
triggerLi, err := triggerDao.Select([]grmi.Predicate{meta.Trigger_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.Trigger_TaskId})
if err != nil {
return err
}
task.TriggerLi = triggerLi
if len(triggerLi) == 0 { if len(triggerLi) == 0 {
return grmi.NewBusinessError("不存在对应的触发器") return grmi.NewBusinessError("不存在对应的触发器")
} }
@ -667,7 +686,25 @@ func (impl *TaskHeadServiceImplement) ExecuteTask(taskId int) error {
if !exist { if !exist {
return grmi.NewBusinessError("不支持该种触发器") return grmi.NewBusinessError("不支持该种触发器")
} }
extractService := svr.NewExtractService()
go extractService.StandardExtract(task)
global.TaskExtractStatusMap.Lock.Lock()
_, exist = global.TaskExtractStatusMap.Map[taskId]
if !exist {
global.TaskExtractStatusMap.Map[taskId] = false
}
if global.TaskExtractStatusMap.Map[taskId] {
global.TaskExtractStatusMap.Lock.Unlock()
return grmi.NewBusinessError("任务正在进行")
}
global.TaskExtractStatusMap.Map[taskId] = true
global.TaskExtractStatusMap.Lock.Unlock()
go func() {
extractService := NewExtractServiceImplement()
extractService.StandardExtract(task)
global.TaskExtractStatusMap.Lock.Lock()
global.TaskExtractStatusMap.Map[taskId] = false
global.TaskExtractStatusMap.Lock.Unlock()
}()
return nil return nil
}
}

Loading…
Cancel
Save