高级排程
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
5.7 KiB

3 years ago
  1. package db
  2. import (
  3. "LAPP_AS/conf"
  4. "LAPP_AS/utils"
  5. "context"
  6. "errors"
  7. "fmt"
  8. _ "github.com/denisenkom/go-mssqldb"
  9. _ "github.com/go-sql-driver/mysql"
  10. "github.com/go-xorm/xorm"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "go.mongodb.org/mongo-driver/mongo"
  13. "go.mongodb.org/mongo-driver/mongo/options"
  14. "log"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. Eloquent *xorm.EngineGroup
  20. elock sync.Mutex
  21. mgoDb *mongo.Client
  22. mlock sync.Mutex
  23. PlantNr int
  24. )
  25. //初始化数据库
  26. func InitDb() error {
  27. elock.Lock()
  28. defer elock.Unlock()
  29. //数据库信息
  30. dataDb := FindData()
  31. //从库集合
  32. var err error
  33. var master *xorm.Engine
  34. slaves := []*xorm.Engine{}
  35. for _, v := range dataDb {
  36. if v.Sourcetype == "Master" {
  37. switch v.DriverName {
  38. case "mssql":
  39. driveSource := fmt.Sprintf("server=%s;database=%s;user id=%s;password=%s;port=%d;encrypt=disable",
  40. v.Host, v.DbName, v.User, v.Pwd, v.Port)
  41. master, err = xorm.NewEngine("mssql", driveSource)
  42. if err != nil {
  43. fmt.Printf("err1 is %v", err)
  44. return err
  45. }
  46. if err = master.Ping(); err != nil {
  47. return err
  48. }
  49. case "mysql":
  50. driveSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8",
  51. v.User, v.Pwd, v.Host, v.Port, v.DbName)
  52. master, err = xorm.NewEngine("mysql", driveSource)
  53. if err != nil {
  54. fmt.Printf("err2 is %v", err)
  55. return err
  56. }
  57. if err = master.Ping(); err != nil {
  58. return err
  59. }
  60. }
  61. master.SetTZLocation(utils.TimezoneLocation)
  62. master.SetTZDatabase(utils.TimezoneLocation)
  63. } else if v.Sourcetype == "Slave" {
  64. switch v.DriverName {
  65. case "mssql":
  66. driveSource := fmt.Sprintf("server=%s;database=%s;user id=%s;password=%s;port=%d;encrypt=disable",
  67. v.Host, v.DbName, v.User, v.Pwd, v.Port)
  68. slave, err := xorm.NewEngine("mssql", driveSource)
  69. if err != nil {
  70. fmt.Printf("err1 is %v", err)
  71. return err
  72. }
  73. if err = slave.Ping(); err != nil {
  74. return err
  75. }
  76. slaves = append(slaves, slave)
  77. case "mysql":
  78. driveSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8",
  79. v.User, v.Pwd, v.Host, v.Port, v.DbName)
  80. slave, err := xorm.NewEngine("mysql", driveSource)
  81. if err != nil {
  82. fmt.Printf("err2 is %v", err)
  83. return err
  84. }
  85. if err = slave.Ping(); err != nil {
  86. return err
  87. }
  88. slaves = append(slaves, slave)
  89. }
  90. }
  91. }
  92. Eloquent, err = xorm.NewEngineGroup(master, slaves)
  93. if err != nil {
  94. return err
  95. }
  96. Eloquent.ShowSQL(true)
  97. return nil
  98. }
  99. //初始化数据库
  100. func InitTestDb() error {
  101. elock.Lock()
  102. defer elock.Unlock()
  103. //数据库信息
  104. dataDb := FindData()
  105. //从库集合
  106. var err error
  107. var testDB *xorm.Engine
  108. slaves := []*xorm.Engine{}
  109. for _, v := range dataDb {
  110. if v.Sourcetype == "Test" {
  111. switch v.DriverName {
  112. case "mssql":
  113. driveSource := fmt.Sprintf("server=%s;database=%s;user id=%s;password=%s;port=%d;encrypt=disable",
  114. v.Host, v.DbName, v.User, v.Pwd, v.Port)
  115. testDB, err = xorm.NewEngine("mssql", driveSource)
  116. if err != nil {
  117. fmt.Printf("err1 is %v", err)
  118. return err
  119. }
  120. if err = testDB.Ping(); err != nil {
  121. return err
  122. }
  123. case "mysql":
  124. driveSource := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8",
  125. v.User, v.Pwd, v.Host, v.Port, v.DbName)
  126. testDB, err = xorm.NewEngine("mysql", driveSource)
  127. if err != nil {
  128. fmt.Printf("err2 is %v", err)
  129. return err
  130. }
  131. if err = testDB.Ping(); err != nil {
  132. return err
  133. }
  134. }
  135. }
  136. }
  137. if testDB == nil {
  138. return errors.New("has no test db")
  139. }
  140. Eloquent, err = xorm.NewEngineGroup(testDB, slaves)
  141. if err != nil {
  142. return err
  143. }
  144. Eloquent.ShowSQL(true)
  145. return nil
  146. }
  147. //创建mongoDb数据库链接
  148. func MgoDb() *mongo.Client {
  149. //判断mgoDb是否存在
  150. if mgoDb != nil {
  151. return mgoDb
  152. }
  153. //创建之前会涉及并发,所以要加锁
  154. mlock.Lock()
  155. defer mlock.Unlock()
  156. //细节,防止多个请求锁住
  157. if mgoDb != nil {
  158. return mgoDb
  159. }
  160. //1.建立链接
  161. ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
  162. client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
  163. if err != nil {
  164. log.Fatal("dbhelper.MgoDb error= ", err)
  165. return nil
  166. } else {
  167. mgoDb = client
  168. return mgoDb
  169. }
  170. }
  171. //数据库
  172. type Database struct {
  173. ID string `bson:"id" json:"database-id"` //ID:uuid唯一字符串
  174. DbName string `bson:"dbname" json:"database-dbname"` //数据库名称
  175. DriverName string `bson:"drivername" json:"database-drivername"` //引擎类型
  176. Host string `bson:"host" json:"database-host"` //链接地址
  177. Port int `bson:"port" json:"database-port"` //端口号
  178. User string `bson:"user" json:"database-user"` //用户名
  179. Pwd string `bson:"pwd" json:"database-pwd"` //密码
  180. Sourcetype string `bson:"sourcetype" json:"database-sourcetype"` //主库和从库
  181. }
  182. func FindData() []*Database {
  183. limit := int64(500)
  184. skip := int64(0)
  185. //1.初始化链接
  186. client := MgoDb()
  187. //2.选择数据库,数据表
  188. collect := client.Database(conf.DbConfig.Mongdbname).Collection("database")
  189. //var skip int64 = 0//从那个开始
  190. //var limit int64 = 2//炼制几个输出字段
  191. cursor, err := collect.Find(context.TODO(), bson.D{
  192. }, &options.FindOptions{
  193. Skip: &skip,
  194. Limit: &limit,
  195. Sort: bson.D{{"id", -1}},
  196. })
  197. fmt.Println(cursor)
  198. if err != nil {
  199. return nil
  200. }
  201. defer cursor.Close(context.TODO())
  202. //创建需要反序列化成什么样子的结构体对象
  203. records := make([]*Database, 0)
  204. for cursor.Next(context.TODO()) {
  205. record := &Database{}
  206. //反序列化
  207. err = cursor.Decode(record)
  208. if err != nil {
  209. fmt.Println(err)
  210. return nil
  211. }
  212. //追加
  213. records = append(records, record)
  214. }
  215. return records
  216. }