diff --git a/spring-batch-docs/asciidoc/readersAndWriters.adoc b/spring-batch-docs/asciidoc/readersAndWriters.adoc index c32bba1572..953c6cbb07 100644 --- a/spring-batch-docs/asciidoc/readersAndWriters.adoc +++ b/spring-batch-docs/asciidoc/readersAndWriters.adoc @@ -3000,6 +3000,7 @@ Spring Batch includes the following decorators: * <> * <> +* <> * <> * <> * <> @@ -3023,6 +3024,13 @@ NOTE: SingleItemPeekableItemReader's peek method is not thread-safe, because it be possible to honor the peek in multiple threads. Only one of the threads that peeked would get that item in the next call to read. +[[synchronizedItemStreamWriter]] +===== `SynchronizedItemStreamWriter` +When using an `ItemWriter` that is not thread safe, Spring Batch offers the +`SynchronizedItemStreamWriter` decorator, which can be used to make the `ItemWriter` +thread safe. Spring Batch provides a `SynchronizedItemStreamWriterBuilder` to construct +an instance of the `SynchronizedItemStreamWriter`. + [[multiResourceItemWriter]] ===== `MultiResourceItemWriter` The `MultiResourceItemWriter` wraps a `ResourceAwareItemWriterItemStream` and creates a new diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/SynchronizedItemStreamWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/SynchronizedItemStreamWriter.java new file mode 100644 index 0000000000..3bd40a1879 --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/SynchronizedItemStreamWriter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.support; + +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamWriter; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; + +import java.util.List; + +/** + * An {@link ItemStreamWriter} decorator with a synchronized + * {@link SynchronizedItemStreamWriter#write write()} method + * + * @author Dimitrios Liapis + * + * @param type of object being written + */ +public class SynchronizedItemStreamWriter implements ItemStreamWriter, InitializingBean { + + private ItemStreamWriter delegate; + + public void setDelegate(ItemStreamWriter delegate) { + this.delegate = delegate; + } + + /** + * This delegates to the write method of the delegate + */ + @Override + public synchronized void write(List items) throws Exception { + this.delegate.write(items); + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + this.delegate.open(executionContext); + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + this.delegate.update(executionContext); + } + + @Override + public void close() throws ItemStreamException { + this.delegate.close(); + } + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(this.delegate, "A delegate item writer is required"); + } +} diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/builder/SynchronizedItemStreamWriterBuilder.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/builder/SynchronizedItemStreamWriterBuilder.java new file mode 100644 index 0000000000..c2d5fe8197 --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/builder/SynchronizedItemStreamWriterBuilder.java @@ -0,0 +1,44 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.support.builder; + +import org.springframework.batch.item.ItemStreamWriter; +import org.springframework.batch.item.support.SynchronizedItemStreamWriter; +import org.springframework.util.Assert; + +/** + * Creates a fully qualified {@link SynchronizedItemStreamWriter}. + * + * @author Dimitrios Liapis + */ +public class SynchronizedItemStreamWriterBuilder { + + private ItemStreamWriter delegate; + + public SynchronizedItemStreamWriterBuilder delegate(ItemStreamWriter delegate) { + this.delegate = delegate; + + return this; + } + + public SynchronizedItemStreamWriter build() { + Assert.notNull(this.delegate, "A delegate is required"); + + SynchronizedItemStreamWriter writer = new SynchronizedItemStreamWriter<>(); + writer.setDelegate(this.delegate); + return writer; + } +} diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/AbstractSynchronizedItemStreamWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/AbstractSynchronizedItemStreamWriterTests.java new file mode 100644 index 0000000000..c1574964a5 --- /dev/null +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/AbstractSynchronizedItemStreamWriterTests.java @@ -0,0 +1,84 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.support; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamWriter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +/** + * Common parent class for {@link SynchronizedItemStreamWriterTests} and + * {@link org.springframework.batch.item.support.builder.SynchronizedItemStreamWriterBuilderTests} + * + * @author Dimitrios Liapis + * + */ +public abstract class AbstractSynchronizedItemStreamWriterTests { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Mock + protected ItemStreamWriter delegate; + + private SynchronizedItemStreamWriter synchronizedItemStreamWriter; + private final List testList = Collections.unmodifiableList(new ArrayList<>()); + private final ExecutionContext testExecutionContext = new ExecutionContext(); + + abstract protected SynchronizedItemStreamWriter createNewSynchronizedItemStreamWriter(); + + @Before + public void init() { + initMocks(this); + synchronizedItemStreamWriter = createNewSynchronizedItemStreamWriter(); + } + + @Test + public void testDelegateWriteIsCalled() throws Exception { + synchronizedItemStreamWriter.write(testList); + verify(delegate).write(testList); + } + + @Test + public void testDelegateOpenIsCalled() { + synchronizedItemStreamWriter.open(testExecutionContext); + verify(delegate).open(testExecutionContext); + } + + @Test + public void testDelegateUpdateIsCalled() { + synchronizedItemStreamWriter.update(testExecutionContext); + verify(delegate).update(testExecutionContext); + } + + @Test + public void testDelegateCloseIsClosed() { + synchronizedItemStreamWriter.close(); + verify(delegate).close(); + } + +} diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/SynchronizedItemStreamWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/SynchronizedItemStreamWriterTests.java new file mode 100644 index 0000000000..cdece0c6a7 --- /dev/null +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/SynchronizedItemStreamWriterTests.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.support; + +import org.junit.Test; +import org.springframework.beans.factory.InitializingBean; + +/** + * + * @author Dimitrios Liapis + * + */ +public class SynchronizedItemStreamWriterTests extends AbstractSynchronizedItemStreamWriterTests { + + + @Override + protected SynchronizedItemStreamWriter createNewSynchronizedItemStreamWriter() { + SynchronizedItemStreamWriter synchronizedItemStreamWriter = new SynchronizedItemStreamWriter<>(); + synchronizedItemStreamWriter.setDelegate(delegate); + return synchronizedItemStreamWriter; + } + + @Test + public void testDelegateIsNotNullWhenPropertiesSet() throws Exception { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("A delegate item writer is required"); + ((InitializingBean) new SynchronizedItemStreamWriter<>()).afterPropertiesSet(); + } +} diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/SynchronizedItemStreamWriterBuilderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/SynchronizedItemStreamWriterBuilderTests.java new file mode 100644 index 0000000000..b3d98eae12 --- /dev/null +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/SynchronizedItemStreamWriterBuilderTests.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.support.builder; + +import org.junit.Test; +import org.springframework.batch.item.support.AbstractSynchronizedItemStreamWriterTests; +import org.springframework.batch.item.support.SynchronizedItemStreamWriter; + +/** + * + * @author Dimitrios Liapis + * + */ +public class SynchronizedItemStreamWriterBuilderTests extends AbstractSynchronizedItemStreamWriterTests { + + + @Override + protected SynchronizedItemStreamWriter createNewSynchronizedItemStreamWriter() { + return new SynchronizedItemStreamWriterBuilder<>() + .delegate(delegate) + .build(); + } + + @Test + public void testBuilderDelegateIsNotNull() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("A delegate is required"); + new SynchronizedItemStreamWriterBuilder<>().build(); + } +}