From 934826b1abc98b970676725f587a89367947b280 Mon Sep 17 00:00:00 2001 From: Stephen Edwards Date: Wed, 16 Apr 2025 15:03:44 -0400 Subject: [PATCH] Update CSR to yield() and update tests w/ multiple dispatchers --- .../com/squareup/workflow1/RenderWorkflow.kt | 18 +- .../workflow1/RenderWorkflowInTest.kt | 343 +++++++++++------- .../workflow1/WorkflowsLifecycleTests.kt | 68 ++-- 3 files changed, 261 insertions(+), 168 deletions(-) diff --git a/workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt b/workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt index 898875591..5a53eae15 100644 --- a/workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt +++ b/workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.yield /** * Launches the [workflow] in a new coroutine in [scope] and returns a [StateFlow] of its @@ -170,7 +171,7 @@ public fun renderWorkflowIn( } scope.launch { - while (isActive) { + outer@ while (isActive) { // It might look weird to start by processing an action before getting the rendering below, // but remember the first render pass already occurred above, before this coroutine was even // launched. @@ -178,7 +179,7 @@ public fun renderWorkflowIn( if (shouldShortCircuitForUnchangedState(actionResult)) { sendOutput(actionResult, onOutput) - continue + continue@outer } // After resuming from runner.processAction() our coroutine could now be cancelled, check so @@ -189,15 +190,22 @@ public fun renderWorkflowIn( var nextRenderAndSnapshot: RenderingAndSnapshot = runner.nextRendering() if (runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) { - while (isActive && actionResult is ActionApplied<*> && actionResult.output == null) { + conflate@ while (isActive && actionResult is ActionApplied<*> && actionResult.output == null) { + // We start by yielding, because if we are on an Unconfined dispatcher, we want to give + // other signals (like Workers listening to the same result) a chance to get dispatched + // and queue their actions. + yield() // We may have more actions we can process, this rendering could be stale. actionResult = runner.processAction(waitForAnAction = false) // If no actions processed, then no new rendering needed. Pass on to UI. - if (actionResult == ActionsExhausted) break + if (actionResult == ActionsExhausted) break@conflate // Skip rendering if we had unchanged state, keep draining actions. - if (shouldShortCircuitForUnchangedState(actionResult)) continue + if (shouldShortCircuitForUnchangedState(actionResult)) { + sendOutput(actionResult, onOutput) + continue@outer + } // Make sure the runtime has not been cancelled from runner.processAction() if (!isActive) return@launch diff --git a/workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt b/workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt index 4a1df64f6..363f9487b 100644 --- a/workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt +++ b/workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt @@ -20,11 +20,12 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.TestDispatcher import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.advanceUntilIdle -import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.yield import okio.ByteString import kotlin.test.Test import kotlin.test.assertEquals @@ -61,12 +62,21 @@ class RenderWorkflowInTest { testTracer ) - private val runtimeOptions: Sequence> = cartesianProduct( - runtimes.asSequence(), - tracerOptions.asSequence() + private val myStandardTestDispatcher = StandardTestDispatcher() + private val dispatcherOptions = setOf( + UnconfinedTestDispatcher(), + myStandardTestDispatcher ) - private val runtimeTestRunner = ParameterizedTestRunner>() + private val runtimeOptions: Sequence> = + cartesianProduct( + runtimes.asSequence(), + tracerOptions.asSequence(), + dispatcherOptions.asSequence() + ) + + private val runtimeTestRunner = + ParameterizedTestRunner>() private fun setup() { traces.clear() @@ -76,8 +86,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val props = MutableStateFlow("foo") val workflow = Workflow.stateless { "props: $it" } // Don't allow the workflow runtime to actually start. @@ -98,12 +108,12 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val props = MutableStateFlow("foo") val workflow = Workflow.stateless { "props: $it" } - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) testScope.cancel() val renderings = renderWorkflowIn( workflow = workflow, @@ -122,8 +132,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { var sideEffectWasRan = false val workflow = Workflow.stateless { runningSideEffect("test") { @@ -131,7 +141,7 @@ class RenderWorkflowInTest { } } - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) testScope.cancel() renderWorkflowIn( workflow, @@ -140,7 +150,7 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) {} - advanceUntilIdle() + advanceIfStandard(dispatcher) assertFalse(sideEffectWasRan) } @@ -152,8 +162,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { var sideEffectWasRan = false val childWorkflow = Workflow.stateless { runningSideEffect("test") { @@ -164,7 +174,7 @@ class RenderWorkflowInTest { renderChild(childWorkflow) } - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) testScope.cancel() renderWorkflowIn( workflow = workflow, @@ -173,7 +183,7 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) {} - advanceUntilIdle() + advanceIfStandard(dispatcher) assertFalse(sideEffectWasRan) } @@ -184,8 +194,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val props = MutableStateFlow("foo") val workflow = Workflow.stateless { "props: $it" } val renderings = renderWorkflowIn( @@ -195,30 +205,34 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) {} + advanceIfStandard(dispatcher) assertEquals("props: foo", renderings.value.rendering) props.value = "bar" + advanceIfStandard(dispatcher) assertEquals("props: bar", renderings.value.rendering) } } } - private val runtimeMatrix: Sequence> = cartesianProduct( - runtimes.asSequence(), - runtimes.asSequence() - ) + private val runtimeMatrix: Sequence> = + cartesianProduct( + runtimes.asSequence(), + runtimes.asSequence(), + dispatcherOptions.asSequence(), + ) private val runtimeMatrixTestRunner = - ParameterizedTestRunner>() + ParameterizedTestRunner>() @Test fun saves_to_and_restores_from_snapshot() { runtimeMatrixTestRunner.runParametrizedTest( paramSource = runtimeMatrix, before = ::setup, - ) { (runtimeConfig1, runtimeConfig2) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig1, runtimeConfig2, dispatcher) -> + runTest(dispatcher) { val workflow = Workflow.stateful Unit>>( initialState = { _, snapshot -> snapshot?.bytes?.parse { it.readUtf8WithLength() } ?: "initial state" @@ -241,12 +255,14 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig1, workflowTracer = null, ) {} + advanceIfStandard(dispatcher) // Interact with the workflow to change the state. renderings.value.rendering.let { (state, updateState) -> runtimeMatrixTestRunner.assertEquals("initial state", state) updateState("updated state") } + advanceIfStandard(dispatcher) val snapshot = renderings.value.let { (rendering, snapshot) -> val (state, updateState) = rendering @@ -254,9 +270,10 @@ class RenderWorkflowInTest { updateState("ignored rendering") return@let snapshot } + advanceIfStandard(dispatcher) // Create a new scope to launch a second runtime to restore. - val restoreScope = TestScope(testScheduler) + val restoreScope = TestScope(dispatcher) val restoredRenderings = renderWorkflowIn( workflow = workflow, @@ -266,6 +283,7 @@ class RenderWorkflowInTest { workflowTracer = null, runtimeConfig = runtimeConfig2 ) {} + advanceIfStandard(dispatcher) runtimeMatrixTestRunner.assertEquals( "updated state", restoredRenderings.value.rendering.first @@ -279,8 +297,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { lateinit var sink: Sink var snapped = false @@ -305,11 +323,13 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) {} + advanceIfStandard(dispatcher) val emitted = mutableListOf>() val collectionJob = launch { renderings.collect { emitted += it } } + advanceIfStandard(dispatcher) if (runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES)) { // we have to change state then or it won't render. @@ -317,6 +337,7 @@ class RenderWorkflowInTest { } else { sink.send("unchanging state") } + advanceIfStandard(dispatcher) if (runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES)) { // we have to change state then or it won't render. @@ -324,7 +345,7 @@ class RenderWorkflowInTest { } else { sink.send("unchanging state") } - advanceUntilIdle() + advanceIfStandard(dispatcher) collectionJob.cancel() @@ -345,8 +366,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val trigger = Channel() val workflow = Workflow.stateless { runningWorker( @@ -364,24 +385,32 @@ class RenderWorkflowInTest { ) { receivedOutputs += it } + advanceIfStandard(dispatcher) assertTrue(receivedOutputs.isEmpty()) assertTrue(trigger.trySend("foo").isSuccess) + advanceIfStandard(dispatcher) assertEquals(listOf("foo"), receivedOutputs) assertTrue(trigger.trySend("bar").isSuccess) + advanceIfStandard(dispatcher) assertEquals(listOf("foo", "bar"), receivedOutputs) } } } + private fun advanceIfStandard(dispatcher: TestDispatcher) { + if (dispatcher == myStandardTestDispatcher) { + dispatcher.scheduler.advanceUntilIdle() + dispatcher.scheduler.runCurrent() + } + } + /** * This is a bit of a tricky test. Everything comes down to how your coroutines are dispatched. * This test confirms that we are setting the value on the StateFlow of the updated rendering * before onOutput is called. * - * It uses an [UnconfinedTestDispatcher] for the runtime as would be typical. - * * If we were collecting the renderings, that would happen after [onOutput] as it would have * to wait to be dispatched after onOutput was complete. * @@ -392,8 +421,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val trigger = Channel() val workflow = Workflow.stateful( initialState = "initial", @@ -425,9 +454,12 @@ class RenderWorkflowInTest { // called assertEquals(it, renderings.value.rendering) } + advanceIfStandard(dispatcher) + assertTrue(receivedOutputs.isEmpty()) assertTrue(trigger.trySend("foo").isSuccess) + advanceIfStandard(dispatcher) assertEquals(listOf("foo"), receivedOutputs) } } @@ -447,10 +479,10 @@ class RenderWorkflowInTest { */ @Test fun onOutput_called_after_rendering_emitted_and_collected() { runtimeTestRunner.runParametrizedTest( - paramSource = runtimeOptions, + paramSource = runtimeOptions.filter { it.third != myStandardTestDispatcher }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val trigger = Channel() val workflow = Workflow.stateful( initialState = "initial", @@ -557,8 +589,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val workflow = Workflow.stateless { props -> props } var onOutputCalls = 0 val props = MutableStateFlow(0) @@ -569,14 +601,17 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) { onOutputCalls++ } + advanceIfStandard(dispatcher) assertEquals(0, renderings.value.rendering) assertEquals(0, onOutputCalls) props.value = 1 + advanceIfStandard(dispatcher) assertEquals(1, renderings.value.rendering) assertEquals(0, onOutputCalls) props.value = 2 + advanceIfStandard(dispatcher) assertEquals(2, renderings.value.rendering) assertEquals(0, onOutputCalls) } @@ -592,8 +627,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val workflow = Workflow.stateless { throw ExpectedException() } @@ -616,8 +651,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { var sideEffectWasRan = false val workflow = Workflow.stateless { runningSideEffect("test") { @@ -645,8 +680,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { var sideEffectWasRan = false var cancellationException: Throwable? = null val childWorkflow = Workflow.stateless { @@ -671,11 +706,15 @@ class RenderWorkflowInTest { workflowTracer = workflowTracer, ) {} } - assertTrue(sideEffectWasRan) - assertNotNull(cancellationException) - val realCause = generateSequence(cancellationException) { it.cause } - .firstOrNull { it !is CancellationException } - assertTrue(realCause is ExpectedException) + advanceIfStandard(dispatcher) + if (dispatcher != myStandardTestDispatcher) { + // Side effect will never actually be started unless the dispatcher is eager. + assertTrue(sideEffectWasRan) + assertNotNull(cancellationException) + val realCause = generateSequence(cancellationException) { it.cause } + .firstOrNull { it !is CancellationException } + assertTrue(realCause is ExpectedException) + } } } } @@ -685,8 +724,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { var sideEffectWasRan = false val childWorkflow = Workflow.stateless { runningSideEffect("test") { @@ -716,8 +755,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val trigger = CompletableDeferred() // Throws an exception when trigger is completed. val workflow = Workflow.stateful( @@ -729,7 +768,7 @@ class RenderWorkflowInTest { } } ) - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) renderWorkflowIn( workflow = workflow, scope = testScope, @@ -741,7 +780,7 @@ class RenderWorkflowInTest { assertTrue(testScope.isActive) trigger.complete(Unit) - advanceUntilIdle() + advanceIfStandard(dispatcher) assertFalse(testScope.isActive) } @@ -752,8 +791,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val trigger = CompletableDeferred() // Throws an exception when trigger is completed. val workflow = Workflow.stateless { @@ -763,7 +802,7 @@ class RenderWorkflowInTest { } } } - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) renderWorkflowIn( workflow = workflow, scope = testScope, @@ -775,7 +814,7 @@ class RenderWorkflowInTest { assertTrue(testScope.isActive) trigger.complete(Unit) - advanceUntilIdle() + advanceIfStandard(dispatcher) assertFalse(testScope.isActive) } @@ -786,8 +825,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { var cancellationException: Throwable? = null val workflow = Workflow.stateless { runningSideEffect(key = "test1") { @@ -796,7 +835,7 @@ class RenderWorkflowInTest { } } } - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) renderWorkflowIn( workflow = workflow, scope = testScope, @@ -806,11 +845,11 @@ class RenderWorkflowInTest { ) {} assertNull(cancellationException) assertTrue(testScope.isActive) - advanceUntilIdle() + advanceIfStandard(dispatcher) testScope.cancel() - advanceUntilIdle() + advanceIfStandard(dispatcher) assertTrue(cancellationException is CancellationException) assertNull(cancellationException!!.cause) @@ -822,9 +861,9 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { - val testScope = TestScope(testScheduler) + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { + val testScope = TestScope(dispatcher) val trigger = CompletableDeferred() var renderCount = 0 val workflow = Workflow.stateless { @@ -842,14 +881,14 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) {} - advanceUntilIdle() + advanceIfStandard(dispatcher) assertTrue(testScope.isActive) assertTrue(renderCount == 1) trigger.complete(Unit) - advanceUntilIdle() + advanceIfStandard(dispatcher) assertFalse(testScope.isActive) assertEquals( @@ -865,8 +904,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { var cancellationException: Throwable? = null val workflow = Workflow.stateless { runningSideEffect(key = "failing") { @@ -875,7 +914,7 @@ class RenderWorkflowInTest { } } } - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) renderWorkflowIn( workflow = workflow, scope = testScope, @@ -883,12 +922,12 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) {} - advanceUntilIdle() + advanceIfStandard(dispatcher) assertNull(cancellationException) assertTrue(testScope.isActive) testScope.cancel(CancellationException("fail!", ExpectedException())) - advanceUntilIdle() + advanceIfStandard(dispatcher) assertTrue(cancellationException is CancellationException) assertTrue(cancellationException!!.cause is ExpectedException) } @@ -899,10 +938,10 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val workflow = Workflow.stateless {} - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) val renderings = renderWorkflowIn( workflow = workflow, scope = testScope, @@ -913,11 +952,11 @@ class RenderWorkflowInTest { // Collect in separate scope so we actually test that the parent scope is failed when it's // different from the collecting scope. - val collectScope = TestScope(testScheduler) + val collectScope = TestScope(dispatcher) collectScope.launch { renderings.collect { throw ExpectedException() } } - advanceUntilIdle() + advanceIfStandard(dispatcher) assertTrue(testScope.isActive) assertFalse(collectScope.isActive) } @@ -928,14 +967,14 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val trigger = CompletableDeferred() // Emits a Unit when trigger is completed. val workflow = Workflow.stateless { runningWorker(Worker.from { trigger.await() }) { action("") { setOutput(Unit) } } } - val testScope = TestScope(testScheduler) + val testScope = TestScope(dispatcher) renderWorkflowIn( workflow = workflow, scope = testScope, @@ -945,11 +984,11 @@ class RenderWorkflowInTest { ) { throw ExpectedException() } - advanceUntilIdle() + advanceIfStandard(dispatcher) assertTrue(testScope.isActive) trigger.complete(Unit) - advanceUntilIdle() + advanceIfStandard(dispatcher) assertFalse(testScope.isActive) } } @@ -960,8 +999,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { val workflow = Workflow.stateful( snapshot = { Snapshot.of { @@ -1006,8 +1045,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { @Suppress("EqualsOrHashCode", "unused") class FailRendering(val value: Int) { override fun equals(other: Any?): Boolean { @@ -1044,7 +1083,7 @@ class RenderWorkflowInTest { // Trigger another render pass. props.value += 1 - advanceUntilIdle() + advanceIfStandard(dispatcher) mutex.unlock() } mutex.lock() @@ -1057,8 +1096,8 @@ class RenderWorkflowInTest { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { @Suppress("EqualsOrHashCode") data class FailRendering(val value: Int) { override fun hashCode(): Int { @@ -1094,7 +1133,7 @@ class RenderWorkflowInTest { // Trigger another render pass. props.value += 1 - advanceUntilIdle() + advanceIfStandard(dispatcher) mutex.unlock() } mutex.lock() @@ -1108,8 +1147,8 @@ class RenderWorkflowInTest { it.first.contains(RENDER_ONLY_WHEN_STATE_CHANGES) }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { check(runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES)) lateinit var sink: Sink @@ -1135,7 +1174,7 @@ class RenderWorkflowInTest { } sink.send("unchanging state") - advanceUntilIdle() + advanceIfStandard(dispatcher) collectionJob.cancel() assertEquals(1, emitted.size) @@ -1149,8 +1188,8 @@ class RenderWorkflowInTest { it.first.contains(RENDER_ONLY_WHEN_STATE_CHANGES) }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { check(runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES)) lateinit var sink: Sink @@ -1175,9 +1214,9 @@ class RenderWorkflowInTest { renderings.collect { emitted += it } } - advanceUntilIdle() + advanceIfStandard(dispatcher) sink.send("changing state") - advanceUntilIdle() + advanceIfStandard(dispatcher) assertEquals(2, emitted.size) collectionJob.cancel() @@ -1192,8 +1231,8 @@ class RenderWorkflowInTest { it.first.contains(PARTIAL_TREE_RENDERING) }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { check(runtimeConfig.contains(PARTIAL_TREE_RENDERING)) val trigger = MutableSharedFlow() @@ -1246,8 +1285,9 @@ class RenderWorkflowInTest { workflowTracer = workflowTracer, ) {} + advanceIfStandard(dispatcher) trigger.emit("state 1") // same value as the child starts with. - advanceUntilIdle() + advanceIfStandard(dispatcher) assertEquals(2, parentRenderCount) assertEquals(1, childRenderCount) @@ -1261,8 +1301,8 @@ class RenderWorkflowInTest { it.first.contains(PARTIAL_TREE_RENDERING) }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { check(runtimeConfig.contains(PARTIAL_TREE_RENDERING)) val trigger = MutableSharedFlow() @@ -1314,9 +1354,10 @@ class RenderWorkflowInTest { runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, ) {} + advanceIfStandard(dispatcher) trigger.emit("state 1") // different value than the child starts with. - advanceUntilIdle() + advanceIfStandard(dispatcher) assertEquals(3, parentRenderCount) // Parent needs to be rendered 3x, but child only 2x as the 3rd time its the same. @@ -1334,8 +1375,8 @@ class RenderWorkflowInTest { ) }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(UnconfinedTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) check(runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES)) @@ -1343,6 +1384,7 @@ class RenderWorkflowInTest { var childHandlerActionExecuted = 0 var workerActionExecuted = 0 val trigger = MutableSharedFlow() + val outputSet = mutableListOf() val childWorkflow = Workflow.stateful( initialState = "unchanging state", @@ -1373,6 +1415,7 @@ class RenderWorkflowInTest { action("") { workerActionExecuted++ state = it + setOutput(it) } } renderState.also { @@ -1387,21 +1430,31 @@ class RenderWorkflowInTest { props = props, runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, - ) {} + ) { + outputSet.add(it) + } + advanceIfStandard(dispatcher) launch { trigger.emit("changed state") } - advanceUntilIdle() + advanceIfStandard(dispatcher) // 2 renderings (initial and then the update.) Not *3* renderings. assertEquals(2, renderCount) assertEquals(1, childHandlerActionExecuted) assertEquals(1, workerActionExecuted) + assertEquals(1, outputSet.size) + assertEquals("changed state", outputSet[0]) } } } + /** + * This is the same test as [for_conflate_we_do_not_conflate_stacked_actions_into_one_rendering_if_output] + * except that in that version the handler for the child output also sets output - which is + * one reason we do not end up conflating. + */ @Test fun for_conflate_we_conflate_stacked_actions_into_one_rendering() { runtimeTestRunner.runParametrizedTest( @@ -1410,8 +1463,8 @@ class RenderWorkflowInTest { it.first.contains(CONFLATE_STALE_RENDERINGS) }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(StandardTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) var childHandlerActionExecuted = false @@ -1447,6 +1500,7 @@ class RenderWorkflowInTest { action("") { // Update the rendering in order to show conflation. state = "$it+update" + setOutput(state) } } renderState @@ -1459,20 +1513,25 @@ class RenderWorkflowInTest { props = props, runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, - ) {} - - launch { - trigger.emit("changed state") + ) { + // Yield in output so that we ensure that we let the collector of the renderings + // collect each of them before processing the next action. + yield() } - val collectionJob = launch(UnconfinedTestDispatcher(testScheduler)) { + advanceIfStandard(dispatcher) + + val collectionJob = launch { // Collect this unconfined so we can get all the renderings faster than actions can // be processed. renderings.collect { emitted += it.rendering } } - advanceUntilIdle() - runCurrent() + advanceIfStandard(dispatcher) + launch { + trigger.emit("changed state") + } + advanceIfStandard(dispatcher) collectionJob.cancel() @@ -1492,8 +1551,8 @@ class RenderWorkflowInTest { it.first.contains(CONFLATE_STALE_RENDERINGS) }, before = ::setup, - ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) -> - runTest(StandardTestDispatcher()) { + ) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?, dispatcher: TestDispatcher) -> + runTest(dispatcher) { check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) var childHandlerActionExecuted = false @@ -1543,20 +1602,25 @@ class RenderWorkflowInTest { props = props, runtimeConfig = runtimeConfig, workflowTracer = workflowTracer, - ) {} - - launch { - trigger.emit("changed state") + ) { + // Yield in output so that we ensure that we let the collector of the renderings + // collect each of them before processing the next action. + yield() } - val collectionJob = launch(UnconfinedTestDispatcher(testScheduler)) { + + val collectionJob = launch { // Collect this unconfined so we can get all the renderings faster than actions can // be processed. renderings.collect { emitted += it.rendering } } - advanceUntilIdle() - runCurrent() + advanceIfStandard(dispatcher) + + launch { + trigger.emit("changed state") + } + advanceIfStandard(dispatcher) collectionJob.cancel() @@ -1577,6 +1641,23 @@ class RenderWorkflowInTest { return set1.flatMap { set1Item -> set2.map { set2Item -> set1Item to set2Item } } } + private fun cartesianProduct( + set1: Sequence, + set2: Sequence, + set3: Sequence + ): Sequence> { + return set1.flatMap { set1Item -> set2.map { set2Item -> set1Item to set2Item } } + .flatMap { (set1Item, set2Item) -> + set3.map { set3Item -> + Triple( + set1Item, + set2Item, + set3Item + ) + } + } + } + companion object { internal val EXPECTED_TRACE: String = """ StartingCreateWorkerWorkflow diff --git a/workflow-testing/src/test/java/com/squareup/workflow1/WorkflowsLifecycleTests.kt b/workflow-testing/src/test/java/com/squareup/workflow1/WorkflowsLifecycleTests.kt index 51a1b648b..5f3b6841b 100644 --- a/workflow-testing/src/test/java/com/squareup/workflow1/WorkflowsLifecycleTests.kt +++ b/workflow-testing/src/test/java/com/squareup/workflow1/WorkflowsLifecycleTests.kt @@ -4,9 +4,10 @@ import com.squareup.workflow1.RuntimeConfigOptions.CONFLATE_STALE_RENDERINGS import com.squareup.workflow1.RuntimeConfigOptions.PARTIAL_TREE_RENDERING import com.squareup.workflow1.RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES import com.squareup.workflow1.testing.headlessIntegrationTest +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.awaitCancellation -import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlin.test.Ignore import kotlin.test.Test @@ -122,20 +123,45 @@ class WorkflowsLifecycleTests { } } + @Test fun childSessionWorkflowStartedWhenExpected() { + runtimeTestRunner.runParametrizedTest( + paramSource = runtimeOptions, + after = ::cleanup, + ) { runtimeConfig: RuntimeConfig -> + + workflowWithChildSession.headlessIntegrationTest( + runtimeConfig = runtimeConfig + ) { + // One time starts but does not stop the child session workflow. + repeat(1) { + val (current, setState) = awaitNextRendering() + setState.invoke(current + 1) + } + + assertEquals(1, started, "Child Session Workflow not started 1 time.") + } + } + } + /** * @see [1093](https://github.com/square/workflow-kotlin/issues/1093) * - * This test ensconces the currently failing behavior of side effects. We are not currently - * fixing this but rather working around it with [SessionWorkflow]. + * This test fails. It is kept and Ignored as a way to ensconce the currently failing behavior + * of side effects with immediate start & stops. We are not currently fixing this but rather + * working around it with [SessionWorkflow]. + * + * Compare with [childSessionWorkflowStartAndStoppedWhenHandledSynchronously] */ - @Ignore - @Test fun sideEffectsStartAndStoppedWhenHandledSynchronously() { + @Ignore("https://github.com/square/workflow-kotlin/issues/1093") + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun sideEffectsStartAndStoppedWhenHandledSynchronously() { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, after = ::cleanup, ) { runtimeConfig: RuntimeConfig -> - val dispatcher = StandardTestDispatcher() + val dispatcher = UnconfinedTestDispatcher() workflowWithSideEffects.headlessIntegrationTest( coroutineContext = dispatcher, runtimeConfig = runtimeConfig @@ -146,9 +172,7 @@ class WorkflowsLifecycleTests { // on two consecutive render passes. setState.invoke(1) setState.invoke(2) - dispatcher.scheduler.runCurrent() awaitNextRendering() - dispatcher.scheduler.runCurrent() if (!runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) { // 2 rendering or 1 depending on runtime config. awaitNextRendering() @@ -160,26 +184,6 @@ class WorkflowsLifecycleTests { } } - @Test fun childSessionWorkflowStartedWhenExpected() { - runtimeTestRunner.runParametrizedTest( - paramSource = runtimeOptions, - after = ::cleanup, - ) { runtimeConfig: RuntimeConfig -> - - workflowWithChildSession.headlessIntegrationTest( - runtimeConfig = runtimeConfig - ) { - // One time starts but does not stop the child session workflow. - repeat(1) { - val (current, setState) = awaitNextRendering() - setState.invoke(current + 1) - } - - assertEquals(1, started, "Child Session Workflow not started 1 time.") - } - } - } - @Test fun childSessionWorkflowStoppedWhenExpected() { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, @@ -205,13 +209,15 @@ class WorkflowsLifecycleTests { * * This tests show the working behavior when using a [SessionWorkflow] to track the lifetime. */ - @Test fun childSessionWorkflowStartAndStoppedWhenHandledSynchronously() { + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun childSessionWorkflowStartAndStoppedWhenHandledSynchronously() { runtimeTestRunner.runParametrizedTest( paramSource = runtimeOptions, after = ::cleanup, ) { runtimeConfig: RuntimeConfig -> - val dispatcher = StandardTestDispatcher() + val dispatcher = UnconfinedTestDispatcher() workflowWithChildSession.headlessIntegrationTest( coroutineContext = dispatcher, runtimeConfig = runtimeConfig @@ -222,9 +228,7 @@ class WorkflowsLifecycleTests { // on two consecutive render passes, synchronously. setState.invoke(1) setState.invoke(2) - dispatcher.scheduler.runCurrent() awaitNextRendering() - dispatcher.scheduler.runCurrent() if (!runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) { // 2 rendering or 1 depending on runtime config. awaitNextRendering()