package etcd import ( clientv3 "go.etcd.io/etcd/client/v3" "leit.com/leit_seat_aps/common" conf "leit.com/leit_seat_aps/config" "context" "fmt" ) /****************************************************************************** * * @Struct Name : ETCDServiceImplement *----------------------------------------------------------------------------- * * @Description : 的数据访问对象实现 * * @Author : zhangxin * * @Date : 2021-06-17 * ******************************************************************************/ type ETCDServiceImplement struct { Client *clientv3.Client Lease clientv3.Lease LeaseResp *clientv3.LeaseGrantResponse CancelFunc func() KeepAliveChan <-chan *clientv3.LeaseKeepAliveResponse Addr string CTX context.Context } /****************************************************************************** * * @Function Name : NewETCDLockClientDAOImplement *----------------------------------------------------------------------------- * * @Description : 创建一个NewETCDLockClientDAOImplement实例 * * @Function Parameters : etcd客户端连接 * * @Return Value : ETCDLockClientImplement实例 * * @Author : zhangxin * * @Date : 2021-06-16 * ******************************************************************************/ func NewETCDServiceImplement(client *clientv3.Client, addr string) *ETCDServiceImplement { lease := clientv3.NewLease(client) ctx, cancelFunc := context.WithCancel(context.TODO()) return &ETCDServiceImplement{ Client: client, Lease: lease, Addr: addr, CTX: ctx, CancelFunc: cancelFunc, } } /****************************************************************************** * * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.RegisterService * ******************************************************************************/ func (impl *ETCDServiceImplement) RegisterService(name string) (err error) { err = impl.SetLease() if err != nil { return err } go impl.ListenLeaseRespChan() kv := clientv3.NewKV(impl.Client) serviceKey := fmt.Sprintf("/services/%s/%s", conf.AppConfig.LocalAddr, name) _, err = kv.Put(impl.CTX, serviceKey, common.SERVICE_STATUS_RUNNING, clientv3.WithLease(impl.LeaseResp.ID)) return err } /****************************************************************************** * * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.RegisterTask * ******************************************************************************/ func (impl *ETCDServiceImplement) RegisterTask(taskId int, stage string) (err error) { err = impl.SetLease() if err != nil { return err } go impl.ListenLeaseRespChan() kv := clientv3.NewKV(impl.Client) serviceKey := fmt.Sprintf("/task/%s/%d/%s", conf.AppConfig.LocalAddr, taskId, stage) _, err = kv.Put(impl.CTX, serviceKey, common.SERVICE_STATUS_RUNNING, clientv3.WithLease(impl.LeaseResp.ID)) return err } /****************************************************************************** * * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.SetLease * ******************************************************************************/ func (impl *ETCDServiceImplement) SetLease() error { //设置租约时间 leaseResp, err := impl.Lease.Grant(impl.CTX, conf.AppConfig.ServiceLease) if err != nil { return err } //设置续租 leaseRespChan, err := impl.Lease.KeepAlive(impl.CTX, leaseResp.ID) if err != nil { impl.CancelFunc() return err } impl.LeaseResp = leaseResp impl.KeepAliveChan = leaseRespChan return nil } /****************************************************************************** * * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.ListenLeaseRespChan * ******************************************************************************/ func (impl *ETCDServiceImplement) ListenLeaseRespChan() { for { select { case <-impl.CTX.Done(): fmt.Println("context 结束运行") return case leaseKeepResp := <-impl.KeepAliveChan: if leaseKeepResp == nil { return } } } } /****************************************************************************** * * @Reference LAPP_ETL/dao/etl/ETCDServiceImplement.RevokeLease * ******************************************************************************/ func (impl *ETCDServiceImplement) RevokeLease() error { var err error if impl.LeaseResp != nil { _, err = impl.Lease.Revoke(impl.CTX, impl.LeaseResp.ID) err = impl.Lease.Close() } impl.CancelFunc() return err }