|
package implments
|
|
|
|
import (
|
|
"LAPP_ETL/global"
|
|
"LAPP_ETL/grmi"
|
|
"LAPP_ETL/infra/config"
|
|
model "LAPP_ETL/models/etl"
|
|
"context"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"time"
|
|
)
|
|
|
|
/******************************************************************************
|
|
*
|
|
* @Struct Name : RecordDaoImplement
|
|
*-----------------------------------------------------------------------------
|
|
*
|
|
* @Description : Record的数据访问对象实现
|
|
*
|
|
* @Author : zhangxin
|
|
*
|
|
* @Date : 2021-06-01
|
|
*
|
|
******************************************************************************/
|
|
type RecordDaoImplement struct {
|
|
client *mongo.Client
|
|
}
|
|
|
|
|
|
/******************************************************************************
|
|
*
|
|
* @Function Name : NewRecordDaoImplement
|
|
*-----------------------------------------------------------------------------
|
|
*
|
|
* @Description : 创建一个RecordDaoImplement实例
|
|
*
|
|
* @Function Parameters : mongo会话
|
|
*
|
|
|
|
* @Return Value : NewRecordDaoImplement实例
|
|
*
|
|
* @Author : zhangxin
|
|
*
|
|
* @Date : 2021-06-01
|
|
*
|
|
******************************************************************************/
|
|
func NewRecordDaoImplement(client *mongo.Client) *RecordDaoImplement {
|
|
return &RecordDaoImplement{client}
|
|
}
|
|
|
|
|
|
/******************************************************************************
|
|
*
|
|
* @Reference LAPP_ETL/dao/etl/RecordDaoImplement.InsertOne
|
|
*
|
|
******************************************************************************/
|
|
func (impl *RecordDaoImplement) InsertOne(record *model.Record, collectionName string) error {
|
|
_, err := impl.client.Database(config.AppConfig.Mongo.LogDB).Collection(collectionName).InsertOne(context.TODO(), record)
|
|
return err
|
|
}
|
|
|
|
|
|
/******************************************************************************
|
|
*
|
|
* @Reference LAPP_ETL/dao/etl/RecordDaoImplement.Select
|
|
*
|
|
******************************************************************************/
|
|
func (impl *RecordDaoImplement) Select(collectionName string, stage, status, start, end, batchId string, taskId int, pageIndex, pageNumber int) (grmi.PagingResult, error) {
|
|
var condition bson.D
|
|
if start != "" && end != "" {
|
|
startT, err := time.ParseInLocation(grmi.DatetimeOutFormat, start, global.TimezoneLocation)
|
|
if err != nil {
|
|
return grmi.EmptyPagingResult, err
|
|
}
|
|
endT, err := time.ParseInLocation(grmi.DatetimeOutFormat, end, global.TimezoneLocation)
|
|
if err != nil {
|
|
return grmi.EmptyPagingResult, err
|
|
}
|
|
condition = bson.D{{"time", bson.M{"$lte": endT, "$gte": startT}}}
|
|
} else {
|
|
condition = bson.D{}
|
|
}
|
|
if taskId != 0 {
|
|
condition = append(condition, bson.E{Key: "taskId", Value: taskId})
|
|
}
|
|
if status != "" {
|
|
condition = append(condition, bson.E{Key: "status", Value: status})
|
|
}
|
|
if stage != "" {
|
|
condition = append(condition, bson.E{Key: "stage", Value: stage})
|
|
}
|
|
if batchId != "" {
|
|
condition = append(condition, bson.E{Key: "batchId", Value: batchId})
|
|
}
|
|
var skip = int64((pageIndex - 1) * pageNumber)
|
|
var limit = int64(pageNumber)
|
|
count, err := impl.client.Database(config.AppConfig.Mongo.LogDB).Collection(collectionName).CountDocuments(context.TODO(), condition)
|
|
if err != nil {
|
|
return grmi.EmptyPagingResult, err
|
|
}
|
|
cursor, err := impl.client.Database(config.AppConfig.Mongo.LogDB).Collection(collectionName).Find(context.TODO(), condition, &options.FindOptions{
|
|
Skip: &skip,
|
|
Limit: &limit,
|
|
Sort: bson.D{{"time", -1}},
|
|
})
|
|
if err != nil {
|
|
return grmi.EmptyPagingResult, err
|
|
}
|
|
records := make([]model.Record, 0)
|
|
for cursor.Next(context.TODO()) {
|
|
var mr model.Record
|
|
err = cursor.Decode(&mr)
|
|
if err != nil {
|
|
return grmi.EmptyPagingResult, err
|
|
}
|
|
records = append(records, mr)
|
|
}
|
|
return grmi.PagingResult{Records: records, Count: count, PageNumber: int64(pageNumber), PageSize: int64(pageIndex)}, nil
|
|
}
|
|
|