Skip to content

Commit dc41757

Browse files
wklausfmbenhassine
authored andcommitted
BATCH-2480 Fix regression due to the fix of BATCH-2442
1 parent dabf699 commit dc41757

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

spring-batch-core-tests/src/test/java/org/springframework/batch/core/test/step/FaultTolerantStepIntegrationTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2010-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package org.springframework.batch.core.test.step;
217

318
import java.util.ArrayList;
@@ -17,6 +32,7 @@
1732
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
1833
import org.springframework.batch.core.repository.JobRepository;
1934
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
35+
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
2036
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
2137
import org.springframework.batch.core.step.skip.SkipPolicy;
2238
import org.springframework.batch.item.ItemProcessor;
@@ -195,6 +211,53 @@ public void write(List<? extends Integer> items) throws Exception {
195211
assertEquals(1, stepExecution.getProcessSkipCount());
196212
}
197213

214+
@Test(timeout = 3000)
215+
public void testExceptionInProcessAndWriteDuringChunkScan() throws Exception {
216+
// Given
217+
ListItemReader<Integer> itemReader = new ListItemReader<>(Arrays.asList(1, 2, 3));
218+
219+
ItemProcessor<Integer, Integer> itemProcessor = new ItemProcessor<Integer, Integer>() {
220+
@Override
221+
public Integer process(Integer item) throws Exception {
222+
if (item.equals(2)) {
223+
throw new Exception("Error during process item " + item);
224+
}
225+
return item;
226+
}
227+
};
228+
229+
ItemWriter<Integer> itemWriter = new ItemWriter<Integer>() {
230+
@Override
231+
public void write(List<? extends Integer> items) throws Exception {
232+
if (items.contains(3)) {
233+
throw new Exception("Error during write");
234+
}
235+
}
236+
};
237+
238+
Step step = new StepBuilderFactory(jobRepository, transactionManager).get("step")
239+
.<Integer, Integer>chunk(5)
240+
.reader(itemReader)
241+
.processor(itemProcessor)
242+
.writer(itemWriter)
243+
.faultTolerant()
244+
.skipPolicy(new AlwaysSkipItemSkipPolicy())
245+
.build();
246+
247+
// When
248+
StepExecution stepExecution = execute(step);
249+
250+
// Then
251+
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
252+
assertEquals(ExitStatus.COMPLETED, stepExecution.getExitStatus());
253+
assertEquals(3, stepExecution.getReadCount());
254+
assertEquals(1, stepExecution.getWriteCount());
255+
assertEquals(1, stepExecution.getWriteSkipCount());
256+
assertEquals(1, stepExecution.getProcessSkipCount());
257+
assertEquals(3, stepExecution.getRollbackCount());
258+
assertEquals(2, stepExecution.getCommitCount());
259+
}
260+
198261
private List<Integer> createItems() {
199262
List<Integer> items = new ArrayList<>(TOTAL_ITEMS);
200263
for (int i = 1; i <= TOTAL_ITEMS; i++) {

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2018 the original author or authors.
2+
* Copyright 2006-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -580,8 +580,7 @@ private void scan(final StepContribution contribution, final Chunk<I> inputs, fi
580580
Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
581581
Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
582582

583-
//BATCH-2442 : do not scan skipped items
584-
if (!inputs.getSkips().isEmpty()) {
583+
if (!inputs.getSkips().isEmpty() && inputs.getItems().size() != outputs.getItems().size()) {
585584
if (outputIterator.hasNext()) {
586585
outputIterator.remove();
587586
return;

0 commit comments

Comments
 (0)