Skip to main content

etcd-5-go与etcd监听

1.3 示例2 watch机制

package main

import (
"context"
"fmt"
"github.com/etcd-io/etcd/clientv3"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
)

func connect() (client *clientv3.Client, err error){

client, err = clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
DialTimeout: 5 * time.Second,
})

if err != nil {
fmt.Println("connect err:", err)
return nil, err
}

return client, err
}

func main() {

// 连接
cli, err := connect()
defer cli.Close()
if err != nil {
return
}

// 获取etcd读写对象
kv := clientv3.NewKV(cli)

// 模拟变化
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job7", "job7")
kv.Delete(context.TODO(), "/cron/jobs/job7")
time.Sleep(1 * time.Second)
}
}()

// 获取当前值
getR, err := kv.Get(context.TODO(), "/cron/jobs/job7")
if err != nil {
fmt.Println("get err:", err)
return
}
if len(getR.Kvs) != 0 { // key存在
fmt.Println("当前值:", string(getR.Kvs[0].Value))
}

// 监听后续变化: revision是当前etcd集群事务ID,该ID是单调递增
wathStartRevision := getR.Header.Revision + 1
watcher := clientv3.NewWatcher(cli) // 创建wathcer
fmt.Println("从该版本向后监听:", wathStartRevision)
watchChan := watcher.Watch(context.TODO(), "/cron/jobs/job7", clientv3.WithRev(wathStartRevision))

// 如果有变化,则会将变化丢到watchChan
for watchResult := range watchChan{
for _, event := range watchResult.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了Revision:", event.Kv.ModRevision )
}

}
}
}

如果要取消监听,同样是通过取消contex来实现:

ctx, cancelFunc := context.WithTimeout(context.TODO(), 5 * time.Second)
// 5秒后执行退出函数
time.AfterFunc(5 * time.Second, func(){
cancelFunc()
})
watchChan := watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(wathStartRevision))