Skip to content

ai/live: Support for multiple instances per orchestrator in discovery#3719

Merged
j0sh merged 6 commits intomasterfrom
ja/multiple-serviceuri
Sep 17, 2025
Merged

ai/live: Support for multiple instances per orchestrator in discovery#3719
j0sh merged 6 commits intomasterfrom
ja/multiple-serviceuri

Conversation

@j0sh
Copy link
Copy Markdown
Collaborator

@j0sh j0sh commented Aug 26, 2025

This will allow orchestrators to advertise nodes in a different location
and have the gateway pick up on those during the discovery phase. There
are quite a few advantages to this:

  • Built-in scalability for orchestrator operators. They can spin up
    new instances anywhere and have those processing jobs in moments.
    This also works for scaling down instances.

  • Better latency to orchestrators from gateways compared to alternative
    scaling strategies, eg remote workers. Gateways directly connect to
    each instance, rather than traffic being proxied through a central node.

  • Better overall resiliency when gateways are making direct connections
    rather than traffic being funneled through a central orchestrator
    instance. This avoids having a single instance as a point of failure and
    minimizes the blast radius when one does go down.

  • Less burden for Inc and anyone else who needs to manually maintain a
    list of orchestrator nodes. We just need one entry per on-chain orch and
    don't have to keep up with the instance provisioning activities of
    individual orchestrators.

  • The community can use this as a basis for future pool implementations
    that preserve the latency requirements for realtime AI while minimizing
    the overall infrastructure footprint of the pool coordinator.

Implementation Changes and Notes

  • Adds a repeated string nodes field to the OrchestratorInfo RPC
    message with the URL for each node that is being advertised.

    We can technically make the existing Transcoder field repeatable
    without breaking wire-format compatibility but this would still require
    sprawling code-level changes, so use a new field instead to minimize
    the diff and rollout risk.

  • Add a nodes CLI argument to go-livepeer for orchestrators to specify
    the hostname:port of the extra nodes they want to advertise, comma-
    delimited for multiple nodes. While we could maybe extend the existing
    -serviceAddr flag to achieve a similar goal, that makes it a little
    less intuitive to separate the current node from the ones it is
    advertising.

  • Add a extraNodes CLI argument to go-livepeer for gateways to limit
    the number of additional nodes they will to visit per orchestrator.
    Default of 0 to maintain backwards compatibility with existing setups
    that might not necessarily want to adopt this scheme right away, eg
    transcoder gateways.

  • During discovery, gateways will do a recursive GetOrchestratorInfo
    call to each advertised node, up to the gateway's extraNodes limit.
    Recursive calls must happen within existing discovery time-outs; no
    additional time is given for extra nodes.

  • This extraNodes limit is enforced per "top-level" orchestrator,
    eg those that are discovered on-chain, via the orchestrator webhook, or
    manually specified at the gateway by the -orchAddrs flag. The
    Ethereum address is not used to limit extra nodes.

  • Recursive calls are limited too a depth of one orchestrator, for now.
    Nodes cannot advertise another node that advertises yet another
    node. Top-level orchestrators only.

  • The -serviceAddr flag now supports a special "none" value to indicate
    the node will not be perforing any work and is only for advertising
    other nodes. This may make operations a little easier - only one node
    needs to be configured for the advertising - and can tolerate brief
    periods of downtime without interrupting in-progress jobs. In the RPC
    wire format, this is indicated by an empty Transcoder value. Gateways
    now check for this empty Transcoder value, and will not consider empty
    top-level nodes in selection, but will still check any advertised nodes.
    Orchestrators cannot set -serviceAddr none without also setting
    additional nodes. This is strictly an optional QoL knob for operators.

  • OrchestratorPool is already very configurable, and even more so with
    these changes, so there is a new OrchestratorPoolConfig struct to make
    later additions easier, rather than having to add a yet another
    parameter to a constructor (and to update that constructor everywhere).
    To minimize changes to existing code, the existing constructors are
    re-written in terms of this config struct. However, this particular
    corner of the codebase is quite creaky and showing its age; it's due for
    a refactor. Hopefully we can do this refactor gradually as we keep
    adding new features for AI selection.

  • Added a number of unit tests exercising these code paths. Some of
    these tests are also showing their age and quite brittle (mocks!)
    but coverage is decent overall and we should be comfortable rolling this
    out.

Roll-Out Plan

We can roll this out in stages.

  • Staging with no other config changes. This should be a no-op, but
    e2e and load tests on staging will help us confirm after a couple days.

  • Configure realtime AI gateways on staging with a sensible -extraNodes
    value; say 5.

  • Announce this change to (staging) orchestrators and explain how they can
    utilize the -nodes flag. This explanation should probably be in the
    form of permanent documentation somewhwere.

  • Optionally rig up a couple of Inc orchs to use this as well.

  • Deploy to prod

  • Configure prod realtime AI gateways with -extraNodes and announce.

  • Do not update transcoder gateways.

@github-actions github-actions bot added the go Pull requests that update Go code label Aug 26, 2025
@j0sh j0sh force-pushed the ja/multiple-serviceuri branch from e1268c9 to 3300b01 Compare September 10, 2025 06:57
This will allow orchestrators to advertise nodes in a different location
and have the gateway pick up on those during the discovery phase. There
are quite a few advantages to this:

* Built-in scalability for orchestrator operators. They can spin up
  new instances anywhere and have those processing jobs in moments.
  This also works for scaling down instances.

* Better latency to orchestrators from gateways compared to alternative
  scaling strategies, eg remote workers. Gateways directly connect to
  each instance, rather than traffic being proxied through a central node.

* Better overall resiliency when gateways are making direct connections
  rather than traffic being funneled through a central orchestrator
  instance. This avoids having a single instance as a point of failure and
  minimizes the blast radius when one does go down.

* Less burden for Inc and anyone else who needs to manually maintain a
  list of orchestrator nodes. We just need one entry per on-chain orch and
  don't have to keep up with the instance provisioning activities of
  individual orchestrators.

* The community can use this as a basis for future pool implementations
  that preserve the latency requirements for realtime AI while minimizing
  the overall infrastructure footprint of the pool coordinator.

/// Implementation Changes and Notes

* Adds a `repeated string nodes` field to the OrchestratorInfo RPC
  message with the URL for each node that is being advertised.

  We can technically make the existing `Transcoder` field repeatable
  without breaking wire-format compatibility but this would still require
  sprawling code-level changes, so use a new field instead to minimize
  the diff and rollout risk.

* Add a `nodes` CLI argument to go-livepeer for orchestrators to specify
  the hostname:port of the extra nodes they want to advertise, comma-
  delimited for multiple nodes. While we could maybe extend the existing
  `-serviceAddr` flag to achieve a similar goal, that makes it a little
  less intuitive to separate the current node from the ones it is
  advertising.

* Add a `extraNodes` CLI argument to go-livepeer for gateways to limit
  the number of additional nodes they will to visit per orchestrator.
  Default of 0 to maintain backwards compatibility with existing setups
  that might not necessarily want to adopt this scheme right away, eg
  transcoder gateways.

* During discovery, gateways will do a recursive GetOrchestratorInfo
  call to each advertised node, up to the gateway's `extraNodes` limit.
  Recursive calls must happen within existing discovery time-outs; no
  additional time is given for extra nodes.

* This `extraNodes` limit is enforced per "top-level" orchestrator,
  eg those that are discovered on-chain, via the orchestrator webhook, or
  manually specified at the gateway by the `-orchAddrs` flag. The
  Ethereum address is not used to limit extra nodes.

* Recursive calls are limited too a depth of one orchestrator, for now.
  Nodes cannot advertise another node that advertises yet another
  node. Top-level orchestrators only.

* The `-serviceAddr` flag now supports a special "none" value to indicate
  the node will not be perforing any work and is only for advertising
  other nodes. This may make operations a little easier - only one node
  needs to be configured for the advertising - and can tolerate brief
  periods of downtime without interrupting in-progress jobs. In the RPC
  wire format, this is indicated by an empty `Transcoder` value. Gateways
  now check for this empty `Transcoder` value, and will not consider empty
  top-level nodes in selection, but will still check any advertised nodes.
  Orchestrators cannot set `-serviceAddr none` without also setting
  additional nodes. This is strictly an optional QoL knob for operators.

* OrchestratorPool is already very configurable, and even more so with
  these changes, so there is a new OrchestratorPoolConfig struct to make
  later additions easier, rather than having to add a yet another
  parameter to a constructor (and to update that constructor everywhere).
  To minimize changes to existing code, the existing constructors are
  re-written in terms of this config struct. However, this particular
  corner of the codebase is quite creaky and showing its age; it's due for
  a refactor. Hopefully we can do this refactor gradually as we keep
  adding new features for AI selection.

* Added a number of unit tests exercising these code paths. Some of
  these tests are also showing their age and quite brittle (mocks!)
  but coverage is decent overall and we should be comfortable rolling this
  out.

/// Roll-Out Plan

We can roll this out in stages.

* Staging with no other config changes. This should be a no-op, but
  e2e and load tests on staging will help us confirm after a couple days.

* Configure realtime AI gateways on staging with a sensible `-extraNodes`
  value; say 5.

* Announce this change to (staging) orchestrators and explain how they can
  utilize the `-nodes` flag. This explanation should probably be in the
  form of permanent documentation somewhwere.

* Optionally rig up a couple of Inc orchs to use this as well.

* Deploy to prod

* Configure prod realtime AI gateways with `-extraNodes` and announce.

* Do not update transcoder gateways.
@j0sh j0sh force-pushed the ja/multiple-serviceuri branch from 3300b01 to 8378dcd Compare September 10, 2025 07:00
@j0sh j0sh marked this pull request as ready for review September 10, 2025 07:00
@codecov
Copy link
Copy Markdown

codecov bot commented Sep 10, 2025

Codecov Report

❌ Patch coverage is 47.73869% with 104 lines in your changes missing coverage. Please review.
✅ Project coverage is 31.95143%. Comparing base (25a8dbb) to head (e26bc5c).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
cmd/livepeer/starter/starter.go 0.00000% 48 Missing ⚠️
net/lp_rpc_grpc.pb.go 6.25000% 30 Missing ⚠️
discovery/discovery.go 86.00000% 10 Missing and 4 partials ⚠️
core/broadcaster.go 0.00000% 5 Missing ⚠️
core/orchestrator.go 0.00000% 5 Missing ⚠️
cmd/livepeer/starter/flags.go 0.00000% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@                 Coverage Diff                 @@
##              master       #3719         +/-   ##
===================================================
+ Coverage   31.65079%   31.95143%   +0.30064%     
===================================================
  Files            158         158                 
  Lines          47765       47519        -246     
===================================================
+ Hits           15118       15183         +65     
+ Misses         31745       31434        -311     
  Partials         902         902                 
Files with missing lines Coverage Δ
common/types.go 0.00000% <ø> (ø)
core/livepeernode.go 60.22099% <ø> (ø)
discovery/wh_discovery.go 60.57692% <100.00000%> (ø)
net/lp_rpc.pb.go 19.39163% <ø> (+3.93384%) ⬆️
server/rpc.go 68.19277% <100.00000%> (+2.25603%) ⬆️
cmd/livepeer/starter/flags.go 0.00000% <0.00000%> (ø)
core/broadcaster.go 0.00000% <0.00000%> (ø)
core/orchestrator.go 71.77700% <0.00000%> (-0.30244%) ⬇️
discovery/discovery.go 71.32075% <86.00000%> (+2.32075%) ⬆️
net/lp_rpc_grpc.pb.go 9.45946% <6.25000%> (-0.47843%) ⬇️
... and 1 more

... and 4 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9c13ef6...e26bc5c. Read the comment docs.

Files with missing lines Coverage Δ
common/types.go 0.00000% <ø> (ø)
core/livepeernode.go 60.22099% <ø> (ø)
discovery/wh_discovery.go 60.57692% <100.00000%> (ø)
net/lp_rpc.pb.go 19.39163% <ø> (+3.93384%) ⬆️
server/rpc.go 68.19277% <100.00000%> (+2.25603%) ⬆️
cmd/livepeer/starter/flags.go 0.00000% <0.00000%> (ø)
core/broadcaster.go 0.00000% <0.00000%> (ø)
core/orchestrator.go 71.77700% <0.00000%> (-0.30244%) ⬇️
discovery/discovery.go 71.32075% <86.00000%> (+2.32075%) ⬆️
net/lp_rpc_grpc.pb.go 9.45946% <6.25000%> (-0.47843%) ⬇️
... and 1 more

... and 4 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@leszko leszko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea. I think it's simple enough and solves the main headache of Orchestrators. Great work Josh!

Added some inline comments, but other than that, looks good.

// Broadcaster's Selection Algorithm
cfg.OrchAddr = fs.String("orchAddr", *cfg.OrchAddr, "Comma-separated list of orchestrators to connect to")
cfg.OrchWebhookURL = fs.String("orchWebhookUrl", *cfg.OrchWebhookURL, "Orchestrator discovery callback URL")
cfg.ExtraNodes = fs.Int("extraNodes", *cfg.ExtraNodes, "Number of extra nodes an orchestrator can advertise within the GetOrchestratorInfo response")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe maxNodes or maxExtraNodes (but then I'd rename nodes to extraNodes).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really struggled with naming this.

My first pass at this PR named it maxNodes too but that got a little confusing in practice since this does not include the initial set orchestrators in the pool ... these are literally extra nodes in addition to the initial set of orchestrators in the pool.

We might be able to sidestep this if maxNodes had a default of 1 but that means changing more of the current behavior (which does not depend on this value) and I tried to avoid doing that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

func getServiceURI(n *core.LivepeerNode, serviceAddr string) (*url.URL, error) {
// Passed in via CLI
if serviceAddr != "" {
if serviceAddr == "none" {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it actually set to none? Doesn't empty string mean that the node is not used for work?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user sets it in the -serviceAddr CLI flag. An empty string for that flag right now means go-livepeer tries to infer the address (see the rest of this function) and we don't want to change that for anyone who is depending on the behavior.

discoveryTimeout time.Duration
node core.LivepeerNode
extraNodes int
getOrchInfo func(context.Context, common.Broadcaster, *url.URL, server.GetOrchestratorInfoParams) (*net.OrchestratorInfo, error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this getOrchInfo function in the orchestratorPool? Is it only to help with unit testing?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, unit testing. Not having this caused some issues with race conditions while I was working on these.

It's much better this way than having to set (and reset) a global function pointer for each test, and it saves us a bunch of per-test boilerplate.

Eventually we should move the rest of the tests over to use this method but I didn't want to add even more unrelated changes to this PR.

for nbResp < numAvailableOrchs && len(ods) < numOrchestrators && !timedOut {
// nbResp < maxOrchNodes : responses expected, whether successful or not
// len(ods) < numOrchestrator: successful responses needed
for nbResp < maxOrchNodes && len(ods) < numOrchestrators && !timedOut {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to change len(ods) < numOrchestrators to len(ods) < maxOrchNodes. Otherwise, the discovery may not wait for all the responses.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to wait for all responses. Not everyone will be running extra instances so this would just make discovery time out, every time. If we don't care about that, we could just simplify this to for !timedOut {

As the comment says:

numOrchcestrators: The number of responses we need
maxOrchNodes: The maximum number of responses we should wait for, if we haven't yet received enough responses. It's unlikely we will ever hit this in practice, FWIW.

for nbResp < numAvailableOrchs && len(ods) < numOrchestrators && !timedOut {
// nbResp < maxOrchNodes : responses expected, whether successful or not
// len(ods) < numOrchestrator: successful responses needed
for nbResp < maxOrchNodes && len(ods) < numOrchestrators && !timedOut {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you may want to change line 308. Otherwise we may see in the logs something like:

Done fetching orch info numOrch=5 responses=30/5 timedOut=true

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved logging in 4e6e552

// haven't seen this one yet so lets continue
u, err := url.Parse(inst)
if err != nil {
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add some debug logging here, We would like to know if the Orchestrators return some malformatted URLs.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logging in 4e6e552 - I am a little nervous about printing the malformed URL due to injection.

Comment on lines +1722 to +1723
maxNodesCases := []int{-5, -1, 0, 1, 2, 5, 10}
numOrchestratorsCases := []int{-5, -1, 0, 1, 2, 5, 10}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This smells like this test could be rewritten to use the standard tests := [] struct and iterate over different test scenarios. Do you think the test readability could benefit from changing the test structure?

You could actually have a few good test scenarios instead of this loop with so many generated tests:


	for _, maxNodes := range maxNodesCases {
		for _, numOrchs := range numOrchestratorsCases {
       }
    }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already uses sub-tests. How would you simplify it while still covering all the different combinations? I found bugs in a number of these so it seems helpful to just have that exhaustive checking.

These tests are really quick to run anyway, I wouldn't worry about the number of generated cases. On my machine it takes around 70ms - a little more than 1ms per case.

josh@josh-gpu:~/go-livepeer/discovery$ go test -v -run ExtraNodes .
=== RUN   TestGetOrchestrators_Nodes_ExtraNodes
...
--- PASS: TestGetOrchestrators_Nodes_ExtraNodes (0.07s)

}
}

func TestGetOrchestrators_Nodes_AlternatingTimeouts(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider removing this test. I get what you try to test, but tests based on a number of time.Sleep() with miliseconds tend to be flaky. Plus, I think this test is very complex.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider keeping it, because it legitimately exercises new code paths: specifically, that we still respect the discovery timeout even when additional instances are slow to respond.

The sleep is not great but it's not likely to cause problems in this case, because we just want to make sure we pause for longer than the discovery timeout - which we have to set anyway. Many other tests in this file depend on hitting that discovery timeout, not just these.

Golang 1.25 has some helpers around testing time, they don't work with network stuff but I will try to see if that can be kludged into grpc here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the golang 1.25 synctest changes as a separate PR here: #3739


pool, err := NewOrchestratorPoolWithConfig(OrchestratorPoolConfig{
URIs: uris,
DiscoveryTimeout: 50 * time.Millisecond,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense ti increase the timeout here? Won't 50 * time.Milisecond become flaky at some point in our CI?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, if it does then we can increase it. I think this specific case will time out during the test anyway so we don't want to make it too high, since it'll just make the test unnecessarily long.

On my machine it still passes with a 1ms timeout x 1000 iterations, so hopefully 50ms would be plenty for CI. If not, I suspect we'll see a a bunch of other tests in this file blow up too, even outside of this PR.

josh@josh-gpu:~/go-livepeer/discovery$ git diff
diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go
index 2299c66c..e9aec056 100644
--- a/discovery/discovery_test.go
+++ b/discovery/discovery_test.go
@@ -1894,7 +1894,7 @@ func TestGetOrchestrators_Nodes_RecursiveDiscovery(t *testing.T) {
 
        pool, err := NewOrchestratorPoolWithConfig(OrchestratorPoolConfig{
                URIs:             uris,
-               DiscoveryTimeout: 50 * time.Millisecond,
+               DiscoveryTimeout: 1 * time.Millisecond,
                ExtraNodes:       10, // ensure limits don't truncate first-level discovery
        })
        require := require.New(t)
josh@josh-gpu:~/go-livepeer/discovery$ go test -run RecursiveDisco -count 1000
PASS
ok      github.com/livepeer/go-livepeer/discovery       1.314s

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synctest PR should mitigate any issues with flakiness: #3739

return nil
}

func TestGetOrchestrator_NoLiveVideoCapacity_WithAndWithoutServiceURI(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to your PR?

Copy link
Copy Markdown
Collaborator Author

@j0sh j0sh Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes see the changes here

Mainly - when serviceAddr is none then there is zero capacity, and live-video-to-video would always return OrchCapped which is obviously not correct. So this test ensures that fixed behavior stays in place.

Suspended orchestrators will still be placed last.
@j0sh
Copy link
Copy Markdown
Collaborator Author

j0sh commented Sep 16, 2025

Forgot an important bit - sorting the discovered orchestrators by latency in 5f47105

This is needed because the list of orchestrators is built in-order, but nodes advertised through recursive discovery may have lower latencies than the parent, and we want to prioritize latency.

Suspended orchestrators will still be placed last.

@j0sh j0sh enabled auto-merge (squash) September 17, 2025 15:11
@j0sh j0sh merged commit d095d96 into master Sep 17, 2025
24 of 26 checks passed
@j0sh j0sh deleted the ja/multiple-serviceuri branch September 17, 2025 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go Pull requests that update Go code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants