广汽安道拓Acura项目MES后台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

168 lines
4.0 KiB

package utils
import (
"log"
"reflect"
"sync"
"time"
)
var EmailSendStruct = make(map[string]*emailSendStruct)
var EmailSendFoudnMap = make(map[string]interface{})
type emailSendStruct struct {
channels chan ChannelsInfo
channelCount int
successMap map[string]int //完成状态 如果map中包含消息下角标表明消息已完成发送
funcName string
successMapMutex sync.Mutex
manageCount int //需要开启消费者的数量
manageStartCount int //已经开启消费者的数量
manageMaxCount int //最大开启数量
manageAverage int //开启,关闭一条信道基数
serviceName string //使用业务层
}
func NewEmailSendStruct(maxCount int, serviceName string, manageAverage int, funcName string) *emailSendStruct {
model := &emailSendStruct{}
model.channels = make(chan ChannelsInfo)
model.channelCount = 0
model.manageCount = 1
model.manageStartCount = 1
model.manageMaxCount = maxCount
model.successMap = make(map[string]int)
model.successMapMutex = sync.Mutex{}
model.serviceName = serviceName
model.manageAverage = manageAverage
model.funcName = funcName
//开启主信道消费
go model.startSend()
//开启动态信道消费
go model.dynamic()
return model
}
/***
返回消息数量
*/
func (_this *emailSendStruct) GetChannelCount() int {
return _this.channelCount
}
/***
第一次写入信道
*/
func (_this *emailSendStruct) SeyChannels(info ChannelsInfo) {
_this.channels <- info
_this.channelCount++
if _this.channelCount > (_this.manageStartCount+1)*_this.manageAverage && _this.manageCount < _this.manageMaxCount {
//需要开启一条新通道
_this.manageCount++
}
if _this.channelCount < (_this.manageStartCount-1)*_this.manageAverage {
_this.manageCount--
}
}
/***
将数据重新放入信道
*/
func (_this *emailSendStruct) seyChannels(info ChannelsInfo) {
_this.channels <- info
}
//完成状态 如果map中包含消息下角标表明消息已完成发送
func (_this *emailSendStruct) SetSuccessMap(key string) {
_this.successMapMutex.Lock()
defer _this.successMapMutex.Unlock()
_this.successMap[key] = 1
}
//如果读取到完成信息直接销毁掉map
func (_this *emailSendStruct) getSuccessMap(key string) bool {
_this.successMapMutex.Lock()
defer _this.successMapMutex.Unlock()
_, ok := _this.successMap[key]
if ok {
delete(_this.successMap, key)
_this.channelCount--
}
return ok
}
//开始处理消息
func (_this *emailSendStruct) startSend() {
for {
info := <-_this.channels
_this.manageFunction(info)
}
}
//动态信道
func (_this *emailSendStruct) dynamic() {
for {
if _this.manageCount > _this.manageStartCount {
//需要开启一条信道处理信息
_this.manageStartCount++
go func(_i int) {
log.Println("动态队列开启:", _i)
for {
if _this.manageCount < _this.manageStartCount && _this.manageCount < _i {
log.Println("动态队列关闭", _i)
break
}
info := <-_this.channels
_this.manageFunction(info)
}
_this.manageStartCount--
}(_this.manageStartCount)
}
time.Sleep(2 * time.Second)
}
}
/**
调用处理方法
*/
func (_this *emailSendStruct) manageFunction(info ChannelsInfo) {
if _this.getSuccessMap(info.Key) {
//信息处理完成下一条消息
return
}
refUser := ReflectApi(_this.serviceName, EmailSendFoudnMap)
setNameMethod := refUser.MethodByName(_this.funcName)
args := []reflect.Value{
reflect.ValueOf(info),
} //构造一个类型为reflect.Value的切片
isErr, reflectValue := ReflectApiCall(&setNameMethod, args)
if isErr || (!isErr && reflectValue == nil) {
panic("服务注册异常")
}
var ok bool
var isRepetition bool
for key, value := range reflectValue {
switch key {
case 0:
isRepetition, ok = value.Interface().(bool)
break
case 1:
info, ok = value.Interface().(ChannelsInfo)
break
}
if !ok {
panic("服务返回参数注册异常")
}
}
if isRepetition {
//等待5S后放回信道中
_this.seyChannels(info)
}
return
}
type ChannelsInfo struct {
Key string
Info interface{}
}