Skip to content

Commit 590de81

Browse files
committed
feat(pqlmanifest): enhance S3 manifest fetching with conditional requests and registry support
1 parent c1bd70a commit 590de81

File tree

15 files changed

+860
-236
lines changed

15 files changed

+860
-236
lines changed

router-tests/operations/pql_manifest_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func TestPQLManifest(t *testing.T) {
8787
require.Equal(t, expectedEmployeesBody, res.Body)
8888

8989
// Verify startup log
90-
logEntries := xEnv.Observer().FilterMessageSnippet("Loaded initial PQL manifest").All()
90+
logEntries := xEnv.Observer().FilterMessageSnippet("Loaded PQL manifest").All()
9191
require.Len(t, logEntries, 1)
9292
})
9393
})

router/core/init_config_poller.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"errors"
55
"fmt"
66

7-
"github.com/wundergraph/cosmo/router/pkg/config"
87
"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"
98
"github.com/wundergraph/cosmo/router/pkg/execution_config"
109
"github.com/wundergraph/cosmo/router/pkg/routerconfig"
@@ -13,9 +12,9 @@ import (
1312
"go.uber.org/zap"
1413
)
1514

16-
func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvider, s3Providers map[string]config.S3StorageProvider, providerID string, isFallbackClient bool) (client *routerconfig.Client, err error) {
15+
func getConfigClient(r *Router, registry *ProviderRegistry, providerID string, isFallbackClient bool) (client *routerconfig.Client, err error) {
1716
// CDN Providers
18-
if provider, ok := cdnProviders[providerID]; ok {
17+
if provider, ok := registry.CDN(providerID); ok {
1918
if r.graphApiToken == "" {
2019
return nil, errors.New(
2120
"graph token is required to fetch execution config from CDN. " +
@@ -50,7 +49,7 @@ func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvide
5049
}
5150

5251
// S3 Providers
53-
if provider, ok := s3Providers[providerID]; ok {
52+
if provider, ok := registry.S3(providerID); ok {
5453
clientOptions := &configs3Provider.ClientOptions{
5554
AccessKeyID: provider.AccessKey,
5655
SecretAccessKey: provider.SecretKey,
@@ -121,12 +120,12 @@ func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvide
121120
}
122121

123122
// InitializeConfigPoller creates a poller to fetch execution config. It is only initialized when a config poller is configured and the router is not started with a static config
124-
func InitializeConfigPoller(r *Router, cdnProviders map[string]config.CDNStorageProvider, s3Providers map[string]config.S3StorageProvider) (*configpoller.ConfigPoller, error) {
123+
func InitializeConfigPoller(r *Router, registry *ProviderRegistry) (*configpoller.ConfigPoller, error) {
125124
if r.staticExecutionConfig != nil || r.routerConfigPollerConfig == nil || r.configPoller != nil {
126125
return nil, nil
127126
}
128127

129-
primaryClient, err := getConfigClient(r, cdnProviders, s3Providers, r.routerConfigPollerConfig.Storage.ProviderID, false)
128+
primaryClient, err := getConfigClient(r, registry, r.routerConfigPollerConfig.Storage.ProviderID, false)
130129
if err != nil {
131130
return nil, err
132131
}
@@ -141,7 +140,7 @@ func InitializeConfigPoller(r *Router, cdnProviders map[string]config.CDNStorage
141140
return nil, errors.New("cannot use the same storage as both primary and fallback provider for execution config")
142141
}
143142

144-
fallbackClient, err = getConfigClient(r, cdnProviders, s3Providers, r.routerConfigPollerConfig.FallbackStorage.ProviderID, true)
143+
fallbackClient, err = getConfigClient(r, registry, r.routerConfigPollerConfig.FallbackStorage.ProviderID, true)
145144
if err != nil {
146145
return nil, err
147146
}

router/core/provider_registry.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package core
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/wundergraph/cosmo/router/pkg/config"
7+
)
8+
9+
// ProviderRegistry indexes storage provider configurations by ID, providing
10+
// typed lookups with clear error messages. It is built once during router
11+
// initialization and shared across all subsystems that need to resolve a
12+
// provider by its configured ID.
13+
type ProviderRegistry struct {
14+
s3 map[string]config.S3StorageProvider
15+
cdn map[string]config.CDNStorageProvider
16+
redis map[string]config.RedisStorageProvider
17+
fileSystem map[string]config.FileSystemStorageProvider
18+
}
19+
20+
// NewProviderRegistry builds lookup maps for every provider type and returns
21+
// an error if any type contains duplicate IDs.
22+
func NewProviderRegistry(providers config.StorageProviders) (*ProviderRegistry, error) {
23+
r := &ProviderRegistry{
24+
s3: make(map[string]config.S3StorageProvider, len(providers.S3)),
25+
cdn: make(map[string]config.CDNStorageProvider, len(providers.CDN)),
26+
redis: make(map[string]config.RedisStorageProvider, len(providers.Redis)),
27+
fileSystem: make(map[string]config.FileSystemStorageProvider, len(providers.FileSystem)),
28+
}
29+
30+
for _, p := range providers.S3 {
31+
if _, ok := r.s3[p.ID]; ok {
32+
return nil, fmt.Errorf("duplicate s3 storage provider with id '%s'", p.ID)
33+
}
34+
r.s3[p.ID] = p
35+
}
36+
for _, p := range providers.CDN {
37+
if _, ok := r.cdn[p.ID]; ok {
38+
return nil, fmt.Errorf("duplicate cdn storage provider with id '%s'", p.ID)
39+
}
40+
r.cdn[p.ID] = p
41+
}
42+
for _, p := range providers.Redis {
43+
if _, ok := r.redis[p.ID]; ok {
44+
return nil, fmt.Errorf("duplicate Redis storage provider with id '%s'", p.ID)
45+
}
46+
r.redis[p.ID] = p
47+
}
48+
for _, p := range providers.FileSystem {
49+
if _, ok := r.fileSystem[p.ID]; ok {
50+
return nil, fmt.Errorf("duplicate file system storage provider with id '%s'", p.ID)
51+
}
52+
r.fileSystem[p.ID] = p
53+
}
54+
55+
return r, nil
56+
}
57+
58+
// S3 looks up an S3 provider by ID.
59+
func (r *ProviderRegistry) S3(id string) (config.S3StorageProvider, bool) {
60+
p, ok := r.s3[id]
61+
return p, ok
62+
}
63+
64+
// CDN looks up a CDN provider by ID.
65+
func (r *ProviderRegistry) CDN(id string) (config.CDNStorageProvider, bool) {
66+
p, ok := r.cdn[id]
67+
return p, ok
68+
}
69+
70+
// Redis looks up a Redis provider by ID.
71+
func (r *ProviderRegistry) Redis(id string) (config.RedisStorageProvider, bool) {
72+
p, ok := r.redis[id]
73+
return p, ok
74+
}
75+
76+
// FileSystem looks up a filesystem provider by ID.
77+
func (r *ProviderRegistry) FileSystem(id string) (config.FileSystemStorageProvider, bool) {
78+
p, ok := r.fileSystem[id]
79+
return p, ok
80+
}
81+
82+
// IsFileSystem returns true if the given ID matches a filesystem provider.
83+
func (r *ProviderRegistry) IsFileSystem(id string) bool {
84+
_, ok := r.fileSystem[id]
85+
return ok
86+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package core
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"github.com/wundergraph/cosmo/router/pkg/config"
8+
)
9+
10+
func TestProviderRegistry(t *testing.T) {
11+
t.Parallel()
12+
13+
t.Run("successful lookups", func(t *testing.T) {
14+
t.Parallel()
15+
16+
reg, err := NewProviderRegistry(config.StorageProviders{
17+
S3: []config.S3StorageProvider{{ID: "my-s3", Bucket: "b"}},
18+
CDN: []config.CDNStorageProvider{{ID: "my-cdn", URL: "https://cdn"}},
19+
Redis: []config.RedisStorageProvider{{ID: "my-redis"}},
20+
FileSystem: []config.FileSystemStorageProvider{{ID: "my-fs", Path: "/tmp"}},
21+
})
22+
require.NoError(t, err)
23+
24+
s3, ok := reg.S3("my-s3")
25+
require.True(t, ok)
26+
require.Equal(t, "b", s3.Bucket)
27+
28+
cdn, ok := reg.CDN("my-cdn")
29+
require.True(t, ok)
30+
require.Equal(t, "https://cdn", cdn.URL)
31+
32+
redis, ok := reg.Redis("my-redis")
33+
require.True(t, ok)
34+
require.Equal(t, "my-redis", redis.ID)
35+
36+
fs, ok := reg.FileSystem("my-fs")
37+
require.True(t, ok)
38+
require.Equal(t, "/tmp", fs.Path)
39+
})
40+
41+
t.Run("unknown ID returns false", func(t *testing.T) {
42+
t.Parallel()
43+
44+
reg, err := NewProviderRegistry(config.StorageProviders{})
45+
require.NoError(t, err)
46+
47+
_, ok := reg.S3("nope")
48+
require.False(t, ok)
49+
50+
_, ok = reg.CDN("nope")
51+
require.False(t, ok)
52+
53+
_, ok = reg.Redis("nope")
54+
require.False(t, ok)
55+
56+
_, ok = reg.FileSystem("nope")
57+
require.False(t, ok)
58+
})
59+
60+
t.Run("duplicate S3 ID", func(t *testing.T) {
61+
t.Parallel()
62+
63+
_, err := NewProviderRegistry(config.StorageProviders{
64+
S3: []config.S3StorageProvider{{ID: "dup"}, {ID: "dup"}},
65+
})
66+
require.ErrorContains(t, err, "duplicate s3 storage provider with id 'dup'")
67+
})
68+
69+
t.Run("duplicate CDN ID", func(t *testing.T) {
70+
t.Parallel()
71+
72+
_, err := NewProviderRegistry(config.StorageProviders{
73+
CDN: []config.CDNStorageProvider{{ID: "dup"}, {ID: "dup"}},
74+
})
75+
require.ErrorContains(t, err, "duplicate cdn storage provider with id 'dup'")
76+
})
77+
78+
t.Run("duplicate Redis ID", func(t *testing.T) {
79+
t.Parallel()
80+
81+
_, err := NewProviderRegistry(config.StorageProviders{
82+
Redis: []config.RedisStorageProvider{{ID: "dup"}, {ID: "dup"}},
83+
})
84+
require.ErrorContains(t, err, "duplicate Redis storage provider with id 'dup'")
85+
})
86+
87+
t.Run("duplicate FileSystem ID", func(t *testing.T) {
88+
t.Parallel()
89+
90+
_, err := NewProviderRegistry(config.StorageProviders{
91+
FileSystem: []config.FileSystemStorageProvider{{ID: "dup"}, {ID: "dup"}},
92+
})
93+
require.ErrorContains(t, err, "duplicate file system storage provider with id 'dup'")
94+
})
95+
96+
t.Run("IsFileSystem", func(t *testing.T) {
97+
t.Parallel()
98+
99+
reg, err := NewProviderRegistry(config.StorageProviders{
100+
FileSystem: []config.FileSystemStorageProvider{{ID: "fs1"}},
101+
})
102+
require.NoError(t, err)
103+
104+
require.True(t, reg.IsFileSystem("fs1"))
105+
require.False(t, reg.IsFileSystem("nope"))
106+
})
107+
}

0 commit comments

Comments
 (0)