4444import static com .google .cloud .bigtable .hbase .BigtableOptionsFactory .BIGTABLE_USE_BATCH ;
4545import static com .google .cloud .bigtable .hbase .BigtableOptionsFactory .BIGTABLE_USE_CACHED_DATA_CHANNEL_POOL ;
4646import static com .google .cloud .bigtable .hbase .BigtableOptionsFactory .BIGTABLE_USE_PLAINTEXT_NEGOTIATION ;
47- import static com .google .cloud .bigtable .hbase .BigtableOptionsFactory .BIGTABLE_USE_TIMEOUTS_KEY ;
4847import static com .google .cloud .bigtable .hbase .BigtableOptionsFactory .CUSTOM_USER_AGENT_KEY ;
4948import static com .google .cloud .bigtable .hbase .BigtableOptionsFactory .ENABLE_GRPC_RETRIES_KEY ;
5049import static com .google .cloud .bigtable .hbase .BigtableOptionsFactory .ENABLE_GRPC_RETRY_DEADLINEEXCEEDED_KEY ;
9190import com .google .common .base .Strings ;
9291import com .google .common .collect .ImmutableMap ;
9392import com .google .common .collect .Lists ;
94- import io .grpc .CallOptions ;
95- import io .grpc .CallOptions .Key ;
96- import io .grpc .Channel ;
97- import io .grpc .ClientCall ;
98- import io .grpc .ClientInterceptor ;
99- import io .grpc .Deadline ;
10093import io .grpc .ManagedChannelBuilder ;
101- import io .grpc .MethodDescriptor ;
10294import java .io .ByteArrayInputStream ;
10395import java .io .FileInputStream ;
10496import java .io .IOException ;
111103import java .util .List ;
112104import java .util .Objects ;
113105import java .util .Set ;
114- import java .util .concurrent .TimeUnit ;
115106import org .apache .hadoop .conf .Configuration ;
116107import org .apache .hadoop .hbase .util .VersionInfo ;
117108import org .threeten .bp .Duration ;
118109
119110/** For internal use only - public for technical reasons. */
120111@ InternalApi ("For internal usage only" )
121112public class BigtableHBaseVeneerSettings extends BigtableHBaseSettings {
113+
122114 private static final String BIGTABLE_BATCH_DATA_HOST_DEFAULT = "batch-bigtable.googleapis.com" ;
123115
116+ private static final ClientOperationTimeouts DEFAULT_TIMEOUTS =
117+ new ClientOperationTimeouts (
118+ /* unaryTimeouts= */ new OperationTimeouts (
119+ Optional .absent (),
120+ Optional .of (Duration .ofSeconds (20 )),
121+ Optional .of (Duration .ofMinutes (5 ))),
122+ // Note: scanTimeouts are currently also used for bulk reads as well
123+ // TODO: use a separate settings for bulk reads
124+ /* scanTimeouts= */ new OperationTimeouts (
125+ Optional .of (Duration .ofMinutes (5 )),
126+ Optional .of (Duration .ofMinutes (10 )),
127+ Optional .absent ()),
128+ /* bulkMutateTimeouts = */ new OperationTimeouts (
129+ Optional .absent (),
130+ Optional .of (Duration .ofMinutes (1 )),
131+ Optional .of (Duration .ofMinutes (10 ))));
132+ private static final int MAX_CONSECUTIVE_SCAN_ATTEMPTS = 10 ;
133+
124134 private final Configuration configuration ;
125135 private final BigtableDataSettings dataSettings ;
126136 private final BigtableTableAdminSettings tableAdminSettings ;
@@ -278,16 +288,10 @@ private BigtableDataSettings buildBigtableDataSettings(ClientOperationTimeouts c
278288 // Configure metrics
279289 configureMetricsBridge (dataBuilder );
280290
281- // Configure RPCs - this happens in multiple parts:
282- // - retry settings are configured here only here
283- // - timeouts are split into multiple places:
284- // - timeouts for retries are configured here
285- // - if USE_TIMEOUTS is explicitly disabled, then an interceptor is added to force all
286- // deadlines to 6 minutes
287- // - DataClientVeneerApi can set a flag to affect behavior of the interceptor
288-
289- configureConnectionCallTimeouts (dataBuilder .stubSettings (), clientTimeouts );
290-
291+ // Configure RPCs - this happens in two parts:
292+ // - most of the timeouts are defined here
293+ // - attempt timeouts for readRows is set in DataClientVeneerApi to workaround lack of attempt
294+ // timeouts for streaming RPCs
291295 // Complex RPC method settings
292296 configureBulkMutationSettings (
293297 dataBuilder .stubSettings ().bulkMutateRowsSettings (),
@@ -542,31 +546,6 @@ private Credentials buildCredentialFromPrivateKey(
542546 }
543547 }
544548
545- private void configureConnectionCallTimeouts (
546- StubSettings .Builder <?, ?> stubSettings , ClientOperationTimeouts clientTimeouts ) {
547- // Only patch settings when timeouts are disabled
548- if (clientTimeouts .getUseTimeouts ()) {
549- return ;
550- }
551- InstantiatingGrpcChannelProvider .Builder channelProvider =
552- ((InstantiatingGrpcChannelProvider ) stubSettings .getTransportChannelProvider ()).toBuilder ();
553-
554- final ApiFunction <ManagedChannelBuilder , ManagedChannelBuilder > prevConfigurator =
555- channelProvider .getChannelConfigurator ();
556-
557- channelProvider .setChannelConfigurator (
558- new ApiFunction <ManagedChannelBuilder , ManagedChannelBuilder >() {
559- @ Override
560- public ManagedChannelBuilder apply (ManagedChannelBuilder managedChannelBuilder ) {
561- if (prevConfigurator != null ) {
562- managedChannelBuilder = prevConfigurator .apply (managedChannelBuilder );
563- }
564- return managedChannelBuilder .intercept (new NoTimeoutsInterceptor ());
565- }
566- });
567- stubSettings .setTransportChannelProvider (channelProvider .build ());
568- }
569-
570549 private void configureBulkMutationSettings (
571550 BigtableBatchingCallSettings .Builder builder , OperationTimeouts operationTimeouts ) {
572551 BatchingSettings .Builder batchingSettingsBuilder = builder .getBatchingSettings ().toBuilder ();
@@ -666,7 +645,7 @@ private void configureReadRowsSettings(
666645 // exist, instead we use READ_PARTIAL_ROW_TIMEOUT_MS as the intra-row timeout
667646 if (!configuration .getBoolean (ENABLE_GRPC_RETRIES_KEY , true )) {
668647 // user explicitly disabled retries, treat it as a non-idempotent method
669- readRowsSettings .setRetryableCodes (Collections .< StatusCode . Code > emptySet ());
648+ readRowsSettings .setRetryableCodes (Collections .emptySet ());
670649 } else {
671650 // apply user user retry settings
672651 readRowsSettings .setRetryableCodes (
@@ -688,10 +667,10 @@ private void configureReadRowsSettings(
688667 }
689668 }
690669
691- String maxAttemptsStr = configuration . get ( MAX_SCAN_TIMEOUT_RETRIES );
692- if (! Strings . isNullOrEmpty ( maxAttemptsStr )) {
693- readRowsSettings . retrySettings (). setMaxAttempts (Integer . parseInt ( maxAttemptsStr ));
694- }
670+ readRowsSettings
671+ . retrySettings ()
672+ . setMaxAttempts (
673+ configuration . getInt ( MAX_SCAN_TIMEOUT_RETRIES , MAX_CONSECUTIVE_SCAN_ATTEMPTS ));
695674 }
696675
697676 // Per response timeouts (note: gax maps rpcTimeouts to response timeouts for streaming rpcs)
@@ -759,74 +738,63 @@ private void configureRetryableCallSettings(
759738
760739 private void configureNonRetryableCallSettings (
761740 UnaryCallSettings .Builder <?, ?> unaryCallSettings , OperationTimeouts operationTimeouts ) {
762- unaryCallSettings .setRetryableCodes (Collections .< StatusCode . Code > emptySet ());
741+ unaryCallSettings .setRetryableCodes (Collections .emptySet ());
763742
764- // NOTE: attempt timeouts are not configured for non-retriable rpcs
765743 if (operationTimeouts .getOperationTimeout ().isPresent ()) {
766744 unaryCallSettings
767745 .retrySettings ()
768746 .setLogicalTimeout (operationTimeouts .getOperationTimeout ().get ());
769747 }
770748 }
771749
750+ /**
751+ * Build a source of truth for timeout related settings.
752+ *
753+ * <p>This will combine defaults with end user provided overrides to configure response, attempt
754+ * and operation timeouts. There 3 types of operations that are configured here:
755+ *
756+ * <ul>
757+ * <li>Unary - ReadRow, MutateRow, CheckAndMutate, ReadModifyWrite
758+ * <li>Scans - ReadRows (and BulkReadRow)
759+ * <li>Bulk Mutations
760+ * </ul>
761+ */
772762 private ClientOperationTimeouts buildCallSettings () {
773- boolean useTimeouts = configuration .getBoolean (BIGTABLE_USE_TIMEOUTS_KEY , true );
774-
775- Optional <Duration > DEFAULT_BATCH_BULK_MUTATE_OVERALL_TIMEOUT =
776- Optional .of (Duration .ofMinutes (20 ));
777- // Set 20 minutes default timeout for batch jobs.
778- // The default override the deprecated BIGTABLE_LONG_RPC_TIMEOUT_MS_KEY.
779- Optional <Duration > bulkMutateOverallTimeout =
780- configuration .getBoolean (BIGTABLE_USE_BATCH , false )
781- ? extractOverallTimeout (BIGTABLE_MUTATE_RPC_TIMEOUT_MS_KEY )
782- .or (DEFAULT_BATCH_BULK_MUTATE_OVERALL_TIMEOUT )
783- : extractOverallTimeout (
784- BIGTABLE_MUTATE_RPC_TIMEOUT_MS_KEY ,
785- BigtableOptionsFactory .BIGTABLE_LONG_RPC_TIMEOUT_MS_KEY );
763+ Optional <Duration > defaultBatchMutateOverallTimeout =
764+ DEFAULT_TIMEOUTS .bulkMutateTimeouts .operationTimeout ;
765+ if (configuration .getBoolean (BIGTABLE_USE_BATCH , false )) {
766+ defaultBatchMutateOverallTimeout = Optional .of (Duration .ofMinutes (20 ));
767+ }
786768 OperationTimeouts bulkMutateTimeouts =
787769 new OperationTimeouts (
788- Optional .<Duration >absent (),
789- extractUnaryAttemptTimeout (BIGTABLE_MUTATE_RPC_ATTEMPT_TIMEOUT_MS_KEY ),
790- bulkMutateOverallTimeout );
770+ DEFAULT_TIMEOUTS .bulkMutateTimeouts .responseTimeout ,
771+ extractDuration (BIGTABLE_MUTATE_RPC_ATTEMPT_TIMEOUT_MS_KEY )
772+ .or (DEFAULT_TIMEOUTS .bulkMutateTimeouts .attemptTimeout ),
773+ extractDuration (
774+ BIGTABLE_MUTATE_RPC_TIMEOUT_MS_KEY ,
775+ BigtableOptionsFactory .BIGTABLE_LONG_RPC_TIMEOUT_MS_KEY )
776+ .or (defaultBatchMutateOverallTimeout ));
791777
792778 OperationTimeouts scanTimeouts =
793779 new OperationTimeouts (
794- extractDuration (READ_PARTIAL_ROW_TIMEOUT_MS ),
795- extractScanAttemptTimeout (),
796- extractOverallTimeout (
797- BIGTABLE_READ_RPC_TIMEOUT_MS_KEY ,
798- BigtableOptionsFactory .BIGTABLE_LONG_RPC_TIMEOUT_MS_KEY ));
780+ extractDuration (READ_PARTIAL_ROW_TIMEOUT_MS )
781+ .or (DEFAULT_TIMEOUTS .scanTimeouts .responseTimeout ),
782+ extractDuration (BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY )
783+ .or (DEFAULT_TIMEOUTS .scanTimeouts .attemptTimeout ),
784+ extractDuration (
785+ BIGTABLE_READ_RPC_TIMEOUT_MS_KEY ,
786+ BigtableOptionsFactory .BIGTABLE_LONG_RPC_TIMEOUT_MS_KEY )
787+ .or (DEFAULT_TIMEOUTS .scanTimeouts .operationTimeout ));
799788
800789 OperationTimeouts unaryTimeouts =
801790 new OperationTimeouts (
802- Optional .<Duration >absent (),
803- extractDuration (BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS_KEY ),
804- extractDuration (BIGTABLE_RPC_TIMEOUT_MS_KEY , MAX_ELAPSED_BACKOFF_MILLIS_KEY ));
791+ DEFAULT_TIMEOUTS .unaryTimeouts .responseTimeout ,
792+ extractDuration (BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS_KEY )
793+ .or (DEFAULT_TIMEOUTS .unaryTimeouts .attemptTimeout ),
794+ extractDuration (BIGTABLE_RPC_TIMEOUT_MS_KEY , MAX_ELAPSED_BACKOFF_MILLIS_KEY )
795+ .or (DEFAULT_TIMEOUTS .unaryTimeouts .operationTimeout ));
805796
806- return new ClientOperationTimeouts (
807- useTimeouts , unaryTimeouts , scanTimeouts , bulkMutateTimeouts );
808- }
809-
810- private Optional <Duration > extractUnaryAttemptTimeout (String ... keys ) {
811- if (!configuration .getBoolean (BIGTABLE_USE_TIMEOUTS_KEY , false )) {
812- return Optional .of (Duration .ofMinutes (6 ));
813- }
814- return extractDuration (keys );
815- }
816-
817- private Optional <Duration > extractScanAttemptTimeout () {
818- if (!configuration .getBoolean (BIGTABLE_USE_TIMEOUTS_KEY , false )) {
819- return Optional .absent ();
820- }
821- return extractDuration (BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY );
822- }
823-
824- private Optional <Duration > extractOverallTimeout (String ... keys ) {
825- if (configuration .getBoolean (BIGTABLE_USE_TIMEOUTS_KEY , true )) {
826- return extractDuration (keys );
827- } else {
828- return extractDuration (MAX_ELAPSED_BACKOFF_MILLIS_KEY );
829- }
797+ return new ClientOperationTimeouts (unaryTimeouts , scanTimeouts , bulkMutateTimeouts );
830798 }
831799
832800 private Optional <Duration > extractDuration (String ... keys ) {
@@ -867,49 +835,24 @@ private Set<StatusCode.Code> extractRetryCodesFromConfig(Set<StatusCode.Code> de
867835 return codes ;
868836 }
869837
870- public static class NoTimeoutsInterceptor implements ClientInterceptor {
871- public static final CallOptions .Key <Boolean > SKIP_DEFAULT_ATTEMPT_TIMEOUT =
872- Key .createWithDefault ("SKIP_DEFAULT_ATTEMPT_TIMEOUT" , false );
873-
874- @ Override
875- public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
876- MethodDescriptor <ReqT , RespT > methodDescriptor , CallOptions callOptions , Channel channel ) {
877-
878- if (!callOptions .getOption (SKIP_DEFAULT_ATTEMPT_TIMEOUT )) {
879- callOptions = callOptions .withDeadline (Deadline .after (6 , TimeUnit .MINUTES ));
880- } else {
881- callOptions = callOptions .withDeadline (null );
882- }
883-
884- return channel .newCall (methodDescriptor , callOptions );
885- }
886- }
887-
888838 static class ClientOperationTimeouts {
889839 static final ClientOperationTimeouts EMPTY =
890840 new ClientOperationTimeouts (
891- true , OperationTimeouts .EMPTY , OperationTimeouts .EMPTY , OperationTimeouts .EMPTY );
841+ OperationTimeouts .EMPTY , OperationTimeouts .EMPTY , OperationTimeouts .EMPTY );
892842
893- private final boolean useTimeouts ;
894843 private final OperationTimeouts unaryTimeouts ;
895844 private final OperationTimeouts scanTimeouts ;
896845 private final OperationTimeouts bulkMutateTimeouts ;
897846
898847 public ClientOperationTimeouts (
899- boolean useTimeouts ,
900848 OperationTimeouts unaryTimeouts ,
901849 OperationTimeouts scanTimeouts ,
902850 OperationTimeouts bulkMutateTimeouts ) {
903- this .useTimeouts = useTimeouts ;
904851 this .unaryTimeouts = unaryTimeouts ;
905852 this .scanTimeouts = scanTimeouts ;
906853 this .bulkMutateTimeouts = bulkMutateTimeouts ;
907854 }
908855
909- public boolean getUseTimeouts () {
910- return useTimeouts ;
911- }
912-
913856 public OperationTimeouts getUnaryTimeouts () {
914857 return unaryTimeouts ;
915858 }
0 commit comments