Skip to content

Commit d180a31

Browse files
Merge pull request #703 from frankmcsherry/batch_propagation
Remove some quadratic behavior from `propagate_all`
2 parents c06a057 + 908d782 commit d180a31

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

timely/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ columnation = "0.1"
2727
getopts = { version = "0.2.21", optional = true }
2828
bincode = { version = "1.0" }
2929
byteorder = "1.5"
30+
itertools = "0.14.0"
3031
serde = { version = "1.0", features = ["derive"] }
3132
timely_bytes = { path = "../bytes", version = "0.22" }
3233
timely_logging = { path = "../logging", version = "0.22" }

timely/src/progress/reachability.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -591,10 +591,14 @@ impl<T:Timestamp> Tracker<T> {
591591
// By filtering the changes through `self.pointstamps` we react only to discrete
592592
// changes in the frontier, rather than changes in the pointstamp counts that
593593
// witness that frontier.
594-
for ((target, time), diff) in self.target_changes.drain() {
594+
use itertools::Itertools;
595+
let mut target_changes = self.target_changes.drain().peekable();
596+
while let Some(((target, _), _)) = target_changes.peek() {
595597

598+
let target = *target;
596599
let operator = &mut self.per_operator[target.node].targets[target.port];
597-
let changes = operator.pointstamps.update_iter(Some((time, diff)));
600+
let target_updates = target_changes.peeking_take_while(|((t, _),_)| t == &target).map(|((_,time),diff)| (time,diff));
601+
let changes = operator.pointstamps.update_iter(target_updates);
598602

599603
for (time, diff) in changes {
600604
self.total_counts += diff;
@@ -610,10 +614,13 @@ impl<T:Timestamp> Tracker<T> {
610614
}
611615
}
612616

613-
for ((source, time), diff) in self.source_changes.drain() {
617+
let mut source_changes = self.source_changes.drain().peekable();
618+
while let Some(((source, _), _)) = source_changes.peek() {
614619

620+
let source = *source;
615621
let operator = &mut self.per_operator[source.node].sources[source.port];
616-
let changes = operator.pointstamps.update_iter(Some((time, diff)));
622+
let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff));
623+
let changes = operator.pointstamps.update_iter(source_updates);
617624

618625
for (time, diff) in changes {
619626
self.total_counts += diff;
@@ -761,7 +768,7 @@ fn summarize_outputs<T: Timestamp>(
761768
}
762769
}
763770
}
764-
771+
765772
let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
766773
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
767774

0 commit comments

Comments
 (0)