Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions Sources/GraphQL/Subscription/EventStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,42 @@ public class ConcurrentEventStream<Element>: EventStream<Element> {
extension AsyncThrowingStream {
func mapStream<To>(_ closure: @escaping (Element) throws -> To) -> AsyncThrowingStream<To, Error> {
return AsyncThrowingStream<To, Error> { continuation in
Task {
for try await event in self {
let newEvent = try closure(event)
continuation.yield(newEvent)
let task = Task {
do {
for try await event in self {
let newEvent = try closure(event)
continuation.yield(newEvent)
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}

continuation.onTermination = { @Sendable reason in
task.cancel()
}
}
}

func filterStream(_ isIncluded: @escaping (Element) throws -> Bool) -> AsyncThrowingStream<Element, Error> {
return AsyncThrowingStream<Element, Error> { continuation in
Task {
for try await event in self {
if try isIncluded(event) {
continuation.yield(event)
let task = Task {
do {
for try await event in self {
if try isIncluded(event) {
continuation.yield(event)
}
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}

continuation.onTermination = { @Sendable _ in
task.cancel()
}
}
}
}
Expand Down
20 changes: 17 additions & 3 deletions Tests/GraphQLTests/SubscriptionTests/SubscriptionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,15 @@ class SubscriptionTests : XCTestCase {

var results = [GraphQLResult]()
var expectation = XCTestExpectation()
_ = stream.map { event in

// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
let keepForNow = stream.map { event in
event.map { result in
results.append(result)
expectation.fulfill()
}
}

var expected = [GraphQLResult]()

db.trigger(email: Email(
Expand Down Expand Up @@ -675,6 +678,9 @@ class SubscriptionTests : XCTestCase {
)
wait(for: [expectation], timeout: timeoutDuration)
XCTAssertEqual(results, expected)

// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
_ = keepForNow
}

/// 'should not trigger when subscription is already done'
Expand All @@ -701,7 +707,8 @@ class SubscriptionTests : XCTestCase {

var results = [GraphQLResult]()
var expectation = XCTestExpectation()
_ = stream.map { event in
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
let keepForNow = stream.map { event in
event.map { result in
results.append(result)
expectation.fulfill()
Expand Down Expand Up @@ -747,6 +754,9 @@ class SubscriptionTests : XCTestCase {
// Ensure that the current result was the one before the db was stopped
wait(for: [expectation], timeout: timeoutDuration)
XCTAssertEqual(results, expected)

// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
_ = keepForNow
}

/// 'should not trigger when subscription is thrown'
Expand Down Expand Up @@ -861,7 +871,8 @@ class SubscriptionTests : XCTestCase {

var results = [GraphQLResult]()
var expectation = XCTestExpectation()
_ = stream.map { event in
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
let keepForNow = stream.map { event in
event.map { result in
results.append(result)
expectation.fulfill()
Expand Down Expand Up @@ -925,6 +936,9 @@ class SubscriptionTests : XCTestCase {
)
wait(for: [expectation], timeout: timeoutDuration)
XCTAssertEqual(results, expected)

// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
_ = keepForNow
}

/// 'should pass through error thrown in source event stream'
Expand Down