package task import ( "LAPP_SJA_ME/utils" "LAPP_SJA_ME/web/middleware/glog" "LAPP_SJA_ME/web/models" "context" "encoding/json" uuid "github.com/iris-contrib/go.uuid" "strings" "time" ) func CreateCacheTask() { //第一步:查询所有得服务 me := new(models.Etltab) me.Finr = 100 data, err := me.SelectAll() if err != nil { return } //第二步:给每个服务开启一个协程, for _, v := range data { taskChan <- v } //删除三天前的缓存数据 go DelCacheData() //导入接口表 go ToLeadDataBase() //创建继承Baxkground的子节点Context ctx, cancel := context.WithCancel(context.Background()) defer cancel() CreateCache(ctx) } var taskChan = make(chan models.Etltab, 100) //定义一个调度任务通道 //导入缓存数据库 func CreateCache(ctx context.Context) { for { select { case <-ctx.Done(): return case task, ok := <-taskChan: if !ok { glog.InfoExtln("调度生成终止") return //停机退出 } //第三步:基于协程,开启多个死循环,(为了负载均衡);并发执行 go dealTask(task) } } } //任务处理 func dealTask(task models.Etltab) { for { //第一步:每次查询出数据 data, err := SearchData(task) //错误或者数据为空,跳出循环 if err != nil || (len(data) == 0) { time.Sleep(10 * time.Second) continue } //查询出主键,删除数据的时候备用 //TablePk, err := SearchTablePk(task.Fromtable) //第二步:导入mongodb for _, v := range data { value, err := json.Marshal(v) if err != nil { glog.InfoExtln("buffer 数据导入", "err", err) uuid, err := uuid.NewV1() if err != nil { continue } buffer := new(models.Buffer) buffer.Totable = task.Totable buffer.Eid = task.Eid buffer.Finr = task.Finr buffer.Status = "error" buffer.Orderid = uuid.String() buffer.Flag = 0 buffer.Funcspec = task.Funcspec buffer.Data = "json转义错误" buffer.Todb = task.Todb buffer.Dbtype = task.Todbtype buffer.Todrivername = task.Todrivername buffer.Message = error.Error(err) buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss") buffer.InsertRecord() continue } uuid, err := uuid.NewV1() if err != nil { continue } buffer := new(models.Buffer) buffer.Totable = task.Totable buffer.Eid = task.Eid buffer.Finr = task.Finr buffer.Orderid = uuid.String() buffer.Status = "ok" buffer.Flag = 0 buffer.Funcspec = task.Funcspec buffer.Data = string(value) buffer.Todb = task.Todb buffer.Dbtype = task.Todbtype buffer.Todrivername = task.Todrivername buffer.Message = "" buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss") buffer.InsertRecord() } time.Sleep(10 * time.Second) //等待一秒,再执行下一循环 } } /****************************************************************************** * * @Function Name : *----------------------------------------------------------------------------- * * @Description : 逻辑:查询信息,分为多表查询和单表查询 ChooseType = 1 为多表 * * @Function Parameters: task models.Etltab * * @Return Value : * * @Author : Lou Wenzhi * * @Date : 2021/3/10 14:53 * ******************************************************************************/ func SearchData(task models.Etltab) ([]map[string]interface{}, error) { //选择对应的数据库 e := models.SearchDb(task.Fromdb, task.Fromdrivername, task.Fromdbtype) if task.ChooseType == "1" { //以sql原生查询数据 sql := task.Sqlshow switch task.SearchType { case 1: if task.Todrivername == "mssql" { data := make([]map[string]interface{}, 0) sql = sql + " order by " + task.SearchTime err := e.SQL(sql).Find(&data) if err != nil { return nil, err } /*****判断数据长度,为零返回******/ if len(data) == 0 { return nil, nil } //更新,锁定状态 msgids := make([]string, 0) for _, v := range data { msgid := utils.ValueToString(v[task.SearchFiled], "") msgids = append(msgids, msgid) } _, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1}) if err != nil { glog.InfoExtln("ETL导出错误", "err", err) return nil, err } return data, nil } else if task.Todrivername == "mysql" { data := make([]map[string]interface{}, 0) sql = sql + " order by " + task.SearchTime err := e.SQL(sql).Find(&data) if err != nil { return nil, err } /*****判断数据长度,为零返回******/ if len(data) == 0 { return nil, nil } //更新,锁定状态 msgids := make([]string, 0) for _, v := range data { msgid := utils.ValueToString(v[task.SearchFiled], "") msgids = append(msgids, msgid) } _, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1}) if err != nil { glog.InfoExtln("ETL导出错误", "err", err) return nil, err } return data, nil } else { //暂无 } } } else { switch task.SearchType { case 1: //flag查询 data := make([]map[string]interface{}, 0) err := e.Table(task.Fromtable).Where("retstartstatus != 1 ").Limit(1000).Desc(task.SearchTime).Find(&data) if err != nil { glog.InfoExtln("ETL导出错误", "err", err) return nil, err } if len(data) == 0 { return nil, nil } //更新,锁定状态 msgids := make([]string, 0) for _, v := range data { msgid := utils.ValueToString(v[task.SearchFiled], "") msgids = append(msgids, msgid) } _, err = e.Table(task.Fromtable).Cols("retstartstatus").In("msgid", msgids).Update(&map[string]interface{}{"retstartstatus": 1}) if err != nil { glog.InfoExtln("ETL导出错误", "err", err) return nil, err } return data, nil default: } } return nil, nil } //查询字段属性 func SearchTable(dbname string, drivername string, tablename string, dbtype string) ([]models.EtlTabData, error) { datalist := make([]models.EtlTabData, 0) e := models.SearchDb(dbname, drivername, dbtype) //获取当前导入表的主键 err := e.SQL("SELECT COLUMN_NAME as tabmapcol,DATA_TYPE as coltype FROM INFORMATION_SCHEMA.columns WHERE TABLE_NAME=?", tablename).Find(&datalist) if err != nil { return nil, err } return datalist, nil } type TablePk struct { TabName string Colname string } //查询字段属性 func SearchTablePk(dbname string, drivername string, tablename string, dbtype string) ([]models.TablePk, error) { datalist := make([]models.TablePk, 0) e := models.SearchDb(dbname, drivername, dbtype) //获取当前导入表的主键 err := e.SQL("SELECT TABLE_NAME as tabname,COLUMN_NAME as colname FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME=?", tablename).Find(&datalist) if err != nil { return nil, err } return datalist, nil } //导入从库 func ToLeadDataBase() { for { //第一步:开启一个死循环,每次读取flag = 0,500条数据 me := new(models.Buffer) data := me.FindData() if len(data) == 0 { time.Sleep(1 * time.Second) continue } glog.InfoExtln("UpdatePlnWorkorder", "len(data):", len(data)) //第二步:导入从库, for _, v := range data { //判断执行次数 v.UpdateDataTimes() //判断是否是特殊执行方法 if !utils.ValueIsEmpty(v.Funcspec) { //解析数据 tems := make(map[string]interface{}) //json解析 err := json.Unmarshal([]byte(v.Data), &tems) if err != nil { glog.InfoExtln("etl导入从库err", "Unmarshal err is :", err) continue } switch v.Funcspec { case "UpdatePlnWorkorder": finr := v.Finr workordernr := utils.ValueToString(tems["ordernr"], "") glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr) status := utils.ValueToInt(tems["status"], 0) tem := utils.ValueToString(tems["optime"], "") temtime, err := time.Parse("2006-01-02T15:04:05Z07:00", tem) if err != nil { glog.InfoExtln("UpdatePlnWorkorder", "temtime err is :", err) continue } optime := temtime.Format("20060102150405") glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr) err = UpdatePlnWorkorder(finr, workordernr, status, optime) if err != nil { glog.InfoExtln("UpdatePlnWorkorder", "UpdatePlnWorkorder err is :", err) continue } //更新mongdb数据flag //err = v.UpdateData() //if err != nil { // glog.Info("更新flag错误日志:%v", err) // continue //} err = v.DeleteoneRecord() if err != nil { glog.InfoExtln("etl导入从库", "err is :", err) continue } case "UpdatePlnBatchorder": finr := v.Finr batchordernr := utils.ValueToString(tems["ordernr"], "") status := utils.ValueToInt(tems["status"], 0) actqty := utils.ValueToInt(tems["actqty"], 0) tem := utils.ValueToString(tems["optime"], "") temtime, _ := time.Parse("2006-01-02T15:04:05Z07:00", tem) optime := temtime.Format("20060102150405") err = UpdatePlnBatchorder(finr, batchordernr, status, actqty, optime) if err != nil { glog.InfoExtln("UpdatePlnBatchorder", "UpdatePlnBatchorder err is :", err) continue } //更新mongdb数据flag //err = v.UpdateData() //if err != nil { // glog.Info("更新flag错误日志:%v", err) // continue //} err = v.DeleteoneRecord() if err != nil { glog.InfoExtln("etl导入从库", "err is :", err) continue } } } else { //1.查询导入规则 etl := new(models.Etltablst) etl.Finr = utils.ValueToInt(v.Finr, 0) etl.Eid = utils.ValueToInt(v.Eid, 0) etltabs, err := etl.SelectLst() if err != nil { glog.InfoExtln("etl导入从库", "err1 is :", err) continue } etlMap := make(map[string]models.Etltablst) for _, etlv := range etltabs { key := etlv.Field etlMap[key] = etlv } //查询字段属性 datalist, err := SearchTable(v.Todb, v.Todrivername, v.Totable, v.Dbtype) if err != nil { glog.InfoExtln("etl导入从库", "err2 is :", err) continue } tablefiled := make(map[string]string) for _, vv := range datalist { key := vv.Tabmapcol tablefiled[key] = vv.Coltype } //解析数据 tems := make(map[string]interface{}) //表名 tablename := v.Totable //拼接 keystr := "" valstr := "" wherestr := "" where := "" //json解析 err = json.Unmarshal([]byte(v.Data), &tems) if err != nil { glog.InfoExtln("etl导入从库", "Unmarshal err is :", err) continue } //3.对解析出来的数据进行处理 tem := make(map[string]interface{}) for kk, vv := range tems { key := kk val, ok := etlMap[key] if ok { mapKey := val.ToField mapVal := vv if val.Funcspec == "TimeParse" { vvstr := utils.ValueToString(vv, "") Valtime, err := utils.TimeParse(strings.TrimSpace(vvstr)) mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss") if err != nil { continue } } if val.Funcspec == "DateParse" { vvstr := utils.ValueToString(vv, "") Valtime, err := utils.TimeParseyyyyMMdd(strings.TrimSpace(vvstr)) mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss") if err != nil { continue } } if val.Funcspec == "ValueToInt" { mapVal = utils.ValueToInt(vv, 0) } if !utils.ValueIsEmpty(val.Defaultvalue) { tem[mapKey] = val.Defaultvalue } else { tem[mapKey] = mapVal } } } //获取当前导入表的主键 TablePk, err := SearchTablePk(v.Todb, v.Todrivername, v.Totable, v.Dbtype) if err != nil { glog.InfoExtln("etl导入从库", "err is :", err) continue } //查询主键 temPk := make(map[string]interface{}) for _, v := range TablePk { key := v.Colname temPk[key] = v value, ok := tablefiled[key] if ok { if value == "int" || value == "smallint" || value == "bigint" { intval := utils.ValueToInt(tem[key], 0) where += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND " } else if value == "decimal" { floatval := utils.ValueToFloat(tem[key], 0.0) where += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND " } else { val := utils.ValueToString(tem[key], "") where += key + "= '" + strings.TrimSpace(val) + "'" + " AND " } } else { continue } } //去掉最后边的逗号 where = strings.Trim(where, " AND ") e := models.SearchDb(v.Todb, v.Todrivername, v.Dbtype) //查询数据 dataone := make(map[string]interface{}) ok, err := e.Table(tablename).Where(where).Get(&dataone) if err != nil { glog.InfoExtln("etl导入从库", "err is :", err) continue } if !ok { //记录字段,插入 for key, val := range tablefiled { keystr += key + "," value, ok := tem[key] if ok { if val == "int" || val == "smallint" || value == "bigint" { intval := utils.ValueToInt(value, 0) valstr += "'" + utils.ValueToString(intval, "") + "'" + "," } else if val == "decimal" { floatval := utils.ValueToFloat(value, 0.0) valstr += "'" + utils.ValueToString(floatval, "") + "'" + "," } else if val == "datetime" { val := utils.ValueToString(value, "") valstr += "'" + strings.TrimSpace(val) + "'" + "," } else { val := utils.ValueToString(value, "") valstr += "'" + strings.TrimSpace(val) + "'" + "," } } else { val := utils.ValueToString(value, "") valstr += "'" + strings.TrimSpace(val) + "'" + "," } } //去掉最后边的逗号 keystr = strings.Trim(keystr, ",") valstr = strings.Trim(valstr, ",") sql := "INSERT INTO " + tablename + " (" + keystr + ") values (" + valstr + ")" glog.InfoExtln("etl导入打印", "sql is :", sql) if v.Status != "ok" { continue } to := new(models.Etltab) to.Todb = v.Todb to.Todrivername = v.Todrivername err = to.ToLeadSlave(sql) if err != nil { glog.InfoExtln("etl导入从库", "UpdatePlnBatchorder err is :", err) continue } //更新mongdb数据flag //err = v.UpdateData() //if err != nil { // glog.Info("更新flag错误日志:%v", err) //} err = v.DeleteoneRecord() if err != nil { glog.InfoExtln("etl导入从库", "err is :", err) continue } } else { //更新内容 //第二步构建sql语句 for key, val := range tablefiled { value, ok := tem[key] _, res := temPk[key] if ok { keystr += key + "," if val == "int" || val == "smallint" || value == "bigint" { intval := utils.ValueToInt(value, 0) valstr += "'" + utils.ValueToString(intval, "") + "'" + "," if res { wherestr += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND " } } else if val == "decimal" { floatval := utils.ValueToFloat(value, 0.0) valstr += "'" + utils.ValueToString(floatval, "") + "'" + "," if res { wherestr += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND " } } else { val := utils.ValueToString(value, "") valstr += "'" + strings.TrimSpace(val) + "'" + "," if res { wherestr += key + "= '" + strings.TrimSpace(val) + "'" + " AND " } } } else { continue } } //去掉最后边的逗号 wherestr = strings.Trim(wherestr, " AND ") valstr = strings.Trim(valstr, ",") //2.开始导入 sql := "UPDATE " + tablename + " SET " + valstr + " WHERE " + wherestr //更新 if v.Status != "ok" { continue } to := new(models.Etltab) err = to.ToLeadSlave(sql) if err != nil { glog.InfoExtln("etl导入从库", "err is :", err) continue } //更新mongdb数据flag //err = v.UpdateData() //if err != nil { // glog.Info("更新flag错误日志:%v", err) // continue //} //删除mongdb err = v.DeleteoneRecord() if err != nil { glog.InfoExtln("etl导入从库", "err is :", err) continue } } } } time.Sleep(1 * time.Second) } }