Skip to content

Commit 47046e6

Browse files
committed
Implement ack and nack methods, add javadoc and tests (googleapis#1027)
1 parent 07a45e4 commit 47046e6

File tree

3 files changed

+276
-14
lines changed

3 files changed

+276
-14
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -387,25 +387,107 @@ interface MessageConsumer extends AutoCloseable {
387387

388388
MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);
389389

390+
/**
391+
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
392+
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
393+
* {@link #pullAsync(String, int)}.
394+
*
395+
* @param subscription the subscription whose messages must be acknowledged
396+
* @param ackId the ack id of the first message to acknowledge
397+
* @param ackIds other ack ids of messages to acknowledge
398+
* @throws PubSubException upon failure, or if the subscription was not found
399+
*/
390400
void ack(String subscription, String ackId, String... ackIds);
391401

402+
/**
403+
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
404+
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
405+
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
406+
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
407+
*
408+
* @param subscription the subscription whose messages must be acknowledged
409+
* @param ackId the ack id of the first message to acknowledge
410+
* @param ackIds other ack ids of messages to acknowledge
411+
*/
392412
Future<Void> ackAsync(String subscription, String ackId, String... ackIds);
393413

414+
/**
415+
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
416+
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
417+
* {@link #pullAsync(String, int)}.
418+
*
419+
* @param subscription the subscription whose messages must be acknowledged
420+
* @param ackIds the ack ids of messages to acknowledge
421+
* @throws PubSubException upon failure, or if the subscription was not found
422+
*/
394423
void ack(String subscription, Iterable<String> ackIds);
395424

425+
/**
426+
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
427+
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
428+
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
429+
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
430+
*
431+
* @param subscription the subscription whose messages must be acknowledged
432+
* @param ackIds the ack ids of messages to acknowledge
433+
*/
396434
Future<Void> ackAsync(String subscription, Iterable<String> ackIds);
397435

436+
/**
437+
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
438+
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
439+
* {@link #pullAsync(String, int)}. This method corresponds to calling
440+
* {@link #modifyAckDeadline(String, int, TimeUnit, String, String...)} with a deadline of 0.
441+
*
442+
* @param subscription the subscription whose messages must be "nacked"
443+
* @param ackId the ack id of the first message to "nack"
444+
* @param ackIds other ack ids of messages to "nack"
445+
* @throws PubSubException upon failure, or if the subscription was not found
446+
*/
398447
void nack(String subscription, String ackId, String... ackIds);
399448

449+
/**
450+
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
451+
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
452+
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
453+
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a
454+
* deadline of 0. The method returns a {@code Future} object that can be used to wait for the
455+
* "nack" operation to be completed.
456+
*
457+
* @param subscription the subscription whose messages must be "nacked"
458+
* @param ackId the ack id of the first message to "nack"
459+
* @param ackIds other ack ids of messages to "nack"
460+
*/
400461
Future<Void> nackAsync(String subscription, String ackId, String... ackIds);
401462

463+
/**
464+
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
465+
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
466+
* {@link #pullAsync(String, int)}. This method corresponds to calling
467+
* {@link #modifyAckDeadline(String, int, TimeUnit, Iterable)} with a deadline of 0.
468+
*
469+
* @param subscription the subscription whose messages must be "nacked"
470+
* @param ackIds the ack ids of messages to "nack"
471+
* @throws PubSubException upon failure, or if the subscription was not found
472+
*/
402473
void nack(String subscription, Iterable<String> ackIds);
403474

475+
/**
476+
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
477+
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
478+
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
479+
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a deadline of 0.
480+
* The method returns a {@code Future} object that can be used to wait for the "nack" operation to
481+
* be completed.
482+
*
483+
* @param subscription the subscription whose messages must be "nacked"
484+
* @param ackIds the ack ids of messages to "nack"
485+
*/
404486
Future<Void> nackAsync(String subscription, Iterable<String> ackIds);
405487

406488
/**
407-
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
408-
* the new deadline with respect to the time the modify request was received by the Pub/Sub
489+
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be &gt;= 0 and
490+
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
409491
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
410492
* the new ack deadline will expire 10 seconds after the modify request was received by the
411493
* service. Specifying 0 may be used to make the message available for another pull request
@@ -425,8 +507,8 @@ void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String
425507

426508
/**
427509
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
428-
* must be >= 0 and is the new deadline with respect to the time the modify request was received
429-
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
510+
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
511+
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
430512
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
431513
* was received by the service. Specifying 0 may be used to make the message available for another
432514
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
@@ -444,8 +526,8 @@ Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit
444526
String ackId, String... ackIds);
445527

446528
/**
447-
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
448-
* the new deadline with respect to the time the modify request was received by the Pub/Sub
529+
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be &gt;= 0 and
530+
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
449531
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
450532
* the new ack deadline will expire 10 seconds after the modify request was received by the
451533
* service. Specifying 0 may be used to make the message available for another pull request
@@ -462,8 +544,8 @@ Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit
462544

463545
/**
464546
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
465-
* must be >= 0 and is the new deadline with respect to the time the modify request was received
466-
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
547+
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
548+
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
467549
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
468550
* was received by the service. Specifying 0 may be used to make the message available for another
469551
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.common.collect.Maps;
3838
import com.google.common.util.concurrent.Uninterruptibles;
3939
import com.google.protobuf.Empty;
40+
import com.google.pubsub.v1.AcknowledgeRequest;
4041
import com.google.pubsub.v1.DeleteSubscriptionRequest;
4142
import com.google.pubsub.v1.DeleteTopicRequest;
4243
import com.google.pubsub.v1.GetSubscriptionRequest;
@@ -466,40 +467,46 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
466467

467468
@Override
468469
public void ack(String subscription, String ackId, String... ackIds) {
470+
ack(subscription, Lists.asList(ackId, ackIds));
469471
}
470472

471473
@Override
472474
public Future<Void> ackAsync(String subscription, String ackId, String... ackIds) {
473-
return null;
475+
return ackAsync(subscription, Lists.asList(ackId, ackIds));
474476
}
475477

476478
@Override
477479
public void ack(String subscription, Iterable<String> ackIds) {
478-
480+
get(ackAsync(subscription, ackIds));
479481
}
480482

481483
@Override
482484
public Future<Void> ackAsync(String subscription, Iterable<String> ackIds) {
483-
return null;
485+
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
486+
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
487+
.addAllAckIds(ackIds)
488+
.build();
489+
return lazyTransform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION);
484490
}
485491

486492
@Override
487493
public void nack(String subscription, String ackId, String... ackIds) {
494+
nack(subscription, Lists.asList(ackId, ackIds));
488495
}
489496

490497
@Override
491498
public Future<Void> nackAsync(String subscription, String ackId, String... ackIds) {
492-
return null;
499+
return nackAsync(subscription, Lists.asList(ackId, ackIds));
493500
}
494501

495502
@Override
496503
public void nack(String subscription, Iterable<String> ackIds) {
497-
504+
get(nackAsync(subscription, ackIds));
498505
}
499506

500507
@Override
501508
public Future<Void> nackAsync(String subscription, Iterable<String> ackIds) {
502-
return null;
509+
return modifyAckDeadlineAsync(subscription, 0, TimeUnit.SECONDS, ackIds);
503510
}
504511

505512
@Override

gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.common.collect.Lists;
3838
import com.google.common.util.concurrent.Futures;
3939
import com.google.protobuf.Empty;
40+
import com.google.pubsub.v1.AcknowledgeRequest;
4041
import com.google.pubsub.v1.DeleteSubscriptionRequest;
4142
import com.google.pubsub.v1.DeleteTopicRequest;
4243
import com.google.pubsub.v1.GetSubscriptionRequest;
@@ -1189,6 +1190,178 @@ public void testListTopicSubscriptionsAsyncWithOptions()
11891190
Iterables.toArray(page.values(), SubscriptionId.class));
11901191
}
11911192

1193+
@Test
1194+
public void testAckOneMessage() {
1195+
pubsub = options.service();
1196+
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
1197+
.setSubscription(SUBSCRIPTION_NAME_PB)
1198+
.addAckIds("ackId")
1199+
.build();
1200+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1201+
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
1202+
EasyMock.replay(pubsubRpcMock);
1203+
pubsub.ack(SUBSCRIPTION, "ackId");
1204+
}
1205+
1206+
@Test
1207+
public void testAckOneMessageAsync() throws ExecutionException, InterruptedException {
1208+
pubsub = options.service();
1209+
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
1210+
.setSubscription(SUBSCRIPTION_NAME_PB)
1211+
.addAckIds("ackId")
1212+
.build();
1213+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1214+
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
1215+
EasyMock.replay(pubsubRpcMock);
1216+
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, "ackId");
1217+
assertNull(future.get());
1218+
}
1219+
1220+
@Test
1221+
public void testAckMoreMessages() {
1222+
pubsub = options.service();
1223+
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
1224+
.setSubscription(SUBSCRIPTION_NAME_PB)
1225+
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
1226+
.build();
1227+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1228+
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
1229+
EasyMock.replay(pubsubRpcMock);
1230+
pubsub.ack(SUBSCRIPTION, "ackId1", "ackId2");
1231+
}
1232+
1233+
@Test
1234+
public void testAckMoreMessagesAsync() throws ExecutionException, InterruptedException {
1235+
pubsub = options.service();
1236+
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
1237+
.setSubscription(SUBSCRIPTION_NAME_PB)
1238+
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
1239+
.build();
1240+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1241+
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
1242+
EasyMock.replay(pubsubRpcMock);
1243+
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, "ackId1", "ackId2");
1244+
assertNull(future.get());
1245+
}
1246+
1247+
@Test
1248+
public void testAckMessageList() {
1249+
pubsub = options.service();
1250+
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
1251+
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
1252+
.setSubscription(SUBSCRIPTION_NAME_PB)
1253+
.addAllAckIds(ackIds)
1254+
.build();
1255+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1256+
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
1257+
EasyMock.replay(pubsubRpcMock);
1258+
pubsub.ack(SUBSCRIPTION, ackIds);
1259+
}
1260+
1261+
@Test
1262+
public void testAckMessageListAsync() throws ExecutionException, InterruptedException {
1263+
pubsub = options.service();
1264+
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
1265+
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
1266+
.setSubscription(SUBSCRIPTION_NAME_PB)
1267+
.addAllAckIds(ackIds)
1268+
.build();
1269+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1270+
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
1271+
EasyMock.replay(pubsubRpcMock);
1272+
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, ackIds);
1273+
assertNull(future.get());
1274+
}
1275+
1276+
@Test
1277+
public void testNackOneMessage() {
1278+
pubsub = options.service();
1279+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1280+
.setAckDeadlineSeconds(0)
1281+
.setSubscription(SUBSCRIPTION_NAME_PB)
1282+
.addAckIds("ackId")
1283+
.build();
1284+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1285+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1286+
EasyMock.replay(pubsubRpcMock);
1287+
pubsub.nack(SUBSCRIPTION, "ackId");
1288+
}
1289+
1290+
@Test
1291+
public void testNackOneMessageAsync() throws ExecutionException, InterruptedException {
1292+
pubsub = options.service();
1293+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1294+
.setAckDeadlineSeconds(0)
1295+
.setSubscription(SUBSCRIPTION_NAME_PB)
1296+
.addAckIds("ackId")
1297+
.build();
1298+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1299+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1300+
EasyMock.replay(pubsubRpcMock);
1301+
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, "ackId");
1302+
assertNull(future.get());
1303+
}
1304+
1305+
@Test
1306+
public void testNackMoreMessages() {
1307+
pubsub = options.service();
1308+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1309+
.setAckDeadlineSeconds(0)
1310+
.setSubscription(SUBSCRIPTION_NAME_PB)
1311+
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
1312+
.build();
1313+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1314+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1315+
EasyMock.replay(pubsubRpcMock);
1316+
pubsub.nack(SUBSCRIPTION, "ackId1", "ackId2");
1317+
}
1318+
1319+
@Test
1320+
public void testNackMoreMessagesAsync() throws ExecutionException, InterruptedException {
1321+
pubsub = options.service();
1322+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1323+
.setAckDeadlineSeconds(0)
1324+
.setSubscription(SUBSCRIPTION_NAME_PB)
1325+
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
1326+
.build();
1327+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1328+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1329+
EasyMock.replay(pubsubRpcMock);
1330+
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, "ackId1", "ackId2");
1331+
assertNull(future.get());
1332+
}
1333+
1334+
@Test
1335+
public void testNackMessageList() {
1336+
pubsub = options.service();
1337+
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
1338+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1339+
.setAckDeadlineSeconds(0)
1340+
.setSubscription(SUBSCRIPTION_NAME_PB)
1341+
.addAllAckIds(ackIds)
1342+
.build();
1343+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1344+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1345+
EasyMock.replay(pubsubRpcMock);
1346+
pubsub.nack(SUBSCRIPTION, ackIds);
1347+
}
1348+
1349+
@Test
1350+
public void testNackMessageListAsync() throws ExecutionException, InterruptedException {
1351+
pubsub = options.service();
1352+
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
1353+
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
1354+
.setAckDeadlineSeconds(0)
1355+
.setSubscription(SUBSCRIPTION_NAME_PB)
1356+
.addAllAckIds(ackIds)
1357+
.build();
1358+
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
1359+
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
1360+
EasyMock.replay(pubsubRpcMock);
1361+
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, ackIds);
1362+
assertNull(future.get());
1363+
}
1364+
11921365
@Test
11931366
public void testModifyAckDeadlineOneMessage() {
11941367
pubsub = options.service();

0 commit comments

Comments
 (0)