|
|
- package utils
-
- import (
- "log"
- "reflect"
- "sync"
- "time"
- )
-
- var EmailSendStruct = make(map[string]*emailSendStruct)
- var EmailSendFoudnMap = make(map[string]interface{})
-
- type emailSendStruct struct {
- channels chan ChannelsInfo
- channelCount int
- successMap map[string]int //完成状态 如果map中包含消息下角标表明消息已完成发送
- funcName string
- successMapMutex sync.Mutex
- manageCount int //需要开启消费者的数量
- manageStartCount int //已经开启消费者的数量
- manageMaxCount int //最大开启数量
- manageAverage int //开启,关闭一条信道基数
- serviceName string //使用业务层
- }
-
- func NewEmailSendStruct(maxCount int, serviceName string, manageAverage int, funcName string) *emailSendStruct {
- model := &emailSendStruct{}
- model.channels = make(chan ChannelsInfo)
- model.channelCount = 0
- model.manageCount = 1
- model.manageStartCount = 1
- model.manageMaxCount = maxCount
- model.successMap = make(map[string]int)
- model.successMapMutex = sync.Mutex{}
- model.serviceName = serviceName
- model.manageAverage = manageAverage
- model.funcName = funcName
- //开启主信道消费
- go model.startSend()
- //开启动态信道消费
- go model.dynamic()
- return model
- }
-
- /***
- 返回消息数量
- */
- func (_this *emailSendStruct) GetChannelCount() int {
- return _this.channelCount
- }
-
- /***
- 第一次写入信道
- */
- func (_this *emailSendStruct) SeyChannels(info ChannelsInfo) {
- _this.channels <- info
- _this.channelCount++
- if _this.channelCount > (_this.manageStartCount+1)*_this.manageAverage && _this.manageCount < _this.manageMaxCount {
- //需要开启一条新通道
- _this.manageCount++
- }
- if _this.channelCount < (_this.manageStartCount-1)*_this.manageAverage {
- _this.manageCount--
- }
- }
-
- /***
- 将数据重新放入信道
- */
- func (_this *emailSendStruct) seyChannels(info ChannelsInfo) {
- _this.channels <- info
- }
-
- //完成状态 如果map中包含消息下角标表明消息已完成发送
- func (_this *emailSendStruct) SetSuccessMap(key string) {
- _this.successMapMutex.Lock()
- defer _this.successMapMutex.Unlock()
- _this.successMap[key] = 1
- }
-
- //如果读取到完成信息直接销毁掉map
- func (_this *emailSendStruct) getSuccessMap(key string) bool {
- _this.successMapMutex.Lock()
- defer _this.successMapMutex.Unlock()
- _, ok := _this.successMap[key]
- if ok {
- delete(_this.successMap, key)
- _this.channelCount--
- }
- return ok
- }
-
- //开始处理消息
- func (_this *emailSendStruct) startSend() {
- for {
- info := <-_this.channels
- _this.manageFunction(info)
- }
-
- }
-
- //动态信道
- func (_this *emailSendStruct) dynamic() {
- for {
- if _this.manageCount > _this.manageStartCount {
- //需要开启一条信道处理信息
- _this.manageStartCount++
- go func(_i int) {
- log.Println("动态队列开启:", _i)
- for {
- if _this.manageCount < _this.manageStartCount && _this.manageCount < _i {
- log.Println("动态队列关闭", _i)
- break
- }
- info := <-_this.channels
- _this.manageFunction(info)
-
- }
- _this.manageStartCount--
- }(_this.manageStartCount)
- }
- time.Sleep(2 * time.Second)
- }
- }
-
- /**
- 调用处理方法
- */
- func (_this *emailSendStruct) manageFunction(info ChannelsInfo) {
- if _this.getSuccessMap(info.Key) {
- //信息处理完成下一条消息
- return
- }
- refUser := ReflectApi(_this.serviceName, EmailSendFoudnMap)
- setNameMethod := refUser.MethodByName(_this.funcName)
- args := []reflect.Value{
- reflect.ValueOf(info),
- } //构造一个类型为reflect.Value的切片
- isErr, reflectValue := ReflectApiCall(&setNameMethod, args)
- if isErr || (!isErr && reflectValue == nil) {
- panic("服务注册异常")
- }
- var ok bool
- var isRepetition bool
- for key, value := range reflectValue {
- switch key {
- case 0:
- isRepetition, ok = value.Interface().(bool)
- break
- case 1:
- info, ok = value.Interface().(ChannelsInfo)
- break
- }
- if !ok {
- panic("服务返回参数注册异常")
- }
- }
- if isRepetition {
- //等待5S后放回信道中
- _this.seyChannels(info)
- }
- return
- }
-
- type ChannelsInfo struct {
- Key string
- Info interface{}
- }
|