Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -43,7 +42,6 @@
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
Expand Down Expand Up @@ -151,7 +149,7 @@ protected final void updateNotPropagatedHeaders(String[] headers, boolean merge)

headerPatterns.addAll(Arrays.asList(headers));

this.notPropagatedHeaders = headerPatterns.toArray(new String[headerPatterns.size()]);
this.notPropagatedHeaders = headerPatterns.toArray(new String[0]);
}

boolean hasAsterisk = headerPatterns.contains("*");
Expand Down Expand Up @@ -388,17 +386,8 @@ else if (output instanceof AbstractIntegrationMessageBuilder) {
builder = this.getMessageBuilderFactory().withPayload(output);
}
if (!this.noHeadersPropagation && shouldCopyRequestHeaders()) {
if (this.selectiveHeaderPropagation) {
Map<String, Object> headersToCopy = new HashMap<>(requestHeaders);

headersToCopy.entrySet()
.removeIf(entry -> PatternMatchUtils.simpleMatch(this.notPropagatedHeaders, entry.getKey()));

builder.copyHeadersIfAbsent(headersToCopy);
}
else {
builder.copyHeadersIfAbsent(requestHeaders);
}
builder.filterAndCopyHeadersIfAbsent(requestHeaders,
this.selectiveHeaderPropagation ? this.notPropagatedHeaders : null);
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -29,6 +30,8 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;

/**
* @author Gary Russell
Expand All @@ -39,111 +42,54 @@
*/
public abstract class AbstractIntegrationMessageBuilder<T> {

public abstract T getPayload();

public abstract Map<String, Object> getHeaders();

/**
* Set the value for the given header name. If the provided value is <code>null</code>, the header will be removed.
*
* @param headerName The header name.
* @param headerValue The header value.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> setHeader(String headerName, @Nullable Object headerValue);

/**
* Set the value for the given header name only if the header name is not already associated with a value.
*
* @param headerName The header name.
* @param headerValue The header value.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);

/**
* Removes all headers provided via array of 'headerPatterns'. As the name suggests the array
* may contain simple matching patterns for header names. Supported pattern styles are:
* "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*
* @param headerPatterns The header patterns.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> removeHeaders(String... headerPatterns);

/**
* Remove the value for the given header name.
* @param headerName The header name.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> removeHeader(String headerName);

/**
* Copy the name-value pairs from the provided Map. This operation will overwrite any existing values. Use {
* {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values. Note that the 'id' and 'timestamp' header values
* will never be overwritten.
*
* @param headersToCopy The headers to copy.
* @return this.
*
* @see MessageHeaders#ID
* @see MessageHeaders#TIMESTAMP
*/
public abstract AbstractIntegrationMessageBuilder<T> copyHeaders(@Nullable Map<String, ?> headersToCopy);

/**
* Copy the name-value pairs from the provided Map. This operation will <em>not</em> overwrite any existing values.
*
* @param headersToCopy The headers to copy.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> copyHeadersIfAbsent(@Nullable Map<String, ?> headersToCopy);

public AbstractIntegrationMessageBuilder<T> setExpirationDate(Long expirationDate) {
return this.setHeader(IntegrationMessageHeaderAccessor.EXPIRATION_DATE, expirationDate);
return setHeader(IntegrationMessageHeaderAccessor.EXPIRATION_DATE, expirationDate);
}

public AbstractIntegrationMessageBuilder<T> setExpirationDate(Date expirationDate) {
if (expirationDate != null) {
return this.setHeader(IntegrationMessageHeaderAccessor.EXPIRATION_DATE, expirationDate.getTime());
return setHeader(IntegrationMessageHeaderAccessor.EXPIRATION_DATE, expirationDate.getTime());
}
else {
return this.setHeader(IntegrationMessageHeaderAccessor.EXPIRATION_DATE, null);
return setHeader(IntegrationMessageHeaderAccessor.EXPIRATION_DATE, null);
}
}

public AbstractIntegrationMessageBuilder<T> setCorrelationId(Object correlationId) {
return this.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, correlationId);
return setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, correlationId);
}

public AbstractIntegrationMessageBuilder<T> pushSequenceDetails(Object correlationId, int sequenceNumber,
int sequenceSize) {

Object incomingCorrelationId = this.getCorrelationId();
List<List<Object>> incomingSequenceDetails = this.getSequenceDetails();
List<List<Object>> incomingSequenceDetails = getSequenceDetails();
if (incomingCorrelationId != null) {
if (incomingSequenceDetails == null) {
incomingSequenceDetails = new ArrayList<List<Object>>();
incomingSequenceDetails = new ArrayList<>();
}
else {
incomingSequenceDetails = new ArrayList<List<Object>>(incomingSequenceDetails);
incomingSequenceDetails = new ArrayList<>(incomingSequenceDetails);
}
incomingSequenceDetails.add(Arrays.asList(incomingCorrelationId,
this.getSequenceNumber(), this.getSequenceSize()));
getSequenceNumber(), getSequenceSize()));
incomingSequenceDetails = Collections.unmodifiableList(incomingSequenceDetails);
}
if (incomingSequenceDetails != null) {
this.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS, incomingSequenceDetails);
setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS, incomingSequenceDetails);
}
return setCorrelationId(correlationId).setSequenceNumber(sequenceNumber).setSequenceSize(sequenceSize);
return setCorrelationId(correlationId)
.setSequenceNumber(sequenceNumber)
.setSequenceSize(sequenceSize);
}

public AbstractIntegrationMessageBuilder<T> popSequenceDetails() {
List<List<Object>> incomingSequenceDetails = this.getSequenceDetails();
List<List<Object>> incomingSequenceDetails = getSequenceDetails();
if (incomingSequenceDetails == null) {
return this;
}
else {
incomingSequenceDetails = new ArrayList<List<Object>>(incomingSequenceDetails);
incomingSequenceDetails = new ArrayList<>(incomingSequenceDetails);
}
List<Object> sequenceDetails = incomingSequenceDetails.remove(incomingSequenceDetails.size() - 1);
Assert.state(sequenceDetails.size() == 3, "Wrong sequence details (not created by MessageBuilder?): "
Expand All @@ -158,50 +104,127 @@ public AbstractIntegrationMessageBuilder<T> popSequenceDetails() {
setSequenceSize(sequenceSize);
}
if (!incomingSequenceDetails.isEmpty()) {
this.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS, incomingSequenceDetails);
setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS, incomingSequenceDetails);
}
else {
this.removeHeader(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS);
removeHeader(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS);
}
return this;
}

protected abstract List<List<Object>> getSequenceDetails();

protected abstract Object getCorrelationId();

protected abstract Object getSequenceNumber();

protected abstract Object getSequenceSize();

public AbstractIntegrationMessageBuilder<T> setReplyChannel(MessageChannel replyChannel) {
return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
return setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
}

public AbstractIntegrationMessageBuilder<T> setReplyChannelName(String replyChannelName) {
return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
return setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
}

public AbstractIntegrationMessageBuilder<T> setErrorChannel(MessageChannel errorChannel) {
return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
return setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
}

public AbstractIntegrationMessageBuilder<T> setErrorChannelName(String errorChannelName) {
return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName);
return setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName);
}

public AbstractIntegrationMessageBuilder<T> setSequenceNumber(Integer sequenceNumber) {
return this.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, sequenceNumber);
return setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, sequenceNumber);
}

public AbstractIntegrationMessageBuilder<T> setSequenceSize(Integer sequenceSize) {
return this.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, sequenceSize);
return setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, sequenceSize);
}

public AbstractIntegrationMessageBuilder<T> setPriority(Integer priority) {
return this.setHeader(IntegrationMessageHeaderAccessor.PRIORITY, priority);
return setHeader(IntegrationMessageHeaderAccessor.PRIORITY, priority);
}

/**
* Remove headers from the provided map matching to the provided pattens
* and only after that copy the result into the target message headers.
* @param headersToCopy a map of headers to copy.
* @param headerPatternsToFilter an arrays of header patterns to filter before copying.
* @return the current {@link AbstractIntegrationMessageBuilder}.
* @since 5.1
* @see #copyHeadersIfAbsent(Map)
*/
public AbstractIntegrationMessageBuilder<T> filterAndCopyHeadersIfAbsent(Map<String, ?> headersToCopy,
String... headerPatternsToFilter) {

Map<String, ?> headers = headersToCopy;

if (!ObjectUtils.isEmpty(headerPatternsToFilter)) {
headers = new HashMap<>(headersToCopy);
headers.entrySet()
.removeIf(entry -> PatternMatchUtils.simpleMatch(headerPatternsToFilter, entry.getKey()));
}

return copyHeadersIfAbsent(headers);
}

protected abstract List<List<Object>> getSequenceDetails();

protected abstract Object getCorrelationId();

protected abstract Object getSequenceNumber();

protected abstract Object getSequenceSize();

public abstract T getPayload();

public abstract Map<String, Object> getHeaders();

/**
* Set the value for the given header name. If the provided value is <code>null</code>, the header will be removed.
* @param headerName The header name.
* @param headerValue The header value.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> setHeader(String headerName, @Nullable Object headerValue);

/**
* Set the value for the given header name only if the header name is not already associated with a value.
* @param headerName The header name.
* @param headerValue The header value.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);

/**
* Removes all headers provided via array of 'headerPatterns'. As the name suggests the array
* may contain simple matching patterns for header names. Supported pattern styles are:
* "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
* @param headerPatterns The header patterns.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> removeHeaders(String... headerPatterns);

/**
* Remove the value for the given header name.
* @param headerName The header name.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> removeHeader(String headerName);

/**
* Copy the name-value pairs from the provided Map. This operation will overwrite any existing values. Use {
* {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values. Note that the 'id' and 'timestamp' header values
* will never be overwritten.
* @param headersToCopy The headers to copy.
* @return this.
* @see MessageHeaders#ID
* @see MessageHeaders#TIMESTAMP
*/
public abstract AbstractIntegrationMessageBuilder<T> copyHeaders(@Nullable Map<String, ?> headersToCopy);

/**
* Copy the name-value pairs from the provided Map. This operation will <em>not</em> overwrite any existing values.
* @param headersToCopy The headers to copy.
* @return this.
*/
public abstract AbstractIntegrationMessageBuilder<T> copyHeadersIfAbsent(@Nullable Map<String, ?> headersToCopy);

public abstract Message<T> build();

}
Loading