Skip to content

FaultTolerantChunkProcessor does not collect metrics like SimpleChunkProcessor #3664

Closed
@fredgcosta

Description

@fredgcosta

@benas @mminella
When we enabled the faultTolerant() StepBuilder we noticed that the spring_batch_chunk_write_seconds_count stopped showing in Grafana.
Looking at the source code, we saw that FaultTolerantChunkProcessor indeed don collect this metrics.

SimpleChunkProcessor.java

	protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
		Timer.Sample sample = BatchMetrics.createTimerSample();
		String status = BatchMetrics.STATUS_SUCCESS;
		try {
			doWrite(outputs.getItems());
		}
		catch (Exception e) {
			/*
			 * For a simple chunk processor (no fault tolerance) we are done
			 * here, so prevent any more processing of these inputs.
			 */
			inputs.clear();
			status = BatchMetrics.STATUS_FAILURE;
			throw e;
		}
		finally {
			stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
		}
		contribution.incrementWriteCount(outputs.size());
	}

FaultTolerantChunkProcessor.java

	protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs)
			throws Exception {
		@SuppressWarnings("unchecked")
		final UserData<O> data = (UserData<O>) inputs.getUserData();
		final AtomicReference<RetryContext> contextHolder = new AtomicReference<>();

		RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
			@Override
			public Object doWithRetry(RetryContext context) throws Exception {
				contextHolder.set(context);

				if (!data.scanning()) {
					chunkMonitor.setChunkSize(inputs.size());
					try {
						doWrite(outputs.getItems());
					}
					catch (Exception e) {
						if (rollbackClassifier.classify(e)) {
							throw e;
						}
						/*
						 * If the exception is marked as no-rollback, we need to
						 * override that, otherwise there's no way to write the
						 * rest of the chunk or to honour the skip listener
						 * contract.
						 */
						throw new ForceRollbackForWriteSkipException(
								"Force rollback on skippable exception so that skipped item can be located.", e);
					}
					contribution.incrementWriteCount(outputs.size());
				}
				else {
					scan(contribution, inputs, outputs, chunkMonitor, false);
				}
				return null;

			}
		};

		if (!buffering) {

			RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {

				@Override
				public Object recover(RetryContext context) throws Exception {

					Throwable e = context.getLastThrowable();
					if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
						throw new RetryException("Invalid retry state during write caused by "
								+ "exception that does not classify for rollback: ", e);
					}

					Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
					for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {

						inputIterator.next();
						outputIterator.next();

						checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
						if (!rollbackClassifier.classify(e)) {
							throw new RetryException(
									"Invalid retry state during recovery caused by exception that does not classify for rollback: ",
									e);
						}

					}

					return null;

				}

			};

			batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
					BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));

		}
		else {

			RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {

				@Override
				public Object recover(RetryContext context) throws Exception {
					/*
					 * If the last exception was not skippable we don't need to
					 * do any scanning. We can just bomb out with a retry
					 * exhausted.
					 */
					if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
						throw new ExhaustedRetryException(
								"Retry exhausted after last attempt in recovery path, but exception is not skippable.",
								context.getLastThrowable());
					}

					inputs.setBusy(true);
					data.scanning(true);
					scan(contribution, inputs, outputs, chunkMonitor, true);
					return null;
				}

			};

			if (logger.isDebugEnabled()) {
				logger.debug("Attempting to write: " + inputs);
			}
			try {
				batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
						rollbackClassifier));
			}
			catch (Exception e) {
				RetryContext context = contextHolder.get();
				if (!batchRetryTemplate.canRetry(context)) {
					/*
					 * BATCH-1761: we need advance warning of the scan about to
					 * start in the next transaction, so we can change the
					 * processing behaviour.
					 */
					data.scanning(true);
				}
				throw e;
			}

		}

		callSkipListeners(inputs, outputs);

	}

Metadata

Metadata

Assignees

No one assigned

    Labels

    has: backportsLegacy label from JIRA. Superseded by "for: backport-to-x.x.x"in: coretype: bug

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions