diff --git a/build.gradle b/build.gradle index 7f41d37257b9..6b19cc01c9a6 100644 --- a/build.gradle +++ b/build.gradle @@ -68,6 +68,7 @@ configure(allprojects) { project -> dependency "io.reactivex:rxjava-reactive-streams:1.2.1" dependency "io.reactivex.rxjava2:rxjava:2.2.21" dependency "io.reactivex.rxjava3:rxjava:3.1.0" + dependency "io.smallrye.reactive:mutiny:1.0.0" dependency "io.projectreactor.tools:blockhound:1.0.4.RELEASE" dependency "com.caucho:hessian:4.0.63" diff --git a/spring-core/spring-core.gradle b/spring-core/spring-core.gradle index a4166705d32d..822b6ed58acd 100644 --- a/spring-core/spring-core.gradle +++ b/spring-core/spring-core.gradle @@ -49,6 +49,7 @@ dependencies { optional("io.reactivex:rxjava-reactive-streams") optional("io.reactivex.rxjava2:rxjava") optional("io.reactivex.rxjava3:rxjava") + optional("io.smallrye.reactive:mutiny") optional("io.netty:netty-buffer") testCompile("io.projectreactor:reactor-test") testCompile("com.google.code.findbugs:jsr305") diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index a5a8b9295092..4cbb90c0c606 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -24,6 +24,8 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import kotlinx.coroutines.CompletableDeferredKt; import kotlinx.coroutines.Deferred; import org.reactivestreams.Publisher; @@ -72,6 +74,8 @@ public class ReactiveAdapterRegistry { private static final boolean kotlinCoroutinesPresent; + private static final boolean mutinyPresent; + static { ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader(); reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader); @@ -81,6 +85,7 @@ public class ReactiveAdapterRegistry { rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader); flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader); kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader); + mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Uni", classLoader); } private final List adapters = new ArrayList<>(); @@ -121,6 +126,11 @@ public ReactiveAdapterRegistry() { if (reactorPresent && kotlinCoroutinesPresent) { new CoroutinesRegistrar().registerAdapters(this); } + + // SmallRye Mutiny + if (mutinyPresent) { + new MutinyRegistrar().registerAdapters(this); + } } @@ -417,6 +427,23 @@ void registerAdapters(ReactiveAdapterRegistry registry) { } } + private static class MutinyRegistrar { + + void registerAdapters(ReactiveAdapterRegistry registry) { + registry.registerReactiveType( + ReactiveTypeDescriptor.singleOptionalValue(Uni.class, ()-> Uni.createFrom().nothing()), + uni ->((Uni)uni).convert().toPublisher(), + publisher -> Uni.createFrom().publisher(publisher) + ); + + registry.registerReactiveType( + ReactiveTypeDescriptor.multiValue(Multi.class, ()-> Multi.createFrom().empty()), + multi -> (Multi) multi, + publisher-> Multi.createFrom().publisher(publisher) + ); + } + } + /** * {@code BlockHoundIntegration} for spring-core classes. diff --git a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java index 4dc08a704f29..535891e6ff3e 100644 --- a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java +++ b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import kotlinx.coroutines.Deferred; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -357,6 +359,52 @@ void deferred() { } } + // SmallRye Mutiny + @Nested + class Mutiny { + + @Test + void defaultAdapterRegistrations() { + assertThat(getAdapter(io.smallrye.mutiny.Uni.class)).isNotNull(); + assertThat(getAdapter(io.smallrye.mutiny.Multi.class)).isNotNull(); + } + + @Test + void toUni() { + Publisher source = Mono.just(1); + Object target = getAdapter(Uni.class).fromPublisher(source); + assertThat(target).isInstanceOf(Uni.class); + assertThat(((Uni) target).await().atMost(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + } + + @Test + void fromUni() { + Uni source = Uni.createFrom().item(1); + Object target = getAdapter(Uni.class).toPublisher(source); + assertThat(target).isInstanceOf(Mono.class); + assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + } + + @Test + void toMulti() { + List sequence = Arrays.asList(1, 2, 3); + Publisher source = Flux.fromIterable(sequence); + Object target = getAdapter(Multi.class).fromPublisher(source); + assertThat(target).isInstanceOf(Multi.class); + assertThat(((Multi) target).collect().asList().await().atMost(Duration.ofMillis(1000))).isEqualTo(sequence); + } + + @Test + void fromMulti() { + List sequence = Arrays.asList(1, 2, 3); + Multi source = Multi.createFrom().iterable(sequence); + Object target = getAdapter(Multi.class).toPublisher(source); + assertThat(target).isInstanceOf(Flux.class); + assertThat(((Flux) target).blockLast(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(3)); + } + + } + private ReactiveAdapter getAdapter(Class reactiveType) { ReactiveAdapter adapter = this.registry.getAdapter(reactiveType); assertThat(adapter).isNotNull();