苏州瑞玛APS项目web后台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
4.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
  1. package utils
  2. import (
  3. "leit.com/LAPP_CHEERSSON_BACKEND/utils/mysmtp"
  4. "log"
  5. "reflect"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. var EmailSendStruct = make(map[string]*emailSendStruct)
  11. var EmailSendFoudnMap = make(map[string]interface{})
  12. type emailSendStruct struct {
  13. channels chan ChannelsInfo
  14. channelCount int
  15. successMap map[string]int //完成状态 如果map中包含消息下角标表明消息已完成发送
  16. funcName string
  17. successMapMutex sync.Mutex
  18. manageCount int //需要开启消费者的数量
  19. manageStartCount int //已经开启消费者的数量
  20. manageMaxCount int //最大开启数量
  21. manageAverage int //开启,关闭一条信道基数
  22. serviceName string //使用业务层
  23. }
  24. func NewEmailSendStruct(maxCount int, serviceName string, manageAverage int, funcName string) *emailSendStruct {
  25. model := &emailSendStruct{}
  26. model.channels = make(chan ChannelsInfo)
  27. model.channelCount = 0
  28. model.manageCount = 1
  29. model.manageStartCount = 1
  30. model.manageMaxCount = maxCount
  31. model.successMap = make(map[string]int)
  32. model.successMapMutex = sync.Mutex{}
  33. model.serviceName = serviceName
  34. model.manageAverage = manageAverage
  35. model.funcName = funcName
  36. //开启主信道消费
  37. go model.startSend()
  38. //开启动态信道消费
  39. go model.dynamic()
  40. return model
  41. }
  42. /***
  43. 返回消息数量
  44. */
  45. func (_this *emailSendStruct) GetChannelCount() int {
  46. return _this.channelCount
  47. }
  48. /***
  49. 第一次写入信道
  50. */
  51. func (_this *emailSendStruct) SeyChannels(info ChannelsInfo) {
  52. _this.channels <- info
  53. _this.channelCount++
  54. if _this.channelCount > (_this.manageStartCount+1)*_this.manageAverage && _this.manageCount < _this.manageMaxCount {
  55. //需要开启一条新通道
  56. _this.manageCount++
  57. }
  58. if _this.channelCount < (_this.manageStartCount-1)*_this.manageAverage {
  59. _this.manageCount--
  60. }
  61. }
  62. /***
  63. 将数据重新放入信道
  64. */
  65. func (_this *emailSendStruct) seyChannels(info ChannelsInfo) {
  66. _this.channels <- info
  67. }
  68. //完成状态 如果map中包含消息下角标表明消息已完成发送
  69. func (_this *emailSendStruct) SetSuccessMap(key string) {
  70. _this.successMapMutex.Lock()
  71. defer _this.successMapMutex.Unlock()
  72. _this.successMap[key] = 1
  73. }
  74. //如果读取到完成信息直接销毁掉map
  75. func (_this *emailSendStruct) getSuccessMap(key string) bool {
  76. _this.successMapMutex.Lock()
  77. defer _this.successMapMutex.Unlock()
  78. _, ok := _this.successMap[key]
  79. if ok {
  80. delete(_this.successMap, key)
  81. _this.channelCount--
  82. }
  83. return ok
  84. }
  85. //开始处理消息
  86. func (_this *emailSendStruct) startSend() {
  87. for {
  88. info := <-_this.channels
  89. _this.manageFunction(info)
  90. }
  91. }
  92. //动态信道
  93. func (_this *emailSendStruct) dynamic() {
  94. for {
  95. if _this.manageCount > _this.manageStartCount {
  96. //需要开启一条信道处理信息
  97. _this.manageStartCount++
  98. go func(_i int) {
  99. log.Println("动态队列开启:", _i)
  100. for {
  101. if _this.manageCount < _this.manageStartCount && _this.manageCount < _i {
  102. log.Println("动态队列关闭", _i)
  103. break
  104. }
  105. info := <-_this.channels
  106. _this.manageFunction(info)
  107. }
  108. _this.manageStartCount--
  109. }(_this.manageStartCount)
  110. }
  111. time.Sleep(2 * time.Second)
  112. }
  113. }
  114. /**
  115. 调用处理方法
  116. */
  117. func (_this *emailSendStruct) manageFunction(info ChannelsInfo) {
  118. if _this.getSuccessMap(info.Key) {
  119. //信息处理完成下一条消息
  120. return
  121. }
  122. refUser := ReflectApi(_this.serviceName, EmailSendFoudnMap)
  123. setNameMethod := refUser.MethodByName(_this.funcName)
  124. args := []reflect.Value{
  125. reflect.ValueOf(info),
  126. } //构造一个类型为reflect.Value的切片
  127. isErr, reflectValue := ReflectApiCall(&setNameMethod, args)
  128. if isErr || (!isErr && reflectValue == nil) {
  129. panic("服务注册异常")
  130. }
  131. var ok bool
  132. var isRepetition bool
  133. for key, value := range reflectValue {
  134. switch key {
  135. case 0:
  136. isRepetition, ok = value.Interface().(bool)
  137. break
  138. case 1:
  139. info, ok = value.Interface().(ChannelsInfo)
  140. break
  141. }
  142. if !ok {
  143. panic("服务返回参数注册异常")
  144. }
  145. }
  146. if isRepetition {
  147. //等待5S后放回信道中
  148. _this.seyChannels(info)
  149. }
  150. return
  151. }
  152. type ChannelsInfo struct {
  153. Key string
  154. Info interface{}
  155. }
  156. /*!
  157. username 发送者邮件
  158. password 授权码
  159. host 主机地址 smtp.qq.com:587 smtp.qq.com:25
  160. to 接收邮箱 多个接收邮箱使用 ; 隔开
  161. name 发送人名称
  162. subject 发送主题
  163. body 发送内容
  164. mailType 发送邮件内容类型
  165. */
  166. func SendMail(username, password, host, to, name, subject, body, mailType string) error {
  167. hp := strings.Split(host, ":")
  168. auth := mysmtp.LoginAuth(username, password, hp[0])
  169. var contentType string
  170. if mailType == "html" {
  171. contentType = "Content-Type: text/" + mailType + "; charset=UTF-8"
  172. } else {
  173. contentType = "Content-Type: text/plain" + "; charset=UTF-8"
  174. }
  175. msg := []byte("To: " + to + "\r\nFrom: " + name + "<" + username + ">\r\nSubject: " + subject + "\r\n" + contentType + "\r\n\r\n" + body)
  176. sendTo := strings.Split(to, ";")
  177. err := mysmtp.SendMail(host, auth, username, sendTo, msg)
  178. return err
  179. }