diff --git a/main.go b/main.go index b7c213668..743ae0c66 100644 --- a/main.go +++ b/main.go @@ -252,6 +252,7 @@ func main() { log.Error("failed to initialize service", "error", err) exitFunc(1) } + defer svc.Close() server := &http.Server{ Handler: svc.Router, diff --git a/pkg/routing/router.go b/pkg/routing/router.go index f5249237f..39863d86d 100644 --- a/pkg/routing/router.go +++ b/pkg/routing/router.go @@ -13,6 +13,25 @@ import ( "github.com/docker/model-runner/pkg/responses" ) +// RouterResult is the output of NewRouter, bundling the mux with any +// resources that require cleanup. +type RouterResult struct { + Mux *NormalizedServeMux + // closers holds cleanup functions that must be called when the + // router is no longer needed (e.g. to stop the responses Store + // background goroutine). + closers []func() +} + +// Close releases resources held by handlers registered on this router. +// It is idempotent and safe to call multiple times. +func (rr *RouterResult) Close() { + for _, fn := range rr.closers { + fn() + } + rr.closers = nil +} + // RouterConfig holds the dependencies needed to build the standard // model-runner HTTP route structure. type RouterConfig struct { @@ -41,8 +60,12 @@ type RouterConfig struct { // route structure: models endpoints, scheduler/inference endpoints, // path aliases (/v1/, /rerank, /score), Ollama compatibility, and // Anthropic compatibility. -func NewRouter(cfg RouterConfig) *NormalizedServeMux { +// +// The returned RouterResult must be closed when the router is no longer +// needed to stop background goroutines (e.g. the responses Store cleanup). +func NewRouter(cfg RouterConfig) *RouterResult { router := NewNormalizedServeMux() + result := &RouterResult{Mux: router} // Models endpoints – optionally wrapped by middleware. var modelEndpoint http.Handler = cfg.ModelHandler @@ -78,7 +101,8 @@ func NewRouter(cfg RouterConfig) *NormalizedServeMux { router.Handle("/v1"+responses.APIPrefix, responsesHandler) router.Handle(inference.InferencePrefix+responses.APIPrefix+"/", responsesHandler) router.Handle(inference.InferencePrefix+responses.APIPrefix, responsesHandler) + result.closers = append(result.closers, responsesHandler.Close) } - return router + return result } diff --git a/pkg/routing/router_test.go b/pkg/routing/router_test.go new file mode 100644 index 000000000..c3d8ceb2e --- /dev/null +++ b/pkg/routing/router_test.go @@ -0,0 +1,47 @@ +package routing + +import ( + "log/slog" + "testing" +) + +// TestNewRouter_WithResponsesAPI_Close verifies that creating a router with +// IncludeResponsesAPI enabled and then calling Close does not leak +// goroutines. The goleak detector in TestMain will catch any leak. +func TestNewRouter_WithResponsesAPI_Close(t *testing.T) { + log := slog.New(slog.DiscardHandler) + + result := NewRouter(RouterConfig{ + Log: log, + IncludeResponsesAPI: true, + // Scheduler, ModelHandler, etc. are nil — the responses handler + // only needs them when actually serving requests, not for route + // registration and cleanup. + }) + + // Verify the mux was created. + if result.Mux == nil { + t.Fatal("expected non-nil Mux") + } + + // Close must stop the responses Store cleanup goroutine. + result.Close() +} + +// TestNewRouter_WithoutResponsesAPI_Close verifies that Close is safe +// to call even when the Responses API is not enabled (no closers). +func TestNewRouter_WithoutResponsesAPI_Close(t *testing.T) { + log := slog.New(slog.DiscardHandler) + + result := NewRouter(RouterConfig{ + Log: log, + IncludeResponsesAPI: false, + }) + + if result.Mux == nil { + t.Fatal("expected non-nil Mux") + } + + // Should be a no-op, must not panic. + result.Close() +} diff --git a/pkg/routing/service.go b/pkg/routing/service.go index 207e2e995..408cf510e 100644 --- a/pkg/routing/service.go +++ b/pkg/routing/service.go @@ -74,6 +74,19 @@ type Service struct { SchedulerHTTP *scheduling.HTTPHandler Router *NormalizedServeMux Backends map[string]inference.Backend + // routerResult holds the RouterResult so we can close it on shutdown. + routerResult *RouterResult +} + +// Close releases resources held by the service (e.g. the responses +// Store cleanup goroutine). It is safe to call on a nil receiver and +// idempotent — subsequent calls are no-ops. +func (s *Service) Close() { + if s == nil || s.routerResult == nil { + return + } + s.routerResult.Close() + s.routerResult = nil } // NewService wires up the full inference service stack from the given @@ -109,7 +122,7 @@ func NewService(cfg ServiceConfig) (*Service, error) { Backends: backends, } - svc.Router = NewRouter(RouterConfig{ + routerResult := NewRouter(RouterConfig{ Log: cfg.Log, Scheduler: scheduler, SchedulerHTTP: schedulerHTTP, @@ -119,6 +132,8 @@ func NewService(cfg ServiceConfig) (*Service, error) { ModelHandlerMiddleware: cfg.ModelHandlerMiddleware, IncludeResponsesAPI: cfg.IncludeResponsesAPI, }) + svc.Router = routerResult.Mux + svc.routerResult = routerResult if cfg.ExtraRoutes != nil { cfg.ExtraRoutes(svc.Router, svc) diff --git a/pkg/routing/testmain_test.go b/pkg/routing/testmain_test.go new file mode 100644 index 000000000..02e0fe4cb --- /dev/null +++ b/pkg/routing/testmain_test.go @@ -0,0 +1,12 @@ +package routing + +import ( + "testing" + + "go.uber.org/goleak" +) + +// TestMain runs goleak after the test suite to detect goroutine leaks. +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +}