-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathwatch.rs
More file actions
111 lines (95 loc) · 3.8 KB
/
watch.rs
File metadata and controls
111 lines (95 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
//! Watch example
use etcd_client::*;
use std::{collections::HashMap, time::Duration};
#[tokio::main]
async fn main() -> Result<(), Error> {
let mut client = Client::connect(["localhost:2379"], None).await?;
client.put("foo", "bar", None).await?;
println!("put kv: {{foo: bar}}");
client.put("foo1", "bar1", None).await?;
println!("put kv: {{foo1: bar1}}");
// Record watch IDs and their starting revisions
let mut watches: HashMap<i64 /* watch ID */, i64 /* revision */> = HashMap::new();
let opts = WatchOptions::new().with_watch_id(1);
let mut watch_stream = client.watch("foo", Some(opts)).await?;
let resp = watch_stream
.message()
.await?
.ok_or(Error::WatchError("No initial watch response".into()))?;
let resp = match (resp.created(), resp.canceled()) {
(true, false) => Ok(resp),
(true, true) => Err(Error::WatchError(resp.cancel_reason().into())),
_ => Err(Error::WatchError("Unexpected watch response".into())),
}?;
println!("create watcher, watch ID: {}", resp.watch_id());
println!();
let rev = resp
.header()
.map(|h| h.revision())
.ok_or_else(|| Error::WatchError("No header revision in initial watch response".into()))?;
watches.insert(resp.watch_id(), rev);
client.put("foo", "bar2", None).await?;
watch_stream.request_progress().await?;
client.delete("foo", None).await?;
// Reuse the same watch stream to watch another key
let opts = WatchOptions::new().with_watch_id(2);
watch_stream.watch("foo1", Some(opts)).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
client.put("foo1", "bar2", None).await?;
client.delete("foo1", None).await?;
// NOTE: The responses to the following watches may receive out of order.
while let Some(resp) = watch_stream.message().await? {
let watch_id = resp.watch_id();
println!("[{}] receive watch response", watch_id);
println!("compact revision: {}", resp.compact_revision());
match (resp.created(), resp.canceled()) {
(true, false) if watch_id == 2 => {
let rev = resp.header().map(|h| h.revision()).ok_or_else(|| {
Error::WatchError("No header revision in watch creation response".into())
})?;
watches.insert(resp.watch_id(), rev);
}
(true, true) if watch_id == 2 => {
println!(
"watch ID {} creation canceled: {}",
resp.watch_id(),
resp.cancel_reason()
);
}
(false, true) => {
watches.remove(&resp.watch_id());
println!(
"watch ID {} canceled: {}, reason: {}",
resp.watch_id(),
resp.canceled(),
resp.cancel_reason()
);
}
(created, canceled) => {
println!(
"watch ID {} created: {}, canceled: {}, reason: {}",
resp.watch_id(),
created,
canceled,
resp.cancel_reason()
);
}
}
for event in resp.events() {
println!("event type: {:?}", event.event_type());
if let Some(kv) = event.kv() {
println!("kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?);
}
if EventType::Delete == event.event_type() {
println!("canceling watch ID {}", resp.watch_id());
watch_stream.cancel(resp.watch_id()).await?;
}
}
if watches.is_empty() {
println!("all watches canceled, exiting...");
break;
}
println!();
}
Ok(())
}