SJA APS后端代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.5 KiB

3 years ago
  1. package etcd
  2. import (
  3. clientv3 "go.etcd.io/etcd/client/v3"
  4. "leit.com/leit_seat_aps/common"
  5. conf "leit.com/leit_seat_aps/config"
  6. "context"
  7. "fmt"
  8. )
  9. /******************************************************************************
  10. *
  11. * @Struct Name : ETCDServiceImplement
  12. *-----------------------------------------------------------------------------
  13. *
  14. * @Description : 的数据访问对象实现
  15. *
  16. * @Author : zhangxin
  17. *
  18. * @Date : 2021-06-17
  19. *
  20. ******************************************************************************/
  21. type ETCDServiceImplement struct {
  22. Client *clientv3.Client
  23. Lease clientv3.Lease
  24. LeaseResp *clientv3.LeaseGrantResponse
  25. CancelFunc func()
  26. KeepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
  27. Addr string
  28. CTX context.Context
  29. }
  30. /******************************************************************************
  31. *
  32. * @Function Name : NewETCDLockClientDAOImplement
  33. *-----------------------------------------------------------------------------
  34. *
  35. * @Description : 创建一个NewETCDLockClientDAOImplement实例
  36. *
  37. * @Function Parameters : etcd客户端连接
  38. *
  39. * @Return Value : ETCDLockClientImplement实例
  40. *
  41. * @Author : zhangxin
  42. *
  43. * @Date : 2021-06-16
  44. *
  45. ******************************************************************************/
  46. func NewETCDServiceImplement(client *clientv3.Client, addr string) *ETCDServiceImplement {
  47. lease := clientv3.NewLease(client)
  48. ctx, cancelFunc := context.WithCancel(context.TODO())
  49. return &ETCDServiceImplement{
  50. Client: client,
  51. Lease: lease,
  52. Addr: addr,
  53. CTX: ctx,
  54. CancelFunc: cancelFunc,
  55. }
  56. }
  57. /******************************************************************************
  58. *
  59. * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.RegisterService
  60. *
  61. ******************************************************************************/
  62. func (impl *ETCDServiceImplement) RegisterService(name string) (err error) {
  63. err = impl.SetLease()
  64. if err != nil {
  65. return err
  66. }
  67. go impl.ListenLeaseRespChan()
  68. kv := clientv3.NewKV(impl.Client)
  69. serviceKey := fmt.Sprintf("/services/%s/%s", conf.AppConfig.LocalAddr, name)
  70. _, err = kv.Put(impl.CTX, serviceKey, common.SERVICE_STATUS_RUNNING, clientv3.WithLease(impl.LeaseResp.ID))
  71. return err
  72. }
  73. /******************************************************************************
  74. *
  75. * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.RegisterTask
  76. *
  77. ******************************************************************************/
  78. func (impl *ETCDServiceImplement) RegisterTask(taskId int, stage string) (err error) {
  79. err = impl.SetLease()
  80. if err != nil {
  81. return err
  82. }
  83. go impl.ListenLeaseRespChan()
  84. kv := clientv3.NewKV(impl.Client)
  85. serviceKey := fmt.Sprintf("/task/%s/%d/%s", conf.AppConfig.LocalAddr, taskId, stage)
  86. _, err = kv.Put(impl.CTX, serviceKey, common.SERVICE_STATUS_RUNNING, clientv3.WithLease(impl.LeaseResp.ID))
  87. return err
  88. }
  89. /******************************************************************************
  90. *
  91. * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.SetLease
  92. *
  93. ******************************************************************************/
  94. func (impl *ETCDServiceImplement) SetLease() error {
  95. //设置租约时间
  96. leaseResp, err := impl.Lease.Grant(impl.CTX, conf.AppConfig.ServiceLease)
  97. if err != nil {
  98. return err
  99. }
  100. //设置续租
  101. leaseRespChan, err := impl.Lease.KeepAlive(impl.CTX, leaseResp.ID)
  102. if err != nil {
  103. impl.CancelFunc()
  104. return err
  105. }
  106. impl.LeaseResp = leaseResp
  107. impl.KeepAliveChan = leaseRespChan
  108. return nil
  109. }
  110. /******************************************************************************
  111. *
  112. * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.ListenLeaseRespChan
  113. *
  114. ******************************************************************************/
  115. func (impl *ETCDServiceImplement) ListenLeaseRespChan() {
  116. for {
  117. select {
  118. case <-impl.CTX.Done():
  119. fmt.Println("context 结束运行")
  120. return
  121. case leaseKeepResp := <-impl.KeepAliveChan:
  122. if leaseKeepResp == nil {
  123. return
  124. }
  125. }
  126. }
  127. }
  128. /******************************************************************************
  129. *
  130. * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.RevokeLease
  131. *
  132. ******************************************************************************/
  133. func (impl *ETCDServiceImplement) RevokeLease() error {
  134. var err error
  135. if impl.LeaseResp != nil {
  136. _, err = impl.Lease.Revoke(impl.CTX, impl.LeaseResp.ID)
  137. err = impl.Lease.Close()
  138. }
  139. impl.CancelFunc()
  140. return err
  141. }