diff --git a/cmd/catalogd/main.go b/cmd/catalogd/main.go index ec7c4946f..9fd55aa9e 100644 --- a/cmd/catalogd/main.go +++ b/cmd/catalogd/main.go @@ -22,6 +22,7 @@ import ( "errors" "flag" "fmt" + "net/http" "net/url" "os" "path/filepath" @@ -45,21 +46,21 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" ocv1 "github.com/operator-framework/operator-controller/api/v1" - corecontrollers "github.com/operator-framework/operator-controller/internal/catalogd/controllers/core" + "github.com/operator-framework/operator-controller/internal/catalogd/controllers" "github.com/operator-framework/operator-controller/internal/catalogd/features" "github.com/operator-framework/operator-controller/internal/catalogd/garbagecollection" - catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics" - "github.com/operator-framework/operator-controller/internal/catalogd/serverutil" + "github.com/operator-framework/operator-controller/internal/catalogd/handler" + v1 "github.com/operator-framework/operator-controller/internal/catalogd/handler/api/v1" "github.com/operator-framework/operator-controller/internal/catalogd/storage" "github.com/operator-framework/operator-controller/internal/catalogd/webhook" sharedcontrollers "github.com/operator-framework/operator-controller/internal/shared/controllers" fsutil "github.com/operator-framework/operator-controller/internal/shared/util/fs" + http2 "github.com/operator-framework/operator-controller/internal/shared/util/http" imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image" "github.com/operator-framework/operator-controller/internal/shared/util/pullsecretcache" sautil "github.com/operator-framework/operator-controller/internal/shared/util/sa" @@ -326,47 +327,60 @@ func run(ctx context.Context) error { }, } - var localStorage storage.Instance - metrics.Registry.MustRegister(catalogdmetrics.RequestDurationMetric) - storeDir := filepath.Join(cfg.cacheDir, storageDir) if err := os.MkdirAll(storeDir, 0700); err != nil { setupLog.Error(err, "unable to create storage directory for catalogs") return err } - baseStorageURL, err := url.Parse(fmt.Sprintf("%s/catalogs/", cfg.externalAddr)) + const catalogsSubPath = "catalogs" + baseCatalogsURL, err := url.Parse(fmt.Sprintf("%s/%s", cfg.externalAddr, catalogsSubPath)) if err != nil { setupLog.Error(err, "unable to create base storage URL") return err } - localStorage = &storage.LocalDirV1{ - RootDir: storeDir, - RootURL: baseStorageURL, - EnableMetasHandler: features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler), - } + storageInstances := configureStorage(storeDir) + catalogdHandler := handler.NewStandardHandler( + newAPIV1Handler(catalogsSubPath, storageInstances), + ) // Config for the catalogd web server - catalogServerConfig := serverutil.CatalogServerConfig{ - ExternalAddr: cfg.externalAddr, - CatalogAddr: cfg.catalogServerAddr, - CertFile: cfg.certFile, - KeyFile: cfg.keyFile, - LocalStorage: localStorage, + catalogServerConfig := http2.ServerConfig{ + Name: "catalogs", + OnlyServeWhenLeader: true, + ListenAddr: cfg.catalogServerAddr, + Server: &http.Server{ + Handler: catalogdHandler, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Minute, + }, + } + if cfg.certFile != "" && cfg.keyFile != "" { + catalogServerConfig.TLSConfig = &tls.Config{ + GetCertificate: cw.GetCertificate, + MinVersion: tls.VersionTLS12, + } } - err = serverutil.AddCatalogServerToManager(mgr, catalogServerConfig, cw) + catalogServer, err := http2.NewManagerServer(catalogServerConfig) if err != nil { setupLog.Error(err, "unable to configure catalog server") return err } + if err := mgr.Add(catalogServer); err != nil { + setupLog.Error(err, "unable to add catalog server to manager") + return err + } - if err = (&corecontrollers.ClusterCatalogReconciler{ + if err = (&controllers.ClusterCatalogReconciler{ Client: mgr.GetClient(), ImageCache: imageCache, ImagePuller: imagePuller, - Storage: localStorage, + Storage: storageInstances, + GetBaseURL: func(catalogName string) string { + return fmt.Sprintf("%s/%s", baseCatalogsURL, catalogName) + }, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog") return err @@ -436,3 +450,25 @@ func podNamespace() string { } return string(namespace) } + +func configureStorage(storeDir string) *storage.Instances { + metasEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler) + graphqlEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler) + + return storage.NewInstances( + storage.WithFiles(true, storeDir), + storage.WithIndices(metasEnabled || graphqlEnabled, storeDir), + storage.WithGraphQLSchemas(graphqlEnabled), + ) +} + +func newAPIV1Handler(baseURLPath string, si *storage.Instances) *v1.APIV1Handler { + metasEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler) + graphqlEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler) + + return v1.NewAPIV1Handler(baseURLPath, si, + v1.WithAllHandler(true), + v1.WithMetasHandler(metasEnabled), + v1.WithGraphQLHandler(graphqlEnabled), + ) +} diff --git a/config/components/base/experimental/kustomization.yaml b/config/components/base/experimental/kustomization.yaml index b9ccb1d42..31e113a83 100644 --- a/config/components/base/experimental/kustomization.yaml +++ b/config/components/base/experimental/kustomization.yaml @@ -12,6 +12,7 @@ components: - ../../features/single-own-namespace - ../../features/preflight-permissions - ../../features/apiv1-metas-handler +- ../../features/apiv1-graphql-handler - ../../features/helm-chart # This one is downstream only, so we shant use it # - ../../features/webhook-provider-openshift-serviceca diff --git a/config/components/features/apiv1-graphql-handler/kustomization.yaml b/config/components/features/apiv1-graphql-handler/kustomization.yaml new file mode 100644 index 000000000..0253e2624 --- /dev/null +++ b/config/components/features/apiv1-graphql-handler/kustomization.yaml @@ -0,0 +1,9 @@ +# kustomization file for catalogd APIv1 metas handler +# DO NOT ADD A NAMESPACE HERE +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +patches: + - target: + kind: Deployment + name: catalogd-controller-manager + path: patches/enable-featuregate.yaml diff --git a/config/components/features/apiv1-graphql-handler/patches/enable-featuregate.yaml b/config/components/features/apiv1-graphql-handler/patches/enable-featuregate.yaml new file mode 100644 index 000000000..f0a6d75a6 --- /dev/null +++ b/config/components/features/apiv1-graphql-handler/patches/enable-featuregate.yaml @@ -0,0 +1,4 @@ +# enable APIv1 meta handler feature gate +- op: add + path: /spec/template/spec/containers/0/args/- + value: "--feature-gates=APIV1GraphQLHandler=true" diff --git a/config/overlays/tilt-local-dev/patches/catalogd.yaml b/config/overlays/tilt-local-dev/patches/catalogd.yaml index b273a0c9b..4df906921 100644 --- a/config/overlays/tilt-local-dev/patches/catalogd.yaml +++ b/config/overlays/tilt-local-dev/patches/catalogd.yaml @@ -7,4 +7,4 @@ value: null - op: remove # remove --leader-elect so container doesn't restart during breakpoints - path: /spec/template/spec/containers/0/args/2 + path: /spec/template/spec/containers/0/args/0 diff --git a/go.mod b/go.mod index 0c327499f..94a02e236 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,9 @@ require ( github.com/google/go-containerregistry v0.20.6 github.com/google/renameio/v2 v2.0.0 github.com/gorilla/handlers v1.5.2 + github.com/graphql-go/graphql v0.8.1 + github.com/graphql-go/handler v0.2.4 + github.com/itchyny/gojq v0.12.17 github.com/klauspost/compress v1.18.0 github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.1 @@ -28,6 +31,7 @@ require ( golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b golang.org/x/mod v0.26.0 golang.org/x/sync v0.16.0 + golang.org/x/text v0.27.0 golang.org/x/tools v0.35.0 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.18.4 @@ -133,6 +137,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/huandu/xstrings v1.5.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/itchyny/timefmt-go v0.1.6 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jmoiron/sqlx v1.4.0 // indirect github.com/joelanford/ignore v0.1.1 // indirect @@ -221,7 +226,6 @@ require ( golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/term v0.33.0 // indirect - golang.org/x/text v0.27.0 // indirect golang.org/x/time v0.12.0 // indirect golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index f20214b30..5c2ade775 100644 --- a/go.sum +++ b/go.sum @@ -246,6 +246,10 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/gosuri/uitable v0.0.4 h1:IG2xLKRvErL3uhY6e1BylFzG+aJiwQviDDTfOKeKTpY= github.com/gosuri/uitable v0.0.4/go.mod h1:tKR86bXuXPZazfOTG1FIzvjIdXzd0mo4Vtn16vt0PJo= +github.com/graphql-go/graphql v0.8.1 h1:p7/Ou/WpmulocJeEx7wjQy611rtXGQaAcXGqanuMMgc= +github.com/graphql-go/graphql v0.8.1/go.mod h1:nKiHzRM0qopJEwCITUuIsxk9PlVlwIiiI8pnJEhordQ= +github.com/graphql-go/handler v0.2.4 h1:gz9q11TUHPNUpqzV8LMa+rkqM5NUuH/nkE3oF2LS3rI= +github.com/graphql-go/handler v0.2.4/go.mod h1:gsQlb4gDvURR0bgN8vWQEh+s5vJALM2lYL3n3cf6OxQ= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20210315223345-82c243799c99 h1:JYghRBlGCZyCF2wNUJ8W0cwaQdtpcssJ4CgC406g+WU= @@ -269,6 +273,10 @@ github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg= +github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY= +github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= +github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jmhodges/clock v1.2.0 h1:eq4kys+NI0PLngzaHEe7AmPT90XMGIEySD1JfV1PDIs= diff --git a/internal/catalogd/controllers/core/clustercatalog_controller.go b/internal/catalogd/controllers/clustercatalog_controller.go similarity index 95% rename from internal/catalogd/controllers/core/clustercatalog_controller.go rename to internal/catalogd/controllers/clustercatalog_controller.go index 32ed52e0a..bf3669425 100644 --- a/internal/catalogd/controllers/core/clustercatalog_controller.go +++ b/internal/catalogd/controllers/clustercatalog_controller.go @@ -14,12 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package core +package controllers import ( "context" // #nosec "errors" "fmt" + "io/fs" + "iter" "slices" "sync" "time" @@ -38,6 +40,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/operator-framework/operator-registry/alpha/declcfg" + ocv1 "github.com/operator-framework/operator-controller/api/v1" "github.com/operator-framework/operator-controller/internal/catalogd/storage" imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image" @@ -57,7 +61,8 @@ type ClusterCatalogReconciler struct { ImageCache imageutil.Cache ImagePuller imageutil.Puller - Storage storage.Instance + Storage storage.Instance + GetBaseURL func(catalogName string) string finalizers crfinalizer.Finalizers @@ -224,7 +229,7 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1. case !hasStoredCatalog: l.Info("unpack required: no cached catalog metadata found for this catalog") needsUnpack = true - case !r.Storage.ContentExists(catalog.Name): + case !r.Storage.Exists(catalog.Name): l.Info("unpack required: no stored content found for this catalog") needsUnpack = true case !equality.Semantic.DeepEqual(catalog.Status, *expectedStatus): @@ -265,12 +270,12 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1. // TODO: We should check to see if the unpacked result has the same content // as the already unpacked content. If it does, we should skip this rest // of the unpacking steps. - if err := r.Storage.Store(ctx, catalog.Name, fsys); err != nil { + if err := r.Storage.Store(ctx, catalog.Name, walkMetasFSIterator(ctx, fsys)); err != nil { storageErr := fmt.Errorf("error storing fbc: %v", err) updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr) return ctrl.Result{}, storageErr } - baseURL := r.Storage.BaseURL(catalog.Name) + baseURL := r.GetBaseURL(catalog.Name) updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil) updateStatusServing(&catalog.Status, canonicalRef, unpackTime, baseURL, catalog.GetGeneration()) @@ -296,8 +301,8 @@ func (r *ClusterCatalogReconciler) getCurrentState(catalog *ocv1.ClusterCatalog) // Set expected status based on what we see in the stored catalog clearUnknownConditions(expectedStatus) - if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) { - updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration) + if hasStoredCatalog && r.Storage.Exists(catalog.Name) { + updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.GetBaseURL(catalog.Name), storedCatalog.observedGeneration) updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil) } @@ -458,7 +463,7 @@ func (r *ClusterCatalogReconciler) deleteStoredCatalog(catalogName string) { } func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catalog *ocv1.ClusterCatalog) error { - if err := r.Storage.Delete(catalog.Name); err != nil { + if err := r.Storage.Delete(ctx, catalog.Name); err != nil { updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err) return err } @@ -470,3 +475,12 @@ func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catal r.deleteStoredCatalog(catalog.Name) return nil } + +func walkMetasFSIterator(ctx context.Context, fsys fs.FS) iter.Seq2[*declcfg.Meta, error] { + return func(yield func(*declcfg.Meta, error) bool) { + _ = declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error { + yield(meta, err) + return nil + }, declcfg.WithConcurrency(1)) + } +} diff --git a/internal/catalogd/controllers/core/clustercatalog_controller_test.go b/internal/catalogd/controllers/clustercatalog_controller_test.go similarity index 98% rename from internal/catalogd/controllers/core/clustercatalog_controller_test.go rename to internal/catalogd/controllers/clustercatalog_controller_test.go index 95a18733a..f44d9480e 100644 --- a/internal/catalogd/controllers/core/clustercatalog_controller_test.go +++ b/internal/catalogd/controllers/clustercatalog_controller_test.go @@ -1,11 +1,10 @@ -package core +package controllers import ( "context" "errors" "fmt" - "io/fs" - "net/http" + "iter" "testing" "testing/fstest" "time" @@ -21,6 +20,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/operator-framework/operator-registry/alpha/declcfg" + ocv1 "github.com/operator-framework/operator-controller/api/v1" "github.com/operator-framework/operator-controller/internal/catalogd/storage" imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image" @@ -32,29 +33,21 @@ type MockStore struct { shouldError bool } -func (m MockStore) Store(_ context.Context, _ string, _ fs.FS) error { +func (m MockStore) Store(_ context.Context, _ string, _ iter.Seq2[*declcfg.Meta, error]) error { if m.shouldError { return errors.New("mockstore store error") } return nil } -func (m MockStore) Delete(_ string) error { +func (m MockStore) Delete(_ context.Context, _ string) error { if m.shouldError { return errors.New("mockstore delete error") } return nil } -func (m MockStore) BaseURL(_ string) string { - return "URL" -} - -func (m MockStore) StorageServerHandler() http.Handler { - panic("not needed") -} - -func (m MockStore) ContentExists(_ string) bool { +func (m MockStore) Exists(_ string) bool { return true } @@ -807,6 +800,7 @@ func TestCatalogdControllerReconcile(t *testing.T) { ImagePuller: tt.puller, ImageCache: tt.cache, Storage: tt.store, + GetBaseURL: func(catalogName string) string { return "URL" }, storedCatalogs: map[string]storedCatalogData{}, } if reconciler.ImageCache == nil { @@ -915,7 +909,8 @@ func TestPollingRequeue(t *testing.T) { ImageFS: &fstest.MapFS{}, Ref: ref, }, - Storage: &MockStore{}, + Storage: &MockStore{}, + GetBaseURL: func(catalogName string) string { return "URL" }, storedCatalogs: map[string]storedCatalogData{ tc.catalog.Name: { ref: ref, @@ -1140,6 +1135,7 @@ func TestPollingReconcilerUnpack(t *testing.T) { Client: nil, ImagePuller: &imageutil.MockPuller{Error: errors.New("mockpuller error")}, Storage: &MockStore{}, + GetBaseURL: func(catalogName string) string { return "URL" }, storedCatalogs: scd, } require.NoError(t, reconciler.setupFinalizers()) diff --git a/internal/catalogd/features/features.go b/internal/catalogd/features/features.go index 298cbc859..c24a13332 100644 --- a/internal/catalogd/features/features.go +++ b/internal/catalogd/features/features.go @@ -9,11 +9,13 @@ import ( ) const ( - APIV1MetasHandler = featuregate.Feature("APIV1MetasHandler") + APIV1MetasHandler = featuregate.Feature("APIV1MetasHandler") + APIV1GraphQLHandler = featuregate.Feature("APIV1GraphQLHandler") ) var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - APIV1MetasHandler: {Default: false, PreRelease: featuregate.Alpha}, + APIV1MetasHandler: {Default: false, PreRelease: featuregate.Alpha}, + APIV1GraphQLHandler: {Default: false, PreRelease: featuregate.Alpha}, } var CatalogdFeatureGate featuregate.MutableFeatureGate = featuregate.NewFeatureGate() diff --git a/internal/catalogd/handler/api/v1/all.go b/internal/catalogd/handler/api/v1/all.go new file mode 100644 index 000000000..d1b4dcffa --- /dev/null +++ b/internal/catalogd/handler/api/v1/all.go @@ -0,0 +1,36 @@ +package v1 + +import ( + "net/http" + + "k8s.io/klog/v2" + + "github.com/operator-framework/operator-controller/internal/catalogd/handler/internal/handlerutil" + "github.com/operator-framework/operator-controller/internal/catalogd/storage" +) + +func apiV1AllHandler(files storage.Files) http.Handler { + allHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + catalog := r.PathValue("catalog") + logger := klog.FromContext(r.Context()).WithValues("catalog", catalog) + + catalogFile, err := files.Get(catalog) + if err != nil { + logger.Error(err, "error getting catalog file") + http.Error(w, "Not found", http.StatusNotFound) + return + } + defer catalogFile.Close() + + catalogStat, err := catalogFile.Stat() + if err != nil { + logger.Error(err, "error stat-ing catalog file") + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Add("Content-Type", "application/jsonl") + http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile) + }) + return handlerutil.AllowedMethodsHandler(allHandler, http.MethodGet, http.MethodHead) +} diff --git a/internal/catalogd/handler/api/v1/graphql.go b/internal/catalogd/handler/api/v1/graphql.go new file mode 100644 index 000000000..3bdf9a09c --- /dev/null +++ b/internal/catalogd/handler/api/v1/graphql.go @@ -0,0 +1,46 @@ +package v1 + +import ( + "net/http" + + "github.com/graphql-go/handler" + "k8s.io/klog/v2" + + "github.com/operator-framework/operator-controller/internal/catalogd/storage" +) + +func apiV1GraphQLHandler(files storage.Files, indices storage.Indices, schemas storage.GraphQLSchemas) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + catalog := r.PathValue("catalog") + logger := klog.FromContext(r.Context()).WithValues("catalog", catalog) + + catalogFile, err := files.Get(catalog) + if err != nil { + logger.Error(err, "error getting catalog file") + http.Error(w, "Not found", http.StatusNotFound) + return + } + defer catalogFile.Close() + + catalogIndex, err := indices.Get(catalog) + if err != nil { + logger.Error(err, "error getting catalog index") + http.Error(w, "Not found", http.StatusNotFound) + return + } + + catalogSchema, err := schemas.Get(catalog) + if err != nil { + logger.Error(err, "error getting catalog graphql schema") + http.Error(w, "Not found", http.StatusNotFound) + return + } + + r = r.WithContext(storage.ContextWithCatalogData(r.Context(), catalogFile, catalogIndex)) + h := handler.New(&handler.Config{ + Schema: catalogSchema, + GraphiQL: true, + }) + h.ServeHTTP(w, r) + }) +} diff --git a/internal/catalogd/handler/api/v1/handler.go b/internal/catalogd/handler/api/v1/handler.go new file mode 100644 index 000000000..93f573e72 --- /dev/null +++ b/internal/catalogd/handler/api/v1/handler.go @@ -0,0 +1,64 @@ +package v1 + +import ( + "fmt" + "net/http" + + "github.com/operator-framework/operator-controller/internal/catalogd/storage" +) + +type APIV1Handler struct { + basePath string + mux *http.ServeMux + data *storage.Instances +} + +type APIV1HandlerOption func(*APIV1Handler) + +func WithAllHandler(enabled bool) APIV1HandlerOption { + return func(h *APIV1Handler) { + if enabled { + h.addHandler("all", apiV1AllHandler(h.data.Files())) + } + } +} + +func WithMetasHandler(enabled bool) APIV1HandlerOption { + return func(h *APIV1Handler) { + if enabled { + h.addHandler("metas", apiV1MetasHandler(h.data.Files(), h.data.Indices())) + } + } +} + +func WithGraphQLHandler(enabled bool) APIV1HandlerOption { + return func(h *APIV1Handler) { + if enabled { + h.addHandler("graphql", apiV1GraphQLHandler(h.data.Files(), h.data.Indices(), h.data.GraphQLSchemas())) + } + } +} + +func NewAPIV1Handler(basePath string, data *storage.Instances, opts ...APIV1HandlerOption) *APIV1Handler { + h := &APIV1Handler{ + basePath: basePath, + mux: http.NewServeMux(), + data: data, + } + for _, opt := range opts { + opt(h) + } + return h +} + +func (h *APIV1Handler) SubPath() string { + return fmt.Sprintf("/%s/{catalog}/api/v1/", h.basePath) +} + +func (h *APIV1Handler) addHandler(handlerSubPath string, handler http.Handler) { + h.mux.Handle(fmt.Sprintf("%s%s", h.SubPath(), handlerSubPath), handler) +} + +func (h *APIV1Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.mux.ServeHTTP(w, r) +} diff --git a/internal/catalogd/handler/api/v1/metas.go b/internal/catalogd/handler/api/v1/metas.go new file mode 100644 index 000000000..a6a3733d7 --- /dev/null +++ b/internal/catalogd/handler/api/v1/metas.go @@ -0,0 +1,82 @@ +package v1 + +import ( + "io" + "net/http" + + "k8s.io/klog/v2" + + "github.com/operator-framework/operator-controller/internal/catalogd/handler/internal/handlerutil" + "github.com/operator-framework/operator-controller/internal/catalogd/storage" +) + +func apiV1MetasHandler(files storage.Files, indices storage.Indices) http.Handler { + metasHander := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check for unexpected query parameters + expectedParams := map[string]bool{ + "schema": true, + "package": true, + "name": true, + } + + for param := range r.URL.Query() { + if !expectedParams[param] { + http.Error(w, "Invalid query parameters", http.StatusBadRequest) + return + } + } + + catalog := r.PathValue("catalog") + logger := klog.FromContext(r.Context()).WithValues("catalog", catalog) + + catalogFile, err := files.Get(catalog) + if err != nil { + logger.Error(err, "error getting catalog file") + http.Error(w, "Not found", http.StatusNotFound) + return + } + defer catalogFile.Close() + + catalogStat, err := catalogFile.Stat() + if err != nil { + logger.Error(err, "error stat-ing catalog file") + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + catalogIndex, err := indices.Get(catalog) + if err != nil { + logger.Error(err, "error getting catalog index") + http.Error(w, "Not found", http.StatusNotFound) + return + } + + w.Header().Set("Last-Modified", catalogStat.ModTime().UTC().Format(http.TimeFormat)) + done, _ := handlerutil.CheckPreconditions(w, r, catalogStat.ModTime()) + if done { + return + } + + schema := r.URL.Query().Get("schema") + pkg := r.URL.Query().Get("package") + name := r.URL.Query().Get("name") + + if schema == "" && pkg == "" && name == "" { + // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) + serveJSONLines(w, r, catalogFile) + return + } + indexReader := catalogIndex.Get(catalogFile, schema, pkg, name) + serveJSONLines(w, r, indexReader) + }) + return handlerutil.AllowedMethodsHandler(metasHander, http.MethodGet, http.MethodHead) +} + +func serveJSONLines(w http.ResponseWriter, r *http.Request, rs io.Reader) { + w.Header().Add("Content-Type", "application/jsonl") + // Copy the content of the reader to the response writer + // only if it's a Get request + if r.Method == http.MethodHead { + return + } + _, _ = io.Copy(w, rs) +} diff --git a/internal/catalogd/handler/handler.go b/internal/catalogd/handler/handler.go new file mode 100644 index 000000000..20d384693 --- /dev/null +++ b/internal/catalogd/handler/handler.go @@ -0,0 +1,40 @@ +package handler + +import ( + "net/http" + + "github.com/operator-framework/operator-controller/internal/catalogd/handler/middleware" +) + +type SubPathHandler interface { + http.Handler + SubPath() string +} + +func NewSubPathHandler(subPath string, h http.Handler) SubPathHandler { + return &simpleSubPathHandler{ + handler: h, + subPath: subPath, + } +} + +type simpleSubPathHandler struct { + handler http.Handler + subPath string +} + +func (h *simpleSubPathHandler) SubPath() string { + return h.subPath +} + +func (h *simpleSubPathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.handler.ServeHTTP(w, r) +} + +func NewStandardHandler(handlers ...SubPathHandler) http.Handler { + mux := http.NewServeMux() + for _, h := range handlers { + mux.Handle(h.SubPath(), h) + } + return middleware.Standard(mux) +} diff --git a/internal/catalogd/serverutil/serverutil_test.go b/internal/catalogd/handler/handler_test.go similarity index 64% rename from internal/catalogd/serverutil/serverutil_test.go rename to internal/catalogd/handler/handler_test.go index 183bf97f1..1fa9263bb 100644 --- a/internal/catalogd/serverutil/serverutil_test.go +++ b/internal/catalogd/handler/handler_test.go @@ -1,20 +1,19 @@ -package serverutil +package handler_test import ( "compress/gzip" - "context" "io" - "io/fs" "net/http" "net/http/httptest" "strings" "testing" - "github.com/go-logr/logr" "github.com/stretchr/testify/require" + + "github.com/operator-framework/operator-controller/internal/catalogd/handler" ) -func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { +func TestStandardHandler_Gzip(t *testing.T) { var generatedJSON = func(size int) string { return "{\"data\":\"" + strings.Repeat("test data ", size) + "\"}" } @@ -50,15 +49,11 @@ func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Create a mock storage instance that returns our test content - mockStorage := &mockStorageInstance{ - content: tt.responseContent, - } + baseHandler := handler.NewSubPathHandler("/test", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(tt.responseContent)) + })) - cfg := CatalogServerConfig{ - LocalStorage: mockStorage, - } - handler := storageServerHandlerWrapped(logr.Logger{}, cfg) + standardHandler := handler.NewStandardHandler(baseHandler) // Create test request req := httptest.NewRequest("GET", "/test", nil) @@ -70,7 +65,7 @@ func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { rec := httptest.NewRecorder() // Handle the request - handler.ServeHTTP(rec, req) + standardHandler.ServeHTTP(rec, req) // Check status code require.Equal(t, tt.expectedStatus, rec.Code) @@ -97,32 +92,3 @@ func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { }) } } - -// mockStorageInstance implements storage.Instance interface for testing -type mockStorageInstance struct { - content string -} - -func (m *mockStorageInstance) StorageServerHandler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, err := w.Write([]byte(m.content)) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - }) -} - -func (m *mockStorageInstance) Store(ctx context.Context, catalogName string, fs fs.FS) error { - return nil -} - -func (m *mockStorageInstance) Delete(catalogName string) error { - return nil -} - -func (m *mockStorageInstance) ContentExists(catalog string) bool { - return true -} -func (m *mockStorageInstance) BaseURL(catalog string) string { - return "" -} diff --git a/internal/catalogd/handler/internal/handlerutil/handlerutil.go b/internal/catalogd/handler/internal/handlerutil/handlerutil.go new file mode 100644 index 000000000..4fc73119b --- /dev/null +++ b/internal/catalogd/handler/internal/handlerutil/handlerutil.go @@ -0,0 +1,18 @@ +package handlerutil + +import ( + "net/http" + + "k8s.io/apimachinery/pkg/util/sets" +) + +func AllowedMethodsHandler(next http.Handler, allowedMethods ...string) http.Handler { + allowedMethodSet := sets.New[string](allowedMethods...) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !allowedMethodSet.Has(r.Method) { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + next.ServeHTTP(w, r) + }) +} diff --git a/internal/catalogd/storage/http_preconditions_check.go b/internal/catalogd/handler/internal/handlerutil/http_preconditions_check.go similarity index 58% rename from internal/catalogd/storage/http_preconditions_check.go rename to internal/catalogd/handler/internal/handlerutil/http_preconditions_check.go index 7fb5239b5..92dd70725 100644 --- a/internal/catalogd/storage/http_preconditions_check.go +++ b/internal/catalogd/handler/internal/handlerutil/http_preconditions_check.go @@ -5,7 +5,7 @@ // Source: Originally from Go's net/http/fs.go // https://cs.opensource.google/go/go/+/master:src/net/http/fs.go -package storage +package handlerutil import ( "net/http" @@ -14,6 +14,48 @@ import ( "time" ) +// scanETag determines if a syntactically valid ETag is present at s. If so, +// the ETag and remaining text after consuming ETag is returned. Otherwise, +// it returns "", "". +func scanETag(s string) (string, string) { + s = textproto.TrimString(s) + start := 0 + if strings.HasPrefix(s, "W/") { + start = 2 + } + if len(s[start:]) < 2 || s[start] != '"' { + return "", "" + } + // ETag is either W/"text" or "text". + // See RFC 7232 2.3. + for i := start + 1; i < len(s); i++ { + c := s[i] + switch { + // Character values allowed in ETags. + case c == 0x21 || c >= 0x23 && c <= 0x7E || c >= 0x80: + case c == '"': + return s[:i+1], s[i+1:] + default: + return "", "" + } + } + return "", "" +} + +// etagStrongMatch reports whether a and b match using strong ETag comparison. +// Assumes a and b are valid ETags. +func etagStrongMatch(a, b string) bool { + return a == b && a != "" && a[0] == '"' +} + +// etagWeakMatch reports whether a and b match using weak ETag comparison. +// Assumes a and b are valid ETags. +func etagWeakMatch(a, b string) bool { + return strings.TrimPrefix(a, "W/") == strings.TrimPrefix(b, "W/") +} + +// condResult is the result of an HTTP request precondition check. +// See https://tools.ietf.org/html/rfc7232 section 3. type condResult int const ( @@ -22,53 +64,34 @@ const ( condFalse ) -// checkPreconditions evaluates request preconditions and reports whether a precondition -// resulted in sending StatusNotModified or StatusPreconditionFailed. -func checkPreconditions(w http.ResponseWriter, r *http.Request, modtime time.Time) bool { - // This function carefully follows RFC 7232 section 6. - ch := checkIfMatch(r) - if ch == condNone { - ch = checkIfUnmodifiedSince(r, modtime) - } - if ch == condFalse { - w.WriteHeader(http.StatusPreconditionFailed) - return true +func checkIfMatch(w http.ResponseWriter, r *http.Request) condResult { + im := r.Header.Get("If-Match") + if im == "" { + return condNone } - switch checkIfNoneMatch(r) { - case condFalse: - if r.Method == "GET" || r.Method == "HEAD" { - writeNotModified(w) - return true - } else { - w.WriteHeader(http.StatusPreconditionFailed) - return true + for { + im = textproto.TrimString(im) + if len(im) == 0 { + break } - case condNone: - if checkIfModifiedSince(r, w, modtime) == condFalse { - writeNotModified(w) - return true + if im[0] == ',' { + im = im[1:] + continue + } + if im[0] == '*' { + return condTrue + } + etag, remain := scanETag(im) + if etag == "" { + break + } + if etagStrongMatch(etag, getHTTPHeader(w.Header(), "Etag")) { + return condTrue } + im = remain } - return false -} -func checkIfModifiedSince(r *http.Request, w http.ResponseWriter, modtime time.Time) condResult { - ims := r.Header.Get("If-Modified-Since") - if ims == "" || isZeroTime(modtime) { - return condTrue - } - t, err := parseTime(ims) - if err != nil { - httpError(w, err) - return condNone - } - // The Last-Modified header truncates sub-second precision so - // the modtime needs to be truncated too. - modtime = modtime.Truncate(time.Second) - if modtime.Compare(t) <= 0 { - return condFalse - } - return condTrue + return condFalse } func checkIfUnmodifiedSince(r *http.Request, modtime time.Time) condResult { @@ -76,7 +99,7 @@ func checkIfUnmodifiedSince(r *http.Request, modtime time.Time) condResult { if ius == "" || isZeroTime(modtime) { return condNone } - t, err := parseTime(ius) + t, err := http.ParseTime(ius) if err != nil { return condNone } @@ -90,46 +113,8 @@ func checkIfUnmodifiedSince(r *http.Request, modtime time.Time) condResult { return condFalse } -// timeFormat is the time format to use when generating times in HTTP -// headers. It is like [time.RFC1123] but hard-codes GMT as the time -// zone. The time being formatted must be in UTC for Format to -// generate the correct format. -// -// For parsing this time format, see [ParseTime]. -const timeFormat = "Mon, 02 Jan 2006 15:04:05 GMT" - -var ( - unixEpochTime = time.Unix(0, 0) - timeFormats = []string{ - timeFormat, - time.RFC850, - time.ANSIC, - } -) - -// isZeroTime reports whether t is obviously unspecified (either zero or Unix()=0). -func isZeroTime(t time.Time) bool { - return t.IsZero() || t.Equal(unixEpochTime) -} - -func writeNotModified(w http.ResponseWriter) { - // RFC 7232 section 4.1: - // a sender SHOULD NOT generate representation metadata other than the - // above listed fields unless said metadata exists for the purpose of - // guiding cache updates (e.g., Last-Modified might be useful if the - // response does not have an ETag field). - h := w.Header() - delete(h, "Content-Type") - delete(h, "Content-Length") - delete(h, "Content-Encoding") - if h.Get("Etag") != "" { - delete(h, "Last-Modified") - } - w.WriteHeader(http.StatusNotModified) -} - -func checkIfNoneMatch(r *http.Request) condResult { - inm := r.Header.Get("If-None-Match") +func checkIfNoneMatch(w http.ResponseWriter, r *http.Request) condResult { + inm := getHTTPHeader(r.Header, "If-None-Match") if inm == "" { return condNone } @@ -150,77 +135,128 @@ func checkIfNoneMatch(r *http.Request) condResult { if etag == "" { break } + if etagWeakMatch(etag, getHTTPHeader(w.Header(), "Etag")) { + return condFalse + } buf = remain } return condTrue } -// parseTime parses a time header (such as the Date: header), -// trying each of the three formats allowed by HTTP/1.1: -// [TimeFormat], [time.RFC850], and [time.ANSIC]. -// nolint:nonamedreturns -func parseTime(text string) (t time.Time, err error) { - for _, layout := range timeFormats { - t, err = time.Parse(layout, text) - if err == nil { - return - } +func checkIfModifiedSince(r *http.Request, modtime time.Time) condResult { + if r.Method != "GET" && r.Method != "HEAD" { + return condNone + } + ims := r.Header.Get("If-Modified-Since") + if ims == "" || isZeroTime(modtime) { + return condNone + } + t, err := http.ParseTime(ims) + if err != nil { + return condNone } - return + // The Last-Modified header truncates sub-second precision so + // the modtime needs to be truncated too. + modtime = modtime.Truncate(time.Second) + if ret := modtime.Compare(t); ret <= 0 { + return condFalse + } + return condTrue } -func checkIfMatch(r *http.Request) condResult { - im := r.Header.Get("If-Match") - if im == "" { +func checkIfRange(w http.ResponseWriter, r *http.Request, modtime time.Time) condResult { + if r.Method != "GET" && r.Method != "HEAD" { return condNone } - for { - im = textproto.TrimString(im) - if len(im) == 0 { - break - } - if im[0] == ',' { - im = im[1:] - continue - } - if im[0] == '*' { + ir := getHTTPHeader(r.Header, "If-Range") + if ir == "" { + return condNone + } + etag, _ := scanETag(ir) + if etag != "" { + if etagStrongMatch(etag, w.Header().Get("Etag")) { return condTrue + } else { + return condFalse } - etag, remain := scanETag(im) - if etag == "" { - break - } - im = remain } - + // The If-Range value is typically the ETag value, but it may also be + // the modtime date. See golang.org/issue/8367. + if modtime.IsZero() { + return condFalse + } + t, err := http.ParseTime(ir) + if err != nil { + return condFalse + } + if t.Unix() == modtime.Unix() { + return condTrue + } return condFalse } -// scanETag determines if a syntactically valid ETag is present at s. If so, -// the ETag and remaining text after consuming ETag is returned. Otherwise, -// it returns "", "". -// nolint:nonamedreturns -func scanETag(s string) (etag string, remain string) { - s = textproto.TrimString(s) - start := 0 - if strings.HasPrefix(s, "W/") { - start = 2 +var unixEpochTime = time.Unix(0, 0) + +// isZeroTime reports whether t is obviously unspecified (either zero or Unix()=0). +func isZeroTime(t time.Time) bool { + return t.IsZero() || t.Equal(unixEpochTime) +} + +func writeNotModified(w http.ResponseWriter) { + // RFC 7232 section 4.1: + // a sender SHOULD NOT generate representation metadata other than the + // above listed fields unless said metadata exists for the purpose of + // guiding cache updates (e.g., Last-Modified might be useful if the + // response does not have an ETag field). + h := w.Header() + delete(h, "Content-Type") + delete(h, "Content-Length") + delete(h, "Content-Encoding") + if h.Get("Etag") != "" { + delete(h, "Last-Modified") } - if len(s[start:]) < 2 || s[start] != '"' { - return "", "" + w.WriteHeader(http.StatusNotModified) +} + +// CheckPreconditions evaluates request preconditions and reports whether a precondition +// resulted in sending StatusNotModified or StatusPreconditionFailed. +func CheckPreconditions(w http.ResponseWriter, r *http.Request, modtime time.Time) (bool, string) { + // This function carefully follows RFC 7232 section 6. + ch := checkIfMatch(w, r) + if ch == condNone { + ch = checkIfUnmodifiedSince(r, modtime) } - // ETag is either W/"text" or "text". - // See RFC 7232 2.3. - for i := start + 1; i < len(s); i++ { - c := s[i] - switch { - // Character values allowed in ETags. - case c == 0x21 || c >= 0x23 && c <= 0x7E || c >= 0x80: - case c == '"': - return s[:i+1], s[i+1:] - default: - return "", "" + if ch == condFalse { + w.WriteHeader(http.StatusPreconditionFailed) + return true, "" + } + switch checkIfNoneMatch(w, r) { + case condFalse: + if r.Method == "GET" || r.Method == "HEAD" { + writeNotModified(w) + return true, "" + } else { + w.WriteHeader(http.StatusPreconditionFailed) + return true, "" + } + case condNone: + if checkIfModifiedSince(r, modtime) == condFalse { + writeNotModified(w) + return true, "" } } - return "", "" + + rangeHeader := getHTTPHeader(r.Header, "Range") + if rangeHeader != "" && checkIfRange(w, r, modtime) == condFalse { + rangeHeader = "" + } + return false, rangeHeader +} + +// getHTTPHeader is like http.Header.Get, but key must already be in CanonicalHeaderKey form. +func getHTTPHeader(h http.Header, key string) string { + if v := h[key]; len(v) > 0 { + return v[0] + } + return "" } diff --git a/internal/catalogd/handler/middleware/gzip.go b/internal/catalogd/handler/middleware/gzip.go new file mode 100644 index 000000000..11cebfeb4 --- /dev/null +++ b/internal/catalogd/handler/middleware/gzip.go @@ -0,0 +1,11 @@ +package middleware + +import ( + "net/http" + + "github.com/klauspost/compress/gzhttp" +) + +func GzipHandler(handler http.Handler) http.Handler { + return gzhttp.GzipHandler(handler) +} diff --git a/internal/catalogd/handler/middleware/logr.go b/internal/catalogd/handler/middleware/logr.go new file mode 100644 index 000000000..016f518a9 --- /dev/null +++ b/internal/catalogd/handler/middleware/logr.go @@ -0,0 +1,39 @@ +package middleware + +import ( + "io" + "net" + "net/http" + + "github.com/gorilla/handlers" + "k8s.io/klog/v2" +) + +func LoggingHandler(handler http.Handler) http.Handler { + return handlers.CustomLoggingHandler(nil, handler, func(_ io.Writer, params handlers.LogFormatterParams) { + // extract parameters used in apache common log format, but then log using `logr` to remain consistent + // with other loggers used in this codebase. + username := "-" + if params.URL.User != nil { + if name := params.URL.User.Username(); name != "" { + username = name + } + } + + host, _, err := net.SplitHostPort(params.Request.RemoteAddr) + if err != nil { + host = params.Request.RemoteAddr + } + + uri := params.Request.RequestURI + if params.Request.ProtoMajor == 2 && params.Request.Method == http.MethodConnect { + uri = params.Request.Host + } + if uri == "" { + uri = params.URL.RequestURI() + } + + l := klog.FromContext(params.Request.Context()).WithName("catalogd-http-server") + l.Info("handled request", "host", host, "username", username, "method", params.Request.Method, "uri", uri, "protocol", params.Request.Proto, "status", params.StatusCode, "size", params.Size) + }) +} diff --git a/internal/catalogd/metrics/metrics.go b/internal/catalogd/handler/middleware/metrics.go similarity index 83% rename from internal/catalogd/metrics/metrics.go rename to internal/catalogd/handler/middleware/metrics.go index c30aed584..9f3b098fb 100644 --- a/internal/catalogd/metrics/metrics.go +++ b/internal/catalogd/handler/middleware/metrics.go @@ -1,10 +1,11 @@ -package metrics +package middleware import ( "net/http" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "sigs.k8s.io/controller-runtime/pkg/metrics" ) const ( @@ -20,7 +21,7 @@ const ( // Query C: sum(catalogd_http_request_duration_seconds_count) // Expression for Apdex Score: ($A + (($B - $A) / 2)) / $C var ( - RequestDurationMetric = prometheus.NewHistogramVec( + requestDurationMetric = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: RequestDurationMetricName, Help: "Histogram of request duration in seconds", @@ -35,6 +36,10 @@ var ( ) ) -func AddMetricsToHandler(handler http.Handler) http.Handler { - return promhttp.InstrumentHandlerDuration(RequestDurationMetric, handler) +func init() { + metrics.Registry.MustRegister(requestDurationMetric) +} + +func MetricsHandler(handler http.Handler) http.Handler { + return promhttp.InstrumentHandlerDuration(requestDurationMetric, handler) } diff --git a/internal/catalogd/handler/middleware/middleware.go b/internal/catalogd/handler/middleware/middleware.go new file mode 100644 index 000000000..7595afa58 --- /dev/null +++ b/internal/catalogd/handler/middleware/middleware.go @@ -0,0 +1,24 @@ +package middleware + +import ( + "net/http" +) + +type Middleware func(http.Handler) http.Handler + +func Standard(handler http.Handler) http.Handler { + return Chain( + LoggingHandler, + MetricsHandler, + GzipHandler, + )(handler) +} + +func Chain(middlewares ...Middleware) Middleware { + return func(handler http.Handler) http.Handler { + for _, middleware := range middlewares { + handler = middleware(handler) + } + return handler + } +} diff --git a/internal/catalogd/serverutil/serverutil.go b/internal/catalogd/serverutil/serverutil.go deleted file mode 100644 index 143d4c876..000000000 --- a/internal/catalogd/serverutil/serverutil.go +++ /dev/null @@ -1,104 +0,0 @@ -package serverutil - -import ( - "crypto/tls" - "fmt" - "io" - "net" - "net/http" - "time" - - "github.com/go-logr/logr" - "github.com/gorilla/handlers" - "github.com/klauspost/compress/gzhttp" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/certwatcher" - "sigs.k8s.io/controller-runtime/pkg/manager" - - catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics" - "github.com/operator-framework/operator-controller/internal/catalogd/storage" -) - -type CatalogServerConfig struct { - ExternalAddr string - CatalogAddr string - CertFile string - KeyFile string - LocalStorage storage.Instance -} - -func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFileWatcher *certwatcher.CertWatcher) error { - listener, err := net.Listen("tcp", cfg.CatalogAddr) - if err != nil { - return fmt.Errorf("error creating catalog server listener: %w", err) - } - - if cfg.CertFile != "" && cfg.KeyFile != "" { - // Use the passed certificate watcher instead of creating a new one - config := &tls.Config{ - GetCertificate: tlsFileWatcher.GetCertificate, - MinVersion: tls.VersionTLS12, - } - listener = tls.NewListener(listener, config) - } - - shutdownTimeout := 30 * time.Second - catalogServer := manager.Server{ - Name: "catalogs", - OnlyServeWhenLeader: true, - Server: &http.Server{ - Addr: cfg.CatalogAddr, - Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg), - ReadTimeout: 5 * time.Second, - // TODO: Revert this to 10 seconds if/when the API - // evolves to have significantly smaller responses - WriteTimeout: 5 * time.Minute, - }, - ShutdownTimeout: &shutdownTimeout, - Listener: listener, - } - - err = mgr.Add(&catalogServer) - if err != nil { - return fmt.Errorf("error adding catalog server to manager: %w", err) - } - - return nil -} - -func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler { - return handlers.CustomLoggingHandler(nil, handler, func(_ io.Writer, params handlers.LogFormatterParams) { - // extract parameters used in apache common log format, but then log using `logr` to remain consistent - // with other loggers used in this codebase. - username := "-" - if params.URL.User != nil { - if name := params.URL.User.Username(); name != "" { - username = name - } - } - - host, _, err := net.SplitHostPort(params.Request.RemoteAddr) - if err != nil { - host = params.Request.RemoteAddr - } - - uri := params.Request.RequestURI - if params.Request.ProtoMajor == 2 && params.Request.Method == http.MethodConnect { - uri = params.Request.Host - } - if uri == "" { - uri = params.URL.RequestURI() - } - - l.Info("handled request", "host", host, "username", username, "method", params.Request.Method, "uri", uri, "protocol", params.Request.Proto, "status", params.StatusCode, "size", params.Size) - }) -} - -func storageServerHandlerWrapped(l logr.Logger, cfg CatalogServerConfig) http.Handler { - handler := cfg.LocalStorage.StorageServerHandler() - handler = gzhttp.GzipHandler(handler) - handler = catalogdmetrics.AddMetricsToHandler(handler) - - handler = logrLoggingHandler(l, handler) - return handler -} diff --git a/internal/catalogd/storage/files.go b/internal/catalogd/storage/files.go new file mode 100644 index 000000000..86a4ad64b --- /dev/null +++ b/internal/catalogd/storage/files.go @@ -0,0 +1,117 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "iter" + "os" + "sync" + + "github.com/operator-framework/operator-registry/alpha/declcfg" +) + +var _ Instance = (*files)(nil) + +type files struct { + rootDir string + files map[string]string + mu sync.RWMutex +} + +type Files interface { + Get(catalogName string) (*os.File, error) +} + +func newFiles(rootDir string) *files { + return &files{ + rootDir: rootDir, + files: make(map[string]string), + } +} + +func (f *files) Store(ctx context.Context, catalog string, seq iter.Seq2[*declcfg.Meta, error]) error { + catalogFile, err := os.CreateTemp(f.rootDir, fmt.Sprintf("catalog-all-%s-*.jsonl", catalog)) + if err != nil { + return err + } + catalogFilePath := catalogFile.Name() + defer func() { + _ = catalogFile.Close() + }() + + if err := func() error { + for meta, iterErr := range seq { + if iterErr != nil { + return iterErr + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + _, err := catalogFile.Write(meta.Blob) + if err != nil { + return err + } + } + + f.mu.Lock() + defer f.mu.Unlock() + + // If there is an existing file, delete it + existing, preExists := f.files[catalog] + if preExists { + if err := os.Remove(existing); err != nil { + return err + } + } + f.files[catalog] = catalogFilePath + return nil + }(); err != nil { + return errors.Join(err, os.Remove(catalogFilePath)) + } + return nil +} + +func (f *files) Delete(_ context.Context, catalog string) error { + f.mu.Lock() + defer f.mu.Unlock() + + catalogFilePath, ok := f.files[catalog] + if !ok { + return nil + } + if err := os.Remove(catalogFilePath); err != nil { + return err + } + delete(f.files, catalog) + return nil +} + +func (f *files) Exists(catalog string) bool { + f.mu.RLock() + defer f.mu.RUnlock() + + catalogFilePath, isCatalogRegistered := f.files[catalog] + if !isCatalogRegistered { + return false + } + catalogFileStat, err := os.Stat(catalogFilePath) + if err != nil { + return false + } + return !catalogFileStat.IsDir() +} + +func (f *files) Get(catalog string) (*os.File, error) { + f.mu.RLock() + defer f.mu.RUnlock() + filePath, ok := f.files[catalog] + if !ok { + return nil, os.ErrNotExist + } + return os.Open(filePath) +} diff --git a/internal/catalogd/storage/graphql.go b/internal/catalogd/storage/graphql.go new file mode 100644 index 000000000..1d8de788e --- /dev/null +++ b/internal/catalogd/storage/graphql.go @@ -0,0 +1,74 @@ +package storage + +import ( + "context" + "iter" + "os" + "sync" + + "github.com/graphql-go/graphql" + + "github.com/operator-framework/operator-registry/alpha/declcfg" + + "github.com/operator-framework/operator-controller/internal/catalogd/storage/index" + catalogdgraphql "github.com/operator-framework/operator-controller/internal/catalogd/storage/internal/graphql" +) + +var _ Instance = (*graphQLSchemas)(nil) + +type graphQLSchemas struct { + schemas map[string]*graphql.Schema + mu sync.RWMutex +} + +type GraphQLSchemas interface { + Get(catalogName string) (*graphql.Schema, error) +} + +func newGraphQLSchemas() *graphQLSchemas { + return &graphQLSchemas{ + schemas: make(map[string]*graphql.Schema), + } +} + +func (i *graphQLSchemas) Store(ctx context.Context, catalog string, seq iter.Seq2[*declcfg.Meta, error]) error { + schema, err := catalogdgraphql.GenerateSchema(ctx, seq) + if err != nil { + return err + } + + i.mu.Lock() + defer i.mu.Unlock() + i.schemas[catalog] = schema + return nil +} + +func (i *graphQLSchemas) Delete(_ context.Context, catalog string) error { + i.mu.Lock() + defer i.mu.Unlock() + + delete(i.schemas, catalog) + return nil +} + +func (i *graphQLSchemas) Exists(catalog string) bool { + i.mu.RLock() + defer i.mu.RUnlock() + + _, schemaExists := i.schemas[catalog] + return schemaExists +} + +func (i *graphQLSchemas) Get(catalog string) (*graphql.Schema, error) { + i.mu.RLock() + defer i.mu.RUnlock() + schema, ok := i.schemas[catalog] + if !ok { + return nil, os.ErrNotExist + } + return schema, nil +} + +func ContextWithCatalogData(ctx context.Context, catalogFile *os.File, catalogIndex *index.Index) context.Context { + return catalogdgraphql.ContextWithCatalogData(ctx, catalogFile, catalogIndex) +} diff --git a/internal/catalogd/storage/index.go b/internal/catalogd/storage/index/index.go similarity index 66% rename from internal/catalogd/storage/index.go rename to internal/catalogd/storage/index/index.go index 510e23ff0..bfc42cd89 100644 --- a/internal/catalogd/storage/index.go +++ b/internal/catalogd/storage/index/index.go @@ -1,10 +1,13 @@ -package storage +package index import ( "cmp" + "context" "encoding/json" "fmt" "io" + "iter" + "os" "slices" "k8s.io/apimachinery/pkg/util/sets" @@ -12,24 +15,88 @@ import ( "github.com/operator-framework/operator-registry/alpha/declcfg" ) -// index is an index of sections of an FBC file used to lookup FBC blobs that -// match any combination of their schema, package, and name fields. - -// This index strikes a balance between space and performance. It indexes each field -// separately, and performs logical set intersections at lookup time in order to implement -// a multi-parameter query. -// -// Note: it is permissible to change the indexing algorithm later if it is necessary to -// tune the space / performance tradeoff. However care should be taken to ensure -// that the actual content returned by the index remains identical, as users of the index -// may be sensitive to differences introduced by index algorithm changes (e.g. if the -// order of the returned sections changes). +type Index struct { + idx *index +} + +func (i *Index) Get(r io.ReaderAt, schema, packageName, name string) io.Reader { + return i.idx.get(r, schema, packageName, name) +} + +func New(ctx context.Context, metas iter.Seq2[*declcfg.Meta, error]) (*Index, error) { + idx, err := newIndex(ctx, metas) + if err != nil { + return nil, err + } + return &Index{idx: idx}, nil +} + +func ReadFile(indexFilePath string) (*Index, error) { + indexFile, err := os.Open(indexFilePath) + if err != nil { + return nil, err + } + + dec := json.NewDecoder(indexFile) + var idx index + if err := dec.Decode(&idx); err != nil { + return nil, err + } + return &Index{idx: &idx}, nil +} + +var _ io.WriterTo = (*Index)(nil) + +func (i *Index) WriteTo(w io.Writer) (int64, error) { + data, err := json.Marshal(i.idx) + if err != nil { + return -1, err + } + written, err := w.Write(data) + return int64(written), err +} + type index struct { BySchema map[string][]section `json:"by_schema"` ByPackage map[string][]section `json:"by_package"` ByName map[string][]section `json:"by_name"` } +func newIndex(ctx context.Context, metas iter.Seq2[*declcfg.Meta, error]) (*index, error) { + idx := &index{ + BySchema: make(map[string][]section), + ByPackage: make(map[string][]section), + ByName: make(map[string][]section), + } + offset := int64(0) + for meta, iterErr := range metas { + if iterErr != nil { + return nil, iterErr + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + start := offset + length := int64(len(meta.Blob)) + offset += length + + s := section{offset: start, length: length} + if meta.Schema != "" { + idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) + } + if meta.Package != "" { + idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) + } + if meta.Name != "" { + idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + } + } + return idx, nil +} + // A section is the byte offset and length of an FBC blob within the file. type section struct { offset int64 @@ -50,7 +117,7 @@ func (s *section) UnmarshalJSON(b []byte) error { return nil } -func (i index) Get(r io.ReaderAt, schema, packageName, name string) io.Reader { +func (i *index) get(r io.ReaderAt, schema, packageName, name string) io.Reader { sectionSet := i.getSectionSet(schema, packageName, name) sections := sectionSet.UnsortedList() @@ -91,29 +158,3 @@ func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section return sectionSet } - -func newIndex(metasChan <-chan *declcfg.Meta) *index { - idx := &index{ - BySchema: make(map[string][]section), - ByPackage: make(map[string][]section), - ByName: make(map[string][]section), - } - offset := int64(0) - for meta := range metasChan { - start := offset - length := int64(len(meta.Blob)) - offset += length - - s := section{offset: start, length: length} - if meta.Schema != "" { - idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) - } - if meta.Package != "" { - idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) - } - if meta.Name != "" { - idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) - } - } - return idx -} diff --git a/internal/catalogd/storage/index_test.go b/internal/catalogd/storage/index/index_test.go similarity index 94% rename from internal/catalogd/storage/index_test.go rename to internal/catalogd/storage/index/index_test.go index 66dee0c13..ff61d1ec7 100644 --- a/internal/catalogd/storage/index_test.go +++ b/internal/catalogd/storage/index/index_test.go @@ -1,9 +1,10 @@ -package storage +package index import ( "bytes" "encoding/json" "io" + "iter" "testing" "github.com/stretchr/testify/require" @@ -11,6 +12,16 @@ import ( "github.com/operator-framework/operator-registry/alpha/declcfg" ) +func metasSeq(metas []*declcfg.Meta) iter.Seq2[*declcfg.Meta, error] { + return func(yield func(*declcfg.Meta, error) bool) { + for _, meta := range metas { + if !yield(meta, nil) { + return + } + } + } +} + func TestIndexCreation(t *testing.T) { // Create test Meta objects metas := []*declcfg.Meta{ @@ -28,15 +39,9 @@ func TestIndexCreation(t *testing.T) { }, } - // Create channel and feed Metas - metasChan := make(chan *declcfg.Meta, len(metas)) - for _, meta := range metas { - metasChan <- meta - } - close(metasChan) - // Create index - idx := newIndex(metasChan) + idx, err := newIndex(t.Context(), metasSeq(metas)) + require.NoError(t, err) // Verify schema index require.Len(t, idx.BySchema, 2, "Expected 2 schema entries, got %d", len(idx.BySchema)) @@ -158,14 +163,8 @@ func TestIndexGet(t *testing.T) { }, } - // Create and populate the index - metasChan := make(chan *declcfg.Meta, len(metas)) - for _, meta := range metas { - metasChan <- meta - } - close(metasChan) - - idx := newIndex(metasChan) + idx, err := newIndex(t.Context(), metasSeq(metas)) + require.NoError(t, err) // Create a reader from the metas var combinedBlob bytes.Buffer @@ -253,7 +252,7 @@ func TestIndexGet(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - reader := idx.Get(fullData, tt.schema, tt.packageName, tt.blobName) + reader := idx.get(fullData, tt.schema, tt.packageName, tt.blobName) content, err := io.ReadAll(reader) require.NoError(t, err, "Failed to read content: %v", err) diff --git a/internal/catalogd/storage/indices.go b/internal/catalogd/storage/indices.go new file mode 100644 index 000000000..89b89112f --- /dev/null +++ b/internal/catalogd/storage/indices.go @@ -0,0 +1,124 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "iter" + "os" + "sync" + + "golang.org/x/sync/singleflight" + + "github.com/operator-framework/operator-registry/alpha/declcfg" + + "github.com/operator-framework/operator-controller/internal/catalogd/storage/index" +) + +var _ Instance = (*indices)(nil) + +type indices struct { + rootDir string + indices map[string]string + mu sync.RWMutex + + sf singleflight.Group +} + +type Indices interface { + Get(catalogName string) (*index.Index, error) +} + +func newIndices(rootDir string) *indices { + return &indices{ + rootDir: rootDir, + indices: make(map[string]string), + } +} + +func (i *indices) Store(ctx context.Context, catalog string, seq iter.Seq2[*declcfg.Meta, error]) error { + indexFile, err := os.CreateTemp(i.rootDir, fmt.Sprintf("catalog-index-%s-*.json", catalog)) + if err != nil { + return err + } + indexFilePath := indexFile.Name() + defer func() { + _ = indexFile.Close() + }() + + if err := func() error { + idx, err := index.New(ctx, seq) + if err != nil { + return err + } + if _, err := idx.WriteTo(indexFile); err != nil { + return err + } + + i.mu.Lock() + defer i.mu.Unlock() + + // If there is an existing file, delete it + existing, preExists := i.indices[catalog] + if preExists { + if err := os.Remove(existing); err != nil { + return err + } + } + i.indices[catalog] = indexFilePath + return nil + }(); err != nil { + return errors.Join(err, os.Remove(indexFilePath)) + } + return nil +} + +func (i *indices) Delete(_ context.Context, catalog string) error { + i.mu.Lock() + defer i.mu.Unlock() + + indexFilePath, ok := i.indices[catalog] + if !ok { + return nil + } + if err := os.Remove(indexFilePath); err != nil { + return err + } + delete(i.indices, catalog) + return nil +} + +func (i *indices) Exists(catalog string) bool { + i.mu.RLock() + defer i.mu.RUnlock() + + indexFilePath, isCatalogRegistered := i.indices[catalog] + if !isCatalogRegistered { + return false + } + indexFileStat, err := os.Stat(indexFilePath) + if err != nil { + return false + } + return !indexFileStat.IsDir() +} + +func (i *indices) Get(catalog string) (*index.Index, error) { + i.mu.RLock() + defer i.mu.RUnlock() + indexFilePath, ok := i.indices[catalog] + if !ok { + return nil, os.ErrNotExist + } + return i.readIndexFromFile(indexFilePath) +} + +func (i *indices) readIndexFromFile(indexFilePath string) (*index.Index, error) { + idx, err, _ := i.sf.Do(indexFilePath, func() (interface{}, error) { + return index.ReadFile(indexFilePath) + }) + if err != nil { + return nil, err + } + return idx.(*index.Index), nil +} diff --git a/internal/catalogd/storage/internal/graphql/context.go b/internal/catalogd/storage/internal/graphql/context.go new file mode 100644 index 000000000..35040e299 --- /dev/null +++ b/internal/catalogd/storage/internal/graphql/context.go @@ -0,0 +1,96 @@ +package graphql + +import ( + "context" + "os" + "sync" + + "github.com/itchyny/gojq" + + "github.com/operator-framework/operator-controller/internal/catalogd/storage/index" +) + +type contextKey string + +const ( + contextKeyCatalogFile contextKey = "catalogFile" + contextKeyCatalogIndex contextKey = "catalogIndex" + contextKeyJQCode contextKey = "jqCode" +) + +func ContextWithCatalogData(ctx context.Context, catalogFile *os.File, catalogIndex *index.Index) context.Context { + ctx = context.WithValue(ctx, contextKeyCatalogFile, catalogFile) + ctx = context.WithValue(ctx, contextKeyCatalogIndex, catalogIndex) + ctx = context.WithValue(ctx, contextKeyJQCode, newJQCodeMap()) + return ctx +} + +func fileFromContext(ctx context.Context) (*os.File, error) { + v := ctx.Value(contextKeyCatalogFile) + if v == nil { + return nil, os.ErrNotExist + } + return v.(*os.File), nil +} + +func indexFromContext(ctx context.Context) (*index.Index, error) { + v := ctx.Value(contextKeyCatalogIndex) + if v == nil { + return nil, os.ErrNotExist + } + return v.(*index.Index), nil +} + +func jqCodeFromContextOrCompileAndSet(ctx context.Context, query string) (*gojq.Code, error) { + v := ctx.Value(contextKeyJQCode) + if v == nil { + return nil, os.ErrNotExist + } + return v.(*jqCodeMap).getOrCompileAndSet(query) +} + +type jqCodeMap struct { + m map[string]*gojq.Code + mu sync.RWMutex +} + +func newJQCodeMap() *jqCodeMap { + return &jqCodeMap{ + m: make(map[string]*gojq.Code), + } +} + +func (m *jqCodeMap) getOrCompileAndSet(query string) (*gojq.Code, error) { + // Get a read lock to see if we already have the code compiled + m.mu.RLock() + jqCode, ok := m.m[query] + m.mu.RUnlock() + // If so, just return it + if ok { + return jqCode, nil + } + + // If not, get a write lock so that we can compile the query + m.mu.Lock() + defer m.mu.Unlock() + + // Check again to see if it was added between the time we let go + // of the read lock and grabbed the write lock. If it was, just + // return it. + if jqCode, ok = m.m[query]; ok { + return jqCode, nil + } + + // Otherwise, now we really do need to compile it, and store it + // in the map. + parsed, err := gojq.Parse(query) + if err != nil { + return nil, err + } + jqCode, err = gojq.Compile(parsed) + if err != nil { + return nil, err + } + m.m[query] = jqCode + return jqCode, nil +} diff --git a/internal/catalogd/storage/internal/graphql/json.go b/internal/catalogd/storage/internal/graphql/json.go new file mode 100644 index 000000000..0b17bcec2 --- /dev/null +++ b/internal/catalogd/storage/internal/graphql/json.go @@ -0,0 +1,87 @@ +package graphql + +import ( + "fmt" + + "github.com/graphql-go/graphql" + "github.com/graphql-go/graphql/language/ast" + "github.com/itchyny/gojq" +) + +// applyJQQuery applies a jq query to JSON data and returns the result +func applyJQQuery(data interface{}, jqCode *gojq.Code) (interface{}, error) { + iter := jqCode.Run(data) + + // Collect all results + var results []interface{} + for { + v, ok := iter.Next() + if !ok { + break + } + if err, ok := v.(error); ok { + return nil, fmt.Errorf("error executing jq query: %w", err) + } + results = append(results, v) + } + + // Return single result if only one, otherwise return array + if len(results) == 1 { + return results[0], nil + } + return results, nil +} + +// jsonScalar is a custom GraphQL scalar type that can handle arbitrary JSON objects +var jsonScalar = graphql.NewScalar(graphql.ScalarConfig{ + Name: "JSON", + Description: "The `JSON` scalar type represents JSON values as specified by " + + "[ECMA-404](http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-404.pdf).", + Serialize: func(value interface{}) interface{} { + return value + }, + ParseValue: func(value interface{}) interface{} { + return value + }, + ParseLiteral: func(valueAST ast.Value) interface{} { + switch valueAST := valueAST.(type) { + case *ast.StringValue: + return valueAST.Value + case *ast.IntValue: + return valueAST.Value + case *ast.FloatValue: + return valueAST.Value + case *ast.BooleanValue: + return valueAST.Value + default: + return nil + } + }, +}) + +func newJSONScalarField(fieldName string) *graphql.Field { + return &graphql.Field{ + Type: jsonScalar, + Args: graphql.FieldConfigArgument{ + "jq": &graphql.ArgumentConfig{ + Type: graphql.String, + Description: "jq query to apply to the JSON data", + }, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + data := p.Source.(map[string]interface{})[fieldName] + + jqQueryString, ok := p.Args["jq"].(string) + if !ok || jqQueryString == "" { + return data, nil + } + + jqCode, err := jqCodeFromContextOrCompileAndSet(p.Context, jqQueryString) + if err != nil { + return nil, err + } + + return applyJQQuery(data, jqCode) + }, + } +} diff --git a/internal/catalogd/storage/internal/graphql/name.go b/internal/catalogd/storage/internal/graphql/name.go new file mode 100644 index 000000000..3dd723587 --- /dev/null +++ b/internal/catalogd/storage/internal/graphql/name.go @@ -0,0 +1,63 @@ +package graphql + +import ( + "fmt" + "regexp" + "strings" + + "golang.org/x/text/cases" + "golang.org/x/text/language" +) + +type namer struct { + caser cases.Caser +} + +func newNamer() *namer { + return &namer{caser: cases.Title(language.English)} +} + +func sanitizeName(name string) string { + if name == "" { + panic("name is empty") + } + // Replace any invalid characters with underscores + // First, handle the first character (must be letter or underscore) + firstCharRegex := regexp.MustCompile(`^[^_a-zA-Z]`) + name = firstCharRegex.ReplaceAllString(name, "_") + + // Then handle subsequent characters (can be letters, numbers, or underscores) + restRegex := regexp.MustCompile(`[^_a-zA-Z0-9]`) + return restRegex.ReplaceAllString(name, "_") +} + +func (n *namer) TypeNameForField(prefix, name string) string { + return fmt.Sprintf("%s%s", prefix, sanitizeName(n.caser.String(name))) +} + +func (n *namer) TypeNameForSchema(schema string) string { + if schema == "" { + panic("schema is empty") + } + var sb strings.Builder + parts := strings.Split(schema, ".") + for _, part := range parts { + sb.WriteString(sanitizeName(n.caser.String(part))) + } + + return sb.String() +} + +func (n *namer) FieldNameForSchema(schema string) string { + if schema == "" { + panic("schema is empty") + } + var sb strings.Builder + parts := strings.Split(schema, ".") + sb.WriteString(sanitizeName(parts[0])) + for _, part := range parts[1:] { + sb.WriteString(sanitizeName(n.caser.String(part))) + } + + return sb.String() +} diff --git a/internal/catalogd/storage/internal/graphql/schema.go b/internal/catalogd/storage/internal/graphql/schema.go new file mode 100644 index 000000000..af2733ca5 --- /dev/null +++ b/internal/catalogd/storage/internal/graphql/schema.go @@ -0,0 +1,454 @@ +package graphql + +import ( + "context" + "encoding/json" + "fmt" + "io" + "iter" + "regexp" + + "github.com/graphql-go/graphql" + + "github.com/operator-framework/operator-registry/alpha/declcfg" +) + +// GenerateSchema processes FBC meta objects and returns a GraphQL schema that represents them. +func GenerateSchema(ctx context.Context, metas iter.Seq2[*declcfg.Meta, error]) (*graphql.Schema, error) { + schemaFieldData := make(map[string]map[string]interface{}) + for meta, iterErr := range metas { + if iterErr != nil { + return nil, fmt.Errorf("error walking FBC data: %v", iterErr) + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + // Parse the blob content + var blobContent map[string]interface{} + if err := json.Unmarshal(meta.Blob, &blobContent); err != nil { + return nil, err + } + + // Initialize schema field data if not exists + if schemaFieldData[meta.Schema] == nil { + schemaFieldData[meta.Schema] = map[string]interface{}{} + } + + // Process each field in the blob - only merge data, no GraphQL object creation + for fieldName, fieldValue := range blobContent { + // Deep merge field values to capture all possible nested fields + if existingValue, exists := schemaFieldData[meta.Schema][fieldName]; exists { + mergedValue, err := deepMerge(existingValue, fieldValue, fmt.Sprintf("%s.%s", meta.Schema, fieldName)) + if err != nil { + return nil, fmt.Errorf("failed to merge field '%s' in schema '%s': %w", + fieldName, meta.Schema, err) + } + schemaFieldData[meta.Schema][fieldName] = mergedValue + } else { + schemaFieldData[meta.Schema][fieldName] = fieldValue + } + } + } + return generateSchema(schemaFieldData) +} + +func deepMergeMap(existingMap, newMap map[string]interface{}, fieldPath string) (map[string]interface{}, error) { + for key, newValue := range newMap { + if existingValue, exists := existingMap[key]; exists { + // Recursively merge the field + mergedValue, err := deepMerge(existingValue, newValue, fieldPath+"."+key) + if err != nil { + return nil, err + } + existingMap[key] = mergedValue + } else { + // Field doesn't exist in existing, add it + existingMap[key] = newValue + } + } + return existingMap, nil +} + +func deepMergeArray(existingArray, newArray []interface{}, fieldPath string) ([]interface{}, error) { + mergeInto := func(element interface{}, itemsSlices ...[]interface{}) ([]interface{}, error) { + var err error + elementType := getValueTypeName(element) + + for _, items := range itemsSlices { + for _, item := range items { + itemType := getValueTypeName(item) + if itemType != elementType { + return nil, fmt.Errorf("cannot merge arrays at %s: different element types (%s vs %s)", + fieldPath, elementType, itemType) + } + + element, err = deepMerge(element, item, fmt.Sprintf("%s[%s]", fieldPath, elementType)) + if err != nil { + return nil, err + } + } + } + return []interface{}{element}, nil + } + + if isPropertiesPattern(existingArray) && isPropertiesPattern(newArray) { + // Property objects always have the same structure: {type: string, value: anything} + // No need to merge - just return the canonical structure + return []interface{}{ + map[string]interface{}{ + "type": "", + "value": nil, + }, + }, nil + } + + switch { + case len(existingArray) == 0 && len(newArray) == 0: + return []interface{}{}, nil + case len(existingArray) == 0: + return mergeInto(newArray[0], newArray[1:]) + case len(newArray) == 0: + return mergeInto(existingArray[0], existingArray[1:]) + default: + return mergeInto(existingArray[0], existingArray[1:], newArray) + } +} + +// deepMerge recursively merges two interface{} values, prioritizing comprehensive field coverage +// Returns an error if there are type mismatches between existing and new values +func deepMerge(existing, new interface{}, fieldPath string) (interface{}, error) { + // If existing is nil, return new + if existing == nil { + return new, nil + } + + // If new is nil, return existing + if new == nil { + return existing, nil + } + + // Check for type mismatches + existingType := getValueTypeName(existing) + newType := getValueTypeName(new) + + if existingType != newType { + return nil, fmt.Errorf("type mismatch for field '%s': existing type '%s' conflicts with new type '%s'", + fieldPath, existingType, newType) + } + + // If both are maps, merge them. + existingMap, existingIsMap := existing.(map[string]interface{}) + newMap, newIsMap := new.(map[string]interface{}) + if existingIsMap && newIsMap { + return deepMergeMap(existingMap, newMap, fieldPath) + } + + // If both are arrays, merge them: + // 1. Validate that all elements have the same type + // 2. Merge all elements into a single representative element + // 3. Return the merged element as a single-element array + existingArray, existingIsArray := existing.([]interface{}) + newArray, newIsArray := new.([]interface{}) + if existingIsArray && newIsArray { + return deepMergeArray(existingArray, newArray, fieldPath) + } + + // For primitives of the same type, prefer the existing value + // (we already have a sample, both are valid examples) + return existing, nil +} + +// GenerateFBCSchema generates a complete GraphQL schema from accumulated data - Phase 2: Schema Generation +func generateSchema(schemaFieldData map[string]map[string]interface{}) (*graphql.Schema, error) { + if len(schemaFieldData) == 0 { + return nil, fmt.Errorf("no schema data available - no Meta objects have been processed") + } + + // Build GraphQL types for each schema + graphqlNamer := newNamer() + rootFields := make(graphql.Fields, len(schemaFieldData)) + for schema, protoObj := range schemaFieldData { + // Create GraphQL type for this schema using accumulated field data + gqlType := generateTypeForSchema(schema, protoObj, graphqlNamer) + + // Create field name from schema (e.g., "olm.package" -> "olmPackage") + fieldName := graphqlNamer.FieldNameForSchema(schema) + + // Add field to root query + rootFields[fieldName] = &graphql.Field{ + Type: graphql.NewList(gqlType), + Args: graphql.FieldConfigArgument{ + "name": &graphql.ArgumentConfig{ + Type: graphql.String, + Description: "Filter by name field", + }, + "package": &graphql.ArgumentConfig{ + Type: graphql.String, + Description: "Filter by package field", + }, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + file, err := fileFromContext(p.Context) + if err != nil { + return nil, err + } + + idx, err := indexFromContext(p.Context) + if err != nil { + return nil, err + } + + // Get filter arguments + nameFilter, _ := p.Args["name"].(string) + packageFilter, _ := p.Args["package"].(string) + + // Parse the data using streaming JSON decoder + reader := idx.Get(file, schema, packageFilter, nameFilter) + decoder := json.NewDecoder(reader) + result := make([]map[string]interface{}, 0) + + for { + var parsedContent map[string]interface{} + if err := decoder.Decode(&parsedContent); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("failed to parse JSON: %w", err) + } + result = append(result, parsedContent) + } + + return result, nil + }, + } + } + + // Create root query type + rootQuery := graphql.NewObject(graphql.ObjectConfig{ + Name: "Query", + Fields: rootFields, + }) + + // Create and return the schema + schema, err := graphql.NewSchema(graphql.SchemaConfig{ + Query: rootQuery, + }) + if err != nil { + return nil, fmt.Errorf("failed to create GraphQL schema: %w", err) + } + + return &schema, nil +} + +// generateTypeForSchema creates a GraphQL type for a specific schema using stored field information +func generateTypeForSchema(schema string, schemaFields map[string]interface{}, namer *namer) *graphql.Object { + typeName := namer.TypeNameForSchema(schema) + + // Create GraphQL fields based on discovered fields + gqlFields := make(graphql.Fields, len(schemaFields)) + for fieldName, fieldValue := range schemaFields { + // Determine GraphQL type based on the field value with context + gqlType := inferGraphQLTypeFromValueWithContext(typeName, fieldName, fieldValue, namer) + + var field *graphql.Field + if fieldName == "properties" && isPropertiesPattern(fieldValue) { + field = &graphql.Field{ + Type: gqlType, + Args: graphql.FieldConfigArgument{ + "type": &graphql.ArgumentConfig{ + Type: graphql.String, + Description: "Filter properties by type field", + }, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return filterProperties(p.Source.(map[string]interface{})[fieldName].([]interface{}), p.Args), nil + }, + } + } else { + field = &graphql.Field{ + Type: gqlType, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return p.Source.(map[string]interface{})[fieldName], nil + }, + } + } + + gqlFields[fieldName] = field + } + + // Create the object type + objectType := graphql.NewObject(graphql.ObjectConfig{ + Name: typeName, + Fields: gqlFields, + }) + + return objectType +} + +// isPropertiesPattern checks if a value matches the OLM properties pattern. +func isPropertiesPattern(value interface{}) bool { + arr, ok := value.([]interface{}) + if !ok || len(arr) == 0 { + return false + } + + // Check if all elements have the expected structure for properties + for _, item := range arr { + if itemMap, ok := item.(map[string]interface{}); !ok || !isPropertyObject(itemMap) { + return false + } + } + return true +} + +// isPropertyObject checks if this looks like a property object (has type(String) and value(Any) fields) +func isPropertyObject(obj map[string]interface{}) bool { + typeVal, hasType := obj["type"] + _, typeIsString := typeVal.(string) + _, hasValue := obj["value"] + return hasType && typeIsString && hasValue && len(obj) == 2 +} + +// createPropertiesType creates a properties type using JSON scalar for arbitrary data +func createPropertiesType(parentName, name string, namer *namer) *graphql.Object { + // Generate a type name for properties + typeName := namer.TypeNameForField(parentName, name) + + objectType := graphql.NewObject(graphql.ObjectConfig{ + Name: typeName, + Fields: graphql.Fields{ + "type": &graphql.Field{ + Type: graphql.String, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + return p.Source.(map[string]interface{})["type"], nil + }, + }, + "value": newJSONScalarField("value"), + }, + }) + + return objectType +} + +// filterProperties applies filtering to properties arrays based on arguments +func filterProperties(properties []interface{}, args map[string]interface{}) []interface{} { + // Get the type filter argument + typeFilter, _ := args["type"].(string) + + // If no type filter, return all properties + if typeFilter == "" { + return properties + } + + // Filter properties by type + var filtered []interface{} + for _, prop := range properties { + if propObj, ok := prop.(map[string]interface{}); ok { + if propType, ok := propObj["type"].(string); ok && propType == typeFilter { + filtered = append(filtered, prop) + } + } + } + + return filtered +} + +// isValidGraphQLFieldName checks if a string is a valid GraphQL field name +// GraphQL field names must match /^[_a-zA-Z][_a-zA-Z0-9]*$/ +func isValidGraphQLFieldName(name string) bool { + return regexp.MustCompile(`^[_a-zA-Z][_a-zA-Z0-9]*$`).MatchString(name) +} + +// hasNoValidGraphQLFields checks if a map contains no valid GraphQL field names +func hasAllValidGraphQLFields(mapValue map[string]interface{}) bool { + for key := range mapValue { + if !isValidGraphQLFieldName(key) { + return false + } + } + return true +} + +// inferGraphQLTypeFromValueWithContext infers a GraphQL type from a value with context +func inferGraphQLTypeFromValueWithContext(parentName, name string, value interface{}, namer *namer) graphql.Type { + switch v := value.(type) { + case string: + return graphql.String + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + return graphql.Int + case float32, float64: + return graphql.Float + case bool: + return graphql.Boolean + case []interface{}: + // Check if this matches the OLM properties pattern + if isPropertiesPattern(value) { + return graphql.NewList(createPropertiesType(parentName, name, namer)) + } + + // For regular arrays, create a list type using the first element + if len(v) > 0 { + elementType := inferGraphQLTypeFromValueWithContext(namer.TypeNameForField(parentName, name), "Item", v[0], namer) + return graphql.NewList(elementType) + } + return graphql.NewList(jsonScalar) + case map[string]interface{}: + // Check if this map is empty or has no valid GraphQL fields + if len(v) == 0 || !hasAllValidGraphQLFields(v) { + return jsonScalar + } + return generateObjectTypeFromValue(namer.TypeNameForField(parentName, name), v, namer) + case nil: + return graphql.String + default: + return graphql.String // Default fallback + } +} + +// generateObjectTypeFromValue creates a GraphQL object type from a value using JSON scalar approach +func generateObjectTypeFromValue(name string, value map[string]interface{}, namer *namer) *graphql.Object { + // Create fields for this object + fields := make(graphql.Fields, len(value)) + + for fieldName, fieldValue := range value { + // Use JSON scalar for all property values to handle mixed primitive/object types + fields[fieldName] = &graphql.Field{ + Name: fieldName, + Type: inferGraphQLTypeFromValueWithContext(name, fieldName, fieldValue, namer), + } + } + + // Create the object type + objectType := graphql.NewObject(graphql.ObjectConfig{ + Name: name, + Fields: fields, + }) + + return objectType +} + +// getValueTypeName returns a type name for a value for consistency checking +func getValueTypeName(value interface{}) string { + switch value.(type) { + case string: + return "string" + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + return "number" + case float32, float64: + return "number" + case bool: + return "boolean" + case []interface{}: + return "array" + case map[string]interface{}: + return "object" + case nil: + return "null" + default: + panic(fmt.Sprintf("unknown value type: %T", value)) + } +} diff --git a/internal/catalogd/storage/localdir.go b/internal/catalogd/storage/localdir.go deleted file mode 100644 index 44ef65c58..000000000 --- a/internal/catalogd/storage/localdir.go +++ /dev/null @@ -1,330 +0,0 @@ -package storage - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/fs" - "net/http" - "net/url" - "os" - "path/filepath" - "sync" - - "golang.org/x/sync/errgroup" - "golang.org/x/sync/singleflight" - "k8s.io/apimachinery/pkg/util/sets" - - "github.com/operator-framework/operator-registry/alpha/declcfg" -) - -// LocalDirV1 is a storage Instance. When Storing a new FBC contained in -// fs.FS, the content is first written to a temporary file, after which -// it is copied to its final destination in RootDir/.jsonl. This is -// done so that clients accessing the content stored in RootDir/.json1 -// have an atomic view of the content for a catalog. -type LocalDirV1 struct { - RootDir string - RootURL *url.URL - EnableMetasHandler bool - - m sync.RWMutex - // this singleflight Group is used in `getIndex()`` to handle concurrent HTTP requests - // optimally. With the use of this slightflight group, the index is loaded from disk - // once per concurrent group of HTTP requests being handled by the metas handler. - // The single flight instance gives us a way to load the index from disk exactly once - // per concurrent group of callers, and then let every concurrent caller have access to - // the loaded index. This avoids lots of unnecessary open/decode/close cycles when concurrent - // requests are being handled, which improves overall performance and decreases response latency. - sf singleflight.Group -} - -var ( - _ Instance = (*LocalDirV1)(nil) - errInvalidParams = errors.New("invalid parameters") -) - -func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { - s.m.Lock() - defer s.m.Unlock() - - if err := os.MkdirAll(s.RootDir, 0700); err != nil { - return err - } - tmpCatalogDir, err := os.MkdirTemp(s.RootDir, fmt.Sprintf(".%s-*", catalog)) - if err != nil { - return err - } - defer os.RemoveAll(tmpCatalogDir) - - storeMetaFuncs := []storeMetasFunc{storeCatalogData} - if s.EnableMetasHandler { - storeMetaFuncs = append(storeMetaFuncs, storeIndexData) - } - - eg, egCtx := errgroup.WithContext(ctx) - metaChans := []chan *declcfg.Meta{} - - for range storeMetaFuncs { - metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) - } - for i, f := range storeMetaFuncs { - eg.Go(func() error { - return f(tmpCatalogDir, metaChans[i]) - }) - } - err = declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - for _, ch := range metaChans { - select { - case ch <- meta: - case <-egCtx.Done(): - return egCtx.Err() - } - } - return nil - }, declcfg.WithConcurrency(1)) - for _, ch := range metaChans { - close(ch) - } - if err != nil { - return fmt.Errorf("error walking FBC root: %w", err) - } - - if err := eg.Wait(); err != nil { - return err - } - - catalogDir := s.catalogDir(catalog) - return errors.Join( - os.RemoveAll(catalogDir), - os.Rename(tmpCatalogDir, catalogDir), - ) -} - -func (s *LocalDirV1) Delete(catalog string) error { - s.m.Lock() - defer s.m.Unlock() - - return os.RemoveAll(s.catalogDir(catalog)) -} - -func (s *LocalDirV1) ContentExists(catalog string) bool { - s.m.RLock() - defer s.m.RUnlock() - - catalogFileStat, err := os.Stat(catalogFilePath(s.catalogDir(catalog))) - if err != nil { - return false - } - if !catalogFileStat.Mode().IsRegular() { - // path is not valid content - return false - } - - if s.EnableMetasHandler { - indexFileStat, err := os.Stat(catalogIndexFilePath(s.catalogDir(catalog))) - if err != nil { - return false - } - if !indexFileStat.Mode().IsRegular() { - return false - } - } - return true -} - -func (s *LocalDirV1) catalogDir(catalog string) string { - return filepath.Join(s.RootDir, catalog) -} - -func catalogFilePath(catalogDir string) string { - return filepath.Join(catalogDir, "catalog.jsonl") -} - -func catalogIndexFilePath(catalogDir string) string { - return filepath.Join(catalogDir, "index.json") -} - -type storeMetasFunc func(catalogDir string, metaChan <-chan *declcfg.Meta) error - -func storeCatalogData(catalogDir string, metas <-chan *declcfg.Meta) error { - f, err := os.Create(catalogFilePath(catalogDir)) - if err != nil { - return err - } - defer f.Close() - - for m := range metas { - if _, err := f.Write(m.Blob); err != nil { - return err - } - } - return nil -} - -func storeIndexData(catalogDir string, metas <-chan *declcfg.Meta) error { - idx := newIndex(metas) - - f, err := os.Create(catalogIndexFilePath(catalogDir)) - if err != nil { - return err - } - defer f.Close() - - enc := json.NewEncoder(f) - enc.SetEscapeHTML(false) - return enc.Encode(idx) -} - -func (s *LocalDirV1) BaseURL(catalog string) string { - return s.RootURL.JoinPath(catalog).String() -} - -func (s *LocalDirV1) StorageServerHandler() http.Handler { - mux := http.NewServeMux() - - mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path, s.handleV1All) - if s.EnableMetasHandler { - mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "metas").Path, s.handleV1Metas) - } - allowedMethodsHandler := func(next http.Handler, allowedMethods ...string) http.Handler { - allowedMethodSet := sets.New[string](allowedMethods...) - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !allowedMethodSet.Has(r.Method) { - http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) - return - } - next.ServeHTTP(w, r) - }) - } - return allowedMethodsHandler(mux, http.MethodGet, http.MethodHead) -} - -func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { - s.m.RLock() - defer s.m.RUnlock() - - catalog := r.PathValue("catalog") - catalogFile, catalogStat, err := s.catalogData(catalog) - if err != nil { - httpError(w, err) - return - } - w.Header().Add("Content-Type", "application/jsonl") - http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile) -} - -func (s *LocalDirV1) handleV1Metas(w http.ResponseWriter, r *http.Request) { - s.m.RLock() - defer s.m.RUnlock() - - // Check for unexpected query parameters - expectedParams := map[string]bool{ - "schema": true, - "package": true, - "name": true, - } - - for param := range r.URL.Query() { - if !expectedParams[param] { - httpError(w, errInvalidParams) - return - } - } - - catalog := r.PathValue("catalog") - catalogFile, catalogStat, err := s.catalogData(catalog) - if err != nil { - httpError(w, err) - return - } - defer catalogFile.Close() - - w.Header().Set("Last-Modified", catalogStat.ModTime().UTC().Format(timeFormat)) - done := checkPreconditions(w, r, catalogStat.ModTime()) - if done { - return - } - - schema := r.URL.Query().Get("schema") - pkg := r.URL.Query().Get("package") - name := r.URL.Query().Get("name") - - if schema == "" && pkg == "" && name == "" { - // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) - serveJSONLines(w, r, catalogFile) - return - } - idx, err := s.getIndex(catalog) - if err != nil { - httpError(w, err) - return - } - indexReader := idx.Get(catalogFile, schema, pkg, name) - serveJSONLines(w, r, indexReader) -} - -func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { - catalogFile, err := os.Open(catalogFilePath(s.catalogDir(catalog))) - if err != nil { - return nil, nil, err - } - catalogFileStat, err := catalogFile.Stat() - if err != nil { - return nil, nil, err - } - return catalogFile, catalogFileStat, nil -} - -func httpError(w http.ResponseWriter, err error) { - var code int - switch { - case errors.Is(err, fs.ErrNotExist): - code = http.StatusNotFound - case errors.Is(err, fs.ErrPermission): - code = http.StatusForbidden - case errors.Is(err, errInvalidParams): - code = http.StatusBadRequest - default: - code = http.StatusInternalServerError - } - http.Error(w, fmt.Sprintf("%d %s", code, http.StatusText(code)), code) -} - -func serveJSONLines(w http.ResponseWriter, r *http.Request, rs io.Reader) { - w.Header().Add("Content-Type", "application/jsonl") - // Copy the content of the reader to the response writer - // only if it's a Get request - if r.Method == http.MethodHead { - return - } - _, err := io.Copy(w, rs) - if err != nil { - httpError(w, err) - return - } -} - -func (s *LocalDirV1) getIndex(catalog string) (*index, error) { - idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { - indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) - if err != nil { - return nil, err - } - defer indexFile.Close() - var idx index - if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { - return nil, err - } - return &idx, nil - }) - if err != nil { - return nil, err - } - return idx.(*index), nil -} diff --git a/internal/catalogd/storage/localdir_test.go b/internal/catalogd/storage/localdir_test.go deleted file mode 100644 index 72aafba1c..000000000 --- a/internal/catalogd/storage/localdir_test.go +++ /dev/null @@ -1,636 +0,0 @@ -package storage - -import ( - "bytes" - "context" - "errors" - "fmt" - "io" - "io/fs" - "net/http" - "net/http/httptest" - "net/url" - "os" - "strings" - "sync" - "testing" - "testing/fstest" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/operator-framework/operator-registry/alpha/declcfg" -) - -const urlPrefix = "/catalogs/" - -func TestLocalDirStoraget(t *testing.T) { - tests := []struct { - name string - setup func(*testing.T) (*LocalDirV1, fs.FS) - test func(*testing.T, *LocalDirV1, fs.FS) - cleanup func(*testing.T, *LocalDirV1) - }{ - { - name: "store and retrieve catalog content", - setup: func(t *testing.T) (*LocalDirV1, fs.FS) { - s := &LocalDirV1{ - RootDir: t.TempDir(), - RootURL: &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix}, - } - return s, createTestFS(t) - }, - test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { - const catalog = "test-catalog" - - // Initially content should not exist - if s.ContentExists(catalog) { - t.Fatal("content should not exist before store") - } - - // Store the content - if err := s.Store(context.Background(), catalog, fsys); err != nil { - t.Fatal(err) - } - - // Verify content exists after store - if !s.ContentExists(catalog) { - t.Fatal("content should exist after store") - } - - // Delete the content - if err := s.Delete(catalog); err != nil { - t.Fatal(err) - } - - // Verify content no longer exists - if s.ContentExists(catalog) { - t.Fatal("content should not exist after delete") - } - }, - }, - { - name: "storing with metas handler enabled should create indices", - setup: func(t *testing.T) (*LocalDirV1, fs.FS) { - s := &LocalDirV1{ - RootDir: t.TempDir(), - EnableMetasHandler: true, - } - return s, createTestFS(t) - }, - test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { - err := s.Store(context.Background(), "test-catalog", fsys) - if err != nil { - t.Fatal(err) - } - - if !s.ContentExists("test-catalog") { - t.Error("content should exist after store") - } - - // Verify index file was created - indexPath := catalogIndexFilePath(s.catalogDir("test-catalog")) - if _, err := os.Stat(indexPath); err != nil { - t.Errorf("index file should exist: %v", err) - } - }, - }, - { - name: "concurrent reads during write should not cause data race", - setup: func(t *testing.T) (*LocalDirV1, fs.FS) { - dir := t.TempDir() - s := &LocalDirV1{RootDir: dir} - return s, createTestFS(t) - }, - test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { - const catalog = "test-catalog" - var wg sync.WaitGroup - - // Start multiple concurrent readers - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < 100; j++ { - s.ContentExists(catalog) - } - }() - } - - // Write while readers are active - err := s.Store(context.Background(), catalog, fsys) - if err != nil { - t.Fatal(err) - } - - wg.Wait() - }, - }, - { - name: "delete nonexistent catalog", - setup: func(t *testing.T) (*LocalDirV1, fs.FS) { - return &LocalDirV1{RootDir: t.TempDir()}, nil - }, - test: func(t *testing.T, s *LocalDirV1, _ fs.FS) { - err := s.Delete("nonexistent") - if err != nil { - t.Errorf("expected no error deleting nonexistent catalog, got: %v", err) - } - }, - }, - { - name: "store with invalid permissions", - setup: func(t *testing.T) (*LocalDirV1, fs.FS) { - dir := t.TempDir() - // Set directory permissions to deny access - if err := os.Chmod(dir, 0000); err != nil { - t.Fatal(err) - } - return &LocalDirV1{RootDir: dir}, createTestFS(t) - }, - test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { - err := s.Store(context.Background(), "test-catalog", fsys) - if !errors.Is(err, fs.ErrPermission) { - t.Errorf("expected permission error, got: %v", err) - } - }, - cleanup: func(t *testing.T, s *LocalDirV1) { - // Restore permissions so cleanup can succeed - require.NoError(t, os.Chmod(s.RootDir, 0700)) - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s, fsys := tt.setup(t) - tt.test(t, s, fsys) - if tt.cleanup != nil { - tt.cleanup(t, s) - } - }) - } -} - -func TestLocalDirServerHandler(t *testing.T) { - store := &LocalDirV1{RootDir: t.TempDir(), RootURL: &url.URL{Path: urlPrefix}} - if store.Store(context.Background(), "test-catalog", createTestFS(t)) != nil { - t.Fatal("failed to store test catalog and start server") - } - - testServer := httptest.NewServer(store.StorageServerHandler()) - defer testServer.Close() - - for _, tc := range []struct { - name string - URLPath string - expectedStatusCode int - expectedContent string - }{ - { - name: "Server returns 404 when root URL is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "", - }, - { - name: "Server returns 404 when path '/' is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "/", - }, - { - name: "Server returns 404 when path '/catalogs/' is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "/catalogs/", - }, - { - name: "Server returns 404 when path '/catalogs//' is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "/catalogs/test-catalog/", - }, - { - name: "Server returns 404 when path '/catalogs//api/' is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "/catalogs/test-catalog/api/", - }, - { - name: "Serer return 404 when path '/catalogs//api/v1' is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "/catalogs/test-catalog/api/v1c", - }, - { - name: "Server return 404 when path '/catalogs//non-existent.txt' is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "/catalogs/test-catalog/non-existent.txt", - }, - { - name: "Server returns 404 when path '/catalogs/.jsonl' is queried even if the file exists, since we don't serve the filesystem, and serve an API instead", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 page not found", - URLPath: "/catalogs/test-catalog.jsonl", - }, - { - name: "Server returns 404 when non-existent catalog is queried", - expectedStatusCode: http.StatusNotFound, - expectedContent: "404 Not Found", - URLPath: "/catalogs/non-existent-catalog/api/v1/all", - }, - { - name: "Server returns 200 with json-lines payload when path '/catalogs//api/v1/all' is queried, when catalog exists", - expectedStatusCode: http.StatusOK, - expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}` + "\n" + - `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}` + "\n" + - `{"entries":[{"name":"bundle.v0.0.1"}],"name":"preview_test","package":"webhook_operator_test","schema":"olm.channel"}`, - URLPath: "/catalogs/test-catalog/api/v1/all", - }, - } { - t.Run(tc.name, func(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServer.URL, tc.URLPath), nil) - require.NoError(t, err) - req.Header.Set("Accept-Encoding", "gzip") - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - - require.Equal(t, tc.expectedStatusCode, resp.StatusCode) - if resp.StatusCode == http.StatusOK { - assert.Equal(t, "application/jsonl", resp.Header.Get("Content-Type")) - } - - actualContent, err := io.ReadAll(resp.Body) - require.NoError(t, err) - - require.Equal(t, strings.TrimSpace(tc.expectedContent), strings.TrimSpace(string(actualContent))) - require.NoError(t, resp.Body.Close()) - }) - } -} - -// Tests to verify the behavior of the metas endpoint, as described in -// https://docs.google.com/document/d/1s6_9IFEKGQLNh3ueH7SF4Yrx4PW9NSiNFqFIJx0pU-8/ -func TestMetasEndpoint(t *testing.T) { - store := &LocalDirV1{ - RootDir: t.TempDir(), - RootURL: &url.URL{Path: urlPrefix}, - EnableMetasHandler: true, - } - if store.Store(context.Background(), "test-catalog", createTestFS(t)) != nil { - t.Fatal("failed to store test catalog") - } - testServer := httptest.NewServer(store.StorageServerHandler()) - - testCases := []struct { - name string - initRequest func(req *http.Request) error - queryParams string - expectedStatusCode int - expectedContent string - }{ - { - name: "valid query with package schema", - queryParams: "?schema=olm.package", - expectedStatusCode: http.StatusOK, - expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, - }, - { - name: "valid query with schema and name combination", - queryParams: "?schema=olm.package&name=webhook_operator_test", - expectedStatusCode: http.StatusOK, - expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, - }, - { - name: "valid query with channel schema and package name combination", - queryParams: "?schema=olm.channel&package=webhook_operator_test", - expectedStatusCode: http.StatusOK, - expectedContent: `{"entries":[{"name":"bundle.v0.0.1"}],"name":"preview_test","package":"webhook_operator_test","schema":"olm.channel"}`, - }, - { - name: "query with all meta fields", - queryParams: "?schema=olm.bundle&package=webhook_operator_test&name=bundle.v0.0.1", - expectedStatusCode: http.StatusOK, - expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, - }, - { - name: "valid query for package schema for a package that does not exist", - queryParams: "?schema=olm.package&name=not-present", - expectedStatusCode: http.StatusOK, - expectedContent: "", - }, - { - name: "valid query with package and name", - queryParams: "?package=webhook_operator_test&name=bundle.v0.0.1", - expectedStatusCode: http.StatusOK, - expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, - }, - { - name: "query with non-existent schema", - queryParams: "?schema=non_existent_schema", - expectedStatusCode: http.StatusOK, - expectedContent: "", - }, - { - name: "valid query with packageName that returns multiple blobs in json-lines format", - queryParams: "?package=webhook_operator_test", - expectedStatusCode: http.StatusOK, - expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"} -{"entries":[{"name":"bundle.v0.0.1"}],"name":"preview_test","package":"webhook_operator_test","schema":"olm.channel"}`, - }, - { - name: "cached response with If-Modified-Since", - queryParams: "?schema=olm.package", - initRequest: func(req *http.Request) error { - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - resp.Body.Close() - req.Header.Set("If-Modified-Since", resp.Header.Get("Last-Modified")) - return nil - }, - expectedStatusCode: http.StatusNotModified, - expectedContent: "", - }, - { - name: "request with unknown parameters", - queryParams: "?non-existent=foo", - expectedStatusCode: http.StatusBadRequest, - expectedContent: "400 Bad Request", - }, - { - name: "request with duplicate parameters", - queryParams: "?schema=olm.bundle&&schema=olm.bundle", - expectedStatusCode: http.StatusOK, - expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - reqGet, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/catalogs/test-catalog/api/v1/metas%s", testServer.URL, tc.queryParams), nil) - require.NoError(t, err) - - if tc.initRequest != nil { - require.NoError(t, tc.initRequest(reqGet)) - } - resp, err := http.DefaultClient.Do(reqGet) - require.NoError(t, err) - defer resp.Body.Close() - - require.Equal(t, tc.expectedStatusCode, resp.StatusCode) - if resp.StatusCode == http.StatusOK { - assert.Equal(t, "application/jsonl", resp.Header.Get("Content-Type")) - } - - actualContent, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.Equal(t, tc.expectedContent, strings.TrimSpace(string(actualContent))) - - // Also do a HEAD request - reqHead, err := http.NewRequest(http.MethodHead, fmt.Sprintf("%s/catalogs/test-catalog/api/v1/metas%s", testServer.URL, tc.queryParams), nil) - require.NoError(t, err) - if tc.initRequest != nil { - require.NoError(t, tc.initRequest(reqHead)) - } - resp, err = http.DefaultClient.Do(reqHead) - require.NoError(t, err) - require.Equal(t, tc.expectedStatusCode, resp.StatusCode) - actualContent, err = io.ReadAll(resp.Body) - require.NoError(t, err) - require.Empty(t, string(actualContent)) // HEAD should not return a body - resp.Body.Close() - - // And make sure any other method is not allowed - for _, method := range []string{http.MethodPost, http.MethodPut, http.MethodDelete} { - reqPost, err := http.NewRequest(method, fmt.Sprintf("%s/catalogs/test-catalog/api/v1/metas%s", testServer.URL, tc.queryParams), nil) - require.NoError(t, err) - resp, err = http.DefaultClient.Do(reqPost) - require.NoError(t, err) - require.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode) - resp.Body.Close() - } - }) - } -} - -func TestServerLoadHandling(t *testing.T) { - store := &LocalDirV1{ - RootDir: t.TempDir(), - RootURL: &url.URL{Path: urlPrefix}, - EnableMetasHandler: true, - } - - // Create large test data - largeFS := fstest.MapFS{} - for i := 0; i < 1000; i++ { - largeFS[fmt.Sprintf("meta_%d.json", i)] = &fstest.MapFile{ - Data: []byte(fmt.Sprintf(`{"schema":"olm.bundle","package":"test-op-%d","name":"test-op.v%d.0"}`, i, i)), - } - } - - if err := store.Store(context.Background(), "test-catalog", largeFS); err != nil { - t.Fatal("failed to store test catalog") - } - - testServer := httptest.NewServer(store.StorageServerHandler()) - defer testServer.Close() - - tests := []struct { - name string - concurrent int - requests func(baseURL string) []*http.Request - validateFunc func(t *testing.T, responses []*http.Response, errs []error) - }{ - { - name: "concurrent identical queries", - concurrent: 100, - requests: func(baseURL string) []*http.Request { - var reqs []*http.Request - for i := 0; i < 100; i++ { - req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/catalogs/test-catalog/api/v1/metas?schema=olm.bundle", baseURL), - nil) - req.Header.Set("Accept", "application/jsonl") - reqs = append(reqs, req) - } - return reqs - }, - validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { - for _, err := range errs { - require.NoError(t, err) - } - for _, resp := range responses { - require.Equal(t, http.StatusOK, resp.StatusCode) - require.Equal(t, "application/jsonl", resp.Header.Get("Content-Type")) - resp.Body.Close() - } - }, - }, - { - name: "concurrent different queries", - concurrent: 50, - requests: func(baseURL string) []*http.Request { - var reqs []*http.Request - for i := 0; i < 50; i++ { - req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/catalogs/test-catalog/api/v1/metas?package=test-op-%d", baseURL, i), - nil) - req.Header.Set("Accept", "application/jsonl") - reqs = append(reqs, req) - } - return reqs - }, - validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { - for _, err := range errs { - require.NoError(t, err) - } - for _, resp := range responses { - require.Equal(t, http.StatusOK, resp.StatusCode) - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.Contains(t, string(body), "test-op-") - resp.Body.Close() - } - }, - }, - { - name: "mixed all and metas endpoints", - concurrent: 40, - requests: func(baseURL string) []*http.Request { - var reqs []*http.Request - for i := 0; i < 20; i++ { - allReq, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/catalogs/test-catalog/api/v1/all", baseURL), - nil) - metasReq, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/catalogs/test-catalog/api/v1/metas?schema=olm.bundle", baseURL), - nil) - allReq.Header.Set("Accept", "application/jsonl") - metasReq.Header.Set("Accept", "application/jsonl") - reqs = append(reqs, allReq, metasReq) - } - return reqs - }, - validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { - for _, err := range errs { - require.NoError(t, err) - } - for _, resp := range responses { - require.Equal(t, http.StatusOK, resp.StatusCode) - resp.Body.Close() - } - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var ( - wg sync.WaitGroup - responses = make([]*http.Response, tt.concurrent) - errs = make([]error, tt.concurrent) - ) - - requests := tt.requests(testServer.URL) - for i := 0; i < tt.concurrent; i++ { - wg.Add(1) - go func(idx int) { - defer wg.Done() - // nolint:bodyclose - // the response body is closed in the validateFunc - resp, err := http.DefaultClient.Do(requests[idx]) - responses[idx] = resp - errs[idx] = err - }(i) - } - - wg.Wait() - tt.validateFunc(t, responses, errs) - }) - } -} - -func createTestFS(t *testing.T) fs.FS { - t.Helper() - testBundleTemplate := `--- -image: %s -name: %s -schema: olm.bundle -package: %s -relatedImages: - - name: %s - image: %s -properties: - - type: olm.bundle.object - value: - data: %s - - type: some.other - value: - data: arbitrary-info -` - - testPackageTemplate := `--- -defaultChannel: %s -name: %s -schema: olm.package -` - - testChannelTemplate := `--- -schema: olm.channel -package: %s -name: %s -entries: - - name: %s -` - testBundleName := "bundle.v0.0.1" - testBundleImage := "quaydock.io/namespace/bundle:0.0.3" - testBundleRelatedImageName := "test" - testBundleRelatedImageImage := "testimage:latest" - testBundleObjectData := "dW5pbXBvcnRhbnQK" - testPackageDefaultChannel := "preview_test" - testPackageName := "webhook_operator_test" - testChannelName := "preview_test" - - testPackage := fmt.Sprintf(testPackageTemplate, testPackageDefaultChannel, testPackageName) - testBundle := fmt.Sprintf(testBundleTemplate, testBundleImage, testBundleName, testPackageName, testBundleRelatedImageName, testBundleRelatedImageImage, testBundleObjectData) - testChannel := fmt.Sprintf(testChannelTemplate, testPackageName, testChannelName, testBundleName) - return &fstest.MapFS{ - "test-catalog.yaml": {Data: []byte( - generateJSONLinesOrFail(t, []byte(testBundle)) + - generateJSONLinesOrFail(t, []byte(testPackage)) + - generateJSONLinesOrFail(t, []byte(testChannel))), - Mode: os.ModePerm}, - } -} - -// generateJSONLinesOrFail takes a byte slice of concatenated JSON objects and returns a JSONlines-formatted string -// or raises a test failure in case of encountering any internal errors -func generateJSONLinesOrFail(t *testing.T, in []byte) string { - var out strings.Builder - reader := bytes.NewReader(in) - - err := declcfg.WalkMetasReader(reader, func(meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - - if meta != nil && meta.Blob != nil { - if meta.Blob[len(meta.Blob)-1] != '\n' { - return fmt.Errorf("blob does not end with newline") - } - } - - _, err = out.Write(meta.Blob) - if err != nil { - return err - } - return nil - }) - require.NoError(t, err) - - return out.String() -} diff --git a/internal/catalogd/storage/storage.go b/internal/catalogd/storage/storage.go index af78a669f..0c2b0c412 100644 --- a/internal/catalogd/storage/storage.go +++ b/internal/catalogd/storage/storage.go @@ -2,19 +2,237 @@ package storage import ( "context" - "io/fs" - "net/http" + "errors" + "iter" + "maps" + "slices" + "sync" + + "github.com/operator-framework/operator-registry/alpha/declcfg" ) // Instance is a storage instance that stores FBC content of catalogs // added to a cluster. It can be used to Store or Delete FBC in the -// host's filesystem. It also a manager runnable object, that starts -// a server to serve the content stored. +// host's filesystem, and check if data exists for a catalog. type Instance interface { - Store(ctx context.Context, catalog string, fsys fs.FS) error - Delete(catalog string) error - ContentExists(catalog string) bool + Store(ctx context.Context, catalog string, seq iter.Seq2[*declcfg.Meta, error]) error + Delete(ctx context.Context, catalog string) error + Exists(catalog string) bool +} + +var _ Instance = (*Instances)(nil) + +type Instances struct { + files *files + indices *indices + graphQLSchemas *graphQLSchemas +} + +type InstancesOption func(i *Instances) + +func WithFiles(enabled bool, rootDir string) InstancesOption { + return func(i *Instances) { + if enabled { + i.files = newFiles(rootDir) + } + } +} + +func WithIndices(enabled bool, rootDir string) InstancesOption { + return func(i *Instances) { + if enabled { + i.indices = newIndices(rootDir) + } + } +} + +func WithGraphQLSchemas(enabled bool) InstancesOption { + return func(i *Instances) { + if enabled { + i.graphQLSchemas = newGraphQLSchemas() + } + } +} + +func NewInstances(opts ...InstancesOption) *Instances { + i := &Instances{} + for _, opt := range opts { + opt(i) + } + return i +} + +func (i *Instances) Files() Files { + if i.files == nil { + panic("files data was not initialized") + } + return i.files +} + +func (i *Instances) Indices() Indices { + if i.indices == nil { + panic("indices data was not initialized") + } + return i.indices +} + +func (i *Instances) GraphQLSchemas() GraphQLSchemas { + if i.graphQLSchemas == nil { + panic("graphQLSchemas data was not initialized") + } + return i.graphQLSchemas +} + +func (i *Instances) Store(ctx context.Context, catalog string, seq iter.Seq2[*declcfg.Meta, error]) error { + activeInstances := i.activeInstances() + numActiveInstances := len(activeInstances) + + // copy the sequence `len(i.instances)` times. + copiedSeqs, cancel := copySequence(seq, numActiveInstances) + defer cancel() + + var wg sync.WaitGroup + wg.Add(numActiveInstances) + + errs := make([]error, 0, numActiveInstances) + var errMu sync.Mutex + for idx := range numActiveInstances { + // We need to run the instance store functions concurrently because + // the iterators in copiedSeqs need to be consumed concurrently. + go func() { + defer wg.Done() + if err := activeInstances[idx].Store(ctx, catalog, copiedSeqs[idx]); err != nil { + errMu.Lock() + errs = append(errs, err) + errMu.Unlock() + } + }() + } + wg.Wait() + return errors.Join(errs...) +} + +func (i *Instances) Delete(ctx context.Context, catalog string) error { + activeInstances := i.activeInstances() + errs := make([]error, 0, len(activeInstances)) + for _, instance := range activeInstances { + errs = append(errs, instance.Delete(ctx, catalog)) + } + return errors.Join(errs...) +} + +func (i *Instances) Exists(catalog string) bool { + activeInstances := i.activeInstances() + for _, instance := range activeInstances { + if !instance.Exists(catalog) { + return false + } + } + return true +} + +func (i *Instances) activeInstances() []Instance { + instances := []Instance{} + if i.files != nil { + instances = append(instances, i.files) + } + if i.indices != nil { + instances = append(instances, i.indices) + } + if i.graphQLSchemas != nil { + instances = append(instances, i.graphQLSchemas) + } + return instances +} + +// copySequence copies values from the input iterator to n output iterators. Note that this function +// consumes the input iterator, so callers should not use the input iterator after copying it. +// +// Note: Iterators produced by this function must be consumed concurrently. However, consumers can +// independently decide to stop iterating without affecting other consumers. +func copySequence[V any, E any](in iter.Seq2[V, E], n int) ([]iter.Seq2[V, E], context.CancelFunc) { + if n <= 0 { + return []iter.Seq2[V, E]{}, func() {} + } + if n == 1 { + return []iter.Seq2[V, E]{in}, func() {} + } + + type dataVal struct { + v V + e E + } + type outputPipe struct { + dataCh chan dataVal + doneCh chan struct{} + } + + activePipes := make(map[int]*outputPipe, n) + outSeqs := make([]iter.Seq2[V, E], n) + for i := range n { + pipe := &outputPipe{ + // Buffered data channel of size 1 lets us fan out each input value to all consumers concurrently. + dataCh: make(chan dataVal, 1), + doneCh: make(chan struct{}), + } + activePipes[i] = pipe + + outSeqs[i] = func(yield func(V, E) bool) { + // This is the only place that we close the done channel. + // The fan-out goroutine will be responsible for always closing dataCh, + // which will ensure that _this_ function returns and doneCh is closed. + defer func() { close(pipe.doneCh) }() + + // There are two ways for this loop to end: + // 1. The input iterator is exhausted, and the fan-out goroutine closes dataCh + // to signal that there is no more data on the iterator. + // 2. The consumer of the iterator decides to stop iterating early by returning false + // when we call `yield`, so we return. + for dv := range pipe.dataCh { + if !yield(dv.v, dv.e) { + return + } + } + } + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for v, e := range in { + // Iterate the pipes in index order for determinism. + for _, i := range slices.Sorted(maps.Keys(activePipes)) { + pipe := activePipes[i] + select { + case pipe.dataCh <- dataVal{v: v, e: e}: + + // If a pipe's doneCh is closed before we've exhausted the input, that means + // the consumer has stopped consuming from its iterator. + // + // If the context is cancelled, that means we need to clean up to avoid + // leaking channels and goroutines. + // + // In either case, we close the pipe's dataCh and remove it from our set of active pipes. + case <-pipe.doneCh: + close(pipe.dataCh) + delete(activePipes, i) + case <-ctx.Done(): + close(pipe.dataCh) + delete(activePipes, i) + } + } + + // If all pipes have stopped iterating early, we can also stop iterating through the input + // iterator's values. + if len(activePipes) == 0 { + break + } + } - BaseURL(catalog string) string - StorageServerHandler() http.Handler + // If there are still activePipes, that means we have exhausted the input iterator, so + // we close the active pipes' dataCh to signal that there are no more values to consume. + for _, pipe := range activePipes { + close(pipe.dataCh) + } + }() + return outSeqs, cancel } diff --git a/internal/shared/util/http/serverutil.go b/internal/shared/util/http/serverutil.go new file mode 100644 index 000000000..5749402ea --- /dev/null +++ b/internal/shared/util/http/serverutil.go @@ -0,0 +1,41 @@ +package http + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "time" + + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type ServerConfig struct { + Name string + OnlyServeWhenLeader bool + ListenAddr string + TLSConfig *tls.Config + Server *http.Server + ShutdownTimeout *time.Duration +} + +func NewManagerServer(cfg ServerConfig) (*manager.Server, error) { + listener, err := net.Listen("tcp", cfg.ListenAddr) + if err != nil { + return nil, fmt.Errorf("error creating catalog server listener: %w", err) + } + + if cfg.TLSConfig != nil { + listener = tls.NewListener(listener, cfg.TLSConfig) + } + + srv := manager.Server{ + Name: cfg.Name, + OnlyServeWhenLeader: cfg.OnlyServeWhenLeader, + Server: cfg.Server, + ShutdownTimeout: cfg.ShutdownTimeout, + Listener: listener, + } + + return &srv, nil +} diff --git a/manifests/experimental-e2e.yaml b/manifests/experimental-e2e.yaml index d3adf46e5..f3992599d 100644 --- a/manifests/experimental-e2e.yaml +++ b/manifests/experimental-e2e.yaml @@ -1648,6 +1648,7 @@ spec: - --metrics-bind-address=:7443 - --external-address=catalogd-service.$(POD_NAMESPACE).svc - --feature-gates=APIV1MetasHandler=true + - --feature-gates=APIV1GraphQLHandler=true - --tls-cert=/var/certs/tls.crt - --tls-key=/var/certs/tls.key - --pull-cas-dir=/var/ca-certs diff --git a/manifests/experimental.yaml b/manifests/experimental.yaml index 7b0d2b9a3..2b27328b9 100644 --- a/manifests/experimental.yaml +++ b/manifests/experimental.yaml @@ -1621,6 +1621,7 @@ spec: - --metrics-bind-address=:7443 - --external-address=catalogd-service.$(POD_NAMESPACE).svc - --feature-gates=APIV1MetasHandler=true + - --feature-gates=APIV1GraphQLHandler=true - --tls-cert=/var/certs/tls.crt - --tls-key=/var/certs/tls.key - --pull-cas-dir=/var/ca-certs