over-golang/06-微服务/05-etcd-4-go与etcd租约.md
2021-07-02 18:11:59 +08:00

3.5 KiB
Raw Permalink Blame History

一 租约机制(自动过期)

package main

import (
	"context"
	"fmt"
	"github.com/etcd-io/etcd/clientv3"
	"time"
)

func connect() (client *clientv3.Client, err error){

	client, err = clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
		DialTimeout: 5 * time.Second,
	})

	if err != nil {
		fmt.Println("connect err:", err)
		return nil, err
	}

	return client, err
}

func main() {

	// 连接
	cli, err := connect()
	defer cli.Close()
	if err != nil {
		return
	}

	// 获取etcd读写对象
	kv := clientv3.NewKV(cli)

	// 申请一个10秒租约
	lease := clientv3.NewLease(cli)
	leaseR, err := lease.Grant(context.TODO(), 10)
	if err != nil {
		fmt.Println("lease err:", err)
		return
	}

	// 使用该租约put一个kv
	putR, err := kv.Put(context.TODO(), "/cron/lock/job1", "10001", clientv3.WithLease(leaseR.ID))
	if err != nil {
		fmt.Println("put err:", err)
		return
	}
	fmt.Println("写入成功:", putR.Header.Revision)

	// 定时查看key是否过期
	for {
		getR, err := kv.Get(context.TODO(), "/cron/lock/job1")
		if err != nil {
			fmt.Println("get err:", err)
			return
		}
		if getR.Count == 0 {
			fmt.Println("key过期")
			break
		} else {
			fmt.Println("还未过期")
			time.Sleep(2 * time.Second)
		}
	}
}

1.3 租约续租

我们希望能够续约,并能根据需要删除:

package main

import (
	"context"
	"fmt"
	"github.com/etcd-io/etcd/clientv3"
	"time"
)

func connect() (client *clientv3.Client, err error){

	client, err = clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
		DialTimeout: 5 * time.Second,
	})

	if err != nil {
		fmt.Println("connect err:", err)
		return nil, err
	}

	return client, err
}

func main() {

	// 连接
	cli, err := connect()
	defer cli.Close()
	if err != nil {
		return
	}

	// 获取etcd读写对象
	kv := clientv3.NewKV(cli)

	// 申请一个10秒租约
	lease := clientv3.NewLease(cli)
	leaseR, err := lease.Grant(context.TODO(), 10)
	if err != nil {
		fmt.Println("lease err:", err)
		return
	}
	
	// 自动续租 返回值是个只读的chan因为写入只能是etcd实现
	keepChan, err := lease.KeepAlive(context.TODO(), leaseR.ID )
	if err != nil {
		fmt.Println("keep err:", err)
		return
	}
	// 启动一个协程去消费chan的应答
	go func(){
		for {
			select {
			case keepR := <- keepChan:
				if keepChan == nil {		// 此时系统异常或者主动取消context
					fmt.Println("租约失效")
					goto END
				} else {		// 每秒续租一次
					fmt.Println("收到自动续租应答:", keepR.ID)
				}
			}
		}
		END:
	}()

	// 使用该租约put一个kv
	putR, err := kv.Put(context.TODO(), "/cron/lock/job1", "10001", clientv3.WithLease(leaseR.ID))
	if err != nil {
		fmt.Println("put err:", err)
		return
	}
	fmt.Println("写入成功:", putR.Header.Revision)

	// 定时查看key是否过期
	for {
		getR, err := kv.Get(context.TODO(), "/cron/lock/job1")
		if err != nil {
			fmt.Println("get err:", err)
			return
		}
		if getR.Count == 0 {
			fmt.Println("key过期")
			break
		} else {
			fmt.Println("还未过期")
			time.Sleep(2 * time.Second)
		}
	}
}

如果我们要主动让context取消则会让租约失效现在定义一个5秒后取消的context

	// 续租了5秒然后手动停止续租即总共有15秒生命
	ctx, _ := context.WithTimeout(context.TODO(), 5 * time.Second)
	
	// 自动续租 返回值是个只读的chan因为写入只能是etcd实现
	keepChan, err := lease.KeepAlive(ctx, leaseR.ID )