package db import ( "context" "github.com/go-xorm/xorm" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "sync" ) import "fmt" var ( Eloquent *xorm.EngineGroup elock sync.Mutex ) //初始化数据库 func InitEtlDb() error { elock.Lock() defer elock.Unlock() //数据库信息 dataDb := findData() //从库集合 var err error var master *xorm.Engine slaves := []*xorm.Engine{} for _, v := range dataDb { if v.Sourcetype == "Master" { switch v.DriverName { case "mssql": driveSource := fmt.Sprintf("server=%s;database=%s;user id=%s;password=%s;port=%d;encrypt=disable", v.Host, v.DbName, v.User, v.Pwd, v.Port) master, err = xorm.NewEngine("mssql", driveSource) if err != nil { fmt.Printf("err1 is %v", err) return err } case "mysql": driveSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", v.User, v.Pwd, v.Host, v.Port, v.DbName) master, err = xorm.NewEngine("mysql", driveSource) if err != nil { fmt.Printf("err2 is %v", err) return err } } } else { switch v.DriverName { case "mssql": driveSource := fmt.Sprintf("server=%s;database=%s;user id=%s;password=%s;port=%d;encrypt=disable", v.Host, v.DbName, v.User, v.Pwd, v.Port) slave, err := xorm.NewEngine("mssql", driveSource) if err != nil { fmt.Printf("err1 is %v", err) return err } slaves = append(slaves, slave) case "mysql": driveSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", v.User, v.Pwd, v.Host, v.Port, v.DbName) slave, err := xorm.NewEngine("mysql", driveSource) if err != nil { fmt.Printf("err2 is %v", err) return err } slaves = append(slaves, slave) } } } Eloquent, err = xorm.NewEngineGroup(master, slaves) if err != nil { return err } //Eloquent.ShowSQL(true) return nil } func findData() []Database { limit := int64(500) skip := int64(0) //1.初始化链接 client := MgoDb() //2.选择数据库,数据表 collect := client.Database("logDb").Collection("database") //var skip int64 = 0//从那个开始 //var limit int64 = 2//炼制几个输出字段 cursor, err := collect.Find(context.TODO(), bson.D{ }, &options.FindOptions{ Skip: &skip, Limit: &limit, Sort: bson.D{{"id", 1}}, }) if err != nil { return nil } defer cursor.Close(context.TODO()) //创建需要反序列化成什么样子的结构体对象 records := make([]Database, 0) for cursor.Next(context.TODO()) { var record Database //反序列化 err = cursor.Decode(&record) if err != nil { fmt.Println(err) return nil } //追加 records = append(records, record) } return records }