|
package scheduler
|
|
|
|
import (
|
|
"LAPP_ETL/global"
|
|
model "LAPP_ETL/models/etl"
|
|
svr "LAPP_ETL/services/etl"
|
|
"time"
|
|
)
|
|
|
|
func ExtractTrigger(triggerConfig model.Trigger, task model.TaskHead) {
|
|
global.TaskExtractStatusMap.Lock.Lock()
|
|
_, exist := global.TaskExtractStatusMap.Map[triggerConfig.TaskId]
|
|
if !exist {
|
|
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
|
|
}
|
|
if global.TaskExtractStatusMap.Map[triggerConfig.TaskId] {
|
|
global.TaskExtractStatusMap.Lock.Unlock()
|
|
return
|
|
}
|
|
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = true
|
|
global.TaskExtractStatusMap.Lock.Unlock()
|
|
defer func() {
|
|
global.TaskExtractStatusMap.Lock.Lock()
|
|
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
|
|
global.TaskExtractStatusMap.Lock.Unlock()
|
|
}()
|
|
lastTime, exist := global.ETLTaskRecord.TaskMap[triggerConfig.TaskId]
|
|
if exist {
|
|
hour := triggerConfig.Hour
|
|
minute := triggerConfig.Minute
|
|
second := triggerConfig.Second
|
|
seconds := hour*3600 + minute*60 + second
|
|
now := time.Now()
|
|
if now.Sub(lastTime).Seconds() < float64(seconds) {
|
|
return
|
|
}
|
|
}
|
|
extractService := svr.NewExtractService()
|
|
extractService.StandardExtract(&task)
|
|
global.ETLTaskRecord.TaskMap[triggerConfig.TaskId] = time.Now()
|
|
}
|
|
|
|
func LoadTrigger(task *model.TaskHead) {
|
|
global.TaskLoadStatusMap.Lock.Lock()
|
|
_, exist := global.TaskLoadStatusMap.Map[task.TaskId]
|
|
if !exist {
|
|
global.TaskLoadStatusMap.Map[task.TaskId] = false
|
|
}
|
|
if global.TaskLoadStatusMap.Map[task.TaskId] {
|
|
global.TaskLoadStatusMap.Lock.Unlock()
|
|
return
|
|
}
|
|
global.TaskLoadStatusMap.Map[task.TaskId] = true
|
|
global.TaskLoadStatusMap.Lock.Unlock()
|
|
defer func() {
|
|
global.TaskExtractStatusMap.Lock.Lock()
|
|
global.TaskLoadStatusMap.Map[task.TaskId] = false
|
|
global.TaskExtractStatusMap.Lock.Unlock()
|
|
}()
|
|
loadService := svr.NewLoadService()
|
|
loadService.Load(task)
|
|
}
|
|
|
|
|
|
func ExtractTriggerBySet(triggerConfig model.Trigger, task model.TaskHead) {
|
|
hour := triggerConfig.Hour
|
|
minute := triggerConfig.Minute
|
|
now := time.Now()
|
|
nowStr := now.Format("2006-01-02 15:04")
|
|
if hour != now.Hour() || minute != now.Minute() {
|
|
return
|
|
}
|
|
global.TaskExtractStatusMap.Lock.Lock()
|
|
_, exist := global.TaskExtractStatusMap.Map[triggerConfig.TaskId]
|
|
if !exist {
|
|
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
|
|
}
|
|
if global.TaskExtractStatusMap.Map[triggerConfig.TaskId] {
|
|
global.TaskExtractStatusMap.Lock.Unlock()
|
|
return
|
|
}
|
|
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = true
|
|
global.TaskExtractStatusMap.Lock.Unlock()
|
|
defer func() {
|
|
global.TaskExtractStatusMap.Lock.Lock()
|
|
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
|
|
global.TaskExtractStatusMap.Lock.Unlock()
|
|
}()
|
|
if global.ETLTaskRecord.TaskTimeMap == nil {
|
|
|
|
}
|
|
lastTime, exist := global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId]
|
|
if exist {
|
|
if nowStr == lastTime {
|
|
|
|
return
|
|
}
|
|
}
|
|
extractService := svr.NewExtractService()
|
|
extractService.StandardExtract(&task)
|
|
global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId] = nowStr
|
|
}
|