Skip to content

Commit 0e909a0

Browse files
[gui] read everything from grpc response streams
we have bidirectional streaming rpcs, but we usually send only one response back on that stream for most operations. therefore, the gui would only read data from the stream until it got the first reply. but this introduced errors in which the stream was disposed too early. now we read everything from the stream, so that issue shouldn't happen.
1 parent 4afcbbb commit 0e909a0

File tree

2 files changed

+26
-15
lines changed

2 files changed

+26
-15
lines changed

src/client/gui/lib/grpc_client.dart

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import 'dart:async';
12
import 'dart:io';
23

3-
import 'package:async/async.dart';
44
import 'package:fpdart/fpdart.dart';
55
import 'package:grpc/grpc.dart';
66
import 'package:protobuf/protobuf.dart' hide RpcClient;
@@ -100,7 +100,7 @@ class GrpcClient {
100100
.start(Stream.value(request))
101101
.doOnData(checkForUpdate)
102102
.doOnEach(logGrpc(request))
103-
.firstOrNull;
103+
.lastOrNull;
104104
}
105105

106106
Future<StopReply?> stop(Iterable<String> names) {
@@ -111,7 +111,7 @@ class GrpcClient {
111111
return _client
112112
.stop(Stream.value(request))
113113
.doOnEach(logGrpc(request))
114-
.firstOrNull;
114+
.lastOrNull;
115115
}
116116

117117
Future<SuspendReply?> suspend(Iterable<String> names) {
@@ -122,7 +122,7 @@ class GrpcClient {
122122
return _client
123123
.suspend(Stream.value(request))
124124
.doOnEach(logGrpc(request))
125-
.firstOrNull;
125+
.lastOrNull;
126126
}
127127

128128
Future<RestartReply?> restart(Iterable<String> names) {
@@ -134,7 +134,7 @@ class GrpcClient {
134134
.restart(Stream.value(request))
135135
.doOnData(checkForUpdate)
136136
.doOnEach(logGrpc(request))
137-
.firstOrNull;
137+
.lastOrNull;
138138
}
139139

140140
Future<DeleteReply?> delete(Iterable<String> names) {
@@ -147,7 +147,7 @@ class GrpcClient {
147147
return _client
148148
.delet(Stream.value(request))
149149
.doOnEach(logGrpc(request))
150-
.firstOrNull;
150+
.lastOrNull;
151151
}
152152

153153
Future<RecoverReply?> recover(Iterable<String> names) {
@@ -158,7 +158,7 @@ class GrpcClient {
158158
return _client
159159
.recover(Stream.value(request))
160160
.doOnEach(logGrpc(request))
161-
.firstOrNull;
161+
.lastOrNull;
162162
}
163163

164164
Future<DeleteReply?> purge(Iterable<String> names) {
@@ -172,7 +172,7 @@ class GrpcClient {
172172
return _client
173173
.delet(Stream.value(request))
174174
.doOnEach(logGrpc(request))
175-
.firstOrNull;
175+
.lastOrNull;
176176
}
177177

178178
Future<List<VmInfo>> info([Iterable<String> names = const []]) {
@@ -193,7 +193,7 @@ class GrpcClient {
193193
return _client
194194
.mount(Stream.value(request))
195195
.doOnEach(logGrpc(request))
196-
.firstOrNull;
196+
.lastOrNull;
197197
}
198198

199199
Future<void> umount(String name, [String? path]) {
@@ -204,7 +204,7 @@ class GrpcClient {
204204
return _client
205205
.umount(Stream.value(request))
206206
.doOnEach(logGrpc(request))
207-
.firstOrNull;
207+
.lastOrNull;
208208
}
209209

210210
Future<FindReply> find({bool images = true, bool blueprints = true}) {
@@ -254,7 +254,7 @@ class GrpcClient {
254254
return _client
255255
.set(Stream.value(request))
256256
.doOnEach(logGrpc(request))
257-
.firstOrNull;
257+
.lastOrNull;
258258
}
259259

260260
Future<SSHInfo?> sshInfo(String name) {
@@ -263,7 +263,7 @@ class GrpcClient {
263263
return _client
264264
.ssh_info(Stream.value(request))
265265
.doOnEach(logGrpc(request))
266-
.first
266+
.last
267267
.then((reply) => reply.sshInfo[name]);
268268
}
269269

@@ -299,3 +299,17 @@ class CustomChannelCredentials extends ChannelCredentials {
299299
return ctx;
300300
}
301301
}
302+
303+
extension<T> on Stream<T> {
304+
Future<T?> get lastOrNull {
305+
final completer = Completer<T?>.sync();
306+
T? result;
307+
listen(
308+
(event) => result = event,
309+
onError: completer.completeError,
310+
onDone: () => completer.complete(result),
311+
cancelOnError: true,
312+
);
313+
return completer.future;
314+
}
315+
}

src/client/gui/lib/providers.dart

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ final daemonAvailableProvider = Provider((ref) {
6868
if (message.contains('failed to obtain exit status for remote process')) {
6969
return true;
7070
}
71-
if (message.contains('Connection is being forcefully terminated')) {
72-
return true;
73-
}
7471
}
7572
return false;
7673
});

0 commit comments

Comments
 (0)