|
|
- package task
-
- import (
- "LAPP_SJA_ME/utils"
- "LAPP_SJA_ME/web/middleware/glog"
- "LAPP_SJA_ME/web/models"
- "context"
- "encoding/json"
- uuid "github.com/iris-contrib/go.uuid"
- "strings"
- "time"
- )
-
- func CreateCacheTask() {
-
- //第一步:查询所有得服务
- me := new(models.Etltab)
- me.Finr = 100
- data, err := me.SelectAll()
- if err != nil {
- return
- }
- //第二步:给每个服务开启一个协程,
- for _, v := range data {
- taskChan <- v
- }
-
- //删除三天前的缓存数据
- go DelCacheData()
-
- //导入接口表
- go ToLeadDataBase()
-
- //创建继承Baxkground的子节点Context
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- CreateCache(ctx)
- }
-
- var taskChan = make(chan models.Etltab, 100) //定义一个调度任务通道
-
- //导入缓存数据库
- func CreateCache(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- return
- case task, ok := <-taskChan:
-
- if !ok {
- glog.InfoExtln("调度生成终止")
- return //停机退出
- }
-
- //第三步:基于协程,开启多个死循环,(为了负载均衡);并发执行
- go dealTask(task)
- }
- }
- }
-
- //任务处理
- func dealTask(task models.Etltab) {
- for {
- //第一步:每次查询出数据
-
- data, err := SearchData(task)
- //错误或者数据为空,跳出循环
- if err != nil || (len(data) == 0) {
- time.Sleep(10 * time.Second)
- continue
- }
-
- //查询出主键,删除数据的时候备用
- //TablePk, err := SearchTablePk(task.Fromtable)
- //第二步:导入mongodb
- for _, v := range data {
-
- value, err := json.Marshal(v)
- if err != nil {
- glog.InfoExtln("buffer 数据导入", "err", err)
- uuid, err := uuid.NewV1()
- if err != nil {
- continue
- }
- buffer := new(models.Buffer)
- buffer.Totable = task.Totable
- buffer.Eid = task.Eid
- buffer.Finr = task.Finr
- buffer.Status = "error"
- buffer.Orderid = uuid.String()
- buffer.Flag = 0
- buffer.Funcspec = task.Funcspec
- buffer.Data = "json转义错误"
- buffer.Todb = task.Todb
- buffer.Dbtype = task.Todbtype
- buffer.Todrivername = task.Todrivername
- buffer.Message = error.Error(err)
- buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
- buffer.InsertRecord()
- continue
- }
- uuid, err := uuid.NewV1()
- if err != nil {
- continue
- }
- buffer := new(models.Buffer)
- buffer.Totable = task.Totable
- buffer.Eid = task.Eid
- buffer.Finr = task.Finr
- buffer.Orderid = uuid.String()
- buffer.Status = "ok"
- buffer.Flag = 0
- buffer.Funcspec = task.Funcspec
- buffer.Data = string(value)
- buffer.Todb = task.Todb
- buffer.Dbtype = task.Todbtype
- buffer.Todrivername = task.Todrivername
- buffer.Message = ""
- buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
- buffer.InsertRecord()
- }
-
- time.Sleep(10 * time.Second) //等待一秒,再执行下一循环
- }
- }
-
- /******************************************************************************
- *
- * @Function Name :
- *-----------------------------------------------------------------------------
- *
- * @Description : 逻辑:查询信息,分为多表查询和单表查询 ChooseType = 1 为多表
- *
- * @Function Parameters: task models.Etltab
- *
- * @Return Value :
- *
- * @Author : Lou Wenzhi
- *
- * @Date : 2021/3/10 14:53
- *
- ******************************************************************************/
- func SearchData(task models.Etltab) ([]map[string]interface{}, error) {
- //选择对应的数据库
- e := models.SearchDb(task.Fromdb, task.Fromdrivername, task.Fromdbtype)
- if task.ChooseType == "1" {
- //以sql原生查询数据
- sql := task.Sqlshow
- switch task.SearchType {
- case 1:
- if task.Todrivername == "mssql" {
- data := make([]map[string]interface{}, 0)
- sql = sql + " order by " + task.SearchTime
- err := e.SQL(sql).Find(&data)
- if err != nil {
- return nil, err
- }
- /*****判断数据长度,为零返回******/
- if len(data) == 0 {
- return nil, nil
- }
- //更新,锁定状态
- msgids := make([]string, 0)
- for _, v := range data {
- msgid := utils.ValueToString(v[task.SearchFiled], "")
- msgids = append(msgids, msgid)
- }
- _, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
- if err != nil {
- glog.InfoExtln("ETL导出错误", "err", err)
- return nil, err
- }
- return data, nil
- } else if task.Todrivername == "mysql" {
- data := make([]map[string]interface{}, 0)
- sql = sql + " order by " + task.SearchTime
- err := e.SQL(sql).Find(&data)
- if err != nil {
- return nil, err
- }
- /*****判断数据长度,为零返回******/
- if len(data) == 0 {
- return nil, nil
- }
- //更新,锁定状态
- msgids := make([]string, 0)
- for _, v := range data {
- msgid := utils.ValueToString(v[task.SearchFiled], "")
- msgids = append(msgids, msgid)
- }
- _, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
- if err != nil {
- glog.InfoExtln("ETL导出错误", "err", err)
- return nil, err
- }
- return data, nil
- } else {
- //暂无
- }
- }
-
- } else {
- switch task.SearchType {
- case 1: //flag查询
- data := make([]map[string]interface{}, 0)
- err := e.Table(task.Fromtable).Where("retstartstatus != 1 ").Limit(1000).Desc(task.SearchTime).Find(&data)
- if err != nil {
- glog.InfoExtln("ETL导出错误", "err", err)
- return nil, err
- }
- if len(data) == 0 {
- return nil, nil
- }
- //更新,锁定状态
- msgids := make([]string, 0)
- for _, v := range data {
- msgid := utils.ValueToString(v[task.SearchFiled], "")
- msgids = append(msgids, msgid)
- }
- _, err = e.Table(task.Fromtable).Cols("retstartstatus").In("msgid", msgids).Update(&map[string]interface{}{"retstartstatus": 1})
- if err != nil {
- glog.InfoExtln("ETL导出错误", "err", err)
- return nil, err
- }
- return data, nil
- default:
- }
- }
-
- return nil, nil
- }
-
- //查询字段属性
- func SearchTable(dbname string, drivername string, tablename string, dbtype string) ([]models.EtlTabData, error) {
- datalist := make([]models.EtlTabData, 0)
- e := models.SearchDb(dbname, drivername, dbtype)
- //获取当前导入表的主键
- err := e.SQL("SELECT COLUMN_NAME as tabmapcol,DATA_TYPE as coltype FROM INFORMATION_SCHEMA.columns WHERE TABLE_NAME=?", tablename).Find(&datalist)
- if err != nil {
- return nil, err
- }
- return datalist, nil
- }
-
- type TablePk struct {
- TabName string
- Colname string
- }
-
- //查询字段属性
- func SearchTablePk(dbname string, drivername string, tablename string, dbtype string) ([]models.TablePk, error) {
- datalist := make([]models.TablePk, 0)
- e := models.SearchDb(dbname, drivername, dbtype)
- //获取当前导入表的主键
- err := e.SQL("SELECT TABLE_NAME as tabname,COLUMN_NAME as colname FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME=?", tablename).Find(&datalist)
- if err != nil {
- return nil, err
- }
- return datalist, nil
- }
-
- //导入从库
- func ToLeadDataBase() {
- for {
- //第一步:开启一个死循环,每次读取flag = 0,500条数据
- me := new(models.Buffer)
- data := me.FindData()
- if len(data) == 0 {
- time.Sleep(1 * time.Second)
- continue
- }
- glog.InfoExtln("UpdatePlnWorkorder", "len(data):", len(data))
- //第二步:导入从库,
- for _, v := range data {
- //判断执行次数
- v.UpdateDataTimes()
- //判断是否是特殊执行方法
- if !utils.ValueIsEmpty(v.Funcspec) {
- //解析数据
- tems := make(map[string]interface{})
- //json解析
- err := json.Unmarshal([]byte(v.Data), &tems)
- if err != nil {
- glog.InfoExtln("etl导入从库err", "Unmarshal err is :", err)
- continue
- }
- switch v.Funcspec {
- case "UpdatePlnWorkorder":
- finr := v.Finr
- workordernr := utils.ValueToString(tems["ordernr"], "")
- glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
- status := utils.ValueToInt(tems["status"], 0)
- tem := utils.ValueToString(tems["optime"], "")
- temtime, err := time.Parse("2006-01-02T15:04:05Z07:00", tem)
- if err != nil {
- glog.InfoExtln("UpdatePlnWorkorder", "temtime err is :", err)
- continue
- }
- optime := temtime.Format("20060102150405")
- glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
- err = UpdatePlnWorkorder(finr, workordernr, status, optime)
- if err != nil {
- glog.InfoExtln("UpdatePlnWorkorder", "UpdatePlnWorkorder err is :", err)
- continue
- }
- //更新mongdb数据flag
- //err = v.UpdateData()
- //if err != nil {
- // glog.Info("更新flag错误日志:%v", err)
- // continue
- //}
- err = v.DeleteoneRecord()
- if err != nil {
- glog.InfoExtln("etl导入从库", "err is :", err)
- continue
- }
- case "UpdatePlnBatchorder":
- finr := v.Finr
- batchordernr := utils.ValueToString(tems["ordernr"], "")
- status := utils.ValueToInt(tems["status"], 0)
- actqty := utils.ValueToInt(tems["actqty"], 0)
- tem := utils.ValueToString(tems["optime"], "")
- temtime, _ := time.Parse("2006-01-02T15:04:05Z07:00", tem)
- optime := temtime.Format("20060102150405")
- err = UpdatePlnBatchorder(finr, batchordernr, status, actqty, optime)
- if err != nil {
- glog.InfoExtln("UpdatePlnBatchorder", "UpdatePlnBatchorder err is :", err)
- continue
- }
- //更新mongdb数据flag
- //err = v.UpdateData()
- //if err != nil {
- // glog.Info("更新flag错误日志:%v", err)
- // continue
- //}
-
- err = v.DeleteoneRecord()
- if err != nil {
- glog.InfoExtln("etl导入从库", "err is :", err)
- continue
- }
- }
- } else {
- //1.查询导入规则
- etl := new(models.Etltablst)
- etl.Finr = utils.ValueToInt(v.Finr, 0)
- etl.Eid = utils.ValueToInt(v.Eid, 0)
- etltabs, err := etl.SelectLst()
- if err != nil {
- glog.InfoExtln("etl导入从库", "err1 is :", err)
- continue
- }
- etlMap := make(map[string]models.Etltablst)
- for _, etlv := range etltabs {
- key := etlv.Field
- etlMap[key] = etlv
- }
- //查询字段属性
- datalist, err := SearchTable(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
- if err != nil {
- glog.InfoExtln("etl导入从库", "err2 is :", err)
- continue
- }
- tablefiled := make(map[string]string)
- for _, vv := range datalist {
- key := vv.Tabmapcol
- tablefiled[key] = vv.Coltype
- }
- //解析数据
- tems := make(map[string]interface{})
- //表名
- tablename := v.Totable
- //拼接
- keystr := ""
- valstr := ""
- wherestr := ""
- where := ""
- //json解析
- err = json.Unmarshal([]byte(v.Data), &tems)
- if err != nil {
- glog.InfoExtln("etl导入从库", "Unmarshal err is :", err)
- continue
- }
- //3.对解析出来的数据进行处理
- tem := make(map[string]interface{})
- for kk, vv := range tems {
- key := kk
- val, ok := etlMap[key]
- if ok {
- mapKey := val.ToField
- mapVal := vv
- if val.Funcspec == "TimeParse" {
- vvstr := utils.ValueToString(vv, "")
- Valtime, err := utils.TimeParse(strings.TrimSpace(vvstr))
- mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
- if err != nil {
- continue
- }
- }
- if val.Funcspec == "DateParse" {
- vvstr := utils.ValueToString(vv, "")
- Valtime, err := utils.TimeParseyyyyMMdd(strings.TrimSpace(vvstr))
- mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
- if err != nil {
- continue
- }
- }
- if val.Funcspec == "ValueToInt" {
- mapVal = utils.ValueToInt(vv, 0)
- }
- if !utils.ValueIsEmpty(val.Defaultvalue) {
- tem[mapKey] = val.Defaultvalue
- } else {
- tem[mapKey] = mapVal
- }
- }
- }
- //获取当前导入表的主键
- TablePk, err := SearchTablePk(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
- if err != nil {
- glog.InfoExtln("etl导入从库", "err is :", err)
- continue
- }
- //查询主键
- temPk := make(map[string]interface{})
- for _, v := range TablePk {
- key := v.Colname
- temPk[key] = v
-
- value, ok := tablefiled[key]
- if ok {
- if value == "int" || value == "smallint" || value == "bigint" {
- intval := utils.ValueToInt(tem[key], 0)
- where += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
- } else if value == "decimal" {
- floatval := utils.ValueToFloat(tem[key], 0.0)
- where += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
- } else {
- val := utils.ValueToString(tem[key], "")
- where += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
- }
- } else {
- continue
- }
-
- }
- //去掉最后边的逗号
- where = strings.Trim(where, " AND ")
- e := models.SearchDb(v.Todb, v.Todrivername, v.Dbtype)
- //查询数据
- dataone := make(map[string]interface{})
- ok, err := e.Table(tablename).Where(where).Get(&dataone)
- if err != nil {
- glog.InfoExtln("etl导入从库", "err is :", err)
- continue
- }
- if !ok {
- //记录字段,插入
- for key, val := range tablefiled {
- keystr += key + ","
- value, ok := tem[key]
- if ok {
- if val == "int" || val == "smallint" || value == "bigint" {
- intval := utils.ValueToInt(value, 0)
- valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
- } else if val == "decimal" {
- floatval := utils.ValueToFloat(value, 0.0)
- valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
- } else if val == "datetime" {
- val := utils.ValueToString(value, "")
- valstr += "'" + strings.TrimSpace(val) + "'" + ","
- } else {
- val := utils.ValueToString(value, "")
- valstr += "'" + strings.TrimSpace(val) + "'" + ","
- }
- } else {
- val := utils.ValueToString(value, "")
- valstr += "'" + strings.TrimSpace(val) + "'" + ","
- }
-
- }
-
- //去掉最后边的逗号
- keystr = strings.Trim(keystr, ",")
- valstr = strings.Trim(valstr, ",")
- sql := "INSERT INTO " + tablename + " (" + keystr + ") values (" + valstr + ")"
- glog.InfoExtln("etl导入打印", "sql is :", sql)
- if v.Status != "ok" {
- continue
- }
- to := new(models.Etltab)
- to.Todb = v.Todb
- to.Todrivername = v.Todrivername
- err = to.ToLeadSlave(sql)
- if err != nil {
- glog.InfoExtln("etl导入从库", "UpdatePlnBatchorder err is :", err)
- continue
- }
- //更新mongdb数据flag
- //err = v.UpdateData()
- //if err != nil {
- // glog.Info("更新flag错误日志:%v", err)
- //}
- err = v.DeleteoneRecord()
- if err != nil {
- glog.InfoExtln("etl导入从库", "err is :", err)
- continue
- }
- } else {
- //更新内容
- //第二步构建sql语句
- for key, val := range tablefiled {
-
- value, ok := tem[key]
-
- _, res := temPk[key]
- if ok {
- keystr += key + ","
- if val == "int" || val == "smallint" || value == "bigint" {
- intval := utils.ValueToInt(value, 0)
- valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
-
- if res {
- wherestr += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
- }
-
- } else if val == "decimal" {
- floatval := utils.ValueToFloat(value, 0.0)
- valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
-
- if res {
- wherestr += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
- }
- } else {
- val := utils.ValueToString(value, "")
- valstr += "'" + strings.TrimSpace(val) + "'" + ","
- if res {
- wherestr += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
- }
- }
- } else {
- continue
- }
-
- }
- //去掉最后边的逗号
- wherestr = strings.Trim(wherestr, " AND ")
- valstr = strings.Trim(valstr, ",")
- //2.开始导入
- sql := "UPDATE " + tablename + " SET " + valstr + " WHERE " + wherestr
- //更新
- if v.Status != "ok" {
- continue
- }
- to := new(models.Etltab)
- err = to.ToLeadSlave(sql)
- if err != nil {
- glog.InfoExtln("etl导入从库", "err is :", err)
- continue
- }
- //更新mongdb数据flag
- //err = v.UpdateData()
- //if err != nil {
- // glog.Info("更新flag错误日志:%v", err)
- // continue
- //}
- //删除mongdb
- err = v.DeleteoneRecord()
- if err != nil {
- glog.InfoExtln("etl导入从库", "err is :", err)
- continue
- }
-
- }
-
- }
-
- }
- time.Sleep(1 * time.Second)
- }
-
- }
|