package task
|
|
|
|
import (
|
|
"LAPP_SJA_ME/utils"
|
|
"LAPP_SJA_ME/web/middleware/glog"
|
|
"LAPP_SJA_ME/web/models"
|
|
"context"
|
|
"encoding/json"
|
|
uuid "github.com/iris-contrib/go.uuid"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
func CreateCacheTask() {
|
|
|
|
//第一步:查询所有得服务
|
|
me := new(models.Etltab)
|
|
me.Finr = 100
|
|
data, err := me.SelectAll()
|
|
if err != nil {
|
|
return
|
|
}
|
|
//第二步:给每个服务开启一个协程,
|
|
for _, v := range data {
|
|
taskChan <- v
|
|
}
|
|
|
|
//删除三天前的缓存数据
|
|
go DelCacheData()
|
|
|
|
//导入接口表
|
|
go ToLeadDataBase()
|
|
|
|
//创建继承Baxkground的子节点Context
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
CreateCache(ctx)
|
|
}
|
|
|
|
var taskChan = make(chan models.Etltab, 100) //定义一个调度任务通道
|
|
|
|
//导入缓存数据库
|
|
func CreateCache(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task, ok := <-taskChan:
|
|
|
|
if !ok {
|
|
glog.InfoExtln("调度生成终止")
|
|
return //停机退出
|
|
}
|
|
|
|
//第三步:基于协程,开启多个死循环,(为了负载均衡);并发执行
|
|
go dealTask(task)
|
|
}
|
|
}
|
|
}
|
|
|
|
//任务处理
|
|
func dealTask(task models.Etltab) {
|
|
for {
|
|
//第一步:每次查询出数据
|
|
|
|
data, err := SearchData(task)
|
|
//错误或者数据为空,跳出循环
|
|
if err != nil || (len(data) == 0) {
|
|
time.Sleep(10 * time.Second)
|
|
continue
|
|
}
|
|
|
|
//查询出主键,删除数据的时候备用
|
|
//TablePk, err := SearchTablePk(task.Fromtable)
|
|
//第二步:导入mongodb
|
|
for _, v := range data {
|
|
|
|
value, err := json.Marshal(v)
|
|
if err != nil {
|
|
glog.InfoExtln("buffer 数据导入", "err", err)
|
|
uuid, err := uuid.NewV1()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
buffer := new(models.Buffer)
|
|
buffer.Totable = task.Totable
|
|
buffer.Eid = task.Eid
|
|
buffer.Finr = task.Finr
|
|
buffer.Status = "error"
|
|
buffer.Orderid = uuid.String()
|
|
buffer.Flag = 0
|
|
buffer.Funcspec = task.Funcspec
|
|
buffer.Data = "json转义错误"
|
|
buffer.Todb = task.Todb
|
|
buffer.Dbtype = task.Todbtype
|
|
buffer.Todrivername = task.Todrivername
|
|
buffer.Message = error.Error(err)
|
|
buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
|
|
buffer.InsertRecord()
|
|
continue
|
|
}
|
|
uuid, err := uuid.NewV1()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
buffer := new(models.Buffer)
|
|
buffer.Totable = task.Totable
|
|
buffer.Eid = task.Eid
|
|
buffer.Finr = task.Finr
|
|
buffer.Orderid = uuid.String()
|
|
buffer.Status = "ok"
|
|
buffer.Flag = 0
|
|
buffer.Funcspec = task.Funcspec
|
|
buffer.Data = string(value)
|
|
buffer.Todb = task.Todb
|
|
buffer.Dbtype = task.Todbtype
|
|
buffer.Todrivername = task.Todrivername
|
|
buffer.Message = ""
|
|
buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
|
|
buffer.InsertRecord()
|
|
}
|
|
|
|
time.Sleep(10 * time.Second) //等待一秒,再执行下一循环
|
|
}
|
|
}
|
|
|
|
/******************************************************************************
|
|
*
|
|
* @Function Name :
|
|
*-----------------------------------------------------------------------------
|
|
*
|
|
* @Description : 逻辑:查询信息,分为多表查询和单表查询 ChooseType = 1 为多表
|
|
*
|
|
* @Function Parameters: task models.Etltab
|
|
*
|
|
* @Return Value :
|
|
*
|
|
* @Author : Lou Wenzhi
|
|
*
|
|
* @Date : 2021/3/10 14:53
|
|
*
|
|
******************************************************************************/
|
|
func SearchData(task models.Etltab) ([]map[string]interface{}, error) {
|
|
//选择对应的数据库
|
|
e := models.SearchDb(task.Fromdb, task.Fromdrivername, task.Fromdbtype)
|
|
if task.ChooseType == "1" {
|
|
//以sql原生查询数据
|
|
sql := task.Sqlshow
|
|
switch task.SearchType {
|
|
case 1:
|
|
if task.Todrivername == "mssql" {
|
|
data := make([]map[string]interface{}, 0)
|
|
sql = sql + " order by " + task.SearchTime
|
|
err := e.SQL(sql).Find(&data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
/*****判断数据长度,为零返回******/
|
|
if len(data) == 0 {
|
|
return nil, nil
|
|
}
|
|
//更新,锁定状态
|
|
msgids := make([]string, 0)
|
|
for _, v := range data {
|
|
msgid := utils.ValueToString(v[task.SearchFiled], "")
|
|
msgids = append(msgids, msgid)
|
|
}
|
|
_, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
|
|
if err != nil {
|
|
glog.InfoExtln("ETL导出错误", "err", err)
|
|
return nil, err
|
|
}
|
|
return data, nil
|
|
} else if task.Todrivername == "mysql" {
|
|
data := make([]map[string]interface{}, 0)
|
|
sql = sql + " order by " + task.SearchTime
|
|
err := e.SQL(sql).Find(&data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
/*****判断数据长度,为零返回******/
|
|
if len(data) == 0 {
|
|
return nil, nil
|
|
}
|
|
//更新,锁定状态
|
|
msgids := make([]string, 0)
|
|
for _, v := range data {
|
|
msgid := utils.ValueToString(v[task.SearchFiled], "")
|
|
msgids = append(msgids, msgid)
|
|
}
|
|
_, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
|
|
if err != nil {
|
|
glog.InfoExtln("ETL导出错误", "err", err)
|
|
return nil, err
|
|
}
|
|
return data, nil
|
|
} else {
|
|
//暂无
|
|
}
|
|
}
|
|
|
|
} else {
|
|
switch task.SearchType {
|
|
case 1: //flag查询
|
|
data := make([]map[string]interface{}, 0)
|
|
err := e.Table(task.Fromtable).Where("retstartstatus != 1 ").Limit(1000).Desc(task.SearchTime).Find(&data)
|
|
if err != nil {
|
|
glog.InfoExtln("ETL导出错误", "err", err)
|
|
return nil, err
|
|
}
|
|
if len(data) == 0 {
|
|
return nil, nil
|
|
}
|
|
//更新,锁定状态
|
|
msgids := make([]string, 0)
|
|
for _, v := range data {
|
|
msgid := utils.ValueToString(v[task.SearchFiled], "")
|
|
msgids = append(msgids, msgid)
|
|
}
|
|
_, err = e.Table(task.Fromtable).Cols("retstartstatus").In("msgid", msgids).Update(&map[string]interface{}{"retstartstatus": 1})
|
|
if err != nil {
|
|
glog.InfoExtln("ETL导出错误", "err", err)
|
|
return nil, err
|
|
}
|
|
return data, nil
|
|
default:
|
|
}
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
//查询字段属性
|
|
func SearchTable(dbname string, drivername string, tablename string, dbtype string) ([]models.EtlTabData, error) {
|
|
datalist := make([]models.EtlTabData, 0)
|
|
e := models.SearchDb(dbname, drivername, dbtype)
|
|
//获取当前导入表的主键
|
|
err := e.SQL("SELECT COLUMN_NAME as tabmapcol,DATA_TYPE as coltype FROM INFORMATION_SCHEMA.columns WHERE TABLE_NAME=?", tablename).Find(&datalist)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return datalist, nil
|
|
}
|
|
|
|
type TablePk struct {
|
|
TabName string
|
|
Colname string
|
|
}
|
|
|
|
//查询字段属性
|
|
func SearchTablePk(dbname string, drivername string, tablename string, dbtype string) ([]models.TablePk, error) {
|
|
datalist := make([]models.TablePk, 0)
|
|
e := models.SearchDb(dbname, drivername, dbtype)
|
|
//获取当前导入表的主键
|
|
err := e.SQL("SELECT TABLE_NAME as tabname,COLUMN_NAME as colname FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME=?", tablename).Find(&datalist)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return datalist, nil
|
|
}
|
|
|
|
//导入从库
|
|
func ToLeadDataBase() {
|
|
for {
|
|
//第一步:开启一个死循环,每次读取flag = 0,500条数据
|
|
me := new(models.Buffer)
|
|
data := me.FindData()
|
|
if len(data) == 0 {
|
|
time.Sleep(1 * time.Second)
|
|
continue
|
|
}
|
|
glog.InfoExtln("UpdatePlnWorkorder", "len(data):", len(data))
|
|
//第二步:导入从库,
|
|
for _, v := range data {
|
|
//判断执行次数
|
|
v.UpdateDataTimes()
|
|
//判断是否是特殊执行方法
|
|
if !utils.ValueIsEmpty(v.Funcspec) {
|
|
//解析数据
|
|
tems := make(map[string]interface{})
|
|
//json解析
|
|
err := json.Unmarshal([]byte(v.Data), &tems)
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库err", "Unmarshal err is :", err)
|
|
continue
|
|
}
|
|
switch v.Funcspec {
|
|
case "UpdatePlnWorkorder":
|
|
finr := v.Finr
|
|
workordernr := utils.ValueToString(tems["ordernr"], "")
|
|
glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
|
|
status := utils.ValueToInt(tems["status"], 0)
|
|
tem := utils.ValueToString(tems["optime"], "")
|
|
temtime, err := time.Parse("2006-01-02T15:04:05Z07:00", tem)
|
|
if err != nil {
|
|
glog.InfoExtln("UpdatePlnWorkorder", "temtime err is :", err)
|
|
continue
|
|
}
|
|
optime := temtime.Format("20060102150405")
|
|
glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
|
|
err = UpdatePlnWorkorder(finr, workordernr, status, optime)
|
|
if err != nil {
|
|
glog.InfoExtln("UpdatePlnWorkorder", "UpdatePlnWorkorder err is :", err)
|
|
continue
|
|
}
|
|
//更新mongdb数据flag
|
|
//err = v.UpdateData()
|
|
//if err != nil {
|
|
// glog.Info("更新flag错误日志:%v", err)
|
|
// continue
|
|
//}
|
|
err = v.DeleteoneRecord()
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err is :", err)
|
|
continue
|
|
}
|
|
case "UpdatePlnBatchorder":
|
|
finr := v.Finr
|
|
batchordernr := utils.ValueToString(tems["ordernr"], "")
|
|
status := utils.ValueToInt(tems["status"], 0)
|
|
actqty := utils.ValueToInt(tems["actqty"], 0)
|
|
tem := utils.ValueToString(tems["optime"], "")
|
|
temtime, _ := time.Parse("2006-01-02T15:04:05Z07:00", tem)
|
|
optime := temtime.Format("20060102150405")
|
|
err = UpdatePlnBatchorder(finr, batchordernr, status, actqty, optime)
|
|
if err != nil {
|
|
glog.InfoExtln("UpdatePlnBatchorder", "UpdatePlnBatchorder err is :", err)
|
|
continue
|
|
}
|
|
//更新mongdb数据flag
|
|
//err = v.UpdateData()
|
|
//if err != nil {
|
|
// glog.Info("更新flag错误日志:%v", err)
|
|
// continue
|
|
//}
|
|
|
|
err = v.DeleteoneRecord()
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err is :", err)
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
//1.查询导入规则
|
|
etl := new(models.Etltablst)
|
|
etl.Finr = utils.ValueToInt(v.Finr, 0)
|
|
etl.Eid = utils.ValueToInt(v.Eid, 0)
|
|
etltabs, err := etl.SelectLst()
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err1 is :", err)
|
|
continue
|
|
}
|
|
etlMap := make(map[string]models.Etltablst)
|
|
for _, etlv := range etltabs {
|
|
key := etlv.Field
|
|
etlMap[key] = etlv
|
|
}
|
|
//查询字段属性
|
|
datalist, err := SearchTable(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err2 is :", err)
|
|
continue
|
|
}
|
|
tablefiled := make(map[string]string)
|
|
for _, vv := range datalist {
|
|
key := vv.Tabmapcol
|
|
tablefiled[key] = vv.Coltype
|
|
}
|
|
//解析数据
|
|
tems := make(map[string]interface{})
|
|
//表名
|
|
tablename := v.Totable
|
|
//拼接
|
|
keystr := ""
|
|
valstr := ""
|
|
wherestr := ""
|
|
where := ""
|
|
//json解析
|
|
err = json.Unmarshal([]byte(v.Data), &tems)
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "Unmarshal err is :", err)
|
|
continue
|
|
}
|
|
//3.对解析出来的数据进行处理
|
|
tem := make(map[string]interface{})
|
|
for kk, vv := range tems {
|
|
key := kk
|
|
val, ok := etlMap[key]
|
|
if ok {
|
|
mapKey := val.ToField
|
|
mapVal := vv
|
|
if val.Funcspec == "TimeParse" {
|
|
vvstr := utils.ValueToString(vv, "")
|
|
Valtime, err := utils.TimeParse(strings.TrimSpace(vvstr))
|
|
mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
|
|
if err != nil {
|
|
continue
|
|
}
|
|
}
|
|
if val.Funcspec == "DateParse" {
|
|
vvstr := utils.ValueToString(vv, "")
|
|
Valtime, err := utils.TimeParseyyyyMMdd(strings.TrimSpace(vvstr))
|
|
mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
|
|
if err != nil {
|
|
continue
|
|
}
|
|
}
|
|
if val.Funcspec == "ValueToInt" {
|
|
mapVal = utils.ValueToInt(vv, 0)
|
|
}
|
|
if !utils.ValueIsEmpty(val.Defaultvalue) {
|
|
tem[mapKey] = val.Defaultvalue
|
|
} else {
|
|
tem[mapKey] = mapVal
|
|
}
|
|
}
|
|
}
|
|
//获取当前导入表的主键
|
|
TablePk, err := SearchTablePk(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err is :", err)
|
|
continue
|
|
}
|
|
//查询主键
|
|
temPk := make(map[string]interface{})
|
|
for _, v := range TablePk {
|
|
key := v.Colname
|
|
temPk[key] = v
|
|
|
|
value, ok := tablefiled[key]
|
|
if ok {
|
|
if value == "int" || value == "smallint" || value == "bigint" {
|
|
intval := utils.ValueToInt(tem[key], 0)
|
|
where += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
|
|
} else if value == "decimal" {
|
|
floatval := utils.ValueToFloat(tem[key], 0.0)
|
|
where += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
|
|
} else {
|
|
val := utils.ValueToString(tem[key], "")
|
|
where += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
|
|
}
|
|
} else {
|
|
continue
|
|
}
|
|
|
|
}
|
|
//去掉最后边的逗号
|
|
where = strings.Trim(where, " AND ")
|
|
e := models.SearchDb(v.Todb, v.Todrivername, v.Dbtype)
|
|
//查询数据
|
|
dataone := make(map[string]interface{})
|
|
ok, err := e.Table(tablename).Where(where).Get(&dataone)
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err is :", err)
|
|
continue
|
|
}
|
|
if !ok {
|
|
//记录字段,插入
|
|
for key, val := range tablefiled {
|
|
keystr += key + ","
|
|
value, ok := tem[key]
|
|
if ok {
|
|
if val == "int" || val == "smallint" || value == "bigint" {
|
|
intval := utils.ValueToInt(value, 0)
|
|
valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
|
|
} else if val == "decimal" {
|
|
floatval := utils.ValueToFloat(value, 0.0)
|
|
valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
|
|
} else if val == "datetime" {
|
|
val := utils.ValueToString(value, "")
|
|
valstr += "'" + strings.TrimSpace(val) + "'" + ","
|
|
} else {
|
|
val := utils.ValueToString(value, "")
|
|
valstr += "'" + strings.TrimSpace(val) + "'" + ","
|
|
}
|
|
} else {
|
|
val := utils.ValueToString(value, "")
|
|
valstr += "'" + strings.TrimSpace(val) + "'" + ","
|
|
}
|
|
|
|
}
|
|
|
|
//去掉最后边的逗号
|
|
keystr = strings.Trim(keystr, ",")
|
|
valstr = strings.Trim(valstr, ",")
|
|
sql := "INSERT INTO " + tablename + " (" + keystr + ") values (" + valstr + ")"
|
|
glog.InfoExtln("etl导入打印", "sql is :", sql)
|
|
if v.Status != "ok" {
|
|
continue
|
|
}
|
|
to := new(models.Etltab)
|
|
to.Todb = v.Todb
|
|
to.Todrivername = v.Todrivername
|
|
err = to.ToLeadSlave(sql)
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "UpdatePlnBatchorder err is :", err)
|
|
continue
|
|
}
|
|
//更新mongdb数据flag
|
|
//err = v.UpdateData()
|
|
//if err != nil {
|
|
// glog.Info("更新flag错误日志:%v", err)
|
|
//}
|
|
err = v.DeleteoneRecord()
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err is :", err)
|
|
continue
|
|
}
|
|
} else {
|
|
//更新内容
|
|
//第二步构建sql语句
|
|
for key, val := range tablefiled {
|
|
|
|
value, ok := tem[key]
|
|
|
|
_, res := temPk[key]
|
|
if ok {
|
|
keystr += key + ","
|
|
if val == "int" || val == "smallint" || value == "bigint" {
|
|
intval := utils.ValueToInt(value, 0)
|
|
valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
|
|
|
|
if res {
|
|
wherestr += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
|
|
}
|
|
|
|
} else if val == "decimal" {
|
|
floatval := utils.ValueToFloat(value, 0.0)
|
|
valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
|
|
|
|
if res {
|
|
wherestr += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
|
|
}
|
|
} else {
|
|
val := utils.ValueToString(value, "")
|
|
valstr += "'" + strings.TrimSpace(val) + "'" + ","
|
|
if res {
|
|
wherestr += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
|
|
}
|
|
}
|
|
} else {
|
|
continue
|
|
}
|
|
|
|
}
|
|
//去掉最后边的逗号
|
|
wherestr = strings.Trim(wherestr, " AND ")
|
|
valstr = strings.Trim(valstr, ",")
|
|
//2.开始导入
|
|
sql := "UPDATE " + tablename + " SET " + valstr + " WHERE " + wherestr
|
|
//更新
|
|
if v.Status != "ok" {
|
|
continue
|
|
}
|
|
to := new(models.Etltab)
|
|
err = to.ToLeadSlave(sql)
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err is :", err)
|
|
continue
|
|
}
|
|
//更新mongdb数据flag
|
|
//err = v.UpdateData()
|
|
//if err != nil {
|
|
// glog.Info("更新flag错误日志:%v", err)
|
|
// continue
|
|
//}
|
|
//删除mongdb
|
|
err = v.DeleteoneRecord()
|
|
if err != nil {
|
|
glog.InfoExtln("etl导入从库", "err is :", err)
|
|
continue
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
}
|