Skip to content

Commit b240d73

Browse files
Merge pull request #3829 from canonical/fix-http2-connection-disconnected
Fix `Connection is being forcefully terminated` error in GUI
2 parents 1c0f995 + 7f7d7fd commit b240d73

File tree

2 files changed

+97
-127
lines changed

2 files changed

+97
-127
lines changed
Lines changed: 97 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import 'dart:async';
12
import 'dart:io';
23

3-
import 'package:async/async.dart';
44
import 'package:fpdart/fpdart.dart';
55
import 'package:grpc/grpc.dart';
66
import 'package:protobuf/protobuf.dart' hide RpcClient;
@@ -91,189 +91,148 @@ class GrpcClient {
9191
}
9292
}
9393

94+
Future<Rep?> doRpc<Req extends RpcMessage, Rep extends RpcMessage>(
95+
ResponseStream<Rep> Function(Stream<Req> request) action,
96+
Req request, {
97+
bool checkUpdates = false,
98+
bool log = true,
99+
}) {
100+
if (log) logger.i('Sent ${request.repr}');
101+
Stream<Rep> replyStream = action(Stream.value(request));
102+
if (checkUpdates) replyStream = replyStream.doOnData(checkForUpdate);
103+
if (log) replyStream = replyStream.doOnEach(logGrpc(request));
104+
return replyStream.lastOrNull;
105+
}
106+
94107
Future<StartReply?> start(Iterable<String> names) {
95-
final request = StartRequest(
96-
instanceNames: InstanceNames(instanceName: names),
108+
return doRpc(
109+
_client.start,
110+
StartRequest(instanceNames: InstanceNames(instanceName: names)),
111+
checkUpdates: true,
97112
);
98-
logger.i('Sent ${request.repr}');
99-
return _client
100-
.start(Stream.value(request))
101-
.doOnData(checkForUpdate)
102-
.doOnEach(logGrpc(request))
103-
.firstOrNull;
104113
}
105114

106115
Future<StopReply?> stop(Iterable<String> names) {
107-
final request = StopRequest(
108-
instanceNames: InstanceNames(instanceName: names),
116+
return doRpc(
117+
_client.stop,
118+
StopRequest(instanceNames: InstanceNames(instanceName: names)),
109119
);
110-
logger.i('Sent ${request.repr}');
111-
return _client
112-
.stop(Stream.value(request))
113-
.doOnEach(logGrpc(request))
114-
.firstOrNull;
115120
}
116121

117122
Future<SuspendReply?> suspend(Iterable<String> names) {
118-
final request = SuspendRequest(
119-
instanceNames: InstanceNames(instanceName: names),
123+
return doRpc(
124+
_client.suspend,
125+
SuspendRequest(instanceNames: InstanceNames(instanceName: names)),
120126
);
121-
logger.i('Sent ${request.repr}');
122-
return _client
123-
.suspend(Stream.value(request))
124-
.doOnEach(logGrpc(request))
125-
.firstOrNull;
126127
}
127128

128129
Future<RestartReply?> restart(Iterable<String> names) {
129-
final request = RestartRequest(
130-
instanceNames: InstanceNames(instanceName: names),
130+
return doRpc(
131+
_client.restart,
132+
RestartRequest(instanceNames: InstanceNames(instanceName: names)),
133+
checkUpdates: true,
131134
);
132-
logger.i('Sent ${request.repr}');
133-
return _client
134-
.restart(Stream.value(request))
135-
.doOnData(checkForUpdate)
136-
.doOnEach(logGrpc(request))
137-
.firstOrNull;
138135
}
139136

140137
Future<DeleteReply?> delete(Iterable<String> names) {
141-
final request = DeleteRequest(
142-
instanceSnapshotPairs: names.map(
143-
(name) => InstanceSnapshotPair(instanceName: name),
138+
return doRpc(
139+
_client.delet,
140+
DeleteRequest(
141+
instanceSnapshotPairs: names.map(
142+
(name) => InstanceSnapshotPair(instanceName: name),
143+
),
144144
),
145145
);
146-
logger.i('Sent ${request.repr}');
147-
return _client
148-
.delet(Stream.value(request))
149-
.doOnEach(logGrpc(request))
150-
.firstOrNull;
151146
}
152147

153148
Future<RecoverReply?> recover(Iterable<String> names) {
154-
final request = RecoverRequest(
155-
instanceNames: InstanceNames(instanceName: names),
149+
return doRpc(
150+
_client.recover,
151+
RecoverRequest(instanceNames: InstanceNames(instanceName: names)),
156152
);
157-
logger.i('Sent ${request.repr}');
158-
return _client
159-
.recover(Stream.value(request))
160-
.doOnEach(logGrpc(request))
161-
.firstOrNull;
162153
}
163154

164155
Future<DeleteReply?> purge(Iterable<String> names) {
165-
final request = DeleteRequest(
166-
instanceSnapshotPairs: names.map(
167-
(name) => InstanceSnapshotPair(instanceName: name),
156+
return doRpc(
157+
_client.delet,
158+
DeleteRequest(
159+
purge: true,
160+
instanceSnapshotPairs: names.map(
161+
(name) => InstanceSnapshotPair(instanceName: name),
162+
),
168163
),
169-
purge: true,
170164
);
171-
logger.i('Sent ${request.repr}');
172-
return _client
173-
.delet(Stream.value(request))
174-
.doOnEach(logGrpc(request))
175-
.firstOrNull;
176165
}
177166

178167
Future<List<VmInfo>> info([Iterable<String> names = const []]) {
179-
final request = InfoRequest(
180-
instanceSnapshotPairs: names.map(
181-
(name) => InstanceSnapshotPair(instanceName: name),
168+
return doRpc(
169+
_client.info,
170+
checkUpdates: true,
171+
log: false,
172+
InfoRequest(
173+
instanceSnapshotPairs: names.map(
174+
(name) => InstanceSnapshotPair(instanceName: name),
175+
),
182176
),
183-
);
184-
return _client
185-
.info(Stream.value(request))
186-
.doOnData(checkForUpdate)
187-
.last
188-
.then((r) => r.details.toList());
177+
).then((r) => r!.details.toList());
189178
}
190179

191180
Future<MountReply?> mount(MountRequest request) {
192-
logger.i('Sent ${request.repr}');
193-
return _client
194-
.mount(Stream.value(request))
195-
.doOnEach(logGrpc(request))
196-
.firstOrNull;
181+
return doRpc(_client.mount, request);
197182
}
198183

199184
Future<void> umount(String name, [String? path]) {
200-
final request = UmountRequest(
201-
targetPaths: [TargetPathInfo(instanceName: name, targetPath: path)],
185+
return doRpc(
186+
_client.umount,
187+
UmountRequest(
188+
targetPaths: [TargetPathInfo(instanceName: name, targetPath: path)],
189+
),
202190
);
203-
logger.i('Sent ${request.repr}');
204-
return _client
205-
.umount(Stream.value(request))
206-
.doOnEach(logGrpc(request))
207-
.firstOrNull;
208191
}
209192

210193
Future<FindReply> find({bool images = true, bool blueprints = true}) {
211-
final request = FindRequest(
212-
showImages: images,
213-
showBlueprints: blueprints,
214-
);
215-
logger.i('Sent ${request.repr}');
216-
return _client.find(Stream.value(request)).doOnEach(logGrpc(request)).last;
194+
return doRpc(
195+
_client.find,
196+
FindRequest(
197+
showImages: images,
198+
showBlueprints: blueprints,
199+
),
200+
).then((r) => r!);
217201
}
218202

219203
Future<List<NetInterface>> networks() {
220-
final request = NetworksRequest();
221-
logger.i('Sent ${request.repr}');
222-
return _client
223-
.networks(Stream.value(request))
224-
.doOnData(checkForUpdate)
225-
.doOnEach(logGrpc(request))
226-
.last
227-
.then((r) => r.interfaces);
204+
return doRpc(
205+
_client.networks,
206+
NetworksRequest(),
207+
checkUpdates: true,
208+
).then((r) => r!.interfaces);
228209
}
229210

230211
Future<String> version() {
231-
final request = VersionRequest();
232-
logger.i('Sent ${request.repr}');
233-
return _client
234-
.version(Stream.value(request))
235-
.doOnData(checkForUpdate)
236-
.doOnEach(logGrpc(request))
237-
.last
238-
.then((reply) => reply.version);
212+
return doRpc(
213+
_client.version,
214+
VersionRequest(),
215+
checkUpdates: true,
216+
).then((r) => r!.version);
239217
}
240218

241219
Future<String> get(String key) {
242-
final request = GetRequest(key: key);
243-
logger.i('Sent ${request.repr}');
244-
return _client
245-
.get(Stream.value(request))
246-
.doOnEach(logGrpc(request))
247-
.last
248-
.then((reply) => reply.value);
220+
return doRpc(_client.get, GetRequest(key: key)).then((r) => r!.value);
249221
}
250222

251223
Future<void> set(String key, String value) {
252-
final request = SetRequest(key: key, val: value);
253-
logger.i('Sent ${request.repr}');
254-
return _client
255-
.set(Stream.value(request))
256-
.doOnEach(logGrpc(request))
257-
.firstOrNull;
224+
return doRpc(_client.set, SetRequest(key: key, val: value));
258225
}
259226

260227
Future<SSHInfo?> sshInfo(String name) {
261-
final request = SSHInfoRequest(instanceName: [name]);
262-
logger.i('Sent ${request.repr}');
263-
return _client
264-
.ssh_info(Stream.value(request))
265-
.doOnEach(logGrpc(request))
266-
.first
267-
.then((reply) => reply.sshInfo[name]);
228+
return doRpc(
229+
_client.ssh_info,
230+
SSHInfoRequest(instanceName: [name]),
231+
).then((r) => r!.sshInfo[name]);
268232
}
269233

270234
Future<DaemonInfoReply> daemonInfo() {
271-
final request = DaemonInfoRequest();
272-
logger.i('Sent ${request.repr}');
273-
return _client
274-
.daemon_info(Stream.value(request))
275-
.doOnEach(logGrpc(request))
276-
.last;
235+
return doRpc(_client.daemon_info, DaemonInfoRequest()).then((r) => r!);
277236
}
278237
}
279238

@@ -299,3 +258,17 @@ class CustomChannelCredentials extends ChannelCredentials {
299258
return ctx;
300259
}
301260
}
261+
262+
extension<T> on Stream<T> {
263+
Future<T?> get lastOrNull {
264+
final completer = Completer<T?>.sync();
265+
T? result;
266+
listen(
267+
(event) => result = event,
268+
onError: completer.completeError,
269+
onDone: () => completer.complete(result),
270+
cancelOnError: true,
271+
);
272+
return completer.future;
273+
}
274+
}

src/client/gui/lib/providers.dart

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ final daemonAvailableProvider = Provider((ref) {
6868
if (message.contains('failed to obtain exit status for remote process')) {
6969
return true;
7070
}
71-
if (message.contains('Connection is being forcefully terminated')) {
72-
return true;
73-
}
7471
}
7572
return false;
7673
});

0 commit comments

Comments
 (0)