diff --git a/Sources/Operators/CollectUntilTrigger.swift b/Sources/Operators/CollectUntilTrigger.swift new file mode 100644 index 0000000..d359349 --- /dev/null +++ b/Sources/Operators/CollectUntilTrigger.swift @@ -0,0 +1,138 @@ +// +// CollectUntilTrigger.swift +// CombineExt +// +// Created by ferologics on 09/06/2021. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if canImport(Combine) +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publisher { + func collect( + until trigger: Trigger + ) -> Publishers.CollectUntilTrigger where + Trigger.Output == Void, + Trigger.Failure == Never + { + Publishers.CollectUntilTrigger( + upstream: self, + trigger: trigger + ) + } +} + +// MARK: - Publisher + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publishers { + struct CollectUntilTrigger< + Upstream: Publisher, + Trigger: Publisher + >: Publisher where + Trigger.Output == Void, + Trigger.Failure == Never + { + public typealias Output = [Upstream.Output] + public typealias Failure = Upstream.Failure + + private let upstream: Upstream + private let trigger: Trigger + + init(upstream: Upstream, trigger: Trigger) { + self.upstream = upstream + self.trigger = trigger + } + + public func receive(subscriber: S) + where Failure == S.Failure, Output == S.Input + { + subscriber.receive( + subscription: Subscription( + upstream: upstream, + downstream: subscriber, + trigger: trigger + ) + ) + } + } +} + +// MARK: - Subscription + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.CollectUntilTrigger { + final class Subscription< + Downstream: Subscriber + >: Combine.Subscription where + Downstream.Input == [Upstream.Output], + Downstream.Failure == Upstream.Failure + { + private var sink: Sink? + private var cancellable: Cancellable? + + init( + upstream: Upstream, + downstream: Downstream, + trigger: Trigger + ) { + self.sink = Sink( + upstream: upstream, + downstream: downstream + ) + + cancellable = trigger.sink { [self] in + _ = sink?.buffer.buffer(value: sink?.elements ?? []) + _ = sink?.buffer.demand(.max(1)) + sink?.flush() + } + } + + func request(_ demand: Subscribers.Demand) { + sink?.demand(demand) + } + + func cancel() { + sink = nil + cancellable?.cancel() + cancellable = nil + } + + var description: String { + return "CollectUntilTrigger.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>" + } + } +} + +// MARK: - Sink + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.CollectUntilTrigger { + final class Sink< + Downstream: Subscriber + >: CombineExt.Sink where + Downstream.Input == [Upstream.Output], + Downstream.Failure == Upstream.Failure + { + private let lock = NSRecursiveLock() + var elements: [Upstream.Output] = [] + + override func receive(_ input: Upstream.Output) -> Subscribers.Demand { + lock.lock() + defer { lock.unlock() } + elements.append(input) + return .none + } + + func flush() { + lock.lock() + defer { lock.unlock() } + elements = [] + } + } +} + +#endif + diff --git a/Sources/Operators/Dematerialize.swift b/Sources/Operators/Dematerialize.swift index 663f776..eabee4f 100644 --- a/Sources/Operators/Dematerialize.swift +++ b/Sources/Operators/Dematerialize.swift @@ -40,7 +40,7 @@ public extension Publishers { } } -// MARK: - Subscrription +// MARK: - Subscription @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) private extension Publishers.Dematerialize { class Subscription: Combine.Subscription diff --git a/Tests/CollectUntilTriggerTests.swift b/Tests/CollectUntilTriggerTests.swift new file mode 100644 index 0000000..fdcd921 --- /dev/null +++ b/Tests/CollectUntilTriggerTests.swift @@ -0,0 +1,42 @@ +// +// CollectUntilTriggerTests.swift +// CombineExtTests +// +// Created by ferologics on 09/06/2021. +// Copyright © 2020 Combine Community. All rights reserved. +// + +#if !os(watchOS) +import XCTest +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +class CollectUntilTriggerTests: XCTestCase { + var subscription: AnyCancellable! + + func test() { + // Given + let elements = [1,2,3,4,5] + var receivedElements = [Int]() + let elementsPublisher = PassthroughSubject() + let trigger = PassthroughSubject() + + // When + subscription = elementsPublisher + .collect(until: trigger) + .sink { receivedElements = $0 } + + for x in elements { + elementsPublisher.send(x) + } + + // Then + XCTAssertTrue(receivedElements.isEmpty) + trigger.send(()) + XCTAssertEqual(elements.count, receivedElements.count) + for (a, b) in zip(elements, receivedElements) { + XCTAssertEqual(a, b) + } + } +} +#endif