@@ -211,20 +211,18 @@ class _MemoryFile extends _MemoryFileSystemEntity implements File {
211211/// Implementation of an [io.IOSink] that's backed by a [_FileNode] .
212212class _FileSink implements io.IOSink {
213213 final _FileNode _node;
214- final Completer <Null > _doneCompleter = new Completer <Null >();
214+ final Completer <Null > _completer = new Completer <Null >();
215+
216+ Completer <Null > _streamCompleter;
215217 Encoding _encoding;
216- bool _isClosed = false ;
217218
218219 _FileSink (this ._node, this ._encoding) {
219220 _checkNotNull (_encoding);
220221 }
221222
222- dynamic _checkNotNull (dynamic value) {
223- if (value == null ) {
224- throw new ArgumentError .notNull ();
225- }
226- return value;
227- }
223+ bool get isClosed => _completer.isCompleted;
224+
225+ bool get isStreaming => ! (_streamCompleter? .isCompleted ?? true );
228226
229227 @override
230228 Encoding get encoding => _encoding;
@@ -234,7 +232,8 @@ class _FileSink implements io.IOSink {
234232
235233 @override
236234 void add (List <int > data) {
237- if (! _isClosed) {
235+ _checkNotStreaming ();
236+ if (! isClosed) {
238237 _node.content.addAll (data);
239238 }
240239 }
@@ -255,7 +254,7 @@ class _FileSink implements io.IOSink {
255254 }
256255
257256 @override
258- void writeln ([Object obj = "" ]) {
257+ void writeln ([Object obj = '' ]) {
259258 write (obj);
260259 write ('\n ' );
261260 }
@@ -264,23 +263,57 @@ class _FileSink implements io.IOSink {
264263 void writeCharCode (int charCode) => write (new String .fromCharCode (charCode));
265264
266265 @override
267- void addError (error, [StackTrace stackTrace]) =>
268- throw new UnsupportedError ('addError' );
266+ void addError (error, [StackTrace stackTrace]) {
267+ _checkNotStreaming ();
268+ _completer.completeError (error, stackTrace);
269+ }
269270
270271 @override
271- Future addStream (Stream <List <int >> stream) =>
272- stream.forEach ((List <int > data) => add (data));
272+ Future addStream (Stream <List <int >> stream) {
273+ _checkNotStreaming ();
274+ _streamCompleter = new Completer <Null >();
275+ var finish = () {
276+ _streamCompleter.complete ();
277+ _streamCompleter = null ;
278+ };
279+ stream.listen (
280+ (List <int > data) => _node.content.addAll (data),
281+ cancelOnError: true ,
282+ onError: (error, StackTrace stackTrace) {
283+ _completer.completeError (error, stackTrace);
284+ finish ();
285+ },
286+ onDone: finish,
287+ );
288+ return _streamCompleter.future;
289+ }
273290
274291 @override
275- Future flush () => new Future .value ();
292+ Future flush () {
293+ _checkNotStreaming ();
294+ return new Future .value ();
295+ }
276296
277297 @override
278298 Future close () {
279- _isClosed = true ;
280- _doneCompleter .complete ();
281- return _doneCompleter .future;
299+ _checkNotStreaming () ;
300+ _completer .complete ();
301+ return _completer .future;
282302 }
283303
284304 @override
285- Future get done => _doneCompleter.future;
305+ Future get done => _completer.future;
306+
307+ dynamic _checkNotNull (dynamic value) {
308+ if (value == null ) {
309+ throw new ArgumentError .notNull ();
310+ }
311+ return value;
312+ }
313+
314+ void _checkNotStreaming () {
315+ if (isStreaming) {
316+ throw new StateError ('StreamSink is bound to a stream' );
317+ }
318+ }
286319}
0 commit comments