广汽安道拓Acura项目MES后台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
4.0 KiB

  1. package utils
  2. import (
  3. "log"
  4. "reflect"
  5. "sync"
  6. "time"
  7. )
  8. var EmailSendStruct = make(map[string]*emailSendStruct)
  9. var EmailSendFoudnMap = make(map[string]interface{})
  10. type emailSendStruct struct {
  11. channels chan ChannelsInfo
  12. channelCount int
  13. successMap map[string]int //完成状态 如果map中包含消息下角标表明消息已完成发送
  14. funcName string
  15. successMapMutex sync.Mutex
  16. manageCount int //需要开启消费者的数量
  17. manageStartCount int //已经开启消费者的数量
  18. manageMaxCount int //最大开启数量
  19. manageAverage int //开启,关闭一条信道基数
  20. serviceName string //使用业务层
  21. }
  22. func NewEmailSendStruct(maxCount int, serviceName string, manageAverage int, funcName string) *emailSendStruct {
  23. model := &emailSendStruct{}
  24. model.channels = make(chan ChannelsInfo)
  25. model.channelCount = 0
  26. model.manageCount = 1
  27. model.manageStartCount = 1
  28. model.manageMaxCount = maxCount
  29. model.successMap = make(map[string]int)
  30. model.successMapMutex = sync.Mutex{}
  31. model.serviceName = serviceName
  32. model.manageAverage = manageAverage
  33. model.funcName = funcName
  34. //开启主信道消费
  35. go model.startSend()
  36. //开启动态信道消费
  37. go model.dynamic()
  38. return model
  39. }
  40. /***
  41. 返回消息数量
  42. */
  43. func (_this *emailSendStruct) GetChannelCount() int {
  44. return _this.channelCount
  45. }
  46. /***
  47. 第一次写入信道
  48. */
  49. func (_this *emailSendStruct) SeyChannels(info ChannelsInfo) {
  50. _this.channels <- info
  51. _this.channelCount++
  52. if _this.channelCount > (_this.manageStartCount+1)*_this.manageAverage && _this.manageCount < _this.manageMaxCount {
  53. //需要开启一条新通道
  54. _this.manageCount++
  55. }
  56. if _this.channelCount < (_this.manageStartCount-1)*_this.manageAverage {
  57. _this.manageCount--
  58. }
  59. }
  60. /***
  61. 将数据重新放入信道
  62. */
  63. func (_this *emailSendStruct) seyChannels(info ChannelsInfo) {
  64. _this.channels <- info
  65. }
  66. //完成状态 如果map中包含消息下角标表明消息已完成发送
  67. func (_this *emailSendStruct) SetSuccessMap(key string) {
  68. _this.successMapMutex.Lock()
  69. defer _this.successMapMutex.Unlock()
  70. _this.successMap[key] = 1
  71. }
  72. //如果读取到完成信息直接销毁掉map
  73. func (_this *emailSendStruct) getSuccessMap(key string) bool {
  74. _this.successMapMutex.Lock()
  75. defer _this.successMapMutex.Unlock()
  76. _, ok := _this.successMap[key]
  77. if ok {
  78. delete(_this.successMap, key)
  79. _this.channelCount--
  80. }
  81. return ok
  82. }
  83. //开始处理消息
  84. func (_this *emailSendStruct) startSend() {
  85. for {
  86. info := <-_this.channels
  87. _this.manageFunction(info)
  88. }
  89. }
  90. //动态信道
  91. func (_this *emailSendStruct) dynamic() {
  92. for {
  93. if _this.manageCount > _this.manageStartCount {
  94. //需要开启一条信道处理信息
  95. _this.manageStartCount++
  96. go func(_i int) {
  97. log.Println("动态队列开启:", _i)
  98. for {
  99. if _this.manageCount < _this.manageStartCount && _this.manageCount < _i {
  100. log.Println("动态队列关闭", _i)
  101. break
  102. }
  103. info := <-_this.channels
  104. _this.manageFunction(info)
  105. }
  106. _this.manageStartCount--
  107. }(_this.manageStartCount)
  108. }
  109. time.Sleep(2 * time.Second)
  110. }
  111. }
  112. /**
  113. 调用处理方法
  114. */
  115. func (_this *emailSendStruct) manageFunction(info ChannelsInfo) {
  116. if _this.getSuccessMap(info.Key) {
  117. //信息处理完成下一条消息
  118. return
  119. }
  120. refUser := ReflectApi(_this.serviceName, EmailSendFoudnMap)
  121. setNameMethod := refUser.MethodByName(_this.funcName)
  122. args := []reflect.Value{
  123. reflect.ValueOf(info),
  124. } //构造一个类型为reflect.Value的切片
  125. isErr, reflectValue := ReflectApiCall(&setNameMethod, args)
  126. if isErr || (!isErr && reflectValue == nil) {
  127. panic("服务注册异常")
  128. }
  129. var ok bool
  130. var isRepetition bool
  131. for key, value := range reflectValue {
  132. switch key {
  133. case 0:
  134. isRepetition, ok = value.Interface().(bool)
  135. break
  136. case 1:
  137. info, ok = value.Interface().(ChannelsInfo)
  138. break
  139. }
  140. if !ok {
  141. panic("服务返回参数注册异常")
  142. }
  143. }
  144. if isRepetition {
  145. //等待5S后放回信道中
  146. _this.seyChannels(info)
  147. }
  148. return
  149. }
  150. type ChannelsInfo struct {
  151. Key string
  152. Info interface{}
  153. }