SJA工艺
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

582 lines
16 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package task
  2. import (
  3. "LAPP_SJA_ME/utils"
  4. "LAPP_SJA_ME/web/middleware/glog"
  5. "LAPP_SJA_ME/web/models"
  6. "context"
  7. "encoding/json"
  8. uuid "github.com/iris-contrib/go.uuid"
  9. "strings"
  10. "time"
  11. )
  12. func CreateCacheTask() {
  13. //第一步:查询所有得服务
  14. me := new(models.Etltab)
  15. me.Finr = 100
  16. data, err := me.SelectAll()
  17. if err != nil {
  18. return
  19. }
  20. //第二步:给每个服务开启一个协程,
  21. for _, v := range data {
  22. taskChan <- v
  23. }
  24. //删除三天前的缓存数据
  25. go DelCacheData()
  26. //导入接口表
  27. go ToLeadDataBase()
  28. //创建继承Baxkground的子节点Context
  29. ctx, cancel := context.WithCancel(context.Background())
  30. defer cancel()
  31. CreateCache(ctx)
  32. }
  33. var taskChan = make(chan models.Etltab, 100) //定义一个调度任务通道
  34. //导入缓存数据库
  35. func CreateCache(ctx context.Context) {
  36. for {
  37. select {
  38. case <-ctx.Done():
  39. return
  40. case task, ok := <-taskChan:
  41. if !ok {
  42. glog.InfoExtln("调度生成终止")
  43. return //停机退出
  44. }
  45. //第三步:基于协程,开启多个死循环,(为了负载均衡);并发执行
  46. go dealTask(task)
  47. }
  48. }
  49. }
  50. //任务处理
  51. func dealTask(task models.Etltab) {
  52. for {
  53. //第一步:每次查询出数据
  54. data, err := SearchData(task)
  55. //错误或者数据为空,跳出循环
  56. if err != nil || (len(data) == 0) {
  57. time.Sleep(10 * time.Second)
  58. continue
  59. }
  60. //查询出主键,删除数据的时候备用
  61. //TablePk, err := SearchTablePk(task.Fromtable)
  62. //第二步:导入mongodb
  63. for _, v := range data {
  64. value, err := json.Marshal(v)
  65. if err != nil {
  66. glog.InfoExtln("buffer 数据导入", "err", err)
  67. uuid, err := uuid.NewV1()
  68. if err != nil {
  69. continue
  70. }
  71. buffer := new(models.Buffer)
  72. buffer.Totable = task.Totable
  73. buffer.Eid = task.Eid
  74. buffer.Finr = task.Finr
  75. buffer.Status = "error"
  76. buffer.Orderid = uuid.String()
  77. buffer.Flag = 0
  78. buffer.Funcspec = task.Funcspec
  79. buffer.Data = "json转义错误"
  80. buffer.Todb = task.Todb
  81. buffer.Dbtype = task.Todbtype
  82. buffer.Todrivername = task.Todrivername
  83. buffer.Message = error.Error(err)
  84. buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
  85. buffer.InsertRecord()
  86. continue
  87. }
  88. uuid, err := uuid.NewV1()
  89. if err != nil {
  90. continue
  91. }
  92. buffer := new(models.Buffer)
  93. buffer.Totable = task.Totable
  94. buffer.Eid = task.Eid
  95. buffer.Finr = task.Finr
  96. buffer.Orderid = uuid.String()
  97. buffer.Status = "ok"
  98. buffer.Flag = 0
  99. buffer.Funcspec = task.Funcspec
  100. buffer.Data = string(value)
  101. buffer.Todb = task.Todb
  102. buffer.Dbtype = task.Todbtype
  103. buffer.Todrivername = task.Todrivername
  104. buffer.Message = ""
  105. buffer.TimeStamp = utils.TimeFormat(time.Now(), "yyyyMMddHHmmss")
  106. buffer.InsertRecord()
  107. }
  108. time.Sleep(10 * time.Second) //等待一秒,再执行下一循环
  109. }
  110. }
  111. /******************************************************************************
  112. *
  113. * @Function Name :
  114. *-----------------------------------------------------------------------------
  115. *
  116. * @Description : 逻辑:查询信息分为多表查询和单表查询 ChooseType = 1 为多表
  117. *
  118. * @Function Parameters: task models.Etltab
  119. *
  120. * @Return Value :
  121. *
  122. * @Author : Lou Wenzhi
  123. *
  124. * @Date : 2021/3/10 14:53
  125. *
  126. ******************************************************************************/
  127. func SearchData(task models.Etltab) ([]map[string]interface{}, error) {
  128. //选择对应的数据库
  129. e := models.SearchDb(task.Fromdb, task.Fromdrivername, task.Fromdbtype)
  130. if task.ChooseType == "1" {
  131. //以sql原生查询数据
  132. sql := task.Sqlshow
  133. switch task.SearchType {
  134. case 1:
  135. if task.Todrivername == "mssql" {
  136. data := make([]map[string]interface{}, 0)
  137. sql = sql + " order by " + task.SearchTime
  138. err := e.SQL(sql).Find(&data)
  139. if err != nil {
  140. return nil, err
  141. }
  142. /*****判断数据长度,为零返回******/
  143. if len(data) == 0 {
  144. return nil, nil
  145. }
  146. //更新,锁定状态
  147. msgids := make([]string, 0)
  148. for _, v := range data {
  149. msgid := utils.ValueToString(v[task.SearchFiled], "")
  150. msgids = append(msgids, msgid)
  151. }
  152. _, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
  153. if err != nil {
  154. glog.InfoExtln("ETL导出错误", "err", err)
  155. return nil, err
  156. }
  157. return data, nil
  158. } else if task.Todrivername == "mysql" {
  159. data := make([]map[string]interface{}, 0)
  160. sql = sql + " order by " + task.SearchTime
  161. err := e.SQL(sql).Find(&data)
  162. if err != nil {
  163. return nil, err
  164. }
  165. /*****判断数据长度,为零返回******/
  166. if len(data) == 0 {
  167. return nil, nil
  168. }
  169. //更新,锁定状态
  170. msgids := make([]string, 0)
  171. for _, v := range data {
  172. msgid := utils.ValueToString(v[task.SearchFiled], "")
  173. msgids = append(msgids, msgid)
  174. }
  175. _, err = e.Table(task.Fromtable).Cols("releaseflag").In(task.SearchFiled, msgids).Update(&map[string]interface{}{"releaseflag": 1})
  176. if err != nil {
  177. glog.InfoExtln("ETL导出错误", "err", err)
  178. return nil, err
  179. }
  180. return data, nil
  181. } else {
  182. //暂无
  183. }
  184. }
  185. } else {
  186. switch task.SearchType {
  187. case 1: //flag查询
  188. data := make([]map[string]interface{}, 0)
  189. err := e.Table(task.Fromtable).Where("retstartstatus != 1 ").Limit(1000).Desc(task.SearchTime).Find(&data)
  190. if err != nil {
  191. glog.InfoExtln("ETL导出错误", "err", err)
  192. return nil, err
  193. }
  194. if len(data) == 0 {
  195. return nil, nil
  196. }
  197. //更新,锁定状态
  198. msgids := make([]string, 0)
  199. for _, v := range data {
  200. msgid := utils.ValueToString(v[task.SearchFiled], "")
  201. msgids = append(msgids, msgid)
  202. }
  203. _, err = e.Table(task.Fromtable).Cols("retstartstatus").In("msgid", msgids).Update(&map[string]interface{}{"retstartstatus": 1})
  204. if err != nil {
  205. glog.InfoExtln("ETL导出错误", "err", err)
  206. return nil, err
  207. }
  208. return data, nil
  209. default:
  210. }
  211. }
  212. return nil, nil
  213. }
  214. //查询字段属性
  215. func SearchTable(dbname string, drivername string, tablename string, dbtype string) ([]models.EtlTabData, error) {
  216. datalist := make([]models.EtlTabData, 0)
  217. e := models.SearchDb(dbname, drivername, dbtype)
  218. //获取当前导入表的主键
  219. err := e.SQL("SELECT COLUMN_NAME as tabmapcol,DATA_TYPE as coltype FROM INFORMATION_SCHEMA.columns WHERE TABLE_NAME=?", tablename).Find(&datalist)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return datalist, nil
  224. }
  225. type TablePk struct {
  226. TabName string
  227. Colname string
  228. }
  229. //查询字段属性
  230. func SearchTablePk(dbname string, drivername string, tablename string, dbtype string) ([]models.TablePk, error) {
  231. datalist := make([]models.TablePk, 0)
  232. e := models.SearchDb(dbname, drivername, dbtype)
  233. //获取当前导入表的主键
  234. err := e.SQL("SELECT TABLE_NAME as tabname,COLUMN_NAME as colname FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME=?", tablename).Find(&datalist)
  235. if err != nil {
  236. return nil, err
  237. }
  238. return datalist, nil
  239. }
  240. //导入从库
  241. func ToLeadDataBase() {
  242. for {
  243. //第一步:开启一个死循环,每次读取flag = 0,500条数据
  244. me := new(models.Buffer)
  245. data := me.FindData()
  246. if len(data) == 0 {
  247. time.Sleep(1 * time.Second)
  248. continue
  249. }
  250. glog.InfoExtln("UpdatePlnWorkorder", "len(data):", len(data))
  251. //第二步:导入从库,
  252. for _, v := range data {
  253. //判断执行次数
  254. v.UpdateDataTimes()
  255. //判断是否是特殊执行方法
  256. if !utils.ValueIsEmpty(v.Funcspec) {
  257. //解析数据
  258. tems := make(map[string]interface{})
  259. //json解析
  260. err := json.Unmarshal([]byte(v.Data), &tems)
  261. if err != nil {
  262. glog.InfoExtln("etl导入从库err", "Unmarshal err is :", err)
  263. continue
  264. }
  265. switch v.Funcspec {
  266. case "UpdatePlnWorkorder":
  267. finr := v.Finr
  268. workordernr := utils.ValueToString(tems["ordernr"], "")
  269. glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
  270. status := utils.ValueToInt(tems["status"], 0)
  271. tem := utils.ValueToString(tems["optime"], "")
  272. temtime, err := time.Parse("2006-01-02T15:04:05Z07:00", tem)
  273. if err != nil {
  274. glog.InfoExtln("UpdatePlnWorkorder", "temtime err is :", err)
  275. continue
  276. }
  277. optime := temtime.Format("20060102150405")
  278. glog.InfoExtln("UpdatePlnWorkorder", "workordernr task.go 295:", workordernr)
  279. err = UpdatePlnWorkorder(finr, workordernr, status, optime)
  280. if err != nil {
  281. glog.InfoExtln("UpdatePlnWorkorder", "UpdatePlnWorkorder err is :", err)
  282. continue
  283. }
  284. //更新mongdb数据flag
  285. //err = v.UpdateData()
  286. //if err != nil {
  287. // glog.Info("更新flag错误日志:%v", err)
  288. // continue
  289. //}
  290. err = v.DeleteoneRecord()
  291. if err != nil {
  292. glog.InfoExtln("etl导入从库", "err is :", err)
  293. continue
  294. }
  295. case "UpdatePlnBatchorder":
  296. finr := v.Finr
  297. batchordernr := utils.ValueToString(tems["ordernr"], "")
  298. status := utils.ValueToInt(tems["status"], 0)
  299. actqty := utils.ValueToInt(tems["actqty"], 0)
  300. tem := utils.ValueToString(tems["optime"], "")
  301. temtime, _ := time.Parse("2006-01-02T15:04:05Z07:00", tem)
  302. optime := temtime.Format("20060102150405")
  303. err = UpdatePlnBatchorder(finr, batchordernr, status, actqty, optime)
  304. if err != nil {
  305. glog.InfoExtln("UpdatePlnBatchorder", "UpdatePlnBatchorder err is :", err)
  306. continue
  307. }
  308. //更新mongdb数据flag
  309. //err = v.UpdateData()
  310. //if err != nil {
  311. // glog.Info("更新flag错误日志:%v", err)
  312. // continue
  313. //}
  314. err = v.DeleteoneRecord()
  315. if err != nil {
  316. glog.InfoExtln("etl导入从库", "err is :", err)
  317. continue
  318. }
  319. }
  320. } else {
  321. //1.查询导入规则
  322. etl := new(models.Etltablst)
  323. etl.Finr = utils.ValueToInt(v.Finr, 0)
  324. etl.Eid = utils.ValueToInt(v.Eid, 0)
  325. etltabs, err := etl.SelectLst()
  326. if err != nil {
  327. glog.InfoExtln("etl导入从库", "err1 is :", err)
  328. continue
  329. }
  330. etlMap := make(map[string]models.Etltablst)
  331. for _, etlv := range etltabs {
  332. key := etlv.Field
  333. etlMap[key] = etlv
  334. }
  335. //查询字段属性
  336. datalist, err := SearchTable(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
  337. if err != nil {
  338. glog.InfoExtln("etl导入从库", "err2 is :", err)
  339. continue
  340. }
  341. tablefiled := make(map[string]string)
  342. for _, vv := range datalist {
  343. key := vv.Tabmapcol
  344. tablefiled[key] = vv.Coltype
  345. }
  346. //解析数据
  347. tems := make(map[string]interface{})
  348. //表名
  349. tablename := v.Totable
  350. //拼接
  351. keystr := ""
  352. valstr := ""
  353. wherestr := ""
  354. where := ""
  355. //json解析
  356. err = json.Unmarshal([]byte(v.Data), &tems)
  357. if err != nil {
  358. glog.InfoExtln("etl导入从库", "Unmarshal err is :", err)
  359. continue
  360. }
  361. //3.对解析出来的数据进行处理
  362. tem := make(map[string]interface{})
  363. for kk, vv := range tems {
  364. key := kk
  365. val, ok := etlMap[key]
  366. if ok {
  367. mapKey := val.ToField
  368. mapVal := vv
  369. if val.Funcspec == "TimeParse" {
  370. vvstr := utils.ValueToString(vv, "")
  371. Valtime, err := utils.TimeParse(strings.TrimSpace(vvstr))
  372. mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
  373. if err != nil {
  374. continue
  375. }
  376. }
  377. if val.Funcspec == "DateParse" {
  378. vvstr := utils.ValueToString(vv, "")
  379. Valtime, err := utils.TimeParseyyyyMMdd(strings.TrimSpace(vvstr))
  380. mapVal = utils.TimeFormat(Valtime, "yyyy-MM-dd HH:mm:ss")
  381. if err != nil {
  382. continue
  383. }
  384. }
  385. if val.Funcspec == "ValueToInt" {
  386. mapVal = utils.ValueToInt(vv, 0)
  387. }
  388. if !utils.ValueIsEmpty(val.Defaultvalue) {
  389. tem[mapKey] = val.Defaultvalue
  390. } else {
  391. tem[mapKey] = mapVal
  392. }
  393. }
  394. }
  395. //获取当前导入表的主键
  396. TablePk, err := SearchTablePk(v.Todb, v.Todrivername, v.Totable, v.Dbtype)
  397. if err != nil {
  398. glog.InfoExtln("etl导入从库", "err is :", err)
  399. continue
  400. }
  401. //查询主键
  402. temPk := make(map[string]interface{})
  403. for _, v := range TablePk {
  404. key := v.Colname
  405. temPk[key] = v
  406. value, ok := tablefiled[key]
  407. if ok {
  408. if value == "int" || value == "smallint" || value == "bigint" {
  409. intval := utils.ValueToInt(tem[key], 0)
  410. where += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
  411. } else if value == "decimal" {
  412. floatval := utils.ValueToFloat(tem[key], 0.0)
  413. where += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
  414. } else {
  415. val := utils.ValueToString(tem[key], "")
  416. where += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
  417. }
  418. } else {
  419. continue
  420. }
  421. }
  422. //去掉最后边的逗号
  423. where = strings.Trim(where, " AND ")
  424. e := models.SearchDb(v.Todb, v.Todrivername, v.Dbtype)
  425. //查询数据
  426. dataone := make(map[string]interface{})
  427. ok, err := e.Table(tablename).Where(where).Get(&dataone)
  428. if err != nil {
  429. glog.InfoExtln("etl导入从库", "err is :", err)
  430. continue
  431. }
  432. if !ok {
  433. //记录字段,插入
  434. for key, val := range tablefiled {
  435. keystr += key + ","
  436. value, ok := tem[key]
  437. if ok {
  438. if val == "int" || val == "smallint" || value == "bigint" {
  439. intval := utils.ValueToInt(value, 0)
  440. valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
  441. } else if val == "decimal" {
  442. floatval := utils.ValueToFloat(value, 0.0)
  443. valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
  444. } else if val == "datetime" {
  445. val := utils.ValueToString(value, "")
  446. valstr += "'" + strings.TrimSpace(val) + "'" + ","
  447. } else {
  448. val := utils.ValueToString(value, "")
  449. valstr += "'" + strings.TrimSpace(val) + "'" + ","
  450. }
  451. } else {
  452. val := utils.ValueToString(value, "")
  453. valstr += "'" + strings.TrimSpace(val) + "'" + ","
  454. }
  455. }
  456. //去掉最后边的逗号
  457. keystr = strings.Trim(keystr, ",")
  458. valstr = strings.Trim(valstr, ",")
  459. sql := "INSERT INTO " + tablename + " (" + keystr + ") values (" + valstr + ")"
  460. glog.InfoExtln("etl导入打印", "sql is :", sql)
  461. if v.Status != "ok" {
  462. continue
  463. }
  464. to := new(models.Etltab)
  465. to.Todb = v.Todb
  466. to.Todrivername = v.Todrivername
  467. err = to.ToLeadSlave(sql)
  468. if err != nil {
  469. glog.InfoExtln("etl导入从库", "UpdatePlnBatchorder err is :", err)
  470. continue
  471. }
  472. //更新mongdb数据flag
  473. //err = v.UpdateData()
  474. //if err != nil {
  475. // glog.Info("更新flag错误日志:%v", err)
  476. //}
  477. err = v.DeleteoneRecord()
  478. if err != nil {
  479. glog.InfoExtln("etl导入从库", "err is :", err)
  480. continue
  481. }
  482. } else {
  483. //更新内容
  484. //第二步构建sql语句
  485. for key, val := range tablefiled {
  486. value, ok := tem[key]
  487. _, res := temPk[key]
  488. if ok {
  489. keystr += key + ","
  490. if val == "int" || val == "smallint" || value == "bigint" {
  491. intval := utils.ValueToInt(value, 0)
  492. valstr += "'" + utils.ValueToString(intval, "") + "'" + ","
  493. if res {
  494. wherestr += key + "= '" + utils.ValueToString(intval, "") + "'" + " AND "
  495. }
  496. } else if val == "decimal" {
  497. floatval := utils.ValueToFloat(value, 0.0)
  498. valstr += "'" + utils.ValueToString(floatval, "") + "'" + ","
  499. if res {
  500. wherestr += key + "= '" + utils.ValueToString(floatval, "") + "'" + " AND "
  501. }
  502. } else {
  503. val := utils.ValueToString(value, "")
  504. valstr += "'" + strings.TrimSpace(val) + "'" + ","
  505. if res {
  506. wherestr += key + "= '" + strings.TrimSpace(val) + "'" + " AND "
  507. }
  508. }
  509. } else {
  510. continue
  511. }
  512. }
  513. //去掉最后边的逗号
  514. wherestr = strings.Trim(wherestr, " AND ")
  515. valstr = strings.Trim(valstr, ",")
  516. //2.开始导入
  517. sql := "UPDATE " + tablename + " SET " + valstr + " WHERE " + wherestr
  518. //更新
  519. if v.Status != "ok" {
  520. continue
  521. }
  522. to := new(models.Etltab)
  523. err = to.ToLeadSlave(sql)
  524. if err != nil {
  525. glog.InfoExtln("etl导入从库", "err is :", err)
  526. continue
  527. }
  528. //更新mongdb数据flag
  529. //err = v.UpdateData()
  530. //if err != nil {
  531. // glog.Info("更新flag错误日志:%v", err)
  532. // continue
  533. //}
  534. //删除mongdb
  535. err = v.DeleteoneRecord()
  536. if err != nil {
  537. glog.InfoExtln("etl导入从库", "err is :", err)
  538. continue
  539. }
  540. }
  541. }
  542. }
  543. time.Sleep(1 * time.Second)
  544. }
  545. }