Skip to content

Commit 117b7d5

Browse files
dimitrislifmbenhassine
authored andcommitted
Introduce SynchronizedItemStreamWriter
Issue #842
1 parent 2f8aff2 commit 117b7d5

File tree

6 files changed

+290
-0
lines changed

6 files changed

+290
-0
lines changed

spring-batch-docs/asciidoc/readersAndWriters.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3000,6 +3000,7 @@ Spring Batch includes the following decorators:
30003000

30013001
* <<synchronizedItemStreamReader>>
30023002
* <<singleItemPeekableItemReader>>
3003+
* <<synchronizedItemStreamWriter>>
30033004
* <<multiResourceItemWriter>>
30043005
* <<classifierCompositeItemWriter>>
30053006
* <<classifierCompositeItemProcessor>>
@@ -3023,6 +3024,13 @@ NOTE: SingleItemPeekableItemReader's peek method is not thread-safe, because it
30233024
be possible to honor the peek in multiple threads. Only one of the threads that peeked
30243025
would get that item in the next call to read.
30253026

3027+
[[synchronizedItemStreamWriter]]
3028+
===== `SynchronizedItemStreamWriter`
3029+
When using an `ItemWriter` that is not thread safe, Spring Batch offers the
3030+
`SynchronizedItemStreamWriter` decorator, which can be used to make the `ItemWriter`
3031+
thread safe. Spring Batch provides a `SynchronizedItemStreamWriterBuilder` to construct
3032+
an instance of the `SynchronizedItemStreamWriter`.
3033+
30263034
[[multiResourceItemWriter]]
30273035
===== `MultiResourceItemWriter`
30283036
The `MultiResourceItemWriter` wraps a `ResourceAwareItemWriterItemStream` and creates a new
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import org.springframework.batch.item.ExecutionContext;
19+
import org.springframework.batch.item.ItemStreamException;
20+
import org.springframework.batch.item.ItemStreamWriter;
21+
import org.springframework.beans.factory.InitializingBean;
22+
import org.springframework.util.Assert;
23+
24+
import java.util.List;
25+
26+
/**
27+
* An {@link ItemStreamWriter} decorator with a synchronized
28+
* {@link SynchronizedItemStreamWriter#write write()} method
29+
*
30+
* @author Dimitrios Liapis
31+
*
32+
* @param <T> type of object being written
33+
*/
34+
public class SynchronizedItemStreamWriter<T> implements ItemStreamWriter<T>, InitializingBean {
35+
36+
private ItemStreamWriter<T> delegate;
37+
38+
public void setDelegate(ItemStreamWriter<T> delegate) {
39+
this.delegate = delegate;
40+
}
41+
42+
/**
43+
* This delegates to the write method of the <code>delegate</code>
44+
*/
45+
@Override
46+
public synchronized void write(List<? extends T> items) throws Exception {
47+
this.delegate.write(items);
48+
}
49+
50+
@Override
51+
public void open(ExecutionContext executionContext) throws ItemStreamException {
52+
this.delegate.open(executionContext);
53+
}
54+
55+
@Override
56+
public void update(ExecutionContext executionContext) throws ItemStreamException {
57+
this.delegate.update(executionContext);
58+
}
59+
60+
@Override
61+
public void close() throws ItemStreamException {
62+
this.delegate.close();
63+
}
64+
65+
@Override
66+
public void afterPropertiesSet() throws Exception {
67+
Assert.notNull(this.delegate, "A delegate item writer is required");
68+
}
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support.builder;
17+
18+
import org.springframework.batch.item.ItemStreamWriter;
19+
import org.springframework.batch.item.support.SynchronizedItemStreamWriter;
20+
import org.springframework.util.Assert;
21+
22+
/**
23+
* Creates a fully qualified {@link SynchronizedItemStreamWriter}.
24+
*
25+
* @author Dimitrios Liapis
26+
*/
27+
public class SynchronizedItemStreamWriterBuilder<T> {
28+
29+
private ItemStreamWriter<T> delegate;
30+
31+
public SynchronizedItemStreamWriterBuilder<T> delegate(ItemStreamWriter<T> delegate) {
32+
this.delegate = delegate;
33+
34+
return this;
35+
}
36+
37+
public SynchronizedItemStreamWriter<T> build() {
38+
Assert.notNull(this.delegate, "A delegate is required");
39+
40+
SynchronizedItemStreamWriter<T> writer = new SynchronizedItemStreamWriter<>();
41+
writer.setDelegate(this.delegate);
42+
return writer;
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import org.junit.Before;
19+
import org.junit.Rule;
20+
import org.junit.Test;
21+
import org.junit.rules.ExpectedException;
22+
import org.mockito.Mock;
23+
import org.springframework.batch.item.ExecutionContext;
24+
import org.springframework.batch.item.ItemStreamWriter;
25+
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
30+
import static org.mockito.Mockito.verify;
31+
import static org.mockito.MockitoAnnotations.initMocks;
32+
33+
/**
34+
* Common parent class for {@link SynchronizedItemStreamWriterTests} and
35+
* {@link org.springframework.batch.item.support.builder.SynchronizedItemStreamWriterBuilderTests}
36+
*
37+
* @author Dimitrios Liapis
38+
*
39+
*/
40+
public abstract class AbstractSynchronizedItemStreamWriterTests {
41+
42+
@Rule
43+
public ExpectedException expectedException = ExpectedException.none();
44+
45+
@Mock
46+
protected ItemStreamWriter<Object> delegate;
47+
48+
private SynchronizedItemStreamWriter<Object> synchronizedItemStreamWriter;
49+
private final List<Object> testList = Collections.unmodifiableList(new ArrayList<>());
50+
private final ExecutionContext testExecutionContext = new ExecutionContext();
51+
52+
abstract protected SynchronizedItemStreamWriter<Object> createNewSynchronizedItemStreamWriter();
53+
54+
@Before
55+
public void init() {
56+
initMocks(this);
57+
synchronizedItemStreamWriter = createNewSynchronizedItemStreamWriter();
58+
}
59+
60+
@Test
61+
public void testDelegateWriteIsCalled() throws Exception {
62+
synchronizedItemStreamWriter.write(testList);
63+
verify(delegate).write(testList);
64+
}
65+
66+
@Test
67+
public void testDelegateOpenIsCalled() {
68+
synchronizedItemStreamWriter.open(testExecutionContext);
69+
verify(delegate).open(testExecutionContext);
70+
}
71+
72+
@Test
73+
public void testDelegateUpdateIsCalled() {
74+
synchronizedItemStreamWriter.update(testExecutionContext);
75+
verify(delegate).update(testExecutionContext);
76+
}
77+
78+
@Test
79+
public void testDelegateCloseIsClosed() {
80+
synchronizedItemStreamWriter.close();
81+
verify(delegate).close();
82+
}
83+
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import org.junit.Test;
19+
import org.springframework.beans.factory.InitializingBean;
20+
21+
/**
22+
*
23+
* @author Dimitrios Liapis
24+
*
25+
*/
26+
public class SynchronizedItemStreamWriterTests extends AbstractSynchronizedItemStreamWriterTests {
27+
28+
29+
@Override
30+
protected SynchronizedItemStreamWriter<Object> createNewSynchronizedItemStreamWriter() {
31+
SynchronizedItemStreamWriter<Object> synchronizedItemStreamWriter = new SynchronizedItemStreamWriter<>();
32+
synchronizedItemStreamWriter.setDelegate(delegate);
33+
return synchronizedItemStreamWriter;
34+
}
35+
36+
@Test
37+
public void testDelegateIsNotNullWhenPropertiesSet() throws Exception {
38+
expectedException.expect(IllegalArgumentException.class);
39+
expectedException.expectMessage("A delegate item writer is required");
40+
((InitializingBean) new SynchronizedItemStreamWriter<>()).afterPropertiesSet();
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support.builder;
17+
18+
import org.junit.Test;
19+
import org.springframework.batch.item.support.AbstractSynchronizedItemStreamWriterTests;
20+
import org.springframework.batch.item.support.SynchronizedItemStreamWriter;
21+
22+
/**
23+
*
24+
* @author Dimitrios Liapis
25+
*
26+
*/
27+
public class SynchronizedItemStreamWriterBuilderTests extends AbstractSynchronizedItemStreamWriterTests {
28+
29+
30+
@Override
31+
protected SynchronizedItemStreamWriter<Object> createNewSynchronizedItemStreamWriter() {
32+
return new SynchronizedItemStreamWriterBuilder<>()
33+
.delegate(delegate)
34+
.build();
35+
}
36+
37+
@Test
38+
public void testBuilderDelegateIsNotNull() {
39+
expectedException.expect(IllegalArgumentException.class);
40+
expectedException.expectMessage("A delegate is required");
41+
new SynchronizedItemStreamWriterBuilder<>().build();
42+
}
43+
}

0 commit comments

Comments
 (0)