From e44bae268f2a7287b0542d98a028b21296017d0e Mon Sep 17 00:00:00 2001 From: zhangxin Date: Wed, 18 Aug 2021 14:15:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=AF=8F=E5=A4=A9=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E5=90=AF=E5=8A=A8=E7=9A=84=E8=A7=A6=E5=8F=91=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- global/global.go | 2 ++ models/etl/etl.go | 1 + scheduler/extract.go | 2 ++ scheduler/trigger.go | 40 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 45 insertions(+) diff --git a/global/global.go b/global/global.go index 8405622..1cef702 100644 --- a/global/global.go +++ b/global/global.go @@ -12,6 +12,7 @@ var MongoDsn string type TaskRecord struct { TaskMap map[int]time.Time + TaskTimeMap map[int]string Lock sync.RWMutex } @@ -48,4 +49,5 @@ func init() { TaskLoadStatusMap.Map = make(map[int]bool) ETLDriver.Map = make(map[int]*xorm.Engine) ETLTaskRecord.TaskMap = make(map[int]time.Time) + ETLTaskRecord.TaskTimeMap = make(map[int]string) } diff --git a/models/etl/etl.go b/models/etl/etl.go index 5192c30..d163ad9 100644 --- a/models/etl/etl.go +++ b/models/etl/etl.go @@ -12,6 +12,7 @@ const ( // 触发器类型 TRIGGER_TYPE_TIME = "TIME" TRIGGER_TYPE_INTERFACE = "INTERFACE" + TRIGGER_TYPE_SET = "SET" // 数据状态字段的状态 STATUS_UNSEND = 0 diff --git a/scheduler/extract.go b/scheduler/extract.go index 19c8013..779e0e7 100644 --- a/scheduler/extract.go +++ b/scheduler/extract.go @@ -26,6 +26,8 @@ func ETLExtract() { for _, trigger := range task.TriggerLi { if trigger.TriggerType == model.TRIGGER_TYPE_TIME { go ExtractTrigger(trigger, task) + } else if trigger.TriggerType == model.TRIGGER_TYPE_SET { + go ExtractTriggerBySet(trigger, task) } } } diff --git a/scheduler/trigger.go b/scheduler/trigger.go index 4756948..b8c357a 100644 --- a/scheduler/trigger.go +++ b/scheduler/trigger.go @@ -59,4 +59,44 @@ func LoadTrigger(task *model.TaskHead) { }() loadService := svr.NewLoadService() loadService.Load(task) +} + + +func ExtractTriggerBySet(triggerConfig model.Trigger, task model.TaskHead) { + hour := triggerConfig.Hour + minute := triggerConfig.Minute + now := time.Now() + nowStr := now.Format("2006-01-02 15:04") + if hour != now.Hour() || minute != now.Minute() { + return + } + global.TaskExtractStatusMap.Lock.Lock() + _, exist := global.TaskExtractStatusMap.Map[triggerConfig.TaskId] + if !exist { + global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false + } + if global.TaskExtractStatusMap.Map[triggerConfig.TaskId] { + global.TaskExtractStatusMap.Lock.Unlock() + return + } + global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = true + global.TaskExtractStatusMap.Lock.Unlock() + defer func() { + global.TaskExtractStatusMap.Lock.Lock() + global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false + global.TaskExtractStatusMap.Lock.Unlock() + }() + if global.ETLTaskRecord.TaskTimeMap == nil { + + } + lastTime, exist := global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId] + if exist { + if nowStr == lastTime { + + return + } + } + extractService := svr.NewExtractService() + extractService.StandardExtract(&task) + global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId] = nowStr } \ No newline at end of file -- 2.30.1.windows.1