Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,25 +1275,30 @@ func (r *Router) buildClients(ctx context.Context) error {
}

if storageProviderID != "" {
Comment thread
SkArchon marked this conversation as resolved.
Outdated
// An explicit storage provider is configured — read the manifest once at startup.
// An explicit storage provider is configured — fetch the manifest at startup and poll for updates.
objectPrefix := r.persistedOperationsConfig.Storage.ObjectPrefix
objectPath := manifestFileName
if objectPrefix != "" {
objectPath = path.Join(objectPrefix, manifestFileName)
}

manifest, err := pClient.ReadManifest(ctx, objectPath)
if err != nil {
return fmt.Errorf("failed to fetch PQL manifest from storage provider %q: %w",
storageProviderID, err)
}
storageFetcher := pqlmanifest.NewStorageFetcher(pClient.ReadManifest, objectPath, r.logger)

pqlStore = pqlmanifest.NewStore(r.logger)
pqlStore.Load(manifest)
r.logger.Info("Loaded PQL manifest from storage provider",
Comment thread
StarpTech marked this conversation as resolved.
zap.String("provider_id", storageProviderID),
zap.Int("operations", pqlStore.OperationCount()),
poller := pqlmanifest.NewPoller(
storageFetcher,
pqlStore,
r.persistedOperationsConfig.Manifest.PollInterval,
r.persistedOperationsConfig.Manifest.PollJitter,
r.logger,
)

if err := poller.FetchInitial(ctx); err != nil {
return fmt.Errorf("failed to fetch initial PQL manifest from storage provider %q: %w",
storageProviderID, err)
}

r.pqlPoller = poller
Comment thread
StarpTech marked this conversation as resolved.
Outdated
} else {
// No storage provider configured — fetch manifest from Cosmo CDN and poll for updates.
if r.graphApiToken == "" {
Expand Down
10 changes: 8 additions & 2 deletions router/internal/persistedoperation/pqlmanifest/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@ import (
"go.uber.org/zap"
)

// ManifestFetcher abstracts how the manifest is retrieved so the Poller
// can work with any storage backend (CDN, S3, etc.).
type ManifestFetcher interface {
Fetch(ctx context.Context, currentRevision string) (*Manifest, bool, error)
}

type Poller struct {
fetcher *Fetcher
fetcher ManifestFetcher
pollInterval time.Duration
pollJitter time.Duration
logger *zap.Logger
store *Store
}

func NewPoller(fetcher *Fetcher, store *Store, pollInterval, pollJitter time.Duration, logger *zap.Logger) *Poller {
func NewPoller(fetcher ManifestFetcher, store *Store, pollInterval, pollJitter time.Duration, logger *zap.Logger) *Poller {
if pollJitter <= 0 {
pollJitter = 5 * time.Second
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package pqlmanifest

import (
"context"

"go.uber.org/zap"
)

// StorageFetcher adapts a ReadManifest-style function (e.g. from an S3 or CDN
// storage client) to the ManifestFetcher interface used by the Poller.
// Unlike the CDN Fetcher which uses HTTP ETags for conditional requests,
// StorageFetcher always downloads the manifest and compares the revision field
// to determine whether it changed.
type StorageFetcher struct {
readManifest func(ctx context.Context, objectPath string) (*Manifest, error)
objectPath string
logger *zap.Logger
}

func NewStorageFetcher(
readManifest func(ctx context.Context, objectPath string) (*Manifest, error),
objectPath string,
logger *zap.Logger,
) *StorageFetcher {
if logger == nil {
logger = zap.NewNop()
}
return &StorageFetcher{
readManifest: readManifest,
objectPath: objectPath,
logger: logger.With(zap.String("component", "pql_storage_fetcher")),
}
}

func (f *StorageFetcher) Fetch(ctx context.Context, currentRevision string) (*Manifest, bool, error) {
manifest, err := f.readManifest(ctx, f.objectPath)
if err != nil {
return nil, false, err
}

if manifest.Revision == currentRevision {
return nil, false, nil
}

return manifest, true, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package pqlmanifest

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestStorageFetcher(t *testing.T) {
t.Parallel()

t.Run("first fetch with empty revision returns manifest", func(t *testing.T) {
t.Parallel()

manifest := &Manifest{
Version: 1,
Revision: "rev-1",
Operations: map[string]string{"h1": "query { a }"},
}
fetcher := NewStorageFetcher(
func(_ context.Context, _ string) (*Manifest, error) {
return manifest, nil
},
"ops/manifest.json",
zap.NewNop(),
)

result, changed, err := fetcher.Fetch(context.Background(), "")
require.NoError(t, err)
require.True(t, changed)
require.Equal(t, manifest, result)
})

t.Run("same revision returns no change", func(t *testing.T) {
t.Parallel()

fetcher := NewStorageFetcher(
func(_ context.Context, _ string) (*Manifest, error) {
return &Manifest{
Version: 1,
Revision: "rev-1",
Operations: map[string]string{"h1": "query { a }"},
}, nil
},
"ops/manifest.json",
zap.NewNop(),
)

result, changed, err := fetcher.Fetch(context.Background(), "rev-1")
require.NoError(t, err)
require.False(t, changed)
require.Nil(t, result)
})

t.Run("different revision returns updated manifest", func(t *testing.T) {
t.Parallel()

manifest := &Manifest{
Version: 1,
Revision: "rev-2",
Operations: map[string]string{"h1": "query { a }", "h2": "query { b }"},
}
fetcher := NewStorageFetcher(
func(_ context.Context, _ string) (*Manifest, error) {
return manifest, nil
},
"ops/manifest.json",
zap.NewNop(),
)

result, changed, err := fetcher.Fetch(context.Background(), "rev-1")
require.NoError(t, err)
require.True(t, changed)
require.Equal(t, manifest, result)
})

t.Run("read error is propagated", func(t *testing.T) {
t.Parallel()

fetcher := NewStorageFetcher(
func(_ context.Context, _ string) (*Manifest, error) {
return nil, fmt.Errorf("s3 connection refused")
},
"ops/manifest.json",
zap.NewNop(),
)

result, changed, err := fetcher.Fetch(context.Background(), "rev-1")
require.Error(t, err)
require.Contains(t, err.Error(), "s3 connection refused")
require.False(t, changed)
require.Nil(t, result)
})

t.Run("passes correct object path", func(t *testing.T) {
t.Parallel()

var receivedPath string
fetcher := NewStorageFetcher(
func(_ context.Context, objectPath string) (*Manifest, error) {
receivedPath = objectPath
return &Manifest{
Version: 1,
Revision: "rev-1",
Operations: map[string]string{},
}, nil
},
"my-prefix/operations/manifest.json",
zap.NewNop(),
)

_, _, err := fetcher.Fetch(context.Background(), "")
require.NoError(t, err)
require.Equal(t, "my-prefix/operations/manifest.json", receivedPath)
})
}
Loading