Browse Source

调整释放etcd连接的代码执行位置

pull/1/head
zhangxin 3 years ago
parent
commit
8d24acaf47
2 changed files with 2 additions and 2 deletions
  1. +1
    -1
      services/etl/implments/Extract.service.impl.go
  2. +1
    -1
      services/etl/implments/Load.service.impl.go

+ 1
- 1
services/etl/implments/Extract.service.impl.go View File

@ -63,12 +63,12 @@ func NewExtractServiceImplement() *ExtractServiceImplement {
func (impl *ExtractServiceImplement) StandardExtract(task *model.TaskHead) {
log, _ := logger.NewLogger(task.TaskId, model.STAGE_EXTRACT_DATA)
etcdTaskDao := dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr)
defer etcdTaskDao.RevokeLease()
err := etcdTaskDao.RegisterTask(task.TaskId, model.STAGE_EXTRACT_DATA)
if err != nil {
log.Error("StandardExtract register task failed, error:"+err.Error(), "task id:"+strconv.Itoa(task.TaskId))
return
}
defer etcdTaskDao.RevokeLease()
// 获取数据源驱动引擎
uid, err := uuid.NewV1()
if err != nil {


+ 1
- 1
services/etl/implments/Load.service.impl.go View File

@ -62,12 +62,12 @@ func NewLoadServiceImplement() *LoadServiceImplement {
func (impl *LoadServiceImplement) Load(task *model.TaskHead) {
log, _ := logger.NewLogger(task.TaskId, model.STAGE_LOAD_DATA)
etcdTaskDao := dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr)
defer etcdTaskDao.RevokeLease()
err := etcdTaskDao.RegisterTask(task.TaskId, model.STAGE_LOAD_DATA)
if err != nil {
log.Error("Load register task failed, error:"+err.Error(), "task id:"+strconv.Itoa(task.TaskId))
return
}
defer etcdTaskDao.RevokeLease()
// 获取目标表和源表的驱动引擎
targetDBID := task.TargetDB
global.ETLDriver.Lock.RLock()


Loading…
Cancel
Save