Skip to content

Commit f40ab20

Browse files
elimelecfmbenhassine
authored andcommitted
Attempt to close all delegate stream readers even when some fail
Signed-off-by: Elimelec Burghelea <[email protected]>
1 parent 1eac9e9 commit f40ab20

File tree

4 files changed

+69
-15
lines changed

4 files changed

+69
-15
lines changed

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/TaskletStepExceptionTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2008-2022 the original author or authors.
2+
* Copyright 2008-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@
6363
* @author David Turanski
6464
* @author Mahmoud Ben Hassine
6565
* @author Parikshit Dutta
66+
* @author Elimelec Burghelea
6667
*/
6768
class TaskletStepExceptionTests {
6869

@@ -212,8 +213,8 @@ public void close() throws ItemStreamException {
212213

213214
taskletStep.execute(stepExecution);
214215
assertEquals(FAILED, stepExecution.getStatus());
215-
assertTrue(stepExecution.getFailureExceptions().contains(taskletException));
216-
assertTrue(stepExecution.getFailureExceptions().contains(exception));
216+
assertEquals(stepExecution.getFailureExceptions().get(0), taskletException);
217+
assertEquals(stepExecution.getFailureExceptions().get(1).getSuppressed()[0], exception);
217218
assertEquals(2, jobRepository.getUpdateCount());
218219
}
219220

spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2023 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -761,7 +761,7 @@ public void close() throws ItemStreamException {
761761
Throwable ex = stepExecution.getFailureExceptions().get(0);
762762

763763
// The original rollback was caused by this one:
764-
assertEquals("Bar", ex.getMessage());
764+
assertEquals("Bar", ex.getSuppressed()[0].getMessage());
765765
}
766766

767767
@Test
@@ -791,7 +791,7 @@ public void close() throws ItemStreamException {
791791
assertEquals("", msg);
792792
Throwable ex = stepExecution.getFailureExceptions().get(0);
793793
// The original rollback was caused by this one:
794-
assertEquals("Bar", ex.getMessage());
794+
assertEquals("Bar", ex.getSuppressed()[0].getMessage());
795795
}
796796

797797
/**

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemStream.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2022 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,7 +28,7 @@
2828
*
2929
* @author Dave Syer
3030
* @author Mahmoud Ben Hassine
31-
*
31+
* @author Elimelec Burghelea
3232
*/
3333
public class CompositeItemStream implements ItemStream {
3434

@@ -102,13 +102,26 @@ public void update(ExecutionContext executionContext) {
102102
/**
103103
* Broadcast the call to close.
104104
* @throws ItemStreamException thrown if one of the {@link ItemStream}s in the list
105-
* fails to close. This is a sequential operation so all itemStreams in the list after
106-
* the one that failed to close will remain open.
105+
* fails to close.
107106
*/
108107
@Override
109108
public void close() throws ItemStreamException {
109+
List<Exception> exceptions = new ArrayList<>();
110+
110111
for (ItemStream itemStream : streams) {
111-
itemStream.close();
112+
try {
113+
itemStream.close();
114+
}
115+
catch (Exception e) {
116+
exceptions.add(e);
117+
}
118+
}
119+
120+
if (!exceptions.isEmpty()) {
121+
String message = String.format("Failed to close %d delegate(s) due to exceptions", exceptions.size());
122+
ItemStreamException holder = new ItemStreamException(message);
123+
exceptions.forEach(holder::addSuppressed);
124+
throw holder;
112125
}
113126
}
114127

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemStreamTests.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2022 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,19 +15,25 @@
1515
*/
1616
package org.springframework.batch.item.support;
1717

18+
import static org.junit.jupiter.api.Assertions.assertEquals;
19+
import static org.mockito.Mockito.doThrow;
20+
import static org.mockito.Mockito.times;
21+
import static org.mockito.Mockito.verify;
22+
1823
import java.util.ArrayList;
1924
import java.util.List;
2025

26+
import org.junit.jupiter.api.Assertions;
2127
import org.junit.jupiter.api.Test;
28+
import org.mockito.Mockito;
2229
import org.springframework.batch.item.ExecutionContext;
2330
import org.springframework.batch.item.ItemStream;
31+
import org.springframework.batch.item.ItemStreamException;
2432
import org.springframework.batch.item.ItemStreamSupport;
2533

26-
import static org.junit.jupiter.api.Assertions.assertEquals;
27-
2834
/**
2935
* @author Dave Syer
30-
*
36+
* @author Elimelec Burghelea
3137
*/
3238
class CompositeItemStreamTests {
3339

@@ -90,6 +96,40 @@ public void close() {
9096
assertEquals(1, list.size());
9197
}
9298

99+
@Test
100+
void testClose2Delegates() {
101+
ItemStream reader1 = Mockito.mock(ItemStream.class);
102+
ItemStream reader2 = Mockito.mock(ItemStream.class);
103+
manager.register(reader1);
104+
manager.register(reader2);
105+
106+
manager.close();
107+
108+
verify(reader1, times(1)).close();
109+
verify(reader2, times(1)).close();
110+
}
111+
112+
@Test
113+
void testClose2DelegatesThatThrowsException() {
114+
ItemStream reader1 = Mockito.mock(ItemStream.class);
115+
ItemStream reader2 = Mockito.mock(ItemStream.class);
116+
manager.register(reader1);
117+
manager.register(reader2);
118+
119+
doThrow(new ItemStreamException("A failure")).when(reader1).close();
120+
121+
try {
122+
manager.close();
123+
Assertions.fail("Expected an ItemStreamException");
124+
}
125+
catch (ItemStreamException ignored) {
126+
127+
}
128+
129+
verify(reader1, times(1)).close();
130+
verify(reader2, times(1)).close();
131+
}
132+
93133
@Test
94134
void testCloseDoesNotUnregister() {
95135
manager.setStreams(new ItemStream[] { new ItemStreamSupport() {

0 commit comments

Comments
 (0)