Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,25 +387,107 @@ interface MessageConsumer extends AutoCloseable {

MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);

/**
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}.
*
* @param subscription the subscription whose messages must be acknowledged

This comment was marked as spam.

This comment was marked as spam.

* @param ackId the ack id of the first message to acknowledge
* @param ackIds other ack ids of messages to acknowledge
* @throws PubSubException upon failure, or if the subscription was not found
*/
void ack(String subscription, String ackId, String... ackIds);

/**
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
*
* @param subscription the subscription whose messages must be acknowledged
* @param ackId the ack id of the first message to acknowledge
* @param ackIds other ack ids of messages to acknowledge
*/
Future<Void> ackAsync(String subscription, String ackId, String... ackIds);

/**
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}.
*
* @param subscription the subscription whose messages must be acknowledged
* @param ackIds the ack ids of messages to acknowledge
* @throws PubSubException upon failure, or if the subscription was not found
*/
void ack(String subscription, Iterable<String> ackIds);

/**
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
*
* @param subscription the subscription whose messages must be acknowledged
* @param ackIds the ack ids of messages to acknowledge
*/
Future<Void> ackAsync(String subscription, Iterable<String> ackIds);

/**
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}. This method corresponds to calling
* {@link #modifyAckDeadline(String, int, TimeUnit, String, String...)} with a deadline of 0.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackId the ack id of the first message to "nack"
* @param ackIds other ack ids of messages to "nack"
* @throws PubSubException upon failure, or if the subscription was not found
*/
void nack(String subscription, String ackId, String... ackIds);

/**
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a
* deadline of 0. The method returns a {@code Future} object that can be used to wait for the
* "nack" operation to be completed.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackId the ack id of the first message to "nack"
* @param ackIds other ack ids of messages to "nack"
*/
Future<Void> nackAsync(String subscription, String ackId, String... ackIds);

/**
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}. This method corresponds to calling
* {@link #modifyAckDeadline(String, int, TimeUnit, Iterable)} with a deadline of 0.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackIds the ack ids of messages to "nack"
* @throws PubSubException upon failure, or if the subscription was not found
*/
void nack(String subscription, Iterable<String> ackIds);

/**
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a deadline of 0.
* The method returns a {@code Future} object that can be used to wait for the "nack" operation to
* be completed.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackIds the ack ids of messages to "nack"
*/
Future<Void> nackAsync(String subscription, Iterable<String> ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be &gt;= 0 and
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
Expand All @@ -425,8 +507,8 @@ void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
Expand All @@ -444,8 +526,8 @@ Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit
String ackId, String... ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be &gt;= 0 and
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
Expand All @@ -462,8 +544,8 @@ Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
import com.google.pubsub.v1.DeleteTopicRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
Expand Down Expand Up @@ -466,40 +467,46 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback,

@Override
public void ack(String subscription, String ackId, String... ackIds) {
ack(subscription, Lists.asList(ackId, ackIds));
}

@Override
public Future<Void> ackAsync(String subscription, String ackId, String... ackIds) {
return null;
return ackAsync(subscription, Lists.asList(ackId, ackIds));
}

@Override
public void ack(String subscription, Iterable<String> ackIds) {

get(ackAsync(subscription, ackIds));
}

@Override
public Future<Void> ackAsync(String subscription, Iterable<String> ackIds) {
return null;
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.addAllAckIds(ackIds)
.build();
return lazyTransform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION);
}

@Override
public void nack(String subscription, String ackId, String... ackIds) {
nack(subscription, Lists.asList(ackId, ackIds));
}

@Override
public Future<Void> nackAsync(String subscription, String ackId, String... ackIds) {
return null;
return nackAsync(subscription, Lists.asList(ackId, ackIds));
}

@Override
public void nack(String subscription, Iterable<String> ackIds) {

get(nackAsync(subscription, ackIds));
}

@Override
public Future<Void> nackAsync(String subscription, Iterable<String> ackIds) {
return null;
return modifyAckDeadlineAsync(subscription, 0, TimeUnit.SECONDS, ackIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
import com.google.pubsub.v1.DeleteTopicRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
Expand Down Expand Up @@ -1189,6 +1190,178 @@ public void testListTopicSubscriptionsAsyncWithOptions()
Iterables.toArray(page.values(), SubscriptionId.class));
}

@Test
public void testAckOneMessage() {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.ack(SUBSCRIPTION, "ackId");
}

@Test
public void testAckOneMessageAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, "ackId");
assertNull(future.get());
}

@Test
public void testAckMoreMessages() {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.ack(SUBSCRIPTION, "ackId1", "ackId2");
}

@Test
public void testAckMoreMessagesAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, "ackId1", "ackId2");
assertNull(future.get());
}

@Test
public void testAckMessageList() {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.ack(SUBSCRIPTION, ackIds);
}

@Test
public void testAckMessageListAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, ackIds);
assertNull(future.get());
}

@Test
public void testNackOneMessage() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.nack(SUBSCRIPTION, "ackId");
}

@Test
public void testNackOneMessageAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, "ackId");
assertNull(future.get());
}

@Test
public void testNackMoreMessages() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.nack(SUBSCRIPTION, "ackId1", "ackId2");
}

@Test
public void testNackMoreMessagesAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, "ackId1", "ackId2");
assertNull(future.get());
}

@Test
public void testNackMessageList() {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.nack(SUBSCRIPTION, ackIds);
}

@Test
public void testNackMessageListAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, ackIds);
assertNull(future.get());
}

@Test
public void testModifyAckDeadlineOneMessage() {
pubsub = options.service();
Expand Down