package Engine import ( "LAPP_SJA_ME/conf" "fmt" "log" "os" "testing" "time" ) func TestEngine(t *testing.T){ conf, err := conf.ReadYamlConfig("D:\\Go\\gopath\\src\\LAPP_PLATFORM\\conf\\config.yaml") if err != nil { t.Errorf("failed to read yaml config due to: %v", err) return } //set logfile Stdout logFile, logErr := os.OpenFile(conf.Log, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) if logErr != nil { fmt.Println("Fail to the log file:", conf.Log) os.Exit(1) } defer logFile.Close() log.SetOutput(logFile) log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) fmt.Printf("conf is :%v",conf) RunEngine(conf) time.Sleep(60*time.Second) } // 测试不同服务之间的GoRouting通信 /** |-- 管道 --| --> 1号打印机 --| Fetcher--|--管道--|Scheduler --->| |--管道--|--> Saver |-- 管道 --| --> 2号打印机 --| 引擎管理对象: 1. Fetcher 2. Fetcher -> Scheduler 管道 3. Scheduler 4. Printer列表 (每个Printer包括接收数据的管道和缓存数据的队列) 5. 打印机 -> Saver的管道 6. Saver(带有缓存数据队列) **/ func TestEngineRoutine(t *testing.T) { fschan := make(chan int) // 数据获取服务到调度器的管道 sp1chan := make(chan int) // 调度器到1号打印机的管道 sp2chan := make(chan int) // 调度器到2号打印机的管道 pschan := make(chan int) // 打印机到保存服务的管道 var t1, t2 time.Time var sp1q []int var sp2q [] int t1 = time.Now() fmt.Println("start time = ",t1) // 模拟数据获取服务 go func(){ for i := 0; i < 40000; i++ { fschan <- i } }() // 启动调度器服务 go func(){ for { i := <-fschan if i%2 == 0 { sp1chan <- i } else { sp2chan <- i } } }() // 启动打印机1服务 go func(){ for { i := <-sp1chan //fmt.Println("打印机1接收打印任务:", i) sp1q = append(sp1q, i) } }() // 启动打印机2服务 go func(){ for { i := <-sp2chan //fmt.Println("打印机2接收打印任务:", i) sp2q = append(sp2q, i) } }() go func(){ for { if len(sp1q) > 0 { i := sp1q[0] sp1q = sp1q[1:] pschan <- i fmt.Println("打印机1:任务:",i,"完成") } } }() go func(){ for { if len(sp2q) > 0 { i := sp2q[0] sp2q = sp2q[1:] pschan <- i fmt.Println("打印机2:任务:",i,"完成") } } }() // 启动保存服务 go func(){ for { i := <-pschan fmt.Println("任务:", i, " 被保存") if i >= 39998 { t2 = time.Now() fmt.Println("end time = ",t2) } } }() time.Sleep(10*time.Second) sub := t2.Sub(t1) fmt.Println("Duration = ",sub.Seconds()) } func TestEngineRoutineWithQueue(t *testing.T) { fschan := make(chan int) // 数据获取服务到调度器的管道 sp1chan := make(chan int) // 调度器到1号打印机的管道 sp2chan := make(chan int) // 调度器到2号打印机的管道 pschan := make(chan int) // 打印机到保存服务的管道 var t1, t2 time.Time t1 = time.Now() fmt.Println("start time = ",t1) // 模拟数据获取服务 go func(){ for i := 0; i < 40000; i++ { fschan <- i } }() // 启动调度器服务 go func(){ for { i := <-fschan if i%2 == 0 { sp1chan <- i } else { sp2chan <- i } } }() // 启动打印机1服务 go func(){ var sp1q []int var t int for { if len(sp1q) > 0 { t = sp1q[0] }else{ continue } select{ case i := <-sp1chan: sp1q = append(sp1q, i) fmt.Println("打印机1接收打印任务:", i) case pschan <- t: sp1q = sp1q[1:] fmt.Println("打印机1发送保存任务:", t) } } }() // 启动打印机2服务 go func(){ var sp2q []int var t int for { if len(sp2q) > 0 { t = sp2q[0] }else{ continue } select{ case i := <-sp2chan: sp2q = append(sp2q, i) fmt.Println("打印机2接收打印任务:", i) case pschan <- t: sp2q = sp2q[1:] fmt.Println("打印机2发送保存任务:", t) } } }() // 启动保存服务 go func(){ for { i := <-pschan fmt.Println("任务:", i, " 被保存") if i >= 39998 { t2 = time.Now() fmt.Println("end time = ",t2) } } }() time.Sleep(10*time.Second) sub := t2.Sub(t1) fmt.Println("Duration = ",sub.Seconds()) }