SJA APS后端代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
3.2 KiB

  1. // Copyright (c) Shenyang Leading Edge Intelligent Technology Co., Ltd. All rights reserved.
  2. package etcd
  3. import (
  4. "context"
  5. "fmt"
  6. clientv3 "go.etcd.io/etcd/client/v3"
  7. "time"
  8. )
  9. /******************************************************************************
  10. *
  11. * @Function Name :
  12. *-----------------------------------------------------------------------------
  13. *
  14. * @Description :
  15. *
  16. * @Function Parameters:
  17. *
  18. * @Return Value :
  19. *
  20. * @Author : Lou Wenzhi
  21. *
  22. * @Date : 2021/3/25 9:02
  23. *
  24. ******************************************************************************/
  25. // 分布式锁(TXN事务)
  26. type SnrJobLock struct {
  27. // etcd客户端
  28. Client *clientv3.Client
  29. Kv clientv3.KV
  30. Lease clientv3.Lease
  31. JobName string // 任务名
  32. CancelFunc context.CancelFunc // 用于终止自动续租
  33. LeaseId clientv3.LeaseID // 租约ID
  34. IsLocked bool // 是否上锁成功
  35. }
  36. var(
  37. Snr_jobLock *SnrJobLock
  38. )
  39. //初始化任务锁
  40. func InitSnrJobLock() (err error) {
  41. var (
  42. config clientv3.Config
  43. client *clientv3.Client
  44. kv clientv3.KV
  45. lease clientv3.Lease
  46. )
  47. //创建config
  48. config = clientv3.Config{
  49. Endpoints:[]string{"127.0.0.1:2379"},
  50. DialTimeout: 5000 * time.Millisecond,
  51. }
  52. //创建client
  53. if client, err = clientv3.New(config); err != nil {
  54. fmt.Println(err)
  55. }
  56. //创建kv, lease
  57. kv = clientv3.NewKV(client)
  58. lease = clientv3.NewLease(client)
  59. //赋值单例
  60. Snr_jobLock = &SnrJobLock{
  61. Client:client,
  62. Kv:kv,
  63. Lease:lease,
  64. }
  65. return
  66. }
  67. //尝试上锁
  68. func (jobLock *SnrJobLock) TryLock(jobName string) (err error){
  69. var(
  70. ctx context.Context
  71. cancelFunc context.CancelFunc
  72. leaseGrantRes *clientv3.LeaseGrantResponse
  73. leaseId clientv3.LeaseID
  74. KeepResChan <- chan *clientv3.LeaseKeepAliveResponse
  75. txn clientv3.Txn
  76. lockKey string
  77. txnRes *clientv3.TxnResponse
  78. )
  79. //创建上下文
  80. ctx, cancelFunc = context.WithCancel(context.TODO())
  81. //创建续租
  82. leaseGrantRes, err = jobLock.Lease.Grant(ctx, 5)
  83. //续租id
  84. leaseId = leaseGrantRes.ID
  85. //保持续租
  86. if KeepResChan, err = jobLock.Lease.KeepAlive(ctx, leaseId); err != nil{
  87. fmt.Println(err)
  88. goto FAIL
  89. }
  90. //每隔1秒续约续租
  91. go func() {
  92. var keepRes *clientv3.LeaseKeepAliveResponse
  93. for{
  94. select {
  95. case keepRes = <- KeepResChan:
  96. //如果续约失败
  97. if keepRes == nil{
  98. goto END
  99. }
  100. }
  101. time.Sleep(1*time.Second)
  102. }
  103. END:
  104. }()
  105. //创建事务txn
  106. txn = jobLock.Kv.Txn(context.TODO())
  107. //锁路径
  108. lockKey = jobName
  109. //事务枪锁
  110. txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
  111. Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
  112. Else(clientv3.OpGet(lockKey))
  113. //提交事务
  114. if txnRes, err = txn.Commit(); err != nil{
  115. fmt.Println("提交创建锁失败", err)
  116. goto FAIL
  117. }
  118. //如果抢锁失败
  119. if !txnRes.Succeeded{
  120. err = fmt.Errorf("锁被占用")
  121. goto FAIL
  122. }
  123. jobLock.CancelFunc = cancelFunc
  124. jobLock.LeaseId = leaseId
  125. jobLock.IsLocked = true
  126. return
  127. FAIL:
  128. //释放上下文,取消续约
  129. cancelFunc()
  130. jobLock.Lease.Revoke(ctx, leaseId) //释放租约
  131. return
  132. }
  133. //释放锁
  134. func (jobLock *SnrJobLock)UnLock() {
  135. if jobLock.IsLocked{
  136. jobLock.CancelFunc() //取消自动续租协程
  137. jobLock.Lease.Revoke(context.TODO(), jobLock.LeaseId) // 释放租约
  138. }
  139. }