diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/ItemProcessListenerSupport.java b/spring-batch-core/src/main/java/org/springframework/batch/core/ItemProcessListenerSupport.java new file mode 100644 index 0000000000..d847a9feca --- /dev/null +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/ItemProcessListenerSupport.java @@ -0,0 +1,30 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core; + +import org.springframework.batch.item.ItemProcessor; + +/** + * Interface to mark {@link ItemProcessor} which support injection of a {@link ItemProcessListener}. + * + * @author Dominik Bartholdi + * + */ +public interface ItemProcessListenerSupport { + + public void setItemProcessListener(ItemProcessListener itemProcessListener); + +} diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java index 34214d35c3..8063f3594c 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java @@ -18,6 +18,8 @@ import java.util.List; +import org.springframework.batch.core.ItemProcessListenerSupport; +import org.springframework.batch.core.ItemProcessListener; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepListener; import org.springframework.batch.core.listener.MulticasterBatchListener; @@ -99,6 +101,9 @@ public void setListeners(List listeners) { */ public void registerListener(StepListener listener) { this.listener.register(listener); + if((itemProcessor instanceof ItemProcessListenerSupport) && (listener instanceof ItemProcessListener)){ + ((ItemProcessListenerSupport)itemProcessor).setItemProcessListener((ItemProcessListener)listener); + } } /** diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/async/AsyncItemProcessor.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/async/AsyncItemProcessor.java index a47dbcb4e4..35fa1516e1 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/async/AsyncItemProcessor.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/async/AsyncItemProcessor.java @@ -19,6 +19,8 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import org.springframework.batch.core.ItemProcessListenerSupport; +import org.springframework.batch.core.ItemProcessListener; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.StepContext; import org.springframework.batch.core.scope.context.StepSynchronizationManager; @@ -48,11 +50,13 @@ * @param the output object type (will be wrapped in a Future) * @see AsyncItemWriter */ -public class AsyncItemProcessor implements ItemProcessor>, InitializingBean { +public class AsyncItemProcessor implements ItemProcessor>, InitializingBean, ItemProcessListenerSupport { private ItemProcessor delegate; private TaskExecutor taskExecutor = new SyncTaskExecutor(); + + private ItemProcessListener itemProcessListener; /** * Check mandatory properties (the {@link #setDelegate(ItemProcessor)}). @@ -72,7 +76,7 @@ public void afterPropertiesSet() throws Exception { public void setDelegate(ItemProcessor delegate) { this.delegate = delegate; } - + /** * The {@link TaskExecutor} to use to allow the item processing to proceed * in the background. Defaults to a {@link SyncTaskExecutor} so no threads @@ -83,7 +87,16 @@ public void setDelegate(ItemProcessor delegate) { public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } - + + /** + * The {@link ItemProcessListener} to be called in case of an exception. + * + * @param itemProcessListener a {@link ItemProcessListener} to be notified about exceptions. + */ + public void setItemProcessListener(ItemProcessListener itemProcessListener) { + this.itemProcessListener = itemProcessListener; + } + /** * Transform the input by delegating to the provided item processor. The * return value is wrapped in a {@link Future} so that clients can unpack it @@ -101,6 +114,12 @@ public O call() throws Exception { try { return delegate.process(item); } + catch (Exception e) { + if (itemProcessListener != null) { + itemProcessListener.onProcessError(item, e); + } + throw e; + } finally { if (stepExecution != null) { StepSynchronizationManager.close();