Browse Source

Merge pull request '添加对使用etcd的配置参数' (#11) from feature_etcd into develop

Reviewed-on: http://101.201.121.115:3000/leo/LAPP_ETL/pulls/11
feature_trigger
weichenglei 3 years ago
parent
commit
14689daf48
5 changed files with 22 additions and 12 deletions
  1. +1
    -0
      conf/app_config.yaml
  2. +1
    -0
      infra/config/config.go
  3. +8
    -6
      main.go
  4. +6
    -3
      scheduler/extract.go
  5. +6
    -3
      scheduler/load.go

+ 1
- 0
conf/app_config.yaml View File

@ -26,3 +26,4 @@ app:
localaddr: 127.0.0.1 localaddr: 127.0.0.1
mod: release mod: release
name: ETL name: ETL
useetcd: false

+ 1
- 0
infra/config/config.go View File

@ -38,6 +38,7 @@ type App struct {
LocalAddr string `yaml:"localaddr"` LocalAddr string `yaml:"localaddr"`
Mod string `yaml:"mod"` Mod string `yaml:"mod"`
Name string `yaml:"name"` Name string `yaml:"name"`
UseETCD bool `yaml:"useetcd"`
} }
type ETCD struct { type ETCD struct {


+ 8
- 6
main.go View File

@ -130,19 +130,21 @@ func appMain() {
if err != nil { if err != nil {
log.Fatal("init db engine failed, error:", err) log.Fatal("init db engine failed, error:", err)
} }
common.CheckServiceDao = dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr)
defer common.CheckServiceDao.RevokeLease()
// start scheduler // start scheduler
err = scheduler.Start() err = scheduler.Start()
if err != nil { if err != nil {
log.Fatal("start scheduler failed error:", err) log.Fatal("start scheduler failed error:", err)
} }
// register app service to etcd // register app service to etcd
err = RegisterAppService()
if err != nil {
log.Fatal("register service to etcd failed, error:", err)
if config.AppConfig.UseETCD {
common.CheckServiceDao = dal.NewETCDServiceDAO(db.AppEtcdClient, config.AppConfig.App.LocalAddr)
defer common.CheckServiceDao.RevokeLease()
err = RegisterAppService()
if err != nil {
log.Fatal("register service to etcd failed, error:", err)
}
} }
// new iris application // new iris application
app := iris.New() app := iris.New()
routes.Hub(app) routes.Hub(app)


+ 6
- 3
scheduler/extract.go View File

@ -1,6 +1,7 @@
package scheduler package scheduler
import ( import (
"LAPP_ETL/infra/config"
"LAPP_ETL/infra/logger" "LAPP_ETL/infra/logger"
model "LAPP_ETL/models/etl" model "LAPP_ETL/models/etl"
svr "LAPP_ETL/services/etl" svr "LAPP_ETL/services/etl"
@ -8,9 +9,11 @@ import (
) )
func ETLExtract() { func ETLExtract() {
if !CheckService() {
fmt.Println("service is stop")
return
if config.AppConfig.UseETCD {
if !CheckService() {
fmt.Println("service is stop")
return
}
} }
log, _ := logger.NewLogger(-1, "Scheduler") log, _ := logger.NewLogger(-1, "Scheduler")
taskService := svr.NewTaskHeadService() taskService := svr.NewTaskHeadService()


+ 6
- 3
scheduler/load.go View File

@ -1,6 +1,7 @@
package scheduler package scheduler
import ( import (
"LAPP_ETL/infra/config"
"LAPP_ETL/infra/logger" "LAPP_ETL/infra/logger"
model "LAPP_ETL/models/etl" model "LAPP_ETL/models/etl"
svr "LAPP_ETL/services/etl" svr "LAPP_ETL/services/etl"
@ -8,9 +9,11 @@ import (
) )
func ETLLoad() { func ETLLoad() {
if !CheckService() {
fmt.Println("service is stop")
return
if config.AppConfig.UseETCD {
if !CheckService() {
fmt.Println("service is stop")
return
}
} }
log, _ := logger.NewLogger(-1, "Scheduler") log, _ := logger.NewLogger(-1, "Scheduler")
taskService := svr.NewTaskHeadService() taskService := svr.NewTaskHeadService()


Loading…
Cancel
Save