diff --git a/services/etl/implments/Extract.service.impl.go b/services/etl/implments/Extract.service.impl.go index 5c1f74e..ead4cfa 100644 --- a/services/etl/implments/Extract.service.impl.go +++ b/services/etl/implments/Extract.service.impl.go @@ -63,12 +63,12 @@ func NewExtractServiceImplement() *ExtractServiceImplement { func (impl *ExtractServiceImplement) StandardExtract(task *model.TaskHead) { log, _ := logger.NewLogger(task.TaskId, model.STAGE_EXTRACT_DATA) etcdTaskDao := dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr) + defer etcdTaskDao.RevokeLease() err := etcdTaskDao.RegisterTask(task.TaskId, model.STAGE_EXTRACT_DATA) if err != nil { log.Error("StandardExtract register task failed, error:"+err.Error(), "task id:"+strconv.Itoa(task.TaskId)) return } - defer etcdTaskDao.RevokeLease() // 获取数据源驱动引擎 uid, err := uuid.NewV1() if err != nil { diff --git a/services/etl/implments/Load.service.impl.go b/services/etl/implments/Load.service.impl.go index 915f2c5..a05a2fa 100644 --- a/services/etl/implments/Load.service.impl.go +++ b/services/etl/implments/Load.service.impl.go @@ -62,12 +62,12 @@ func NewLoadServiceImplement() *LoadServiceImplement { func (impl *LoadServiceImplement) Load(task *model.TaskHead) { log, _ := logger.NewLogger(task.TaskId, model.STAGE_LOAD_DATA) etcdTaskDao := dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr) + defer etcdTaskDao.RevokeLease() err := etcdTaskDao.RegisterTask(task.TaskId, model.STAGE_LOAD_DATA) if err != nil { log.Error("Load register task failed, error:"+err.Error(), "task id:"+strconv.Itoa(task.TaskId)) return } - defer etcdTaskDao.RevokeLease() // 获取目标表和源表的驱动引擎 targetDBID := task.TargetDB global.ETLDriver.Lock.RLock()