LAPP标准接口程序
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

102 lines
2.9 KiB

package scheduler
import (
"LAPP_ETL/global"
model "LAPP_ETL/models/etl"
svr "LAPP_ETL/services/etl"
"time"
)
func ExtractTrigger(triggerConfig model.Trigger, task model.TaskHead) {
global.TaskExtractStatusMap.Lock.Lock()
_, exist := global.TaskExtractStatusMap.Map[triggerConfig.TaskId]
if !exist {
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
}
if global.TaskExtractStatusMap.Map[triggerConfig.TaskId] {
global.TaskExtractStatusMap.Lock.Unlock()
return
}
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = true
global.TaskExtractStatusMap.Lock.Unlock()
defer func() {
global.TaskExtractStatusMap.Lock.Lock()
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
global.TaskExtractStatusMap.Lock.Unlock()
}()
lastTime, exist := global.ETLTaskRecord.TaskMap[triggerConfig.TaskId]
if exist {
hour := triggerConfig.Hour
minute := triggerConfig.Minute
second := triggerConfig.Second
seconds := hour*3600 + minute*60 + second
now := time.Now()
if now.Sub(lastTime).Seconds() < float64(seconds) {
return
}
}
extractService := svr.NewExtractService()
extractService.StandardExtract(&task)
global.ETLTaskRecord.TaskMap[triggerConfig.TaskId] = time.Now()
}
func LoadTrigger(task *model.TaskHead) {
global.TaskLoadStatusMap.Lock.Lock()
_, exist := global.TaskLoadStatusMap.Map[task.TaskId]
if !exist {
global.TaskLoadStatusMap.Map[task.TaskId] = false
}
if global.TaskLoadStatusMap.Map[task.TaskId] {
global.TaskLoadStatusMap.Lock.Unlock()
return
}
global.TaskLoadStatusMap.Map[task.TaskId] = true
global.TaskLoadStatusMap.Lock.Unlock()
defer func() {
global.TaskExtractStatusMap.Lock.Lock()
global.TaskLoadStatusMap.Map[task.TaskId] = false
global.TaskExtractStatusMap.Lock.Unlock()
}()
loadService := svr.NewLoadService()
loadService.Load(task)
}
func ExtractTriggerBySet(triggerConfig model.Trigger, task model.TaskHead) {
hour := triggerConfig.Hour
minute := triggerConfig.Minute
now := time.Now()
nowStr := now.Format("2006-01-02 15:04")
if hour != now.Hour() || minute != now.Minute() {
return
}
global.TaskExtractStatusMap.Lock.Lock()
_, exist := global.TaskExtractStatusMap.Map[triggerConfig.TaskId]
if !exist {
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
}
if global.TaskExtractStatusMap.Map[triggerConfig.TaskId] {
global.TaskExtractStatusMap.Lock.Unlock()
return
}
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = true
global.TaskExtractStatusMap.Lock.Unlock()
defer func() {
global.TaskExtractStatusMap.Lock.Lock()
global.TaskExtractStatusMap.Map[triggerConfig.TaskId] = false
global.TaskExtractStatusMap.Lock.Unlock()
}()
if global.ETLTaskRecord.TaskTimeMap == nil {
}
lastTime, exist := global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId]
if exist {
if nowStr == lastTime {
return
}
}
extractService := svr.NewExtractService()
extractService.StandardExtract(&task)
global.ETLTaskRecord.TaskTimeMap[triggerConfig.TaskId] = nowStr
}