package utils import ( "leit.com/LAPP_CHEERSSON_BACKEND/utils/mysmtp" "log" "reflect" "strings" "sync" "time" ) var EmailSendStruct = make(map[string]*emailSendStruct) var EmailSendFoudnMap = make(map[string]interface{}) type emailSendStruct struct { channels chan ChannelsInfo channelCount int successMap map[string]int //完成状态 如果map中包含消息下角标表明消息已完成发送 funcName string successMapMutex sync.Mutex manageCount int //需要开启消费者的数量 manageStartCount int //已经开启消费者的数量 manageMaxCount int //最大开启数量 manageAverage int //开启,关闭一条信道基数 serviceName string //使用业务层 } func NewEmailSendStruct(maxCount int, serviceName string, manageAverage int, funcName string) *emailSendStruct { model := &emailSendStruct{} model.channels = make(chan ChannelsInfo) model.channelCount = 0 model.manageCount = 1 model.manageStartCount = 1 model.manageMaxCount = maxCount model.successMap = make(map[string]int) model.successMapMutex = sync.Mutex{} model.serviceName = serviceName model.manageAverage = manageAverage model.funcName = funcName //开启主信道消费 go model.startSend() //开启动态信道消费 go model.dynamic() return model } /*** 返回消息数量 */ func (_this *emailSendStruct) GetChannelCount() int { return _this.channelCount } /*** 第一次写入信道 */ func (_this *emailSendStruct) SeyChannels(info ChannelsInfo) { _this.channels <- info _this.channelCount++ if _this.channelCount > (_this.manageStartCount+1)*_this.manageAverage && _this.manageCount < _this.manageMaxCount { //需要开启一条新通道 _this.manageCount++ } if _this.channelCount < (_this.manageStartCount-1)*_this.manageAverage { _this.manageCount-- } } /*** 将数据重新放入信道 */ func (_this *emailSendStruct) seyChannels(info ChannelsInfo) { _this.channels <- info } //完成状态 如果map中包含消息下角标表明消息已完成发送 func (_this *emailSendStruct) SetSuccessMap(key string) { _this.successMapMutex.Lock() defer _this.successMapMutex.Unlock() _this.successMap[key] = 1 } //如果读取到完成信息直接销毁掉map func (_this *emailSendStruct) getSuccessMap(key string) bool { _this.successMapMutex.Lock() defer _this.successMapMutex.Unlock() _, ok := _this.successMap[key] if ok { delete(_this.successMap, key) _this.channelCount-- } return ok } //开始处理消息 func (_this *emailSendStruct) startSend() { for { info := <-_this.channels _this.manageFunction(info) } } //动态信道 func (_this *emailSendStruct) dynamic() { for { if _this.manageCount > _this.manageStartCount { //需要开启一条信道处理信息 _this.manageStartCount++ go func(_i int) { log.Println("动态队列开启:", _i) for { if _this.manageCount < _this.manageStartCount && _this.manageCount < _i { log.Println("动态队列关闭", _i) break } info := <-_this.channels _this.manageFunction(info) } _this.manageStartCount-- }(_this.manageStartCount) } time.Sleep(2 * time.Second) } } /** 调用处理方法 */ func (_this *emailSendStruct) manageFunction(info ChannelsInfo) { if _this.getSuccessMap(info.Key) { //信息处理完成下一条消息 return } refUser := ReflectApi(_this.serviceName, EmailSendFoudnMap) setNameMethod := refUser.MethodByName(_this.funcName) args := []reflect.Value{ reflect.ValueOf(info), } //构造一个类型为reflect.Value的切片 isErr, reflectValue := ReflectApiCall(&setNameMethod, args) if isErr || (!isErr && reflectValue == nil) { panic("服务注册异常") } var ok bool var isRepetition bool for key, value := range reflectValue { switch key { case 0: isRepetition, ok = value.Interface().(bool) break case 1: info, ok = value.Interface().(ChannelsInfo) break } if !ok { panic("服务返回参数注册异常") } } if isRepetition { //等待5S后放回信道中 _this.seyChannels(info) } return } type ChannelsInfo struct { Key string Info interface{} } /*! username 发送者邮件 password 授权码 host 主机地址 smtp.qq.com:587 或 smtp.qq.com:25 to 接收邮箱 多个接收邮箱使用 ; 隔开 name 发送人名称 subject 发送主题 body 发送内容 mailType 发送邮件内容类型 */ func SendMail(username, password, host, to, name, subject, body, mailType string) error { hp := strings.Split(host, ":") auth := mysmtp.LoginAuth(username, password, hp[0]) var contentType string if mailType == "html" { contentType = "Content-Type: text/" + mailType + "; charset=UTF-8" } else { contentType = "Content-Type: text/plain" + "; charset=UTF-8" } msg := []byte("To: " + to + "\r\nFrom: " + name + "<" + username + ">\r\nSubject: " + subject + "\r\n" + contentType + "\r\n\r\n" + body) sendTo := strings.Split(to, ";") err := mysmtp.SendMail(host, auth, username, sendTo, msg) return err }