over-golang/06-微服务/05-etcd-5-go与etcd监听.md
2021-07-02 18:11:59 +08:00

2.1 KiB
Raw Permalink Blame History

1.3 示例2 watch机制

package main

import (
	"context"
	"fmt"
	"github.com/etcd-io/etcd/clientv3"
	"time"
	"github.com/coreos/etcd/mvcc/mvccpb"
)

func connect() (client *clientv3.Client, err error){

	client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
		DialTimeout: 5 * time.Second,
	})

	if err != nil {
		fmt.Println("connect err:", err)
		return nil, err
	}

	return client, err
}

func main() {

	// 连接
	cli, err := connect()
	defer cli.Close()
	if err != nil {
		return
	}

	// 获取etcd读写对象
	kv := clientv3.NewKV(cli)

	// 模拟变化
	go func() {
		for {
			kv.Put(context.TODO(), "/cron/jobs/job7", "job7")
			kv.Delete(context.TODO(), "/cron/jobs/job7")
			time.Sleep(1 * time.Second)
		}
	}()
	
	// 获取当前值
	getR, err := kv.Get(context.TODO(),  "/cron/jobs/job7")
	if err != nil {
		fmt.Println("get err:", err)
		return
	}
	if len(getR.Kvs) != 0 {		// key存在
		fmt.Println("当前值:", string(getR.Kvs[0].Value))
	}

	// 监听后续变化: revision是当前etcd集群事务ID该ID是单调递增
	wathStartRevision := getR.Header.Revision + 1
	watcher := clientv3.NewWatcher(cli)			// 创建wathcer
	fmt.Println("从该版本向后监听:", wathStartRevision)
	watchChan := watcher.Watch(context.TODO(),  "/cron/jobs/job7", clientv3.WithRev(wathStartRevision))

	// 如果有变化则会将变化丢到watchChan
	for watchResult := range watchChan{
		for _, event := range watchResult.Events {
			switch event.Type {
			case mvccpb.PUT:
				fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("删除了Revision:", event.Kv.ModRevision )
			}

		}
	}
}

如果要取消监听同样是通过取消contex来实现

ctx, cancelFunc := context.WithTimeout(context.TODO(), 5 * time.Second)
// 5秒后执行退出函数
time.AfterFunc(5 * time.Second, func(){
    cancelFunc()
})
watchChan := watcher.Watch(ctx,  "/cron/jobs/job7", clientv3.WithRev(wathStartRevision))