From 5d0a37213da1ec3fdaa8055b1316bbbb22c943c5 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Wed, 1 Apr 2026 21:36:12 +0200 Subject: [PATCH 1/3] feat(pqlmanifest): implement StorageFetcher for manifest retrieval and update polling --- router/core/router.go | 25 ++-- .../persistedoperation/pqlmanifest/poller.go | 10 +- .../pqlmanifest/storage_fetcher.go | 46 +++++++ .../pqlmanifest/storage_fetcher_test.go | 119 ++++++++++++++++++ 4 files changed, 188 insertions(+), 12 deletions(-) create mode 100644 router/internal/persistedoperation/pqlmanifest/storage_fetcher.go create mode 100644 router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go diff --git a/router/core/router.go b/router/core/router.go index f25778ec7a..41e4503fda 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1275,25 +1275,30 @@ func (r *Router) buildClients(ctx context.Context) error { } if storageProviderID != "" { - // An explicit storage provider is configured — read the manifest once at startup. + // An explicit storage provider is configured — fetch the manifest at startup and poll for updates. objectPrefix := r.persistedOperationsConfig.Storage.ObjectPrefix objectPath := manifestFileName if objectPrefix != "" { objectPath = path.Join(objectPrefix, manifestFileName) } - manifest, err := pClient.ReadManifest(ctx, objectPath) - if err != nil { - return fmt.Errorf("failed to fetch PQL manifest from storage provider %q: %w", - storageProviderID, err) - } + storageFetcher := pqlmanifest.NewStorageFetcher(pClient.ReadManifest, objectPath, r.logger) pqlStore = pqlmanifest.NewStore(r.logger) - pqlStore.Load(manifest) - r.logger.Info("Loaded PQL manifest from storage provider", - zap.String("provider_id", storageProviderID), - zap.Int("operations", pqlStore.OperationCount()), + poller := pqlmanifest.NewPoller( + storageFetcher, + pqlStore, + r.persistedOperationsConfig.Manifest.PollInterval, + r.persistedOperationsConfig.Manifest.PollJitter, + r.logger, ) + + if err := poller.FetchInitial(ctx); err != nil { + return fmt.Errorf("failed to fetch initial PQL manifest from storage provider %q: %w", + storageProviderID, err) + } + + r.pqlPoller = poller } else { // No storage provider configured — fetch manifest from Cosmo CDN and poll for updates. if r.graphApiToken == "" { diff --git a/router/internal/persistedoperation/pqlmanifest/poller.go b/router/internal/persistedoperation/pqlmanifest/poller.go index 6b0cd5a939..a7e6d3ef73 100644 --- a/router/internal/persistedoperation/pqlmanifest/poller.go +++ b/router/internal/persistedoperation/pqlmanifest/poller.go @@ -8,15 +8,21 @@ import ( "go.uber.org/zap" ) +// ManifestFetcher abstracts how the manifest is retrieved so the Poller +// can work with any storage backend (CDN, S3, etc.). +type ManifestFetcher interface { + Fetch(ctx context.Context, currentRevision string) (*Manifest, bool, error) +} + type Poller struct { - fetcher *Fetcher + fetcher ManifestFetcher pollInterval time.Duration pollJitter time.Duration logger *zap.Logger store *Store } -func NewPoller(fetcher *Fetcher, store *Store, pollInterval, pollJitter time.Duration, logger *zap.Logger) *Poller { +func NewPoller(fetcher ManifestFetcher, store *Store, pollInterval, pollJitter time.Duration, logger *zap.Logger) *Poller { if pollJitter <= 0 { pollJitter = 5 * time.Second } diff --git a/router/internal/persistedoperation/pqlmanifest/storage_fetcher.go b/router/internal/persistedoperation/pqlmanifest/storage_fetcher.go new file mode 100644 index 0000000000..84afccb2a9 --- /dev/null +++ b/router/internal/persistedoperation/pqlmanifest/storage_fetcher.go @@ -0,0 +1,46 @@ +package pqlmanifest + +import ( + "context" + + "go.uber.org/zap" +) + +// StorageFetcher adapts a ReadManifest-style function (e.g. from an S3 or CDN +// storage client) to the ManifestFetcher interface used by the Poller. +// Unlike the CDN Fetcher which uses HTTP ETags for conditional requests, +// StorageFetcher always downloads the manifest and compares the revision field +// to determine whether it changed. +type StorageFetcher struct { + readManifest func(ctx context.Context, objectPath string) (*Manifest, error) + objectPath string + logger *zap.Logger +} + +func NewStorageFetcher( + readManifest func(ctx context.Context, objectPath string) (*Manifest, error), + objectPath string, + logger *zap.Logger, +) *StorageFetcher { + if logger == nil { + logger = zap.NewNop() + } + return &StorageFetcher{ + readManifest: readManifest, + objectPath: objectPath, + logger: logger.With(zap.String("component", "pql_storage_fetcher")), + } +} + +func (f *StorageFetcher) Fetch(ctx context.Context, currentRevision string) (*Manifest, bool, error) { + manifest, err := f.readManifest(ctx, f.objectPath) + if err != nil { + return nil, false, err + } + + if manifest.Revision == currentRevision { + return nil, false, nil + } + + return manifest, true, nil +} diff --git a/router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go b/router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go new file mode 100644 index 0000000000..0d08cd5260 --- /dev/null +++ b/router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go @@ -0,0 +1,119 @@ +package pqlmanifest + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestStorageFetcher(t *testing.T) { + t.Parallel() + + t.Run("first fetch with empty revision returns manifest", func(t *testing.T) { + t.Parallel() + + manifest := &Manifest{ + Version: 1, + Revision: "rev-1", + Operations: map[string]string{"h1": "query { a }"}, + } + fetcher := NewStorageFetcher( + func(_ context.Context, _ string) (*Manifest, error) { + return manifest, nil + }, + "ops/manifest.json", + zap.NewNop(), + ) + + result, changed, err := fetcher.Fetch(context.Background(), "") + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, manifest, result) + }) + + t.Run("same revision returns no change", func(t *testing.T) { + t.Parallel() + + fetcher := NewStorageFetcher( + func(_ context.Context, _ string) (*Manifest, error) { + return &Manifest{ + Version: 1, + Revision: "rev-1", + Operations: map[string]string{"h1": "query { a }"}, + }, nil + }, + "ops/manifest.json", + zap.NewNop(), + ) + + result, changed, err := fetcher.Fetch(context.Background(), "rev-1") + require.NoError(t, err) + require.False(t, changed) + require.Nil(t, result) + }) + + t.Run("different revision returns updated manifest", func(t *testing.T) { + t.Parallel() + + manifest := &Manifest{ + Version: 1, + Revision: "rev-2", + Operations: map[string]string{"h1": "query { a }", "h2": "query { b }"}, + } + fetcher := NewStorageFetcher( + func(_ context.Context, _ string) (*Manifest, error) { + return manifest, nil + }, + "ops/manifest.json", + zap.NewNop(), + ) + + result, changed, err := fetcher.Fetch(context.Background(), "rev-1") + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, manifest, result) + }) + + t.Run("read error is propagated", func(t *testing.T) { + t.Parallel() + + fetcher := NewStorageFetcher( + func(_ context.Context, _ string) (*Manifest, error) { + return nil, fmt.Errorf("s3 connection refused") + }, + "ops/manifest.json", + zap.NewNop(), + ) + + result, changed, err := fetcher.Fetch(context.Background(), "rev-1") + require.Error(t, err) + require.Contains(t, err.Error(), "s3 connection refused") + require.False(t, changed) + require.Nil(t, result) + }) + + t.Run("passes correct object path", func(t *testing.T) { + t.Parallel() + + var receivedPath string + fetcher := NewStorageFetcher( + func(_ context.Context, objectPath string) (*Manifest, error) { + receivedPath = objectPath + return &Manifest{ + Version: 1, + Revision: "rev-1", + Operations: map[string]string{}, + }, nil + }, + "my-prefix/operations/manifest.json", + zap.NewNop(), + ) + + _, _, err := fetcher.Fetch(context.Background(), "") + require.NoError(t, err) + require.Equal(t, "my-prefix/operations/manifest.json", receivedPath) + }) +} From 590de81fd326e843ff886d29be803a71485b60ec Mon Sep 17 00:00:00 2001 From: StarpTech Date: Thu, 2 Apr 2026 01:57:10 +0200 Subject: [PATCH 2/3] feat(pqlmanifest): enhance S3 manifest fetching with conditional requests and registry support --- router-tests/operations/pql_manifest_test.go | 2 +- router/core/init_config_poller.go | 13 +- router/core/provider_registry.go | 86 ++++ router/core/provider_registry_test.go | 107 +++++ router/core/router.go | 416 +++++++++--------- router/core/router_config.go | 1 + .../PO_MANIFEST_S3_GUIDE.md | 82 ++++ router/internal/persistedoperation/client.go | 1 - .../operationstorage/cdn/client.go | 6 +- .../operationstorage/fs/client.go | 5 - .../operationstorage/s3/client.go | 58 ++- .../operationstorage/s3/client_test.go | 90 ++++ .../persistedoperation/pqlmanifest/poller.go | 8 +- .../pqlmanifest/storage_fetcher.go | 32 +- .../pqlmanifest/storage_fetcher_test.go | 189 +++++++- 15 files changed, 860 insertions(+), 236 deletions(-) create mode 100644 router/core/provider_registry.go create mode 100644 router/core/provider_registry_test.go create mode 100644 router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md create mode 100644 router/internal/persistedoperation/operationstorage/s3/client_test.go diff --git a/router-tests/operations/pql_manifest_test.go b/router-tests/operations/pql_manifest_test.go index 2b54f39626..fc0c22c85f 100644 --- a/router-tests/operations/pql_manifest_test.go +++ b/router-tests/operations/pql_manifest_test.go @@ -87,7 +87,7 @@ func TestPQLManifest(t *testing.T) { require.Equal(t, expectedEmployeesBody, res.Body) // Verify startup log - logEntries := xEnv.Observer().FilterMessageSnippet("Loaded initial PQL manifest").All() + logEntries := xEnv.Observer().FilterMessageSnippet("Loaded PQL manifest").All() require.Len(t, logEntries, 1) }) }) diff --git a/router/core/init_config_poller.go b/router/core/init_config_poller.go index 37b6cc0672..b8358ccae0 100644 --- a/router/core/init_config_poller.go +++ b/router/core/init_config_poller.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" - "github.com/wundergraph/cosmo/router/pkg/config" "github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller" "github.com/wundergraph/cosmo/router/pkg/execution_config" "github.com/wundergraph/cosmo/router/pkg/routerconfig" @@ -13,9 +12,9 @@ import ( "go.uber.org/zap" ) -func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvider, s3Providers map[string]config.S3StorageProvider, providerID string, isFallbackClient bool) (client *routerconfig.Client, err error) { +func getConfigClient(r *Router, registry *ProviderRegistry, providerID string, isFallbackClient bool) (client *routerconfig.Client, err error) { // CDN Providers - if provider, ok := cdnProviders[providerID]; ok { + if provider, ok := registry.CDN(providerID); ok { if r.graphApiToken == "" { return nil, errors.New( "graph token is required to fetch execution config from CDN. " + @@ -50,7 +49,7 @@ func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvide } // S3 Providers - if provider, ok := s3Providers[providerID]; ok { + if provider, ok := registry.S3(providerID); ok { clientOptions := &configs3Provider.ClientOptions{ AccessKeyID: provider.AccessKey, SecretAccessKey: provider.SecretKey, @@ -121,12 +120,12 @@ func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvide } // InitializeConfigPoller creates a poller to fetch execution config. It is only initialized when a config poller is configured and the router is not started with a static config -func InitializeConfigPoller(r *Router, cdnProviders map[string]config.CDNStorageProvider, s3Providers map[string]config.S3StorageProvider) (*configpoller.ConfigPoller, error) { +func InitializeConfigPoller(r *Router, registry *ProviderRegistry) (*configpoller.ConfigPoller, error) { if r.staticExecutionConfig != nil || r.routerConfigPollerConfig == nil || r.configPoller != nil { return nil, nil } - primaryClient, err := getConfigClient(r, cdnProviders, s3Providers, r.routerConfigPollerConfig.Storage.ProviderID, false) + primaryClient, err := getConfigClient(r, registry, r.routerConfigPollerConfig.Storage.ProviderID, false) if err != nil { return nil, err } @@ -141,7 +140,7 @@ func InitializeConfigPoller(r *Router, cdnProviders map[string]config.CDNStorage return nil, errors.New("cannot use the same storage as both primary and fallback provider for execution config") } - fallbackClient, err = getConfigClient(r, cdnProviders, s3Providers, r.routerConfigPollerConfig.FallbackStorage.ProviderID, true) + fallbackClient, err = getConfigClient(r, registry, r.routerConfigPollerConfig.FallbackStorage.ProviderID, true) if err != nil { return nil, err } diff --git a/router/core/provider_registry.go b/router/core/provider_registry.go new file mode 100644 index 0000000000..39ff6576e7 --- /dev/null +++ b/router/core/provider_registry.go @@ -0,0 +1,86 @@ +package core + +import ( + "fmt" + + "github.com/wundergraph/cosmo/router/pkg/config" +) + +// ProviderRegistry indexes storage provider configurations by ID, providing +// typed lookups with clear error messages. It is built once during router +// initialization and shared across all subsystems that need to resolve a +// provider by its configured ID. +type ProviderRegistry struct { + s3 map[string]config.S3StorageProvider + cdn map[string]config.CDNStorageProvider + redis map[string]config.RedisStorageProvider + fileSystem map[string]config.FileSystemStorageProvider +} + +// NewProviderRegistry builds lookup maps for every provider type and returns +// an error if any type contains duplicate IDs. +func NewProviderRegistry(providers config.StorageProviders) (*ProviderRegistry, error) { + r := &ProviderRegistry{ + s3: make(map[string]config.S3StorageProvider, len(providers.S3)), + cdn: make(map[string]config.CDNStorageProvider, len(providers.CDN)), + redis: make(map[string]config.RedisStorageProvider, len(providers.Redis)), + fileSystem: make(map[string]config.FileSystemStorageProvider, len(providers.FileSystem)), + } + + for _, p := range providers.S3 { + if _, ok := r.s3[p.ID]; ok { + return nil, fmt.Errorf("duplicate s3 storage provider with id '%s'", p.ID) + } + r.s3[p.ID] = p + } + for _, p := range providers.CDN { + if _, ok := r.cdn[p.ID]; ok { + return nil, fmt.Errorf("duplicate cdn storage provider with id '%s'", p.ID) + } + r.cdn[p.ID] = p + } + for _, p := range providers.Redis { + if _, ok := r.redis[p.ID]; ok { + return nil, fmt.Errorf("duplicate Redis storage provider with id '%s'", p.ID) + } + r.redis[p.ID] = p + } + for _, p := range providers.FileSystem { + if _, ok := r.fileSystem[p.ID]; ok { + return nil, fmt.Errorf("duplicate file system storage provider with id '%s'", p.ID) + } + r.fileSystem[p.ID] = p + } + + return r, nil +} + +// S3 looks up an S3 provider by ID. +func (r *ProviderRegistry) S3(id string) (config.S3StorageProvider, bool) { + p, ok := r.s3[id] + return p, ok +} + +// CDN looks up a CDN provider by ID. +func (r *ProviderRegistry) CDN(id string) (config.CDNStorageProvider, bool) { + p, ok := r.cdn[id] + return p, ok +} + +// Redis looks up a Redis provider by ID. +func (r *ProviderRegistry) Redis(id string) (config.RedisStorageProvider, bool) { + p, ok := r.redis[id] + return p, ok +} + +// FileSystem looks up a filesystem provider by ID. +func (r *ProviderRegistry) FileSystem(id string) (config.FileSystemStorageProvider, bool) { + p, ok := r.fileSystem[id] + return p, ok +} + +// IsFileSystem returns true if the given ID matches a filesystem provider. +func (r *ProviderRegistry) IsFileSystem(id string) bool { + _, ok := r.fileSystem[id] + return ok +} diff --git a/router/core/provider_registry_test.go b/router/core/provider_registry_test.go new file mode 100644 index 0000000000..31d8b81d63 --- /dev/null +++ b/router/core/provider_registry_test.go @@ -0,0 +1,107 @@ +package core + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router/pkg/config" +) + +func TestProviderRegistry(t *testing.T) { + t.Parallel() + + t.Run("successful lookups", func(t *testing.T) { + t.Parallel() + + reg, err := NewProviderRegistry(config.StorageProviders{ + S3: []config.S3StorageProvider{{ID: "my-s3", Bucket: "b"}}, + CDN: []config.CDNStorageProvider{{ID: "my-cdn", URL: "https://cdn"}}, + Redis: []config.RedisStorageProvider{{ID: "my-redis"}}, + FileSystem: []config.FileSystemStorageProvider{{ID: "my-fs", Path: "/tmp"}}, + }) + require.NoError(t, err) + + s3, ok := reg.S3("my-s3") + require.True(t, ok) + require.Equal(t, "b", s3.Bucket) + + cdn, ok := reg.CDN("my-cdn") + require.True(t, ok) + require.Equal(t, "https://cdn", cdn.URL) + + redis, ok := reg.Redis("my-redis") + require.True(t, ok) + require.Equal(t, "my-redis", redis.ID) + + fs, ok := reg.FileSystem("my-fs") + require.True(t, ok) + require.Equal(t, "/tmp", fs.Path) + }) + + t.Run("unknown ID returns false", func(t *testing.T) { + t.Parallel() + + reg, err := NewProviderRegistry(config.StorageProviders{}) + require.NoError(t, err) + + _, ok := reg.S3("nope") + require.False(t, ok) + + _, ok = reg.CDN("nope") + require.False(t, ok) + + _, ok = reg.Redis("nope") + require.False(t, ok) + + _, ok = reg.FileSystem("nope") + require.False(t, ok) + }) + + t.Run("duplicate S3 ID", func(t *testing.T) { + t.Parallel() + + _, err := NewProviderRegistry(config.StorageProviders{ + S3: []config.S3StorageProvider{{ID: "dup"}, {ID: "dup"}}, + }) + require.ErrorContains(t, err, "duplicate s3 storage provider with id 'dup'") + }) + + t.Run("duplicate CDN ID", func(t *testing.T) { + t.Parallel() + + _, err := NewProviderRegistry(config.StorageProviders{ + CDN: []config.CDNStorageProvider{{ID: "dup"}, {ID: "dup"}}, + }) + require.ErrorContains(t, err, "duplicate cdn storage provider with id 'dup'") + }) + + t.Run("duplicate Redis ID", func(t *testing.T) { + t.Parallel() + + _, err := NewProviderRegistry(config.StorageProviders{ + Redis: []config.RedisStorageProvider{{ID: "dup"}, {ID: "dup"}}, + }) + require.ErrorContains(t, err, "duplicate Redis storage provider with id 'dup'") + }) + + t.Run("duplicate FileSystem ID", func(t *testing.T) { + t.Parallel() + + _, err := NewProviderRegistry(config.StorageProviders{ + FileSystem: []config.FileSystemStorageProvider{{ID: "dup"}, {ID: "dup"}}, + }) + require.ErrorContains(t, err, "duplicate file system storage provider with id 'dup'") + }) + + t.Run("IsFileSystem", func(t *testing.T) { + t.Parallel() + + reg, err := NewProviderRegistry(config.StorageProviders{ + FileSystem: []config.FileSystemStorageProvider{{ID: "fs1"}}, + }) + require.NoError(t, err) + + require.True(t, reg.IsFileSystem("fs1")) + require.False(t, reg.IsFileSystem("nope")) + }) +} diff --git a/router/core/router.go b/router/core/router.go index 41e4503fda..4faa10fc88 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -814,6 +814,14 @@ func (r *Router) bootstrap(ctx context.Context) error { return fmt.Errorf("router is already bootstrapped") } + // Build the provider registry early so that MCP, ConnectRPC, and buildClients + // can all use it for storage provider lookups. + registry, err := NewProviderRegistry(r.storageProviders) + if err != nil { + return err + } + r.providerRegistry = registry + cosmoCloudTracingEnabled := r.traceConfig.Enabled && rtrace.DefaultExporter(r.traceConfig) != nil artInProductionEnabled := r.engineExecutionConfiguration.EnableRequestTracing && !r.developmentMode needsRegistration := cosmoCloudTracingEnabled || artInProductionEnabled @@ -938,23 +946,14 @@ func (r *Router) bootstrap(ctx context.Context) error { r.logger.Debug("Resolving storage provider for MCP operations", zap.String("provider_id", r.mcp.Storage.ProviderID)) - // Find the provider in storage_providers - // Check for file_system providers - for _, provider := range r.storageProviders.FileSystem { - if provider.ID == r.mcp.Storage.ProviderID { - r.logger.Debug("Found file_system storage provider for MCP", - zap.String("id", provider.ID), - zap.String("path", provider.Path)) - - // Use the resolved file system path - operationsDir = provider.Path - break - } - } - - if operationsDir == "" { + provider, ok := r.providerRegistry.FileSystem(r.mcp.Storage.ProviderID) + if !ok { return fmt.Errorf("storage provider with id '%s' for mcp server not found", r.mcp.Storage.ProviderID) } + r.logger.Debug("Found file_system storage provider for MCP", + zap.String("id", provider.ID), + zap.String("path", provider.Path)) + operationsDir = provider.Path } logFields := []zap.Field{ @@ -1011,19 +1010,14 @@ func (r *Router) bootstrap(ctx context.Context) error { zap.String("graphql_endpoint", r.connectRPC.GraphQLEndpoint)) // Resolve the services provider to get the services directory - var servicesDir string - for _, provider := range r.storageProviders.FileSystem { - if provider.ID == r.connectRPC.Storage.ProviderID { - servicesDir = provider.Path - r.logger.Debug("Resolved services provider", - zap.String("provider_id", provider.ID), - zap.String("path", provider.Path)) - break - } - } - if servicesDir == "" { + servicesProvider, ok := r.providerRegistry.FileSystem(r.connectRPC.Storage.ProviderID) + if !ok { return fmt.Errorf("services storage provider with id '%s' for connect_rpc not found", r.connectRPC.Storage.ProviderID) } + servicesDir := servicesProvider.Path + r.logger.Debug("Resolved services provider", + zap.String("provider_id", servicesProvider.ID), + zap.String("path", servicesDir)) // Discover services using convention-based approach discoveredServices, err := connectrpc.DiscoverServices(connectrpc.ServiceDiscoveryConfig{ @@ -1129,120 +1123,148 @@ func (r *Router) bootstrap(ctx context.Context) error { // buildClients initializes the storage clients for persisted operations and router config. func (r *Router) buildClients(ctx context.Context) error { - s3Providers := map[string]config.S3StorageProvider{} - cdnProviders := map[string]config.CDNStorageProvider{} - redisProviders := map[string]config.RedisStorageProvider{} - fileSystemProviders := map[string]config.FileSystemStorageProvider{} + registry := r.providerRegistry - for _, provider := range r.storageProviders.S3 { - if _, ok := s3Providers[provider.ID]; ok { - return fmt.Errorf("duplicate s3 storage provider with id '%s'", provider.ID) - } - s3Providers[provider.ID] = provider + pClient, manifestReader, err := r.buildPersistedOpsClient(registry) + if err != nil { + return err } - for _, provider := range r.storageProviders.CDN { - if _, ok := cdnProviders[provider.ID]; ok { - return fmt.Errorf("duplicate cdn storage provider with id '%s'", provider.ID) - } - cdnProviders[provider.ID] = provider + apqClient, err := r.buildAPQClient(registry) + if err != nil { + return err } - for _, provider := range r.storageProviders.Redis { - if _, ok := redisProviders[provider.ID]; ok { - return fmt.Errorf("duplicate Redis storage provider with id '%s'", provider.ID) - } - redisProviders[provider.ID] = provider + pqlStore, err := r.buildManifestStore(ctx, registry, manifestReader) + if err != nil { + return err + } + + if pqlStore != nil { + // Manifest is authoritative — individual operation fetches are not needed. + pClient = nil } - for _, provider := range r.storageProviders.FileSystem { - if _, ok := fileSystemProviders[provider.ID]; ok { - return fmt.Errorf("duplicate file system storage provider with id '%s'", provider.ID) + if pClient != nil || apqClient != nil || pqlStore != nil { + // For backwards compatibility with cdn config field + cacheSize := r.persistedOperationsConfig.Cache.Size.Uint64() + if cacheSize <= 0 { + cacheSize = r.cdnConfig.CacheSize.Uint64() + } + + c, err := persistedoperation.NewClient(&persistedoperation.Options{ + CacheSize: cacheSize, + Logger: r.logger, + ProviderClient: pClient, + ApqClient: apqClient, + PQLStore: pqlStore, + }) + if err != nil { + return err } - fileSystemProviders[provider.ID] = provider + + r.persistedOperationClient = c } - // Create the storage client for persisted operations based on the configured provider. - // The same client is reused for manifest fetching when the manifest feature is enabled, - // since both features are exclusive (manifest replaces individual operation fetches). - var pClient persistedoperation.StorageClient + return r.buildConfigPoller(registry) +} - if !r.persistedOperationsConfig.Disabled { - if provider, ok := cdnProviders[r.persistedOperationsConfig.Storage.ProviderID]; ok { - if r.graphApiToken == "" { - return errors.New("graph token is required to fetch persisted operations from CDN") - } +// buildPersistedOpsClient creates the storage client for persisted operations. +// It also returns a manifestReader function when the underlying storage supports +// manifest fetching (S3 or CDN), which is passed to buildManifestStore. +func (r *Router) buildPersistedOpsClient(registry *ProviderRegistry) (persistedoperation.StorageClient, pqlmanifest.ManifestReaderFunc, error) { + if r.persistedOperationsConfig.Disabled { + return nil, nil, nil + } - c, err := cdn.NewClient(provider.URL, r.graphApiToken, cdn.Options{ - Logger: r.logger, - }) - if err != nil { - return fmt.Errorf("failed to create CDN client: %w", err) - } - pClient = c + providerID := r.persistedOperationsConfig.Storage.ProviderID - r.logger.Info("Use CDN as storage provider for persisted operations", - zap.String("provider_id", provider.ID), - ) - } else if provider, ok := s3Providers[r.persistedOperationsConfig.Storage.ProviderID]; ok { - - c, err := s3.NewClient(provider.Endpoint, &s3.Options{ - AccessKeyID: provider.AccessKey, - SecretAccessKey: provider.SecretKey, - Region: provider.Region, - UseSSL: provider.Secure, - BucketName: provider.Bucket, - ObjectPathPrefix: r.persistedOperationsConfig.Storage.ObjectPrefix, - TraceProvider: r.tracerProvider, - }) - if err != nil { - return fmt.Errorf("failed to create S3 client: %w", err) - } - pClient = c + if provider, ok := registry.CDN(providerID); ok { + if r.graphApiToken == "" { + return nil, nil, errors.New("graph token is required to fetch persisted operations from CDN") + } - r.logger.Info("Use S3 as storage provider for persisted operations", - zap.String("provider_id", provider.ID), - ) - } else if provider, ok := fileSystemProviders[r.persistedOperationsConfig.Storage.ProviderID]; ok { - c, err := fs.NewClient(provider.Path, &fs.Options{ - ObjectPathPrefix: r.persistedOperationsConfig.Storage.ObjectPrefix, - }) - if err != nil { - return fmt.Errorf("failed to create filesystem client: %w", err) - } - pClient = c + c, err := cdn.NewClient(provider.URL, r.graphApiToken, cdn.Options{ + Logger: r.logger, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to create CDN client: %w", err) + } - r.logger.Info("Use file system as storage provider for persisted operations", - zap.String("provider_id", provider.ID), - ) - } else if r.graphApiToken != "" { - if r.persistedOperationsConfig.Storage.ProviderID != "" { - return fmt.Errorf("unknown storage provider id '%s' for persisted operations", r.persistedOperationsConfig.Storage.ProviderID) - } + r.logger.Info("Use CDN as storage provider for persisted operations", + zap.String("provider_id", provider.ID), + ) + return c, c.ReadManifest, nil + } + + if provider, ok := registry.S3(providerID); ok { + c, err := s3.NewClient(provider.Endpoint, &s3.Options{ + AccessKeyID: provider.AccessKey, + SecretAccessKey: provider.SecretKey, + Region: provider.Region, + UseSSL: provider.Secure, + BucketName: provider.Bucket, + ObjectPathPrefix: r.persistedOperationsConfig.Storage.ObjectPrefix, + TraceProvider: r.tracerProvider, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to create S3 client: %w", err) + } - c, err := cdn.NewClient(r.cdnConfig.URL, r.graphApiToken, cdn.Options{ - Logger: r.logger, - }) - if err != nil { - return fmt.Errorf("failed to create CDN client: %w", err) - } - pClient = c + r.logger.Info("Use S3 as storage provider for persisted operations", + zap.String("provider_id", provider.ID), + ) + return c, c.ReadManifest, nil + } - r.logger.Debug("Default to Cosmo CDN as persisted operations provider", - zap.String("url", r.cdnConfig.URL), - ) + if provider, ok := registry.FileSystem(providerID); ok { + c, err := fs.NewClient(provider.Path, &fs.Options{ + ObjectPathPrefix: r.persistedOperationsConfig.Storage.ObjectPrefix, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to create filesystem client: %w", err) } + + r.logger.Info("Use file system as storage provider for persisted operations", + zap.String("provider_id", provider.ID), + ) + // Filesystem does not support manifest fetching. + return c, nil, nil } + if r.graphApiToken != "" { + if providerID != "" { + return nil, nil, fmt.Errorf("unknown storage provider id '%s' for persisted operations", providerID) + } + + c, err := cdn.NewClient(r.cdnConfig.URL, r.graphApiToken, cdn.Options{ + Logger: r.logger, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to create CDN client: %w", err) + } + + r.logger.Debug("Default to Cosmo CDN as persisted operations provider", + zap.String("url", r.cdnConfig.URL), + ) + return c, c.ReadManifest, nil + } + + return nil, nil, nil +} + +// buildAPQClient creates the automatic persisted queries client and its +// optional Redis backing store. +func (r *Router) buildAPQClient(registry *ProviderRegistry) (apq.Client, error) { var kvClient apq.KVClient - if provider, ok := redisProviders[r.automaticPersistedQueriesConfig.Storage.ProviderID]; ok { + if provider, ok := registry.Redis(r.automaticPersistedQueriesConfig.Storage.ProviderID); ok { c, err := apq.NewRedisClient(&apq.RedisOptions{ Logger: r.logger, StorageConfig: &provider, Prefix: r.automaticPersistedQueriesConfig.Storage.ObjectPrefix, }) if err != nil { - return err + return nil, err } kvClient = c r.logger.Info("Use redis as storage provider for automatic persisted operations", @@ -1250,117 +1272,115 @@ func (r *Router) buildClients(ctx context.Context) error { ) } - var apqClient apq.Client - if r.automaticPersistedQueriesConfig.Enabled { - var err error - apqClient, err = apq.NewClient(&apq.Options{ - Logger: r.logger, - ApqConfig: &r.automaticPersistedQueriesConfig, - KVClient: kvClient, - }) - if err != nil { - return err - } + if !r.automaticPersistedQueriesConfig.Enabled { + return nil, nil } - var pqlStore *pqlmanifest.Store - - if r.persistedOperationsConfig.Manifest.Enabled && !r.persistedOperationsConfig.Disabled { - const manifestFileName = "manifest.json" - - storageProviderID := r.persistedOperationsConfig.Storage.ProviderID - - if _, ok := fileSystemProviders[storageProviderID]; ok { - return fmt.Errorf("filesystem storage provider %q is not supported for PQL manifest; use S3 or CDN instead", storageProviderID) - } - - if storageProviderID != "" { - // An explicit storage provider is configured — fetch the manifest at startup and poll for updates. - objectPrefix := r.persistedOperationsConfig.Storage.ObjectPrefix - objectPath := manifestFileName - if objectPrefix != "" { - objectPath = path.Join(objectPrefix, manifestFileName) - } + apqClient, err := apq.NewClient(&apq.Options{ + Logger: r.logger, + ApqConfig: &r.automaticPersistedQueriesConfig, + KVClient: kvClient, + }) + if err != nil { + return nil, err + } + return apqClient, nil +} - storageFetcher := pqlmanifest.NewStorageFetcher(pClient.ReadManifest, objectPath, r.logger) +// buildManifestStore sets up the PQL manifest store and its background poller. +// manifestReader is obtained from buildPersistedOpsClient and may be nil when the +// configured storage provider does not support manifest fetching (e.g. filesystem). +func (r *Router) buildManifestStore(ctx context.Context, registry *ProviderRegistry, manifestReader pqlmanifest.ManifestReaderFunc) (*pqlmanifest.Store, error) { + if !r.persistedOperationsConfig.Manifest.Enabled || r.persistedOperationsConfig.Disabled { + return nil, nil + } - pqlStore = pqlmanifest.NewStore(r.logger) - poller := pqlmanifest.NewPoller( - storageFetcher, - pqlStore, - r.persistedOperationsConfig.Manifest.PollInterval, - r.persistedOperationsConfig.Manifest.PollJitter, - r.logger, - ) + const manifestFileName = "manifest.json" - if err := poller.FetchInitial(ctx); err != nil { - return fmt.Errorf("failed to fetch initial PQL manifest from storage provider %q: %w", - storageProviderID, err) - } + storageProviderID := r.persistedOperationsConfig.Storage.ProviderID - r.pqlPoller = poller - } else { - // No storage provider configured — fetch manifest from Cosmo CDN and poll for updates. - if r.graphApiToken == "" { - return errors.New("graph token is required for PQL manifest") - } + if registry.IsFileSystem(storageProviderID) { + return nil, fmt.Errorf("filesystem storage provider %q is not supported for PQL manifest; use S3 or CDN instead", storageProviderID) + } - fetcher, err := pqlmanifest.NewFetcher(r.cdnConfig.URL, r.graphApiToken, r.logger) - if err != nil { - return fmt.Errorf("failed to create PQL manifest fetcher: %w", err) - } + if storageProviderID != "" { + // An explicit storage provider is configured — fetch the manifest at startup and poll for updates. + objectPrefix := r.persistedOperationsConfig.Storage.ObjectPrefix + objectPath := manifestFileName + if objectPrefix != "" { + objectPath = path.Join(objectPrefix, manifestFileName) + } - pqlStore = pqlmanifest.NewStore(r.logger) - poller := pqlmanifest.NewPoller( - fetcher, - pqlStore, - r.persistedOperationsConfig.Manifest.PollInterval, - r.persistedOperationsConfig.Manifest.PollJitter, - r.logger, - ) + storageFetcher := pqlmanifest.NewStorageFetcher(manifestReader, objectPath, r.logger) - if err := poller.FetchInitial(ctx); err != nil { - return fmt.Errorf("failed to fetch initial PQL manifest: %w", err) - } + pqlStore := pqlmanifest.NewStore(r.logger) + poller := pqlmanifest.NewPoller( + storageFetcher, + pqlStore, + r.persistedOperationsConfig.Manifest.PollInterval, + r.persistedOperationsConfig.Manifest.PollJitter, + r.logger, + ) - r.pqlPoller = poller + if err := poller.FetchInitial(ctx); err != nil { + return nil, fmt.Errorf("failed to fetch initial PQL manifest from storage provider %q: %w", + storageProviderID, err) } + r.logger.Info("Loaded PQL manifest from storage provider", + zap.String("provider_id", storageProviderID), + zap.String("object_path", objectPath), + zap.String("revision", pqlStore.Revision()), + zap.Int("operation_count", pqlStore.OperationCount()), + ) + + r.pqlPoller = poller r.pqlStore = pqlStore + return pqlStore, nil + } - // Manifest is authoritative — individual operation fetches are not needed. - pClient = nil + // No storage provider configured — fetch manifest from Cosmo CDN and poll for updates. + if r.graphApiToken == "" { + return nil, errors.New("graph token is required for PQL manifest") } - if pClient != nil || apqClient != nil || pqlStore != nil { - // For backwards compatibility with cdn config field - cacheSize := r.persistedOperationsConfig.Cache.Size.Uint64() - if cacheSize <= 0 { - cacheSize = r.cdnConfig.CacheSize.Uint64() - } + fetcher, err := pqlmanifest.NewFetcher(r.cdnConfig.URL, r.graphApiToken, r.logger) + if err != nil { + return nil, fmt.Errorf("failed to create PQL manifest fetcher: %w", err) + } - c, err := persistedoperation.NewClient(&persistedoperation.Options{ - CacheSize: cacheSize, - Logger: r.logger, - ProviderClient: pClient, - ApqClient: apqClient, - PQLStore: pqlStore, - }) - if err != nil { - return err - } + pqlStore := pqlmanifest.NewStore(r.logger) + poller := pqlmanifest.NewPoller( + fetcher, + pqlStore, + r.persistedOperationsConfig.Manifest.PollInterval, + r.persistedOperationsConfig.Manifest.PollJitter, + r.logger, + ) - r.persistedOperationClient = c + if err := poller.FetchInitial(ctx); err != nil { + return nil, fmt.Errorf("failed to fetch initial PQL manifest: %w", err) } - configPoller, err := InitializeConfigPoller(r, cdnProviders, s3Providers) + r.logger.Info("Loaded PQL manifest from Cosmo CDN", + zap.String("revision", pqlStore.Revision()), + zap.Int("operation_count", pqlStore.OperationCount()), + ) + + r.pqlPoller = poller + r.pqlStore = pqlStore + return pqlStore, nil +} + +// buildConfigPoller initializes the execution config poller. +func (r *Router) buildConfigPoller(registry *ProviderRegistry) error { + configPoller, err := InitializeConfigPoller(r, registry) if err != nil { return err } if configPoller != nil { r.configPoller = *configPoller } - return nil } diff --git a/router/core/router_config.go b/router/core/router_config.go index b37f7a779c..9f4b0bf84c 100644 --- a/router/core/router_config.go +++ b/router/core/router_config.go @@ -92,6 +92,7 @@ type Config struct { apolloCompatibilityFlags config.ApolloCompatibilityFlags apolloRouterCompatibilityFlags config.ApolloRouterCompatibilityFlags storageProviders config.StorageProviders + providerRegistry *ProviderRegistry demoMode bool eventsConfig config.EventsConfiguration prometheusServer *http.Server diff --git a/router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md b/router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md new file mode 100644 index 0000000000..8b54b772f3 --- /dev/null +++ b/router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md @@ -0,0 +1,82 @@ +# PQL Manifest with Custom S3 Storage + +This guide explains how to configure the Cosmo Router to load the PQL manifest from a custom S3-compatible storage provider (e.g. MinIO, DigitalOcean Spaces, Cloudflare R2) instead of the Cosmo CDN. + +## Configuration + +```yaml +version: "1" + +persisted_operations: + storage: + # Path prefix inside the S3 bucket where the manifest.json file is stored. + # The router resolves the manifest at: /manifest.json + # Example: with the prefix below, the full object key is: + # operations/manifest.json + object_prefix: "operations" + # Must match the `id` of an entry in `storage_providers.s3`. + provider_id: s3 + + manifest: + enabled: true + # How often to poll S3 for manifest updates (default: 10s) + poll_interval: 10s + # Random jitter added to the poll interval (default: 5s) + poll_jitter: 5s + warmup: + # Pre-plan all manifest operations so the first request is served from cache + enabled: true + # Concurrent workers for warmup (default: 4) + workers: 4 + # Rate limit for warmup processing (default: 50) + items_per_second: 50 + # Max time for warmup to complete (default: 30s) + timeout: 30s + +storage_providers: + s3: + - id: s3 # Referenced by persisted_operations.storage.provider_id + bucket: "" # S3 bucket name + access_key: "" # AWS access key or equivalent + secret_key: "" # AWS secret key or equivalent + endpoint: "" # S3-compatible endpoint *without* protocol (e.g. "s3.amazonaws.com" or "minio.internal:9000") + region: "" # Optional. AWS region (e.g. "us-east-1") + secure: false # Optional. Set to true to use HTTPS for the endpoint +``` + +## Behavior + +- The manifest is loaded at startup and **polled periodically** for updates. The router uses `If-Modified-Since` conditional requests to avoid downloading an unchanged manifest, and compares the `revision` field to detect content changes. +- The manifest is **authoritative** -- when enabled, individual per-request operation fetches from S3 are disabled. If an operation hash is not in the manifest, the request is rejected immediately. +- When warmup is enabled, new or changed operations are planned in the background after each manifest update, so that the first request for each operation is served from the plan cache. +- **Compression is supported**: if the object path ends with `.gz` or `.zst`, the manifest is decompressed transparently (gzip or Zstandard). +- **Filesystem providers are not supported** for the manifest. Only S3 and CDN providers can be used. + +## Manifest Schema + +The manifest file (`manifest.json`) is a JSON document with the following structure: + +```json +{ + "version": 1, + "revision": "rev-2024-01-15-abc123", + "generatedAt": "2024-01-15T10:30:00Z", + "operations": { + "a1b2c3d4e5f6...": "query GetEmployees { employees { id name } }", + "f6e5d4c3b2a1...": "mutation CreateUser($input: CreateUserInput!) { createUser(input: $input) { id } }" + } +} +``` + +### Fields + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `version` | `int` | Yes | Must be `1`. The router rejects manifests with any other version. | +| `revision` | `string` | Yes | An opaque revision identifier. Used to detect changes during polling. In CDN mode it doubles as an ETag. Can be any non-empty string (e.g. a git SHA, timestamp, or UUID). **You must change this value whenever you update the operations**, otherwise the router will not pick up the changes. | +| `generatedAt` | `string` | No | ISO 8601 timestamp of when the manifest was generated. Informational only; not used by the router. | +| `operations` | `map` | Yes | A map of SHA256 hashes to GraphQL operation bodies. Keys are the SHA256 hash of the operation text. Values are the full GraphQL operation string. The field must be present (can be an empty `{}`). | + +### Operation lookup + +When a client sends a persisted query request with `extensions.persistedQuery.sha256Hash`, the router looks up the hash directly in the `operations` map. This is an O(1) in-memory lookup with no network overhead. diff --git a/router/internal/persistedoperation/client.go b/router/internal/persistedoperation/client.go index 6431f92a72..648373123c 100644 --- a/router/internal/persistedoperation/client.go +++ b/router/internal/persistedoperation/client.go @@ -27,7 +27,6 @@ func (e PersistentOperationNotFoundError) Error() string { type StorageClient interface { PersistedOperation(ctx context.Context, clientName string, sha256Hash string) ([]byte, error) - ReadManifest(ctx context.Context, objectPath string) (*pqlmanifest.Manifest, error) Close() } diff --git a/router/internal/persistedoperation/operationstorage/cdn/client.go b/router/internal/persistedoperation/operationstorage/cdn/client.go index 02a7d6906d..34f69df234 100644 --- a/router/internal/persistedoperation/operationstorage/cdn/client.go +++ b/router/internal/persistedoperation/operationstorage/cdn/client.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "net/url" + "time" "github.com/wundergraph/cosmo/router/internal/httpclient" "github.com/wundergraph/cosmo/router/internal/jwt" @@ -184,8 +185,9 @@ func gzipAwareReader(resp *http.Response) (io.Reader, func(), error) { } // ReadManifest fetches the PQL manifest from the CDN, delegating to the manifest Fetcher. -// The objectPath parameter is unused — the Fetcher constructs the path from JWT claims. -func (cdn *Client) ReadManifest(ctx context.Context, _ string) (*pqlmanifest.Manifest, error) { +// The objectPath and modifiedSince parameters are unused — the Fetcher constructs the +// path from JWT claims and uses ETags for conditional requests instead of timestamps. +func (cdn *Client) ReadManifest(ctx context.Context, _ string, _ time.Time) (*pqlmanifest.Manifest, error) { manifest, _, err := cdn.fetcher.Fetch(ctx, "") if err != nil { return nil, err diff --git a/router/internal/persistedoperation/operationstorage/fs/client.go b/router/internal/persistedoperation/operationstorage/fs/client.go index d808c11c33..6e7181d721 100644 --- a/router/internal/persistedoperation/operationstorage/fs/client.go +++ b/router/internal/persistedoperation/operationstorage/fs/client.go @@ -8,7 +8,6 @@ import ( "path/filepath" "github.com/wundergraph/cosmo/router/internal/persistedoperation" - "github.com/wundergraph/cosmo/router/internal/persistedoperation/pqlmanifest" ) type client struct { @@ -70,8 +69,4 @@ func (c client) persistedOperation(clientName string, sha256Hash string) ([]byte return []byte(po.Body), nil } -func (c client) ReadManifest(_ context.Context, _ string) (*pqlmanifest.Manifest, error) { - return nil, fmt.Errorf("filesystem storage provider does not support reading manifests; use S3 or CDN instead") -} - func (c client) Close() {} diff --git a/router/internal/persistedoperation/operationstorage/s3/client.go b/router/internal/persistedoperation/operationstorage/s3/client.go index ebdbccc6ee..88a3f3aaec 100644 --- a/router/internal/persistedoperation/operationstorage/s3/client.go +++ b/router/internal/persistedoperation/operationstorage/s3/client.go @@ -3,10 +3,16 @@ package s3 import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" + "path/filepath" + "strings" + "time" + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/zstd" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/wundergraph/cosmo/router/internal/persistedoperation" @@ -110,21 +116,65 @@ func (c Client) persistedOperation(ctx context.Context, clientName, sha256Hash s } // ReadManifest fetches and parses a PQL manifest from S3 at the given object path. -func (c Client) ReadManifest(ctx context.Context, objectPath string) (*pqlmanifest.Manifest, error) { - reader, err := c.client.GetObject(ctx, c.options.BucketName, objectPath, minio.GetObjectOptions{}) +// If the object path ends with .gz or .zst, the content is decompressed automatically. +// When modifiedSince is non-zero and the object has not been modified, returns (nil, nil). +func (c Client) ReadManifest(ctx context.Context, objectPath string, modifiedSince time.Time) (*pqlmanifest.Manifest, error) { + opts := minio.GetObjectOptions{} + if !modifiedSince.IsZero() { + if err := opts.SetModified(modifiedSince); err != nil { + return nil, fmt.Errorf("failed to set modified-since on manifest request: %w", err) + } + } + + minioReader, err := c.client.GetObject(ctx, c.options.BucketName, objectPath, opts) if err != nil { return nil, fmt.Errorf("failed to get manifest from S3: %w", err) } defer func() { - _ = reader.Close() + _ = minioReader.Close() }() - data, err := io.ReadAll(reader) + data, err := decompressAndRead(minioReader, objectPath) if err != nil { + // minio surfaces 304 Not Modified as an error on the first read, + // which may occur inside gzip/zstd header parsing or io.ReadAll. + var minioErr minio.ErrorResponse + if errors.As(err, &minioErr) && minioErr.StatusCode == http.StatusNotModified { + return nil, nil + } return nil, fmt.Errorf("failed to read manifest from S3: %w", err) } return pqlmanifest.ParseManifest(data) } +// decompressAndRead reads the full content from a reader, decompressing +// based on the file extension (.gz, .zst). Plain content is read as-is. +func decompressAndRead(r io.Reader, objectPath string) ([]byte, error) { + var reader io.Reader + + switch strings.ToLower(filepath.Ext(objectPath)) { + case ".gz": + gr, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + defer func() { + _ = gr.Close() + }() + reader = gr + case ".zst": + zr, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + defer zr.Close() + reader = zr + default: + reader = r + } + + return io.ReadAll(reader) +} + func (c Client) Close() {} diff --git a/router/internal/persistedoperation/operationstorage/s3/client_test.go b/router/internal/persistedoperation/operationstorage/s3/client_test.go new file mode 100644 index 0000000000..7bc2fba1b1 --- /dev/null +++ b/router/internal/persistedoperation/operationstorage/s3/client_test.go @@ -0,0 +1,90 @@ +package s3 + +import ( + "bytes" + "encoding/json" + "testing" + + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/require" +) + +func TestDecompressAndRead(t *testing.T) { + t.Parallel() + + manifest := map[string]interface{}{ + "version": 1, + "revision": "rev-1", + "operations": map[string]string{ + "abc123": "query { employees { id } }", + }, + } + plainJSON, err := json.Marshal(manifest) + require.NoError(t, err) + + t.Run("plain JSON", func(t *testing.T) { + t.Parallel() + + data, err := decompressAndRead(bytes.NewReader(plainJSON), "manifest.json") + require.NoError(t, err) + require.JSONEq(t, string(plainJSON), string(data)) + }) + + t.Run("gzip compressed", func(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write(plainJSON) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + data, err := decompressAndRead(bytes.NewReader(buf.Bytes()), "manifest.json.gz") + require.NoError(t, err) + require.JSONEq(t, string(plainJSON), string(data)) + }) + + t.Run("zstd compressed", func(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + zw, err := zstd.NewWriter(&buf) + require.NoError(t, err) + _, err = zw.Write(plainJSON) + require.NoError(t, err) + require.NoError(t, zw.Close()) + + data, err := decompressAndRead(bytes.NewReader(buf.Bytes()), "manifest.json.zst") + require.NoError(t, err) + require.JSONEq(t, string(plainJSON), string(data)) + }) + + t.Run("extension is case insensitive", func(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write(plainJSON) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + data, err := decompressAndRead(bytes.NewReader(buf.Bytes()), "manifest.json.GZ") + require.NoError(t, err) + require.JSONEq(t, string(plainJSON), string(data)) + }) + + t.Run("invalid gzip data returns error", func(t *testing.T) { + t.Parallel() + + _, err := decompressAndRead(bytes.NewReader([]byte("not gzip")), "manifest.json.gz") + require.Error(t, err) + }) + + t.Run("invalid zstd data returns error", func(t *testing.T) { + t.Parallel() + + _, err := decompressAndRead(bytes.NewReader([]byte("not zstd")), "manifest.json.zst") + require.Error(t, err) + }) +} diff --git a/router/internal/persistedoperation/pqlmanifest/poller.go b/router/internal/persistedoperation/pqlmanifest/poller.go index a7e6d3ef73..62b153b84e 100644 --- a/router/internal/persistedoperation/pqlmanifest/poller.go +++ b/router/internal/persistedoperation/pqlmanifest/poller.go @@ -50,10 +50,6 @@ func (p *Poller) FetchInitial(ctx context.Context) error { if changed && manifest != nil { p.store.Load(manifest) - p.logger.Info("Loaded initial PQL manifest", - zap.String("revision", manifest.Revision), - zap.Int("operation_count", len(manifest.Operations)), - ) } return nil @@ -87,6 +83,10 @@ func (p *Poller) Poll(ctx context.Context) { zap.String("previous_revision", currentRevision), zap.Int("operation_count", len(manifest.Operations)), ) + } else { + p.logger.Debug("PQL manifest unchanged, skipping update", + zap.String("previous_revision", currentRevision), + ) } } } diff --git a/router/internal/persistedoperation/pqlmanifest/storage_fetcher.go b/router/internal/persistedoperation/pqlmanifest/storage_fetcher.go index 84afccb2a9..7140e35012 100644 --- a/router/internal/persistedoperation/pqlmanifest/storage_fetcher.go +++ b/router/internal/persistedoperation/pqlmanifest/storage_fetcher.go @@ -2,23 +2,30 @@ package pqlmanifest import ( "context" + "time" "go.uber.org/zap" ) -// StorageFetcher adapts a ReadManifest-style function (e.g. from an S3 or CDN +// ManifestReaderFunc reads and parses a PQL manifest from storage at the given path. +// When modifiedSince is non-zero, the implementation may skip the download if the +// object has not been modified (returning nil, nil). A zero modifiedSince means +// "fetch unconditionally". +type ManifestReaderFunc func(ctx context.Context, objectPath string, modifiedSince time.Time) (*Manifest, error) + +// StorageFetcher adapts a ManifestReaderFunc (e.g. from an S3 or CDN // storage client) to the ManifestFetcher interface used by the Poller. -// Unlike the CDN Fetcher which uses HTTP ETags for conditional requests, -// StorageFetcher always downloads the manifest and compares the revision field -// to determine whether it changed. +// It uses If-Modified-Since for conditional requests when the underlying +// storage supports it, and falls back to revision comparison otherwise. type StorageFetcher struct { - readManifest func(ctx context.Context, objectPath string) (*Manifest, error) - objectPath string - logger *zap.Logger + readManifest ManifestReaderFunc + objectPath string + logger *zap.Logger + lastFetchedAt time.Time } func NewStorageFetcher( - readManifest func(ctx context.Context, objectPath string) (*Manifest, error), + readManifest ManifestReaderFunc, objectPath string, logger *zap.Logger, ) *StorageFetcher { @@ -33,11 +40,18 @@ func NewStorageFetcher( } func (f *StorageFetcher) Fetch(ctx context.Context, currentRevision string) (*Manifest, bool, error) { - manifest, err := f.readManifest(ctx, f.objectPath) + manifest, err := f.readManifest(ctx, f.objectPath, f.lastFetchedAt) if err != nil { return nil, false, err } + // nil manifest with no error means "not modified" (e.g. S3 returned 304). + if manifest == nil { + return nil, false, nil + } + + f.lastFetchedAt = time.Now() + if manifest.Revision == currentRevision { return nil, false, nil } diff --git a/router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go b/router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go index 0d08cd5260..8f193b53c6 100644 --- a/router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go +++ b/router/internal/persistedoperation/pqlmanifest/storage_fetcher_test.go @@ -3,7 +3,9 @@ package pqlmanifest import ( "context" "fmt" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -21,7 +23,7 @@ func TestStorageFetcher(t *testing.T) { Operations: map[string]string{"h1": "query { a }"}, } fetcher := NewStorageFetcher( - func(_ context.Context, _ string) (*Manifest, error) { + func(_ context.Context, _ string, _ time.Time) (*Manifest, error) { return manifest, nil }, "ops/manifest.json", @@ -38,7 +40,7 @@ func TestStorageFetcher(t *testing.T) { t.Parallel() fetcher := NewStorageFetcher( - func(_ context.Context, _ string) (*Manifest, error) { + func(_ context.Context, _ string, _ time.Time) (*Manifest, error) { return &Manifest{ Version: 1, Revision: "rev-1", @@ -64,7 +66,7 @@ func TestStorageFetcher(t *testing.T) { Operations: map[string]string{"h1": "query { a }", "h2": "query { b }"}, } fetcher := NewStorageFetcher( - func(_ context.Context, _ string) (*Manifest, error) { + func(_ context.Context, _ string, _ time.Time) (*Manifest, error) { return manifest, nil }, "ops/manifest.json", @@ -81,7 +83,7 @@ func TestStorageFetcher(t *testing.T) { t.Parallel() fetcher := NewStorageFetcher( - func(_ context.Context, _ string) (*Manifest, error) { + func(_ context.Context, _ string, _ time.Time) (*Manifest, error) { return nil, fmt.Errorf("s3 connection refused") }, "ops/manifest.json", @@ -100,7 +102,7 @@ func TestStorageFetcher(t *testing.T) { var receivedPath string fetcher := NewStorageFetcher( - func(_ context.Context, objectPath string) (*Manifest, error) { + func(_ context.Context, objectPath string, _ time.Time) (*Manifest, error) { receivedPath = objectPath return &Manifest{ Version: 1, @@ -116,4 +118,181 @@ func TestStorageFetcher(t *testing.T) { require.NoError(t, err) require.Equal(t, "my-prefix/operations/manifest.json", receivedPath) }) + + t.Run("nil manifest means not modified", func(t *testing.T) { + t.Parallel() + + fetcher := NewStorageFetcher( + func(_ context.Context, _ string, _ time.Time) (*Manifest, error) { + return nil, nil // S3 returned 304 + }, + "ops/manifest.json", + zap.NewNop(), + ) + + result, changed, err := fetcher.Fetch(context.Background(), "rev-1") + require.NoError(t, err) + require.False(t, changed) + require.Nil(t, result) + }) + + t.Run("first fetch passes zero modifiedSince", func(t *testing.T) { + t.Parallel() + + var receivedModifiedSince time.Time + fetcher := NewStorageFetcher( + func(_ context.Context, _ string, modifiedSince time.Time) (*Manifest, error) { + receivedModifiedSince = modifiedSince + return &Manifest{Version: 1, Revision: "rev-1", Operations: map[string]string{}}, nil + }, + "ops/manifest.json", + zap.NewNop(), + ) + + _, _, err := fetcher.Fetch(context.Background(), "") + require.NoError(t, err) + require.True(t, receivedModifiedSince.IsZero()) + }) + + t.Run("subsequent fetch passes non-zero modifiedSince", func(t *testing.T) { + t.Parallel() + + var callCount int + var receivedModifiedSince time.Time + fetcher := NewStorageFetcher( + func(_ context.Context, _ string, modifiedSince time.Time) (*Manifest, error) { + callCount++ + receivedModifiedSince = modifiedSince + return &Manifest{ + Version: 1, + Revision: fmt.Sprintf("rev-%d", callCount), + Operations: map[string]string{}, + }, nil + }, + "ops/manifest.json", + zap.NewNop(), + ) + + // First fetch — modifiedSince should be zero + _, _, err := fetcher.Fetch(context.Background(), "") + require.NoError(t, err) + require.True(t, receivedModifiedSince.IsZero()) + + // Second fetch — modifiedSince should be non-zero + _, _, err = fetcher.Fetch(context.Background(), "rev-1") + require.NoError(t, err) + require.False(t, receivedModifiedSince.IsZero()) + }) +} + +// TestStorageFetcherPollingLifecycle wires a StorageFetcher into a Poller with +// a mock S3 backend to exercise the full polling flow: initial fetch, 304 not-modified +// polls, manifest update detection, modifiedSince advancement, and store callbacks. +func TestStorageFetcherPollingLifecycle(t *testing.T) { + t.Parallel() + + manifestV1 := &Manifest{ + Version: 1, + Revision: "rev-1", + Operations: map[string]string{"h1": "query { employees { id } }"}, + } + manifestV2 := &Manifest{ + Version: 1, + Revision: "rev-2", + Operations: map[string]string{"h1": "query { employees { id } }", "h2": "query { products { id } }"}, + } + + // Mock S3 backend: serves currentManifest, returns nil (304) when modifiedSince is + // non-zero and the manifest hasn't changed. + var currentManifest atomic.Pointer[Manifest] + currentManifest.Store(manifestV1) + + var lastModified atomic.Value // stores time.Time of last manifest change + lastModified.Store(time.Now()) // object already exists on S3 before the test starts + + var fetchCount atomic.Int32 + var lastReceivedModifiedSince atomic.Value + lastReceivedModifiedSince.Store(time.Time{}) + + mockReader := func(_ context.Context, _ string, modifiedSince time.Time) (*Manifest, error) { + fetchCount.Add(1) + lastReceivedModifiedSince.Store(modifiedSince) + + m := currentManifest.Load() + modified := lastModified.Load().(time.Time) + + // Simulate S3 If-Modified-Since: if the caller has a timestamp and the + // manifest hasn't been updated since, return 304. + if !modifiedSince.IsZero() && !modified.IsZero() && !modified.After(modifiedSince) { + return nil, nil // 304 Not Modified + } + + return m, nil + } + + store := NewStore(zap.NewNop()) + storageFetcher := NewStorageFetcher(mockReader, "ops/manifest.json", zap.NewNop()) + poller := NewPoller(storageFetcher, store, 50*time.Millisecond, 1*time.Millisecond, zap.NewNop()) + + // Track store update callbacks. + var updateCallbackCount atomic.Int32 + store.SetOnUpdate(func() { + updateCallbackCount.Add(1) + }) + + // 1. Initial fetch loads manifest v1. + err := poller.FetchInitial(context.Background()) + require.NoError(t, err) + require.True(t, store.IsLoaded()) + require.Equal(t, "rev-1", store.Revision()) + require.Equal(t, 1, store.OperationCount()) + + // Wait for the update callback from initial load. + require.Eventually(t, func() bool { + return updateCallbackCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond) + + // 2. Start polling — manifest unchanged, polls should get 304s. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go poller.Poll(ctx) + + // Let a few poll cycles run. Store should stay at rev-1. + time.Sleep(200 * time.Millisecond) + require.Equal(t, "rev-1", store.Revision()) + require.Equal(t, 1, store.OperationCount()) + + // Verify modifiedSince is being passed (non-zero after initial fetch). + ms := lastReceivedModifiedSince.Load().(time.Time) + require.False(t, ms.IsZero(), "poll should pass non-zero modifiedSince after initial fetch") + + fetchCountBefore304 := fetchCount.Load() + require.Greater(t, fetchCountBefore304, int32(1), "poller should have made multiple fetch attempts") + + // No additional update callbacks beyond the initial one. + require.Equal(t, int32(1), updateCallbackCount.Load()) + + // 3. Simulate manifest update on S3. + currentManifest.Store(manifestV2) + lastModified.Store(time.Now()) + + // Poller should detect the change. + require.Eventually(t, func() bool { + return store.Revision() == "rev-2" + }, 2*time.Second, 10*time.Millisecond) + + require.Equal(t, 2, store.OperationCount()) + + // 4. Update callback should have fired again. + require.Eventually(t, func() bool { + return updateCallbackCount.Load() >= 2 + }, 2*time.Second, 10*time.Millisecond) + + // 5. After the update, subsequent polls should get 304s again. + revisionAfterUpdate := store.Revision() + time.Sleep(200 * time.Millisecond) + require.Equal(t, revisionAfterUpdate, store.Revision(), "revision should be stable after update") + require.Equal(t, int32(2), updateCallbackCount.Load(), "no extra callbacks after stabilization") + + cancel() } From bd0c42206ec39ac97cec84d9952d12fc760d9bc1 Mon Sep 17 00:00:00 2001 From: StarpTech Date: Thu, 2 Apr 2026 11:11:55 +0200 Subject: [PATCH 3/3] feat(pqlmanifest): add configurable manifest file name for S3 polling --- router/core/router.go | 2 +- .../persistedoperation/PO_MANIFEST_S3_GUIDE.md | 14 +++++++++----- router/pkg/config/config.go | 1 + router/pkg/config/config.schema.json | 5 +++++ router/pkg/config/fixtures/full.yaml | 1 + router/pkg/config/testdata/config_defaults.json | 1 + router/pkg/config/testdata/config_full.json | 1 + 7 files changed, 19 insertions(+), 6 deletions(-) diff --git a/router/core/router.go b/router/core/router.go index 7280c4ab56..bb8f8be8eb 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1299,7 +1299,7 @@ func (r *Router) buildManifestStore(ctx context.Context, registry *ProviderRegis return nil, nil } - const manifestFileName = "manifest.json" + manifestFileName := r.persistedOperationsConfig.Manifest.FileName storageProviderID := r.persistedOperationsConfig.Storage.ProviderID diff --git a/router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md b/router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md index 8b54b772f3..df8907b9cb 100644 --- a/router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md +++ b/router/internal/persistedoperation/PO_MANIFEST_S3_GUIDE.md @@ -9,9 +9,9 @@ version: "1" persisted_operations: storage: - # Path prefix inside the S3 bucket where the manifest.json file is stored. - # The router resolves the manifest at: /manifest.json - # Example: with the prefix below, the full object key is: + # Path prefix inside the S3 bucket where the manifest file is stored. + # The router resolves the manifest at: / + # Example: with the prefix below and default file_name, the full object key is: # operations/manifest.json object_prefix: "operations" # Must match the `id` of an entry in `storage_providers.s3`. @@ -19,6 +19,10 @@ persisted_operations: manifest: enabled: true + # Name of the manifest file inside / (default: manifest.json). + # Use a .gz or .zst extension to enable transparent decompression, + # e.g. manifest.json.gz or manifest.json.zst. + file_name: manifest.json # How often to poll S3 for manifest updates (default: 10s) poll_interval: 10s # Random jitter added to the poll interval (default: 5s) @@ -47,9 +51,9 @@ storage_providers: ## Behavior - The manifest is loaded at startup and **polled periodically** for updates. The router uses `If-Modified-Since` conditional requests to avoid downloading an unchanged manifest, and compares the `revision` field to detect content changes. -- The manifest is **authoritative** -- when enabled, individual per-request operation fetches from S3 are disabled. If an operation hash is not in the manifest, the request is rejected immediately. +- The manifest is **authoritative** for hash-only lookups against S3/CDN storage -- when enabled, individual per-request operation fetches from S3 are disabled. If an operation hash is not in the manifest, the request is rejected immediately. Exceptions: if **APQ** (Automatic Persisted Queries) is enabled, unmatched hashes are delegated to the APQ layer instead of being rejected; if `log_unknown` is enabled and the request includes a full query body, the unknown operation is logged and execution continues (hash-only requests without a body are still rejected). - When warmup is enabled, new or changed operations are planned in the background after each manifest update, so that the first request for each operation is served from the plan cache. -- **Compression is supported**: if the object path ends with `.gz` or `.zst`, the manifest is decompressed transparently (gzip or Zstandard). +- **Compression is supported**: if `manifest.file_name` ends with `.gz` or `.zst` (e.g. `manifest.json.gz`), the router decompresses the content transparently (gzip or Zstandard). - **Filesystem providers are not supported** for the manifest. Only S3 and CDN providers can be used. ## Manifest Schema diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index d4f83e381f..c89c049402 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -989,6 +989,7 @@ type PQLManifestWarmupConfig struct { type PQLManifestConfig struct { Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"` + FileName string `yaml:"file_name" envDefault:"manifest.json" env:"FILE_NAME"` PollInterval time.Duration `yaml:"poll_interval" envDefault:"10s" env:"POLL_INTERVAL"` PollJitter time.Duration `yaml:"poll_jitter" envDefault:"5s" env:"POLL_JITTER"` Warmup PQLManifestWarmupConfig `yaml:"warmup" envPrefix:"WARMUP_"` diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 32429f30ea..c84f599c6d 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -203,6 +203,11 @@ "description": "Enable the PQL manifest feature.", "default": false }, + "file_name": { + "type": "string", + "description": "The manifest file name. Use a .gz or .zst extension to enable transparent decompression (e.g. manifest.json.gz or manifest.json.zst).", + "default": "manifest.json" + }, "poll_interval": { "type": "string", "format": "go-duration", diff --git a/router/pkg/config/fixtures/full.yaml b/router/pkg/config/fixtures/full.yaml index da15e859fe..f2275edac5 100644 --- a/router/pkg/config/fixtures/full.yaml +++ b/router/pkg/config/fixtures/full.yaml @@ -501,6 +501,7 @@ persisted_operations: object_prefix: '5ef73d80-cae4-4d0e-98a7-1e9fa922c1a4/92c25b45-a75b-4954-b8f6-6592a9b203eb/operations/foo' manifest: enabled: true + file_name: manifest.json poll_interval: 30s poll_jitter: 10s diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 6afe8c00b1..e4237fd1c5 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -546,6 +546,7 @@ }, "Manifest": { "Enabled": false, + "FileName": "manifest.json", "PollInterval": 10000000000, "PollJitter": 5000000000, "Warmup": { diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index e09957bc29..9ef3ce7892 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -988,6 +988,7 @@ }, "Manifest": { "Enabled": true, + "FileName": "manifest.json", "PollInterval": 30000000000, "PollJitter": 10000000000, "Warmup": {