3.5 KiB
3.5 KiB
一 租约机制(自动过期)
package main
import (
"context"
"fmt"
"github.com/etcd-io/etcd/clientv3"
"time"
)
func connect() (client *clientv3.Client, err error){
client, err = clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Println("connect err:", err)
return nil, err
}
return client, err
}
func main() {
// 连接
cli, err := connect()
defer cli.Close()
if err != nil {
return
}
// 获取etcd读写对象
kv := clientv3.NewKV(cli)
// 申请一个10秒租约
lease := clientv3.NewLease(cli)
leaseR, err := lease.Grant(context.TODO(), 10)
if err != nil {
fmt.Println("lease err:", err)
return
}
// 使用该租约put一个kv
putR, err := kv.Put(context.TODO(), "/cron/lock/job1", "10001", clientv3.WithLease(leaseR.ID))
if err != nil {
fmt.Println("put err:", err)
return
}
fmt.Println("写入成功:", putR.Header.Revision)
// 定时查看key是否过期
for {
getR, err := kv.Get(context.TODO(), "/cron/lock/job1")
if err != nil {
fmt.Println("get err:", err)
return
}
if getR.Count == 0 {
fmt.Println("key过期")
break
} else {
fmt.Println("还未过期")
time.Sleep(2 * time.Second)
}
}
}
1.3 租约续租
我们希望能够续约,并能根据需要删除:
package main
import (
"context"
"fmt"
"github.com/etcd-io/etcd/clientv3"
"time"
)
func connect() (client *clientv3.Client, err error){
client, err = clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Println("connect err:", err)
return nil, err
}
return client, err
}
func main() {
// 连接
cli, err := connect()
defer cli.Close()
if err != nil {
return
}
// 获取etcd读写对象
kv := clientv3.NewKV(cli)
// 申请一个10秒租约
lease := clientv3.NewLease(cli)
leaseR, err := lease.Grant(context.TODO(), 10)
if err != nil {
fmt.Println("lease err:", err)
return
}
// 自动续租 返回值是个只读的chan,因为写入只能是etcd实现
keepChan, err := lease.KeepAlive(context.TODO(), leaseR.ID )
if err != nil {
fmt.Println("keep err:", err)
return
}
// 启动一个协程去消费chan的应答
go func(){
for {
select {
case keepR := <- keepChan:
if keepChan == nil { // 此时系统异常或者主动取消context
fmt.Println("租约失效")
goto END
} else { // 每秒续租一次
fmt.Println("收到自动续租应答:", keepR.ID)
}
}
}
END:
}()
// 使用该租约put一个kv
putR, err := kv.Put(context.TODO(), "/cron/lock/job1", "10001", clientv3.WithLease(leaseR.ID))
if err != nil {
fmt.Println("put err:", err)
return
}
fmt.Println("写入成功:", putR.Header.Revision)
// 定时查看key是否过期
for {
getR, err := kv.Get(context.TODO(), "/cron/lock/job1")
if err != nil {
fmt.Println("get err:", err)
return
}
if getR.Count == 0 {
fmt.Println("key过期")
break
} else {
fmt.Println("还未过期")
time.Sleep(2 * time.Second)
}
}
}
如果我们要主动让context取消,则会让租约失效,现在定义一个5秒后取消的context:
// 续租了5秒,然后手动停止续租,即总共有15秒生命
ctx, _ := context.WithTimeout(context.TODO(), 5 * time.Second)
// 自动续租 返回值是个只读的chan,因为写入只能是etcd实现
keepChan, err := lease.KeepAlive(ctx, leaseR.ID )