diff --git a/global/global.go b/global/global.go index 8405622..1cef702 100644 --- a/global/global.go +++ b/global/global.go @@ -12,6 +12,7 @@ var MongoDsn string type TaskRecord struct { TaskMap map[int]time.Time + TaskTimeMap map[int]string Lock sync.RWMutex } @@ -48,4 +49,5 @@ func init() { TaskLoadStatusMap.Map = make(map[int]bool) ETLDriver.Map = make(map[int]*xorm.Engine) ETLTaskRecord.TaskMap = make(map[int]time.Time) + ETLTaskRecord.TaskTimeMap = make(map[int]string) } diff --git a/models/etl/etl.go b/models/etl/etl.go index 5192c30..d163ad9 100644 --- a/models/etl/etl.go +++ b/models/etl/etl.go @@ -12,6 +12,7 @@ const ( // 触发器类型 TRIGGER_TYPE_TIME = "TIME" TRIGGER_TYPE_INTERFACE = "INTERFACE" + TRIGGER_TYPE_SET = "SET" // 数据状态字段的状态 STATUS_UNSEND = 0 diff --git a/scheduler/extract.go b/scheduler/extract.go index 19c8013..779e0e7 100644 --- a/scheduler/extract.go +++ b/scheduler/extract.go @@ -26,6 +26,8 @@ func ETLExtract() { for _, trigger := range task.TriggerLi { if trigger.TriggerType == model.TRIGGER_TYPE_TIME { go ExtractTrigger(trigger, task) + } else if trigger.TriggerType == model.TRIGGER_TYPE_SET { + go ExtractTriggerBySet(trigger, task) } } } diff --git a/scheduler/trigger.go b/scheduler/trigger.go index 4756948..b8c357a 100644 --- a/scheduler/trigger.go +++ b/scheduler/trigger.go @@ -59,4 +59,44 @@ func LoadTrigger(task *model.TaskHead) { }() loadService := svr.NewLoadService() loadService.Load(task) +} + + +func ExtractTriggerBySet(triggerConfig model.Trigger, task model.TaskHead) { + hour := triggerConfig.Hour + minute := triggerConfig.Minute + now := time.Now() + nowStr := now.Format("2006-01-02 15:04") + if hour != now.Hour() || minute != now.Minute() { + return + } + global.TaskExtractStatusMap.Lock.Lock() + _, exist := global.TaskExtractStatusMap.Map[triggerConfig.TaskId] + if !exist { + global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false + } + if global.TaskExtractStatusMap.Map[triggerConfig.TaskId] { + global.TaskExtractStatusMap.Lock.Unlock() + return + } + global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = true + global.TaskExtractStatusMap.Lock.Unlock() + defer func() { + global.TaskExtractStatusMap.Lock.Lock() + global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false + global.TaskExtractStatusMap.Lock.Unlock() + }() + if global.ETLTaskRecord.TaskTimeMap == nil { + + } + lastTime, exist := global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId] + if exist { + if nowStr == lastTime { + + return + } + } + extractService := svr.NewExtractService() + extractService.StandardExtract(&task) + global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId] = nowStr } \ No newline at end of file