package db
|
|
|
|
import (
|
|
"context"
|
|
"github.com/go-xorm/xorm"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"sync"
|
|
)
|
|
|
|
import "fmt"
|
|
|
|
var (
|
|
Eloquent *xorm.EngineGroup
|
|
elock sync.Mutex
|
|
)
|
|
|
|
//初始化数据库
|
|
func InitEtlDb() error {
|
|
elock.Lock()
|
|
defer elock.Unlock()
|
|
|
|
//数据库信息
|
|
dataDb := findData()
|
|
//从库集合
|
|
var err error
|
|
var master *xorm.Engine
|
|
slaves := []*xorm.Engine{}
|
|
for _, v := range dataDb {
|
|
if v.Sourcetype == "Master" {
|
|
switch v.DriverName {
|
|
case "mssql":
|
|
driveSource := fmt.Sprintf("server=%s;database=%s;user id=%s;password=%s;port=%d;encrypt=disable",
|
|
v.Host, v.DbName, v.User, v.Pwd, v.Port)
|
|
|
|
master, err = xorm.NewEngine("mssql", driveSource)
|
|
if err != nil {
|
|
fmt.Printf("err1 is %v", err)
|
|
return err
|
|
}
|
|
case "mysql":
|
|
driveSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8",
|
|
v.User, v.Pwd, v.Host, v.Port, v.DbName)
|
|
master, err = xorm.NewEngine("mysql", driveSource)
|
|
if err != nil {
|
|
fmt.Printf("err2 is %v", err)
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
switch v.DriverName {
|
|
case "mssql":
|
|
driveSource := fmt.Sprintf("server=%s;database=%s;user id=%s;password=%s;port=%d;encrypt=disable",
|
|
v.Host, v.DbName, v.User, v.Pwd, v.Port)
|
|
|
|
slave, err := xorm.NewEngine("mssql", driveSource)
|
|
if err != nil {
|
|
fmt.Printf("err1 is %v", err)
|
|
return err
|
|
}
|
|
slaves = append(slaves, slave)
|
|
case "mysql":
|
|
driveSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8",
|
|
v.User, v.Pwd, v.Host, v.Port, v.DbName)
|
|
slave, err := xorm.NewEngine("mysql", driveSource)
|
|
if err != nil {
|
|
fmt.Printf("err2 is %v", err)
|
|
return err
|
|
}
|
|
slaves = append(slaves, slave)
|
|
}
|
|
}
|
|
}
|
|
Eloquent, err = xorm.NewEngineGroup(master, slaves)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
//Eloquent.ShowSQL(true)
|
|
return nil
|
|
}
|
|
|
|
func findData() []Database {
|
|
limit := int64(500)
|
|
skip := int64(0)
|
|
//1.初始化链接
|
|
client := MgoDb()
|
|
//2.选择数据库,数据表
|
|
collect := client.Database("logDb").Collection("database")
|
|
|
|
//var skip int64 = 0//从那个开始
|
|
//var limit int64 = 2//炼制几个输出字段
|
|
cursor, err := collect.Find(context.TODO(), bson.D{
|
|
}, &options.FindOptions{
|
|
Skip: &skip,
|
|
Limit: &limit,
|
|
Sort: bson.D{{"id", 1}},
|
|
})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer cursor.Close(context.TODO())
|
|
//创建需要反序列化成什么样子的结构体对象
|
|
records := make([]Database, 0)
|
|
for cursor.Next(context.TODO()) {
|
|
var record Database
|
|
//反序列化
|
|
err = cursor.Decode(&record)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
return nil
|
|
}
|
|
//追加
|
|
records = append(records, record)
|
|
}
|
|
return records
|
|
}
|