Skip to content

Commit 83d337b

Browse files
fabianfettbimawa
authored andcommitted
Release stream callback, once the stream has finished (grpc#1363)
1 parent dac3499 commit 83d337b

File tree

3 files changed

+106
-6
lines changed

3 files changed

+106
-6
lines changed

Sources/GRPC/ClientCalls/ResponseContainers.swift

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ internal class StreamingResponseParts<Response> {
9898
private let eventLoop: EventLoop
9999

100100
/// A callback for response messages.
101-
private let responseCallback: (Response) -> Void
101+
private var responseCallback: Optional<(Response) -> Void>
102102

103103
/// Lazy promises for the status, initial-, and trailing-metadata.
104104
private var initialMetadataPromise: LazyEventLoopPromise<HPACKHeaders>
@@ -139,16 +139,26 @@ internal class StreamingResponseParts<Response> {
139139
self.initialMetadataPromise.succeed(metadata)
140140

141141
case let .message(response):
142-
self.responseCallback(response)
142+
self.responseCallback?(response)
143143

144144
case let .end(status, trailers):
145+
// Once the stream has finished, we must release the callback, to make sure don't
146+
// break potential retain cycles (the callback may reference other object's that in
147+
// turn reference `StreamingResponseParts`).
148+
self.responseCallback = nil
145149
self.initialMetadataPromise.fail(status)
146150
self.trailingMetadataPromise.succeed(trailers)
147151
self.statusPromise.succeed(status)
148152
}
149153
}
150154

151155
internal func handleError(_ error: Error) {
156+
self.eventLoop.assertInEventLoop()
157+
158+
// Once the stream has finished, we must release the callback, to make sure don't
159+
// break potential retain cycles (the callback may reference other object's that in
160+
// turn reference `StreamingResponseParts`).
161+
self.responseCallback = nil
152162
let withoutContext = error.removingContext()
153163
let status = withoutContext.makeGRPCStatus()
154164
self.initialMetadataPromise.fail(withoutContext)

Tests/GRPCTests/FakeChannelTests.swift

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ class FakeChannelTests: GRPCTestCase {
8181
}
8282

8383
func testBidirectional() {
84+
final class ResponseCollector {
85+
private(set) var responses = [Response]()
86+
func collect(_ response: Response) { self.responses.append(response) }
87+
}
8488
var requests: [Request] = []
8589
let response = self.makeStreamingResponse { part in
8690
switch part {
@@ -91,10 +95,12 @@ class FakeChannelTests: GRPCTestCase {
9195
}
9296
}
9397

94-
var responses: [Response] = []
95-
let call = self.makeBidirectionalStreamingCall {
96-
responses.append($0)
98+
var collector = ResponseCollector()
99+
XCTAssertTrue(isKnownUniquelyReferenced(&collector))
100+
let call = self.makeBidirectionalStreamingCall { [collector] in
101+
collector.collect($0)
97102
}
103+
XCTAssertFalse(isKnownUniquelyReferenced(&collector))
98104

99105
XCTAssertNoThrow(try call.sendMessage(.with { $0.text = "1" }).wait())
100106
XCTAssertNoThrow(try call.sendMessage(.with { $0.text = "2" }).wait())
@@ -106,9 +112,12 @@ class FakeChannelTests: GRPCTestCase {
106112
XCTAssertNoThrow(try response.sendMessage(.with { $0.text = "4" }))
107113
XCTAssertNoThrow(try response.sendMessage(.with { $0.text = "5" }))
108114
XCTAssertNoThrow(try response.sendMessage(.with { $0.text = "6" }))
115+
XCTAssertEqual(collector.responses.count, 3)
116+
XCTAssertFalse(isKnownUniquelyReferenced(&collector))
109117
XCTAssertNoThrow(try response.sendEnd())
118+
XCTAssertTrue(isKnownUniquelyReferenced(&collector))
110119

111-
XCTAssertEqual(responses, (4 ... 6).map { number in .with { $0.text = "\(number)" } })
120+
XCTAssertEqual(collector.responses, (4 ... 6).map { number in .with { $0.text = "\(number)" } })
112121
XCTAssertTrue(try call.status.map { $0.isOk }.wait())
113122
}
114123

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2022, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import EchoImplementation
17+
import EchoModel
18+
import GRPC
19+
import NIOConcurrencyHelpers
20+
import NIOCore
21+
import NIOPosix
22+
import XCTest
23+
24+
final class StreamResponseHandlerRetainCycleTests: GRPCTestCase {
25+
var group: EventLoopGroup!
26+
var server: Server!
27+
var client: ClientConnection!
28+
29+
var echo: Echo_EchoClient!
30+
31+
override func setUp() {
32+
super.setUp()
33+
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
34+
35+
self.server = try! Server.insecure(group: self.group)
36+
.withServiceProviders([EchoProvider()])
37+
.withLogger(self.serverLogger)
38+
.bind(host: "localhost", port: 0)
39+
.wait()
40+
41+
self.client = ClientConnection.insecure(group: self.group)
42+
.withBackgroundActivityLogger(self.clientLogger)
43+
.connect(host: "localhost", port: self.server.channel.localAddress!.port!)
44+
45+
self.echo = Echo_EchoClient(
46+
channel: self.client,
47+
defaultCallOptions: CallOptions(logger: self.clientLogger)
48+
)
49+
}
50+
51+
override func tearDown() {
52+
XCTAssertNoThrow(try self.client.close().wait())
53+
XCTAssertNoThrow(try self.server.close().wait())
54+
XCTAssertNoThrow(try self.group.syncShutdownGracefully())
55+
super.tearDown()
56+
}
57+
58+
func testHandlerClosureIsReleasedOnceStreamEnds() {
59+
final class Counter {
60+
private let atomic = NIOAtomic.makeAtomic(value: 0)
61+
func increment() { self.atomic.add(1) }
62+
var value: Int {
63+
self.atomic.load()
64+
}
65+
}
66+
67+
var counter = Counter()
68+
XCTAssertTrue(isKnownUniquelyReferenced(&counter))
69+
let get = self.echo.update { [capturedCounter = counter] _ in
70+
capturedCounter.increment()
71+
}
72+
XCTAssertFalse(isKnownUniquelyReferenced(&counter))
73+
74+
get.sendMessage(.init(text: "hello world"), promise: nil)
75+
XCTAssertFalse(isKnownUniquelyReferenced(&counter))
76+
XCTAssertNoThrow(try get.sendEnd().wait())
77+
XCTAssertNoThrow(try get.status.wait())
78+
XCTAssertEqual(counter.value, 1)
79+
XCTAssertTrue(isKnownUniquelyReferenced(&counter))
80+
}
81+
}

0 commit comments

Comments
 (0)