Browse Source

Merge pull request '添加每天定期启动的trigger' (#14) from feature_trigger into develop

Reviewed-on: #14
feature_trigger
zhangxin 3 years ago
parent
commit
54a0be3796
4 changed files with 45 additions and 0 deletions
  1. +2
    -0
      global/global.go
  2. +1
    -0
      models/etl/etl.go
  3. +2
    -0
      scheduler/extract.go
  4. +40
    -0
      scheduler/trigger.go

+ 2
- 0
global/global.go View File

@ -12,6 +12,7 @@ var MongoDsn string
type TaskRecord struct {
TaskMap map[int]time.Time
TaskTimeMap map[int]string
Lock sync.RWMutex
}
@ -48,4 +49,5 @@ func init() {
TaskLoadStatusMap.Map = make(map[int]bool)
ETLDriver.Map = make(map[int]*xorm.Engine)
ETLTaskRecord.TaskMap = make(map[int]time.Time)
ETLTaskRecord.TaskTimeMap = make(map[int]string)
}

+ 1
- 0
models/etl/etl.go View File

@ -12,6 +12,7 @@ const (
// 触发器类型
TRIGGER_TYPE_TIME = "TIME"
TRIGGER_TYPE_INTERFACE = "INTERFACE"
TRIGGER_TYPE_SET = "SET"
// 数据状态字段的状态
STATUS_UNSEND = 0


+ 2
- 0
scheduler/extract.go View File

@ -26,6 +26,8 @@ func ETLExtract() {
for _, trigger := range task.TriggerLi {
if trigger.TriggerType == model.TRIGGER_TYPE_TIME {
go ExtractTrigger(trigger, task)
} else if trigger.TriggerType == model.TRIGGER_TYPE_SET {
go ExtractTriggerBySet(trigger, task)
}
}
}


+ 40
- 0
scheduler/trigger.go View File

@ -59,4 +59,44 @@ func LoadTrigger(task *model.TaskHead) {
}()
loadService := svr.NewLoadService()
loadService.Load(task)
}
func ExtractTriggerBySet(triggerConfig model.Trigger, task model.TaskHead) {
hour := triggerConfig.Hour
minute := triggerConfig.Minute
now := time.Now()
nowStr := now.Format("2006-01-02 15:04")
if hour != now.Hour() || minute != now.Minute() {
return
}
global.TaskExtractStatusMap.Lock.Lock()
_, exist := global.TaskExtractStatusMap.Map[triggerConfig.TaskId]
if !exist {
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
}
if global.TaskExtractStatusMap.Map[triggerConfig.TaskId] {
global.TaskExtractStatusMap.Lock.Unlock()
return
}
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = true
global.TaskExtractStatusMap.Lock.Unlock()
defer func() {
global.TaskExtractStatusMap.Lock.Lock()
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
global.TaskExtractStatusMap.Lock.Unlock()
}()
if global.ETLTaskRecord.TaskTimeMap == nil {
}
lastTime, exist := global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId]
if exist {
if nowStr == lastTime {
return
}
}
extractService := svr.NewExtractService()
extractService.StandardExtract(&task)
global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId] = nowStr
}

Loading…
Cancel
Save