Skip to content

Commit edb6d24

Browse files
authored
Return a future from Disconnector.disconnect(). (flutter#4)
1 parent ae46607 commit edb6d24

File tree

4 files changed

+76
-19
lines changed

4 files changed

+76
-19
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 1.6.0
2+
3+
* `Disconnector.disconnect()` now returns a future that completes when all the
4+
inner `StreamSink.close()` futures have completed.
5+
16
## 1.5.0
27

38
* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee

lib/src/disconnector.dart

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import 'dart:async';
66

7+
import 'package:async/async.dart';
8+
79
import '../stream_channel.dart';
810

911
/// Allows the caller to force a channel to disconnect.
@@ -17,8 +19,7 @@ import '../stream_channel.dart';
1719
/// be disconnected immediately.
1820
class Disconnector<T> implements StreamChannelTransformer<T, T> {
1921
/// Whether [disconnect] has been called.
20-
bool get isDisconnected => _isDisconnected;
21-
var _isDisconnected = false;
22+
bool get isDisconnected => _disconnectMemo.hasRun;
2223

2324
/// The sinks for transformed channels.
2425
///
@@ -28,20 +29,25 @@ class Disconnector<T> implements StreamChannelTransformer<T, T> {
2829
final _sinks = <_DisconnectorSink<T>>[];
2930

3031
/// Disconnects all channels that have been transformed.
31-
void disconnect() {
32-
_isDisconnected = true;
33-
for (var sink in _sinks) {
34-
sink._disconnect();
35-
}
32+
///
33+
/// Returns a future that completes when all inner sinks' [StreamSink.close]
34+
/// futures have completed. Note that a [StreamController]'s sink won't close
35+
/// until the corresponding stream has a listener.
36+
Future disconnect() => _disconnectMemo.runOnce(() {
37+
var futures = _sinks.map((sink) => sink._disconnect()).toList();
3638
_sinks.clear();
37-
}
39+
return Future.wait(futures, eagerError: true);
40+
});
41+
final _disconnectMemo = new AsyncMemoizer();
3842

3943
StreamChannel<T> bind(StreamChannel<T> channel) {
4044
return channel.changeSink((innerSink) {
4145
var sink = new _DisconnectorSink<T>(innerSink);
4246

43-
if (_isDisconnected) {
44-
sink._disconnect();
47+
if (isDisconnected) {
48+
// Ignore errors here, because otherwise there would be no way for the
49+
// user to handle them gracefully.
50+
sink._disconnect().catchError((_) {});
4551
} else {
4652
_sinks.add(sink);
4753
}
@@ -126,14 +132,18 @@ class _DisconnectorSink<T> implements StreamSink<T> {
126132

127133
/// Disconnects this sink.
128134
///
129-
/// This closes the underlying sink and stops forwarding events.
130-
void _disconnect() {
135+
/// This closes the underlying sink and stops forwarding events. It returns
136+
/// the [StreamSink.close] future for the underlying sink.
137+
Future _disconnect() {
131138
_isDisconnected = true;
132-
_inner.close();
139+
var future = _inner.close();
140+
141+
if (_inAddStream) {
142+
_addStreamCompleter.complete(_addStreamSubscription.cancel());
143+
_addStreamCompleter = null;
144+
_addStreamSubscription = null;
145+
}
133146

134-
if (!_inAddStream) return;
135-
_addStreamCompleter.complete(_addStreamSubscription.cancel());
136-
_addStreamCompleter = null;
137-
_addStreamSubscription = null;
147+
return future;
138148
}
139149
}

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: stream_channel
2-
version: 1.5.1-dev
2+
version: 1.6.0
33
description: An abstraction for two-way communication channels.
44
author: Dart Team <[email protected]>
55
homepage: https://github.com/dart-lang/stream_channel

test/disconnector_test.dart

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import 'dart:async';
66

7+
import 'package:async/async.dart';
78
import 'package:stream_channel/stream_channel.dart';
89
import 'package:test/test.dart';
910

@@ -78,8 +79,35 @@ void main() {
7879
expect(canceled, isTrue);
7980
});
8081

82+
test("disconnect() returns the close future from the inner sink", () async {
83+
var streamController = new StreamController();
84+
var sinkController = new StreamController();
85+
var disconnector = new Disconnector();
86+
var sink = new _CloseCompleterSink(sinkController.sink);
87+
var channel = new StreamChannel.withGuarantees(
88+
streamController.stream, sink)
89+
.transform(disconnector);
90+
91+
var disconnectFutureFired = false;
92+
expect(disconnector.disconnect().then((_) {
93+
disconnectFutureFired = true;
94+
}), completes);
95+
96+
// Give the future time to fire early if it's going to.
97+
await pumpEventQueue();
98+
expect(disconnectFutureFired, isFalse);
99+
100+
// When the inner sink's close future completes, so should the
101+
// disconnector's.
102+
sink.completer.complete();
103+
await pumpEventQueue();
104+
expect(disconnectFutureFired, isTrue);
105+
});
106+
81107
group("after disconnection", () {
82-
setUp(() => disconnector.disconnect());
108+
setUp(() {
109+
disconnector.disconnect();
110+
});
83111

84112
test("closes the inner sink and ignores events to the outer sink", () {
85113
channel.sink.add(1);
@@ -108,3 +136,17 @@ void main() {
108136
});
109137
});
110138
}
139+
140+
/// A [StreamSink] wrapper that adds the ability to manually complete the Future
141+
/// returned by [close] using [completer].
142+
class _CloseCompleterSink extends DelegatingStreamSink {
143+
/// The completer for the future returned by [close].
144+
final completer = new Completer();
145+
146+
_CloseCompleterSink(StreamSink inner) : super(inner);
147+
148+
Future close() {
149+
super.close();
150+
return completer.future;
151+
}
152+
}

0 commit comments

Comments
 (0)