Skip to content

[SPARK-52673][CONNECT][CLIENT] Add grpc RetryInfo handling to Spark Connect retry policies #51363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

khakhlyuk
Copy link
Contributor

@khakhlyuk khakhlyuk commented Jul 3, 2025

What changes were proposed in this pull request?

Spark Connect Client has a set of retry policies that specify which errors coming from the Server can be retried.
This change adds the capability for the Spark Connect Client to use server-provided retry information according to the grpc standards: https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto#L91
The server can include RetryInfo gRPC message containing retry_delay field in its error response. The Client will now use RetryInfo message to classify the error as retriable and will use retry_delay to calculate the next time to wait. This behavior is in line with the gRPC standard for client-server communication.
The change is needed for two reasons:

  1. If the Server is under heavy load or a task takes more time, it can tell the client to wait longer using the retry_delay field.
  2. If the Server needs to introduce a new retryable error, it can simply include RetryInfo in the error message. The error message will be retried automatically by the client. No changes to the client-side retry policies are needed to retry the new error.

Changes in detail

  • Adding new recognize_server_retry_delay and max_server_retry_delay options for RetryPolicy classes in Python and Scala clients.
  • All policies with recognize_server_retry_delay=True will take RetryInfo.retry_delay into account when calculating the next backoff.
  • retry_delay can override client's max_backoff
  • retry_delay is limited by max_server_retry_delay (10 minutes by default).
  • When the server stops sending high retry_delays, the client goes back to using its own backoff policy limited by max_backoff.
  • DefaultPolicy has recognize_server_retry_delay=True and will use retry_delay in the backoff calculation.
  • Additionally, DefaultPolicy will classify all errors with RetryInfo as retryable.
  • If an error message can be retried by several policies, only retry it with the first one (highest prio) and then stop. This change is needed because DefaultPolicy now retries all errors with RetryInfo. If we keep the existing behaviour, an error that is both has the RetryInfo and is matched by a different CustomPolicy, would be retried both by the DefaultPolicy and by the CustomPolicy. This can lead to excessively long retry periods and complicates the planning of total retry times.
  • Moving retry policy related tests from test_client.py to a new test_client_retries.py file. Same for scala.
  • Extending docstrings.

Why are the changes needed?

See above

Does this PR introduce any user-facing change?

  1. The clients retry all errors with RetryInfo grpc message using the DefaultPolicy.
  2. The error is only retried by the first policy that matches it.

How was this patch tested?

Old and new unit tests.

Was this patch authored or co-authored using generative AI tooling?

No

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant