From 80f000a4eb3ed321899a2fe7dc35289fc6bc46a1 Mon Sep 17 00:00:00 2001 From: zhangxin Date: Thu, 15 Jul 2021 10:33:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=80=9A=E8=BF=87=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E6=89=A7=E8=A1=8C=E6=95=B0=E6=8D=AE=E6=8A=BD=E5=8F=96?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../etl/implments/TaskHead.service.impl.go | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/services/etl/implments/TaskHead.service.impl.go b/services/etl/implments/TaskHead.service.impl.go index 0494ac4..6fb603d 100644 --- a/services/etl/implments/TaskHead.service.impl.go +++ b/services/etl/implments/TaskHead.service.impl.go @@ -10,7 +10,6 @@ import ( "LAPP_ETL/infra/db" meta "LAPP_ETL/meta/etl" model "LAPP_ETL/models/etl" - svr "LAPP_ETL/services/etl" "LAPP_ETL/utils" "fmt" "strconv" @@ -373,6 +372,7 @@ func (impl *TaskHeadServiceImplement) Select(urlParameters map[string]string) (i } dao := dal.NewTaskHeadDAO(session) etcdDao := dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr) + defer etcdDao.RevokeLease() keys, err := etcdDao.GetTask() if err != nil { return nil, err @@ -643,6 +643,9 @@ func (impl *TaskHeadServiceImplement) ExecuteTask(taskId int) error { defer session.Close() taskDao := dal.NewTaskHeadDAO(session) triggerDao := dal.NewTriggerDAO(session) + lstDao := dal.NewTaskLstDAO(session) + shellDao := dal.NewShellDAO(session) + etcdDao := dal.NewETCDLockDAO(session) task, err := taskDao.SelectOne(taskId) if err != nil { return err @@ -650,10 +653,26 @@ func (impl *TaskHeadServiceImplement) ExecuteTask(taskId int) error { if task == nil { return grmi.NewBusinessError("不存在对应的任务") } - triggerLi, err := triggerDao.Select([]grmi.Predicate{meta.Trigger_TaskId.NewPredicate(grmi.Equal, taskId)}, nil) + lstLi, err := lstDao.Select([]grmi.Predicate{meta.TaskLst_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.TaskLst_TaskId}) + if err != nil { + return err + } + task.TaskLstLi = lstLi + shellLi, err := shellDao.Select([]grmi.Predicate{meta.Shell_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.Shell_TaskId}) + if err != nil { + return err + } + task.ShellLi = shellLi + lockLi, err := etcdDao.Select([]grmi.Predicate{meta.ETCDLock_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.ETCDLock_TaskId}) if err != nil { return err } + task.ETCDLockLi = lockLi + triggerLi, err := triggerDao.Select([]grmi.Predicate{meta.Trigger_TaskId.NewPredicate(grmi.Equal, task.TaskId)}, []grmi.Field{meta.Trigger_TaskId}) + if err != nil { + return err + } + task.TriggerLi = triggerLi if len(triggerLi) == 0 { return grmi.NewBusinessError("不存在对应的触发器") } @@ -667,7 +686,25 @@ func (impl *TaskHeadServiceImplement) ExecuteTask(taskId int) error { if !exist { return grmi.NewBusinessError("不支持该种触发器") } - extractService := svr.NewExtractService() - go extractService.StandardExtract(task) + global.TaskExtractStatusMap.Lock.Lock() + _, exist = global.TaskExtractStatusMap.Map[taskId] + if !exist { + global.TaskExtractStatusMap.Map[taskId] = false + } + if global.TaskExtractStatusMap.Map[taskId] { + global.TaskExtractStatusMap.Lock.Unlock() + return grmi.NewBusinessError("任务正在进行") + } + global.TaskExtractStatusMap.Map[taskId] = true + global.TaskExtractStatusMap.Lock.Unlock() + + go func() { + extractService := NewExtractServiceImplement() + extractService.StandardExtract(task) + global.TaskExtractStatusMap.Lock.Lock() + global.TaskExtractStatusMap.Map[taskId] = false + global.TaskExtractStatusMap.Lock.Unlock() + }() + return nil -} \ No newline at end of file +}