package utils
|
|
|
|
import (
|
|
"log"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var EmailSendStruct = make(map[string]*emailSendStruct)
|
|
var EmailSendFoudnMap = make(map[string]interface{})
|
|
|
|
type emailSendStruct struct {
|
|
channels chan ChannelsInfo
|
|
channelCount int
|
|
successMap map[string]int //完成状态 如果map中包含消息下角标表明消息已完成发送
|
|
funcName string
|
|
successMapMutex sync.Mutex
|
|
manageCount int //需要开启消费者的数量
|
|
manageStartCount int //已经开启消费者的数量
|
|
manageMaxCount int //最大开启数量
|
|
manageAverage int //开启,关闭一条信道基数
|
|
serviceName string //使用业务层
|
|
}
|
|
|
|
func NewEmailSendStruct(maxCount int, serviceName string, manageAverage int, funcName string) *emailSendStruct {
|
|
model := &emailSendStruct{}
|
|
model.channels = make(chan ChannelsInfo)
|
|
model.channelCount = 0
|
|
model.manageCount = 1
|
|
model.manageStartCount = 1
|
|
model.manageMaxCount = maxCount
|
|
model.successMap = make(map[string]int)
|
|
model.successMapMutex = sync.Mutex{}
|
|
model.serviceName = serviceName
|
|
model.manageAverage = manageAverage
|
|
model.funcName = funcName
|
|
//开启主信道消费
|
|
go model.startSend()
|
|
//开启动态信道消费
|
|
go model.dynamic()
|
|
return model
|
|
}
|
|
|
|
/***
|
|
返回消息数量
|
|
*/
|
|
func (_this *emailSendStruct) GetChannelCount() int {
|
|
return _this.channelCount
|
|
}
|
|
|
|
/***
|
|
第一次写入信道
|
|
*/
|
|
func (_this *emailSendStruct) SeyChannels(info ChannelsInfo) {
|
|
_this.channels <- info
|
|
_this.channelCount++
|
|
if _this.channelCount > (_this.manageStartCount+1)*_this.manageAverage && _this.manageCount < _this.manageMaxCount {
|
|
//需要开启一条新通道
|
|
_this.manageCount++
|
|
}
|
|
if _this.channelCount < (_this.manageStartCount-1)*_this.manageAverage {
|
|
_this.manageCount--
|
|
}
|
|
}
|
|
|
|
/***
|
|
将数据重新放入信道
|
|
*/
|
|
func (_this *emailSendStruct) seyChannels(info ChannelsInfo) {
|
|
_this.channels <- info
|
|
}
|
|
|
|
//完成状态 如果map中包含消息下角标表明消息已完成发送
|
|
func (_this *emailSendStruct) SetSuccessMap(key string) {
|
|
_this.successMapMutex.Lock()
|
|
defer _this.successMapMutex.Unlock()
|
|
_this.successMap[key] = 1
|
|
}
|
|
|
|
//如果读取到完成信息直接销毁掉map
|
|
func (_this *emailSendStruct) getSuccessMap(key string) bool {
|
|
_this.successMapMutex.Lock()
|
|
defer _this.successMapMutex.Unlock()
|
|
_, ok := _this.successMap[key]
|
|
if ok {
|
|
delete(_this.successMap, key)
|
|
_this.channelCount--
|
|
}
|
|
return ok
|
|
}
|
|
|
|
//开始处理消息
|
|
func (_this *emailSendStruct) startSend() {
|
|
for {
|
|
info := <-_this.channels
|
|
_this.manageFunction(info)
|
|
}
|
|
|
|
}
|
|
|
|
//动态信道
|
|
func (_this *emailSendStruct) dynamic() {
|
|
for {
|
|
if _this.manageCount > _this.manageStartCount {
|
|
//需要开启一条信道处理信息
|
|
_this.manageStartCount++
|
|
go func(_i int) {
|
|
log.Println("动态队列开启:", _i)
|
|
for {
|
|
if _this.manageCount < _this.manageStartCount && _this.manageCount < _i {
|
|
log.Println("动态队列关闭", _i)
|
|
break
|
|
}
|
|
info := <-_this.channels
|
|
_this.manageFunction(info)
|
|
|
|
}
|
|
_this.manageStartCount--
|
|
}(_this.manageStartCount)
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
}
|
|
|
|
/**
|
|
调用处理方法
|
|
*/
|
|
func (_this *emailSendStruct) manageFunction(info ChannelsInfo) {
|
|
if _this.getSuccessMap(info.Key) {
|
|
//信息处理完成下一条消息
|
|
return
|
|
}
|
|
refUser := ReflectApi(_this.serviceName, EmailSendFoudnMap)
|
|
setNameMethod := refUser.MethodByName(_this.funcName)
|
|
args := []reflect.Value{
|
|
reflect.ValueOf(info),
|
|
} //构造一个类型为reflect.Value的切片
|
|
isErr, reflectValue := ReflectApiCall(&setNameMethod, args)
|
|
if isErr || (!isErr && reflectValue == nil) {
|
|
panic("服务注册异常")
|
|
}
|
|
var ok bool
|
|
var isRepetition bool
|
|
for key, value := range reflectValue {
|
|
switch key {
|
|
case 0:
|
|
isRepetition, ok = value.Interface().(bool)
|
|
break
|
|
case 1:
|
|
info, ok = value.Interface().(ChannelsInfo)
|
|
break
|
|
}
|
|
if !ok {
|
|
panic("服务返回参数注册异常")
|
|
}
|
|
}
|
|
if isRepetition {
|
|
//等待5S后放回信道中
|
|
_this.seyChannels(info)
|
|
}
|
|
return
|
|
}
|
|
|
|
type ChannelsInfo struct {
|
|
Key string
|
|
Info interface{}
|
|
}
|