Skip to content

Commit 7f57a89

Browse files
authored
Fix time based average stats (#442)
* keep track of current stats and zero them after updating averages * Try tests * typo * remove commented test stuff * Avoid dividing by zero * Fix test * refactor, get rid of iterator. do it manually * trigger build * Fix
1 parent 0898461 commit 7f57a89

File tree

4 files changed

+151
-116
lines changed

4 files changed

+151
-116
lines changed

src/stats.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl Collector {
113113
for stats in server_stats.values() {
114114
if !stats.check_address_stat_average_is_updated_status() {
115115
stats.address_stats().update_averages();
116+
stats.address_stats().reset_current_counts();
116117
stats.set_address_stat_average_is_updated_status(true);
117118
}
118119
}

src/stats/address.rs

Lines changed: 142 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,26 @@
11
use std::sync::atomic::*;
22
use std::sync::Arc;
33

4+
#[derive(Debug, Clone, Default)]
5+
struct AddressStatFields {
6+
xact_count: Arc<AtomicU64>,
7+
query_count: Arc<AtomicU64>,
8+
bytes_received: Arc<AtomicU64>,
9+
bytes_sent: Arc<AtomicU64>,
10+
xact_time: Arc<AtomicU64>,
11+
query_time: Arc<AtomicU64>,
12+
wait_time: Arc<AtomicU64>,
13+
errors: Arc<AtomicU64>,
14+
}
15+
416
/// Internal address stats
517
#[derive(Debug, Clone, Default)]
618
pub struct AddressStats {
7-
pub total_xact_count: Arc<AtomicU64>,
8-
pub total_query_count: Arc<AtomicU64>,
9-
pub total_received: Arc<AtomicU64>,
10-
pub total_sent: Arc<AtomicU64>,
11-
pub total_xact_time: Arc<AtomicU64>,
12-
pub total_query_time: Arc<AtomicU64>,
13-
pub total_wait_time: Arc<AtomicU64>,
14-
pub total_errors: Arc<AtomicU64>,
15-
16-
pub old_total_xact_count: Arc<AtomicU64>,
17-
pub old_total_query_count: Arc<AtomicU64>,
18-
pub old_total_received: Arc<AtomicU64>,
19-
pub old_total_sent: Arc<AtomicU64>,
20-
pub old_total_xact_time: Arc<AtomicU64>,
21-
pub old_total_query_time: Arc<AtomicU64>,
22-
pub old_total_wait_time: Arc<AtomicU64>,
23-
pub old_total_errors: Arc<AtomicU64>,
24-
25-
pub avg_query_count: Arc<AtomicU64>,
26-
pub avg_query_time: Arc<AtomicU64>,
27-
pub avg_recv: Arc<AtomicU64>,
28-
pub avg_sent: Arc<AtomicU64>,
29-
pub avg_errors: Arc<AtomicU64>,
30-
pub avg_xact_time: Arc<AtomicU64>,
31-
pub avg_xact_count: Arc<AtomicU64>,
32-
pub avg_wait_time: Arc<AtomicU64>,
19+
total: AddressStatFields,
20+
21+
current: AddressStatFields,
22+
23+
averages: AddressStatFields,
3324

3425
// Determines if the averages have been updated since the last time they were reported
3526
pub averages_updated: Arc<AtomicBool>,
@@ -43,133 +34,193 @@ impl IntoIterator for AddressStats {
4334
vec![
4435
(
4536
"total_xact_count".to_string(),
46-
self.total_xact_count.load(Ordering::Relaxed),
37+
self.total.xact_count.load(Ordering::Relaxed),
4738
),
4839
(
4940
"total_query_count".to_string(),
50-
self.total_query_count.load(Ordering::Relaxed),
41+
self.total.query_count.load(Ordering::Relaxed),
5142
),
5243
(
5344
"total_received".to_string(),
54-
self.total_received.load(Ordering::Relaxed),
45+
self.total.bytes_received.load(Ordering::Relaxed),
5546
),
5647
(
5748
"total_sent".to_string(),
58-
self.total_sent.load(Ordering::Relaxed),
49+
self.total.bytes_sent.load(Ordering::Relaxed),
5950
),
6051
(
6152
"total_xact_time".to_string(),
62-
self.total_xact_time.load(Ordering::Relaxed),
53+
self.total.xact_time.load(Ordering::Relaxed),
6354
),
6455
(
6556
"total_query_time".to_string(),
66-
self.total_query_time.load(Ordering::Relaxed),
57+
self.total.query_time.load(Ordering::Relaxed),
6758
),
6859
(
6960
"total_wait_time".to_string(),
70-
self.total_wait_time.load(Ordering::Relaxed),
61+
self.total.wait_time.load(Ordering::Relaxed),
7162
),
7263
(
7364
"total_errors".to_string(),
74-
self.total_errors.load(Ordering::Relaxed),
65+
self.total.errors.load(Ordering::Relaxed),
7566
),
7667
(
7768
"avg_xact_count".to_string(),
78-
self.avg_xact_count.load(Ordering::Relaxed),
69+
self.averages.xact_count.load(Ordering::Relaxed),
7970
),
8071
(
8172
"avg_query_count".to_string(),
82-
self.avg_query_count.load(Ordering::Relaxed),
73+
self.averages.query_count.load(Ordering::Relaxed),
8374
),
8475
(
8576
"avg_recv".to_string(),
86-
self.avg_recv.load(Ordering::Relaxed),
77+
self.averages.bytes_received.load(Ordering::Relaxed),
8778
),
8879
(
8980
"avg_sent".to_string(),
90-
self.avg_sent.load(Ordering::Relaxed),
81+
self.averages.bytes_sent.load(Ordering::Relaxed),
9182
),
9283
(
9384
"avg_errors".to_string(),
94-
self.avg_errors.load(Ordering::Relaxed),
85+
self.averages.errors.load(Ordering::Relaxed),
9586
),
9687
(
9788
"avg_xact_time".to_string(),
98-
self.avg_xact_time.load(Ordering::Relaxed),
89+
self.averages.xact_time.load(Ordering::Relaxed),
9990
),
10091
(
10192
"avg_query_time".to_string(),
102-
self.avg_query_time.load(Ordering::Relaxed),
93+
self.averages.query_time.load(Ordering::Relaxed),
10394
),
10495
(
10596
"avg_wait_time".to_string(),
106-
self.avg_wait_time.load(Ordering::Relaxed),
97+
self.averages.wait_time.load(Ordering::Relaxed),
10798
),
10899
]
109100
.into_iter()
110101
}
111102
}
112103

113104
impl AddressStats {
105+
pub fn xact_count_add(&self) {
106+
self.total.xact_count.fetch_add(1, Ordering::Relaxed);
107+
self.current.xact_count.fetch_add(1, Ordering::Relaxed);
108+
}
109+
110+
pub fn query_count_add(&self) {
111+
self.total.query_count.fetch_add(1, Ordering::Relaxed);
112+
self.current.query_count.fetch_add(1, Ordering::Relaxed);
113+
}
114+
115+
pub fn bytes_received_add(&self, bytes: u64) {
116+
self.total
117+
.bytes_received
118+
.fetch_add(bytes, Ordering::Relaxed);
119+
self.current
120+
.bytes_received
121+
.fetch_add(bytes, Ordering::Relaxed);
122+
}
123+
124+
pub fn bytes_sent_add(&self, bytes: u64) {
125+
self.total.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
126+
self.current.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
127+
}
128+
129+
pub fn xact_time_add(&self, time: u64) {
130+
self.total.xact_time.fetch_add(time, Ordering::Relaxed);
131+
self.current.xact_time.fetch_add(time, Ordering::Relaxed);
132+
}
133+
134+
pub fn query_time_add(&self, time: u64) {
135+
self.total.query_time.fetch_add(time, Ordering::Relaxed);
136+
self.current.query_time.fetch_add(time, Ordering::Relaxed);
137+
}
138+
139+
pub fn wait_time_add(&self, time: u64) {
140+
self.total.wait_time.fetch_add(time, Ordering::Relaxed);
141+
self.current.wait_time.fetch_add(time, Ordering::Relaxed);
142+
}
143+
114144
pub fn error(&self) {
115-
self.total_errors.fetch_add(1, Ordering::Relaxed);
145+
self.total.errors.fetch_add(1, Ordering::Relaxed);
146+
self.current.errors.fetch_add(1, Ordering::Relaxed);
116147
}
117148

118149
pub fn update_averages(&self) {
119-
let (totals, averages, old_totals) = self.fields_iterators();
120-
for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) {
121-
let total_value = total.load(Ordering::Relaxed);
122-
let old_total_value = old_total.load(Ordering::Relaxed);
123-
average.store(
124-
(total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000),
125-
Ordering::Relaxed,
126-
); // Avg / second
127-
old_total.store(total_value, Ordering::Relaxed);
150+
let stat_period_per_second = crate::stats::STAT_PERIOD / 1_000;
151+
152+
// xact_count
153+
let current_xact_count = self.current.xact_count.load(Ordering::Relaxed);
154+
let current_xact_time = self.current.xact_time.load(Ordering::Relaxed);
155+
self.averages.xact_count.store(
156+
current_xact_count / stat_period_per_second,
157+
Ordering::Relaxed,
158+
);
159+
if current_xact_count == 0 {
160+
self.averages.xact_time.store(0, Ordering::Relaxed);
161+
} else {
162+
self.averages
163+
.xact_time
164+
.store(current_xact_time / current_xact_count, Ordering::Relaxed);
165+
}
166+
167+
// query_count
168+
let current_query_count = self.current.query_count.load(Ordering::Relaxed);
169+
let current_query_time = self.current.query_time.load(Ordering::Relaxed);
170+
self.averages.query_count.store(
171+
current_query_count / stat_period_per_second,
172+
Ordering::Relaxed,
173+
);
174+
if current_query_count == 0 {
175+
self.averages.query_time.store(0, Ordering::Relaxed);
176+
} else {
177+
self.averages
178+
.query_time
179+
.store(current_query_time / current_query_count, Ordering::Relaxed);
128180
}
181+
182+
// bytes_received
183+
let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed);
184+
self.averages.bytes_received.store(
185+
current_bytes_received / stat_period_per_second,
186+
Ordering::Relaxed,
187+
);
188+
189+
// bytes_sent
190+
let current_bytes_sent = self.current.bytes_sent.load(Ordering::Relaxed);
191+
self.averages.bytes_sent.store(
192+
current_bytes_sent / stat_period_per_second,
193+
Ordering::Relaxed,
194+
);
195+
196+
// wait_time
197+
let current_wait_time = self.current.wait_time.load(Ordering::Relaxed);
198+
self.averages.wait_time.store(
199+
current_wait_time / stat_period_per_second,
200+
Ordering::Relaxed,
201+
);
202+
203+
// errors
204+
let current_errors = self.current.errors.load(Ordering::Relaxed);
205+
self.averages
206+
.errors
207+
.store(current_errors / stat_period_per_second, Ordering::Relaxed);
208+
}
209+
210+
pub fn reset_current_counts(&self) {
211+
self.current.xact_count.store(0, Ordering::Relaxed);
212+
self.current.xact_time.store(0, Ordering::Relaxed);
213+
self.current.query_count.store(0, Ordering::Relaxed);
214+
self.current.query_time.store(0, Ordering::Relaxed);
215+
self.current.bytes_received.store(0, Ordering::Relaxed);
216+
self.current.bytes_sent.store(0, Ordering::Relaxed);
217+
self.current.wait_time.store(0, Ordering::Relaxed);
218+
self.current.errors.store(0, Ordering::Relaxed);
129219
}
130220

131221
pub fn populate_row(&self, row: &mut Vec<String>) {
132222
for (_key, value) in self.clone() {
133223
row.push(value.to_string());
134224
}
135225
}
136-
137-
fn fields_iterators(
138-
&self,
139-
) -> (
140-
Vec<Arc<AtomicU64>>,
141-
Vec<Arc<AtomicU64>>,
142-
Vec<Arc<AtomicU64>>,
143-
) {
144-
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
145-
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
146-
let mut old_totals: Vec<Arc<AtomicU64>> = Vec::new();
147-
148-
totals.push(self.total_xact_count.clone());
149-
old_totals.push(self.old_total_xact_count.clone());
150-
averages.push(self.avg_xact_count.clone());
151-
totals.push(self.total_query_count.clone());
152-
old_totals.push(self.old_total_query_count.clone());
153-
averages.push(self.avg_query_count.clone());
154-
totals.push(self.total_received.clone());
155-
old_totals.push(self.old_total_received.clone());
156-
averages.push(self.avg_recv.clone());
157-
totals.push(self.total_sent.clone());
158-
old_totals.push(self.old_total_sent.clone());
159-
averages.push(self.avg_sent.clone());
160-
totals.push(self.total_xact_time.clone());
161-
old_totals.push(self.old_total_xact_time.clone());
162-
averages.push(self.avg_xact_time.clone());
163-
totals.push(self.total_query_time.clone());
164-
old_totals.push(self.old_total_query_time.clone());
165-
averages.push(self.avg_query_time.clone());
166-
totals.push(self.total_wait_time.clone());
167-
old_totals.push(self.old_total_wait_time.clone());
168-
averages.push(self.avg_wait_time.clone());
169-
totals.push(self.total_errors.clone());
170-
old_totals.push(self.old_total_errors.clone());
171-
averages.push(self.avg_errors.clone());
172-
173-
(totals, averages, old_totals)
174-
}
175226
}

src/stats/server.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,9 @@ impl ServerStats {
177177
}
178178

179179
pub fn checkout_time(&self, microseconds: u64, application_name: String) {
180-
// Update server stats and address aggergation stats
180+
// Update server stats and address aggregation stats
181181
self.set_application(application_name);
182-
self.address
183-
.stats
184-
.total_wait_time
185-
.fetch_add(microseconds, Ordering::Relaxed);
182+
self.address.stats.wait_time_add(microseconds);
186183
self.pool_stats
187184
.maxwait
188185
.fetch_max(microseconds, Ordering::Relaxed);
@@ -191,13 +188,8 @@ impl ServerStats {
191188
/// Report a query executed by a client against a server
192189
pub fn query(&self, milliseconds: u64, application_name: &str) {
193190
self.set_application(application_name.to_string());
194-
let address_stats = self.address_stats();
195-
address_stats
196-
.total_query_count
197-
.fetch_add(1, Ordering::Relaxed);
198-
address_stats
199-
.total_query_time
200-
.fetch_add(milliseconds, Ordering::Relaxed);
191+
self.address.stats.query_count_add();
192+
self.address.stats.query_time_add(milliseconds);
201193
}
202194

203195
/// Report a transaction executed by a client a server
@@ -208,29 +200,20 @@ impl ServerStats {
208200
self.set_application(application_name.to_string());
209201

210202
self.transaction_count.fetch_add(1, Ordering::Relaxed);
211-
self.address
212-
.stats
213-
.total_xact_count
214-
.fetch_add(1, Ordering::Relaxed);
203+
self.address.stats.xact_count_add();
215204
}
216205

217206
/// Report data sent to a server
218207
pub fn data_sent(&self, amount_bytes: usize) {
219208
self.bytes_sent
220209
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
221-
self.address
222-
.stats
223-
.total_sent
224-
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
210+
self.address.stats.bytes_sent_add(amount_bytes as u64);
225211
}
226212

227213
/// Report data received from a server
228214
pub fn data_received(&self, amount_bytes: usize) {
229215
self.bytes_received
230216
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
231-
self.address
232-
.stats
233-
.total_received
234-
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
217+
self.address.stats.bytes_received_add(amount_bytes as u64);
235218
}
236219
}

tests/ruby/admin_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
results = admin_conn.async_exec("SHOW STATS")[0]
2828
admin_conn.close
2929
expect(results["total_query_time"].to_i).to be_within(200).of(750)
30-
expect(results["avg_query_time"].to_i).to be_within(20).of(50)
30+
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
3131

3232
expect(results["total_wait_time"].to_i).to_not eq(0)
3333
expect(results["avg_wait_time"].to_i).to_not eq(0)

0 commit comments

Comments
 (0)