SJA工艺
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

582 lines
16 KiB

package task
import (
"LAPP_SJA_ME/utils"
"LAPP_SJA_ME/web/middleware/glog"
"LAPP_SJA_ME/web/models"
"context"
"encoding/json"
uuid "github.com/iris-contrib/go.uuid"
"strings"
"time"
)
func CreateCacheTask() {
//第一步:查询所有得服务
me := new(models.Etltab)
me.Finr = 100
data, err := me.SelectAll()
if err != nil {
return
}
//第二步:给每个服务开启一个协程,
for _, v := range data {
taskChan <- v
}
//删除三天前的缓存数据
go DelCacheData()
//导入接口表
go ToLeadDataBase()
//创建继承Baxkground的子节点Context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
CreateCache(ctx)
}
var taskChan = make(chan models.Etltab, 100) //定义一个调度任务通道
//导入缓存数据库
func CreateCache(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case task, ok := <-taskChan:
if !ok {
glog.InfoExtln("调度生成终止")
return //停机退出
}
//第三步:基于协程,开启多个死循环,(为了负载均衡);并发执行
go dealTask(task)
}
}
}
//任务处理
func dealTask(task models.Etltab) {
for {
//第一步:每次查询出数据
data, err := SearchData(task)
//错误或者数据为空,跳出循环
if err != nil || (len(data) == 0) {
time.Sleep(10 * time.Second)
continue
}
//查询出主键,删除数据的时候备用
//TablePk, err := SearchTablePk(task.Fromtable)
//第二步:导入mongodb
for _, v := range data {
value, err := json.Marshal(v)
if err != nil {
glog.InfoExtln("buffer 数据导入", "err", err)
uuid, err := uuid.NewV1()
if err != nil {
continue
}
buffer := new(models.Buffer)
buffer.Totable = task.Totable
buffer.Eid = task.Eid
buffer.Finr = task.Finr
buffer.Status = "error"
buffer.Orderid = uuid.String()
buffer.Flag = 0
buffer.Funcspec = task.Funcspec
buffer.Data = "json转义错误"
buffer.Todb = task.Todb
buffer.Dbtype = task.Todbtype
buffer.Todrivername = task.Todrivername
buffer.Message = error.Error(err)
buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
buffer.InsertRecord()
continue
}
uuid, err := uuid.NewV1()
if err != nil {
continue
}
buffer := new(models.Buffer)
buffer.Totable = task.Totable
buffer.Eid = task.Eid
buffer.Finr = task.Finr
buffer.Orderid = uuid.String()
buffer.Status = "ok"
buffer.Flag = 0
buffer.Funcspec = task.Funcspec
buffer.Data = string(value)
buffer.Todb = task.Todb
buffer.Dbtype = task.Todbtype
buffer.Todrivername = task.Todrivername
buffer.Message = ""
buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
buffer.InsertRecord()
}
time.Sleep(10 * time.Second) //等待一秒,再执行下一循环
}
}
/******************************************************************************
*
* @Function Name :
*-----------------------------------------------------------------------------
*
* @Description : 逻辑:查询信息,分为多表查询和单表查询 ChooseType = 1 为多表
*
* @Function Parameters: task models.Etltab
*
* @Return Value :
*
* @Author : Lou Wenzhi
*
* @Date : 2021/3/10 14:53
*
******************************************************************************/
func SearchData(task models.Etltab) ([]map[string]interface{}, error) {
//选择对应的数据库
e := models.SearchDb(task.Fromdb, task.Fromdrivername, task.Fromdbtype)
if task.ChooseType == "1" {
//以sql原生查询数据
sql := task.Sqlshow
switch task.SearchType {
case 1:
if task.Todrivername == "mssql" {
data := make([]map[string]interface{}, 0)
sql = sql + " order by " + task.SearchTime
err := e.SQL(sql).Find(&data)
if err != nil {
return nil, err
}
/*****判断数据长度,为零返回******/
if len(data) == 0 {
return nil, nil
}
//更新,锁定状态
msgids := make([]string, 0)
for _, v := range data {
msgid := utils.ValueToString(v[task.SearchFiled], "")
msgids = append(msgids, msgid)
}
_, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
if err != nil {
glog.InfoExtln("ETL导出错误", "err", err)
return nil, err
}
return data, nil
} else if task.Todrivername == "mysql" {
data := make([]map[string]interface{}, 0)
sql = sql + " order by " + task.SearchTime
err := e.SQL(sql).Find(&data)
if err != nil {
return nil, err
}
/*****判断数据长度,为零返回******/
if len(data) == 0 {
return nil, nil
}
//更新,锁定状态
msgids := make([]string, 0)
for _, v := range data {
msgid := utils.ValueToString(v[task.SearchFiled], "")
msgids = append(msgids, msgid)
}
_, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
if err != nil {
glog.InfoExtln("ETL导出错误", "err", err)
return nil, err
}
return data, nil
} else {
//暂无
}
}
} else {
switch task.SearchType {
case 1: //flag查询
data := make([]map[string]interface{}, 0)
err := e.Table(task.Fromtable).Where("retstartstatus != 1 ").Limit(1000).Desc(task.SearchTime).Find(&data)
if err != nil {
glog.InfoExtln("ETL导出错误", "err", err)
return nil, err
}
if len(data) == 0 {
return nil, nil
}
//更新,锁定状态
msgids := make([]string, 0)
for _, v := range data {
msgid := utils.ValueToString(v[task.SearchFiled], "")
msgids = append(msgids, msgid)
}
_, err = e.Table(task.Fromtable).Cols("retstartstatus").In("msgid", msgids).Update(&map[string]interface{}{"retstartstatus": 1})
if err != nil {
glog.InfoExtln("ETL导出错误", "err", err)
return nil, err
}
return data, nil
default:
}
}
return nil, nil
}
//查询字段属性
func SearchTable(dbname string, drivername string, tablename string, dbtype string) ([]models.EtlTabData, error) {
datalist := make([]models.EtlTabData, 0)
e := models.SearchDb(dbname, drivername, dbtype)
//获取当前导入表的主键
err := e.SQL("SELECT COLUMN_NAME as tabmapcol,DATA_TYPE as coltype FROM INFORMATION_SCHEMA.columns WHERE TABLE_NAME=?", tablename).Find(&datalist)
if err != nil {
return nil, err
}
return datalist, nil
}
type TablePk struct {
TabName string
Colname string
}
//查询字段属性
func SearchTablePk(dbname string, drivername string, tablename string, dbtype string) ([]models.TablePk, error) {
datalist := make([]models.TablePk, 0)
e := models.SearchDb(dbname, drivername, dbtype)
//获取当前导入表的主键
err := e.SQL("SELECT TABLE_NAME as tabname,COLUMN_NAME as colname FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME=?", tablename).Find(&datalist)
if err != nil {
return nil, err
}
return datalist, nil
}
//导入从库
func ToLeadDataBase() {
for {
//第一步:开启一个死循环,每次读取flag = 0,500条数据
me := new(models.Buffer)
data := me.FindData()
if len(data) == 0 {
time.Sleep(1 * time.Second)
continue
}
glog.InfoExtln("UpdatePlnWorkorder", "len(data):", len(data))
//第二步:导入从库,
for _, v := range data {
//判断执行次数
v.UpdateDataTimes()
//判断是否是特殊执行方法
if !utils.ValueIsEmpty(v.Funcspec) {
//解析数据
tems := make(map[string]interface{})
//json解析
err := json.Unmarshal([]byte(v.Data), &tems)
if err != nil {
glog.InfoExtln("etl导入从库err", "Unmarshal err is :", err)
continue
}
switch v.Funcspec {
case "UpdatePlnWorkorder":
finr := v.Finr
workordernr := utils.ValueToString(tems["ordernr"], "")
glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
status := utils.ValueToInt(tems["status"], 0)
tem := utils.ValueToString(tems["optime"], "")
temtime, err := time.Parse("2006-01-02T15:04:05Z07:00", tem)
if err != nil {
glog.InfoExtln("UpdatePlnWorkorder", "temtime err is :", err)
continue
}
optime := temtime.Format("20060102150405")
glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
err = UpdatePlnWorkorder(finr, workordernr, status, optime)
if err != nil {
glog.InfoExtln("UpdatePlnWorkorder", "UpdatePlnWorkorder err is :", err)
continue
}
//更新mongdb数据flag
//err = v.UpdateData()
//if err != nil {
// glog.Info("更新flag错误日志:%v", err)
// continue
//}
err = v.DeleteoneRecord()
if err != nil {
glog.InfoExtln("etl导入从库", "err is :", err)
continue
}
case "UpdatePlnBatchorder":
finr := v.Finr
batchordernr := utils.ValueToString(tems["ordernr"], "")
status := utils.ValueToInt(tems["status"], 0)
actqty := utils.ValueToInt(tems["actqty"], 0)
tem := utils.ValueToString(tems["optime"], "")
temtime, _ := time.Parse("2006-01-02T15:04:05Z07:00", tem)
optime := temtime.Format("20060102150405")
err = UpdatePlnBatchorder(finr, batchordernr, status, actqty, optime)
if err != nil {
glog.InfoExtln("UpdatePlnBatchorder", "UpdatePlnBatchorder err is :", err)
continue
}
//更新mongdb数据flag
//err = v.UpdateData()
//if err != nil {
// glog.Info("更新flag错误日志:%v", err)
// continue
//}
err = v.DeleteoneRecord()
if err != nil {
glog.InfoExtln("etl导入从库", "err is :", err)
continue
}
}
} else {
//1.查询导入规则
etl := new(models.Etltablst)
etl.Finr = utils.ValueToInt(v.Finr, 0)
etl.Eid = utils.ValueToInt(v.Eid, 0)
etltabs, err := etl.SelectLst()
if err != nil {
glog.InfoExtln("etl导入从库", "err1 is :", err)
continue
}
etlMap := make(map[string]models.Etltablst)
for _, etlv := range etltabs {
key := etlv.Field
etlMap[key] = etlv
}
//查询字段属性
datalist, err := SearchTable(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
if err != nil {
glog.InfoExtln("etl导入从库", "err2 is :", err)
continue
}
tablefiled := make(map[string]string)
for _, vv := range datalist {
key := vv.Tabmapcol
tablefiled[key] = vv.Coltype
}
//解析数据
tems := make(map[string]interface{})
//表名
tablename := v.Totable
//拼接
keystr := ""
valstr := ""
wherestr := ""
where := ""
//json解析
err = json.Unmarshal([]byte(v.Data), &tems)
if err != nil {
glog.InfoExtln("etl导入从库", "Unmarshal err is :", err)
continue
}
//3.对解析出来的数据进行处理
tem := make(map[string]interface{})
for kk, vv := range tems {
key := kk
val, ok := etlMap[key]
if ok {
mapKey := val.ToField
mapVal := vv
if val.Funcspec == "TimeParse" {
vvstr := utils.ValueToString(vv, "")
Valtime, err := utils.TimeParse(strings.TrimSpace(vvstr))
mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
if err != nil {
continue
}
}
if val.Funcspec == "DateParse" {
vvstr := utils.ValueToString(vv, "")
Valtime, err := utils.TimeParseyyyyMMdd(strings.TrimSpace(vvstr))
mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
if err != nil {
continue
}
}
if val.Funcspec == "ValueToInt" {
mapVal = utils.ValueToInt(vv, 0)
}
if !utils.ValueIsEmpty(val.Defaultvalue) {
tem[mapKey] = val.Defaultvalue
} else {
tem[mapKey] = mapVal
}
}
}
//获取当前导入表的主键
TablePk, err := SearchTablePk(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
if err != nil {
glog.InfoExtln("etl导入从库", "err is :", err)
continue
}
//查询主键
temPk := make(map[string]interface{})
for _, v := range TablePk {
key := v.Colname
temPk[key] = v
value, ok := tablefiled[key]
if ok {
if value == "int" || value == "smallint" || value == "bigint" {
intval := utils.ValueToInt(tem[key], 0)
where += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
} else if value == "decimal" {
floatval := utils.ValueToFloat(tem[key], 0.0)
where += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
} else {
val := utils.ValueToString(tem[key], "")
where += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
}
} else {
continue
}
}
//去掉最后边的逗号
where = strings.Trim(where, " AND ")
e := models.SearchDb(v.Todb, v.Todrivername, v.Dbtype)
//查询数据
dataone := make(map[string]interface{})
ok, err := e.Table(tablename).Where(where).Get(&dataone)
if err != nil {
glog.InfoExtln("etl导入从库", "err is :", err)
continue
}
if !ok {
//记录字段,插入
for key, val := range tablefiled {
keystr += key + ","
value, ok := tem[key]
if ok {
if val == "int" || val == "smallint" || value == "bigint" {
intval := utils.ValueToInt(value, 0)
valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
} else if val == "decimal" {
floatval := utils.ValueToFloat(value, 0.0)
valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
} else if val == "datetime" {
val := utils.ValueToString(value, "")
valstr += "'" + strings.TrimSpace(val) + "'" + ","
} else {
val := utils.ValueToString(value, "")
valstr += "'" + strings.TrimSpace(val) + "'" + ","
}
} else {
val := utils.ValueToString(value, "")
valstr += "'" + strings.TrimSpace(val) + "'" + ","
}
}
//去掉最后边的逗号
keystr = strings.Trim(keystr, ",")
valstr = strings.Trim(valstr, ",")
sql := "INSERT INTO " + tablename + " (" + keystr + ") values (" + valstr + ")"
glog.InfoExtln("etl导入打印", "sql is :", sql)
if v.Status != "ok" {
continue
}
to := new(models.Etltab)
to.Todb = v.Todb
to.Todrivername = v.Todrivername
err = to.ToLeadSlave(sql)
if err != nil {
glog.InfoExtln("etl导入从库", "UpdatePlnBatchorder err is :", err)
continue
}
//更新mongdb数据flag
//err = v.UpdateData()
//if err != nil {
// glog.Info("更新flag错误日志:%v", err)
//}
err = v.DeleteoneRecord()
if err != nil {
glog.InfoExtln("etl导入从库", "err is :", err)
continue
}
} else {
//更新内容
//第二步构建sql语句
for key, val := range tablefiled {
value, ok := tem[key]
_, res := temPk[key]
if ok {
keystr += key + ","
if val == "int" || val == "smallint" || value == "bigint" {
intval := utils.ValueToInt(value, 0)
valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
if res {
wherestr += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
}
} else if val == "decimal" {
floatval := utils.ValueToFloat(value, 0.0)
valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
if res {
wherestr += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
}
} else {
val := utils.ValueToString(value, "")
valstr += "'" + strings.TrimSpace(val) + "'" + ","
if res {
wherestr += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
}
}
} else {
continue
}
}
//去掉最后边的逗号
wherestr = strings.Trim(wherestr, " AND ")
valstr = strings.Trim(valstr, ",")
//2.开始导入
sql := "UPDATE " + tablename + " SET " + valstr + " WHERE " + wherestr
//更新
if v.Status != "ok" {
continue
}
to := new(models.Etltab)
err = to.ToLeadSlave(sql)
if err != nil {
glog.InfoExtln("etl导入从库", "err is :", err)
continue
}
//更新mongdb数据flag
//err = v.UpdateData()
//if err != nil {
// glog.Info("更新flag错误日志:%v", err)
// continue
//}
//删除mongdb
err = v.DeleteoneRecord()
if err != nil {
glog.InfoExtln("etl导入从库", "err is :", err)
continue
}
}
}
}
time.Sleep(1 * time.Second)
}
}