feat(backend): postgres integration#12379
Conversation
|
Hi @kaikaila. Thanks for your PR. I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
|
🚫 This command cannot be processed. Only organization members or owners can use the commands. |
cd1d08b to
85498ed
Compare
|
Currently, both MySQL and PGX setups use the DB superuser for all KFP operations, which is why client_manager.go contains a “create database if not exist” step here. From a security standpoint, would it be preferable to:
If the team agrees, I can propose a follow-up PR to refactor accordingly. |
|
I'm fine with this, I don't think it's great that KFP tries to create a database (or a bucket frankly) fyi @mprahl / @droctothorpe |
|
Thanks, @HumairAK — totally agree on the security point. |
09fd370 to
1e0caa8
Compare
|
yes that is fine |
4d33821 to
e6c943c
Compare
Question about the PostgreSQL test workflow organizationCurrent situationThe V2 integration tests for PostgreSQL logically belong in a "PostgreSQL counterpart" to legacy-v2-api-integration-tests.yml Question: What's the recommended workflow organization for PostgreSQL tests?Should I:
Would love guidance on the long-term vision for test workflow organization, especially from @nsingla |
| if err != nil { | ||
| return nil, fmt.Errorf("Failed to get execution cache: %q, err: %v", executionCacheKey, err) | ||
| var executionCaches []model.ExecutionCache | ||
| result := s.db.Where("ExecutionCacheKey = ?", executionCacheKey).Find(&executionCaches) |
There was a problem hiding this comment.
** 🤖 AI Review **
These raw predicates still use the mixed-case column name without dialect quoting. On PostgreSQL, the model tags create quoted columns like "ExecutionCacheKey", but Where("ExecutionCacheKey = ?", ...) is sent verbatim and gets folded to executioncachekey. That breaks both cache lookup and duplicate-check paths on the new pgx backend while the sqlite tests still pass. Could we switch these lookups to a dialect-quoted predicate (or another GORM form that preserves the tagged column name) and add a PostgreSQL-backed store test for this path?
| return err | ||
| } | ||
| if err := validateIdentifierName(t.SortBySQLColumn, "sort SQL column"); err != nil { | ||
| if err := validateMetricName(t.SortByMetricName); err != nil { |
There was a problem hiding this comment.
** 🤖 AI Review **
This now rejects metric names in page tokens unless they match ^[A-Za-z][A-Za-z0-9_-]{0,127}$, but model.Run.GetField() still accepts any metric:<name> suffix on the initial request. That means a sort like metric:val/loss can succeed on page 1 and then fail on page 2 when the token is decoded with Invalid metric name, even though the metric name only flows as a bind parameter. Could we either enforce the same restriction up front in NewOptions or stop identifier-validating SortByMetricName, and add a round-trip token test for a non-identifier-safe metric name?
| @@ -170,16 +186,19 @@ jobs: | |||
| matrix: | |||
| k8s_version: ["v1.34.0"] | |||
| cache_enabled: ["true", "false"] | |||
There was a problem hiding this comment.
** 🤖 AI Review **
The standalone matrix excludes db_type=pgx with cache_enabled=false, but the new multi-user matrix still schedules that combination. In deploy-kfp.sh, the multi-user DB_TYPE=pgx branch ignores CACHE_DISABLED because there is no corresponding PostgreSQL cache-disabled overlay, so these jobs can go green without ever exercising a cache-disabled PostgreSQL deployment. Could we exclude db_type=pgx && cache_enabled=false here as well, or make the deploy script fail that combination instead of warning and continuing?
| ) | ||
| return qb. | ||
| Select(q("jobs")+`.*`, refsExpr+" AS "+q("refs")). | ||
| FromSelect(filteredSelectBuilder, q("jobs")) |
There was a problem hiding this comment.
** 🤖 AI Review **
RunStore.addMetricsResourceReferencesAndTasks() had to normalize nested builders back to ? placeholders before the outer ToSql() call so pgx only renumbers once. JobStore.addResourceReferences() now wraps a filtered inner select with the pgx builder too, but it does not apply the same protection. On filtered page-2 ListJobs queries, that leaves us open to colliding $1/$2 placeholders or misbound args once both the inner select and the outer pagination predicates carry parameters. Could we use the same nested-builder pattern here and add a pgx pagination test for ListJobs?
| if s, ok := v.(string); ok { | ||
| col := QualifyIdentifier(quote, k) | ||
| andExprs = append(andExprs, squirrel.Expr( | ||
| fmt.Sprintf("LOWER(%s) = LOWER(?)", col), s, |
There was a problem hiding this comment.
** 🤖 AI Review **
This changes every string EQUALS / NOT_EQUALS / IN comparison to a case-insensitive match, and matchesFilter() now mirrors that on the Kubernetes pipeline-store path. That is a behavior change for exact-match filters on identifier-like fields as well as display names, so callers that previously relied on exact casing will now get broader matches. If that contract change is intentional, could we add API/integration coverage that proves it? Otherwise I think the LOWER(...) / EqualFold path should be limited to fields where case-insensitive matching is explicitly intended.
| - from: | ||
| - ipBlock: | ||
| cidr: 0.0.0.0/0 | ||
| ports: | ||
| - port: 3000 | ||
| protocol: TCP | ||
| - port: 3306 | ||
| protocol: TCP | ||
| - port: 5432 | ||
| protocol: TCP | ||
| - port: 8080 | ||
| protocol: TCP | ||
| - port: 9000 | ||
| protocol: TCP | ||
| - port: 8888 |
There was a problem hiding this comment.
The PostgreSQL dev-kind NetworkPolicy allows ingress from 0.0.0.0/0 to several service ports (including DB port 5432). If this overlay is applied to a non-local cluster, it can unintentionally open database access more broadly than intended. Consider restricting the ipBlock to the expected host/bridge CIDR(s) for Kind, or clearly documenting that this policy is strictly for local dev-kind usage.
| func NewDefaultExperimentStore(db *sql.DB, d dialect.DBDialect) *DefaultExperimentStore { | ||
| s := &DefaultExperimentStore{db: db, dbDialect: d} | ||
| s.initializeDefaultExperimentTable() | ||
| return s |
There was a problem hiding this comment.
NewDefaultExperimentStore() calls initializeDefaultExperimentTable() but ignores the returned error. If initialization fails (e.g., missing table/permissions), the API server can continue running with a partially initialized default_experiments table, leading to harder-to-debug failures later. Consider returning an error from the constructor (or logging/failing fast) instead of discarding it.
| func NewDBStatusStore(db *sql.DB, d dialect.DBDialect) *DBStatusStore { | ||
| s := &DBStatusStore{db: db, dbDialect: d} | ||
| // Initialize database status table | ||
| s.InitializeDBStatusTable() | ||
| return s |
There was a problem hiding this comment.
NewDBStatusStore() calls InitializeDBStatusTable() but ignores the returned error. If the initialization transaction fails, callers still get a store instance that will later error on reads/writes in less obvious places. Consider returning an error from NewDBStatusStore (or failing fast) so initialization problems surface immediately.
| - from: | ||
| - ipBlock: | ||
| cidr: 0.0.0.0/0 | ||
| ports: | ||
| - port: 3000 | ||
| protocol: TCP | ||
| - port: 3306 | ||
| protocol: TCP | ||
| - port: 5432 | ||
| protocol: TCP | ||
| - port: 8080 | ||
| protocol: TCP | ||
| - port: 9000 | ||
| protocol: TCP |
There was a problem hiding this comment.
The dev-kind NetworkPolicy allows ingress from 0.0.0.0/0 to several service ports (including DB ports 3306/5432). Even if intended for local Kind usage, this makes it easy to apply an overly permissive policy to a real cluster by accident. Consider scoping the ipBlock to the Kind/Docker bridge CIDR(s) you need for host access, or adding a prominent comment/name indicating it's for local Kind only and should not be used outside that context.
| mysqlDBGroupConcatMaxLenDefault = "4194304" | ||
|
|
||
| pgxDBDriverDefault = "pgx" | ||
| pgxDBHostDefault = "postgres" |
There was a problem hiding this comment.
** 🤖 AI Review **
The new pgx defaults here still do not line up with the repo's bundled PostgreSQL install: the manifests expose postgres-service and credentials user / password, while the cache server defaults remain host postgres, user root, and empty password. Running the cache server with --db_driver=pgx and defaults will fail unless every connection flag is overridden. Could we either align the pgx defaults with the shipped manifests or require explicit PG connection flags when db_driver=pgx?
| } | ||
| quotedCols := make([]string, len(columns)) | ||
| for i, c := range columns { | ||
| quotedCols[i] = q(c) |
There was a problem hiding this comment.
** 🤖 AI Review **
This helper quotes every column verbatim, so callers that pass []string{"*"} end up with SELECT "*" FROM ... / SELECT `*` FROM ... instead of SELECT * FROM .... The new tests currently assert that behavior, but it will fail for any row-query caller using wildcard columns. Could we special-case * (and ideally table.*) before quoting?
| args[i] = s | ||
| } | ||
| andExprs = append(andExprs, squirrel.Expr( | ||
| fmt.Sprintf("LOWER(%s) IN (%s)", col, strings.Join(placeholders, ", ")), |
There was a problem hiding this comment.
** 🤖 AI Review **
This string-specific IN path can emit invalid SQL for an empty string_values filter. When ss is empty, placeholders is empty and the generated fragment becomes LOWER(col) IN (), which is a syntax error on MySQL, PostgreSQL, and SQLite. The old generic squirrel Eq path handled empty slices safely. Could we reject empty IN lists during validation or special-case len(ss) == 0 here?
| assert.Contains(t, err.Error(), "not found") | ||
| } | ||
| } | ||
| // Wait for runs to be deleted. |
There was a problem hiding this comment.
** 🤖 AI Review **
This wait loop assumes no new runs are being created while cleanup is draining. In the v2 integration suites, though, recurring runs can still exist when callers invoke DeleteAllRuns(), so a leftover schedule can keep recreating runs and make this loop time out intermittently. Could the cleanup contract delete recurring runs first (or have callers do so consistently) before waiting for the run list to reach zero?
| // MySQL: _ sorts before - (ascending), so zip comes first at index 0 | ||
| // PostgreSQL: - sorts before _ (en_US.utf8), so yaml comes first at index 0 | ||
| if *runPostgreSQLTests { | ||
| assert.Equal(t, "arguments-parameters.yaml", listFirstPagePipelines[0].Name) | ||
| assert.Equal(t, "arguments_parameters.zip", listFirstPagePipelines[1].Name) |
| // MySQL: descending order puts yaml at index 0, zip at index 1 | ||
| // PostgreSQL: descending order puts zip at index 0, yaml at index 1 | ||
| if *runPostgreSQLTests { | ||
| assert.Equal(t, "arguments_parameters.zip", listSecondPagePipelines[0].Name) | ||
| assert.Equal(t, "arguments-parameters.yaml", listSecondPagePipelines[1].Name) | ||
| } else { |
| sqlBuilder = opts.AddPaginationToSelect(sqlBuilder, q, s.dbDialect.StringCollation()) | ||
| } | ||
| if s.dbDialect.Name() == "pgx" { | ||
| sqlBuilder = sqlBuilder.PlaceholderFormat(sq.Dollar) |
There was a problem hiding this comment.
** 🤖 AI Review **
This applies sq.Dollar after composing nested builders that were already created from the dialect-aware query builder, so the inner FromSelect(...) path can emit $1/$2/... and then the outer pagination/filter layer can start numbering at $1 again. I think that still leaves pgx open to placeholder collisions on filtered page-2 ListJobs queries. Could this follow the same pattern as run_store and keep every sub-builder in sq.Question format until the final outermost ToSql() call?
| "Enabled = ?, UpdatedAtInSec = ?", | ||
| "jobs.UUID = resource_references.ResourceUUID", | ||
| "resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?") | ||
| updateJobsSQL, updateJobsArgs, err := qb. |
There was a problem hiding this comment.
** 🤖 AI Review **
This archive path still disables jobs only through the deprecated resource_references lookup, but CreateJob() now persists jobs.ExperimentUUID directly as well. A v2-style job row that has ExperimentUUID set but no resource-reference rows will stay enabled after its experiment is archived and can keep scheduling runs. Could we also disable jobs by direct ExperimentUUID = expId here and add a regression test for that shape?
| // the user-supplied metric name (e.g. "log-loss") was stored directly in | ||
| // SortByFieldName. Such values fail SQL identifier validation, so detect | ||
| // and migrate them to the new layout before validating. | ||
| if t.SortByMetricName == "" && t.SortByFieldName != "" && !identifierPattern.MatchString(t.SortByFieldName) { |
There was a problem hiding this comment.
** 🤖 AI Review **
This legacy-token migration only fires when the old SortByFieldName fails the identifier regex, so tokens for metric sorts like metric:accuracy or metric:f1_score still deserialize in the old shape after upgrade. On page 2 those tokens will treat the raw metric name as the SQL sort column instead of migrating to sort_metric_value, which breaks rolling-upgrade pagination. Could we also migrate identifier-safe legacy metric names here (for example by recognizing known model columns vs. legacy metric names) and add a regression test for metric:accuracy?
| // frameworks commonly use names like "log-loss" or "val-accuracy". | ||
| // Metric names are never used as SQL identifiers — they are passed as bind | ||
| // parameters — so allowing "-" here is safe. | ||
| var metricNamePattern = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9_\-]{0,127}$`) |
There was a problem hiding this comment.
** 🤖 AI Review **
This new metric-name regex is stricter than the historical API behavior: names like val/loss or foo.bar used to work, and in the new query they still only flow as bind values rather than SQL identifiers. Tightening the regex here therefore creates a compatibility break without buying much safety. Could we relax this validation to match previously accepted metric names and add a round-trip page-token test for one with / or . in it?
| if s, ok := v.(string); ok { | ||
| col := QualifyIdentifier(quote, k) | ||
| andExprs = append(andExprs, squirrel.Expr( | ||
| fmt.Sprintf("LOWER(%s) = LOWER(?)", col), s, |
There was a problem hiding this comment.
** 🤖 AI Review **
This changes SQL-backed string EQUALS / NOT_EQUALS / IN / IS_SUBSTRING filtering to be case-insensitive, but the Kubernetes-backed path still goes through matchesFilter() below, which compares strings case-sensitively. The same API filter can now return different results depending on whether the data comes from SQL or the Kubernetes store, and it also broadens exact-match semantics for all SQL-backed resources. Is that scope change intentional? If not, I think this wants either backend parity or a narrower opt-in on the specific fields that need case-insensitive matching.
| // TODO(kaikaila):1.Move DB creation out of the client manager and into the deployment/init phase (i.e. add a manifests/kustomize/third-party/postgresql/base/pg-init-configmap.yaml) | ||
| // 2.Introduce a dedicated restricted user for KFP components, limited to the mlpipeline database | ||
| // Refer to manifests/kustomize/third-party/postgresql/base/pg-secret.yaml | ||
| drvDialect := sqldrv.NewDBDialect(driverName) |
There was a problem hiding this comment.
** 🤖 AI Review **
The new pgx path still creates the database with CREATE DATABASE %s using the raw configured name below. PostgreSQL requires identifier quoting for valid names that contain uppercase characters, hyphens, or reserved words, so non-simple DBName values will still fail at startup. Could we validate or quote the database name with the dialect helper before executing that statement?
| branches: [master] | ||
| pull_request: | ||
| paths: | ||
| - ".github/workflows/legacy-v2-api-integration-tests-postgres.yml" |
There was a problem hiding this comment.
** 🤖 AI Review **
This workflow actually depends on more than the paths listed here: it consumes image-builds.yml, ./.github/actions/create-cluster, ./.github/actions/deploy, and the Postgres env overlays under manifests/kustomize/env/platform-agnostic-postgresql/**. As written, a PR can break the real PostgreSQL deployment path without ever triggering this dedicated Postgres lane. Could we add the action/workflow/env-overlay paths it actually relies on?
| matrix: | ||
| pipeline_store: ["database"] | ||
| k8s_version: ["v1.35.0"] | ||
| pod_to_pod_tls_enabled: [true, false] |
There was a problem hiding this comment.
** 🤖 AI Review **
This matrix looks like a 2x2 coverage expansion, but the exclusions below remove every pod_to_pod_tls_enabled: true row and every cache_enabled: false row, so it collapses to a single happy-path combination. That makes the workflow name/reporting overstate the actual PostgreSQL coverage. Could we express this as the single supported combination for now (with a TODO for the missing overlays) instead of generating and excluding the full cross-product?
| - 'test/**' | ||
| - '!**/*.md' | ||
| - '!**/OWNERS' | ||
| - ".github/workflows/api-integration-tests-v1.yml" |
There was a problem hiding this comment.
** 🤖 AI Review **
This path filter points at .github/workflows/api-integration-tests-v1.yml, but the actual file name is integration-tests-v1.yml. Changes to this workflow file itself therefore will not trigger the job, which makes the new MySQL/pgx matrix easy to break without CI signal. Could we point this at the real file name?
| assert.Equal(t, 5, totalSize) | ||
| assert.Equal(t, "arguments-parameters.yaml", listFirstPagePipelines[1].Name) | ||
| assert.Equal(t, "arguments_parameters.zip", listFirstPagePipelines[0].Name) | ||
| // MySQL: _ sorts before - (ascending), so zip comes first at index 0 |
There was a problem hiding this comment.
** 🤖 AI Review **
This now encodes a specific punctuation-order assumption (- vs _) into the sort assertions. That is fragile across database collations: it may pass or fail because of the cluster locale rather than because the query logic is correct, and the v2 integration suite still hardcodes one order without a PostgreSQL branch at all. Could we switch these sort sentinels to alphanumeric-only fixture names (or explicitly assert/pin the expected collation) so the DB-specific lanes validate query behavior rather than punctuation ordering?
| To use PostgreSQL instead of MySQL: | ||
|
|
||
| ```bash | ||
| make DATABASE=postgres -C backend dev-kind-cluster |
There was a problem hiding this comment.
** 🤖 AI Review **
The new Postgres local-dev path points people at make DATABASE=postgres -C backend dev-kind-cluster, but backend/Makefile still disables WEBHOOK_PROXY_ENABLED for Postgres. The later README text says the webhook proxy is used by default with dev-kind-cluster, which is true for MySQL but not for this new Postgres variant. Could we call out that limitation explicitly here (or add webhook-proxy support for the Postgres dev-kind manifests)?
| fi | ||
|
|
||
| # Manifests will be deployed according to the flag provided | ||
| # Manifest selection: each branch picks ONE pre-built kustomize overlay directory. |
There was a problem hiding this comment.
** 🤖 AI Review **
Separate from the overlay-selection changes below, this script's multi-user prerequisite installs above still pull Istio/Profile Controller from upstream kubeflow/manifests at ref=master. That makes CI and local reproduction non-deterministic, because the environment under test can drift independently of this repo and of the branch being validated. Could we pin those remote refs to a known commit/branch (or vendor them locally) so the PostgreSQL lanes are reproducible?
| @@ -220,15 +231,14 @@ jobs: | |||
| python_version: ${{ env.PYTHON_VERSION }} | |||
| report_name: "K8Native_k8sVersion=${{ matrix.k8s_version }}_cacheEnabled=${{ matrix.cache_enabled }}_argoVersion=${{ matrix.argo_version }}_uploadPipelinesWithKubernetesClient=${{ matrix.uploadPipelinesWithKubernetesClient }}" | |||
There was a problem hiding this comment.
** 🤖 AI Review **
This report name now advertises uploadPipelinesWithKubernetesClient coverage, but the k8s-native job still never passes that axis (or pod_to_pod_tls_enabled) into the deploy/test actions, and the included TLS row above even flips pipeline_store back to database. So the workflow can report Kubernetes-native / TLS / upload-client combinations that it never actually exercises. Could we either wire those matrix axes through end-to-end or trim the matrix/reporting to only the combinations this job really runs?
Signed-off-by: kaikaila <lyk2772@126.com>
Signed-off-by: kaikaila <lyk2772@126.com> fix(backend): adjust SQL placeholder handling for PostgreSQL in GetJob query Signed-off-by: kaikaila <lyk2772@126.com> fix(workflow): correct path for integration tests in pull request triggers Signed-off-by: kaikaila <lyk2772@126.com> feat(backend): disable jobs with ExperimentUUID when archiving experiments Signed-off-by: kaikaila <lyk2772@126.com> docs: update README to note webhook proxy status for PostgreSQL integration Signed-off-by: kaikaila <lyk2772@126.com> fix(deploy-kfp): update kubeflow manifests references to specific commit Signed-off-by: kaikaila <lyk2772@126.com>
… usage queries Signed-off-by: kaikaila <lyk2772@126.com>
…ests Signed-off-by: kaikaila <lyk2772@126.com>
Summary
This PR adds full PostgreSQL (pgx driver) support to Kubeflow Pipelines backend, enabling users to choose between MySQL and PostgreSQL as the metadata database. The implementation introduces a clean dialect abstraction layer and includes a major query optimization that benefits both database backends.
Fixes #7512
Fixes #9813
Key achievements:
the root causes behind [frontend] Latency when listing runs #10778, [frontend] Cannot list runs and/or artifacts (upstream request timeout) #10230, [backend] performance issues with list runs API #9780, [frontend] Listing all runs take a lot of time #9701
What Changed
1. Storage Layer Refactoring - Dialect Abstraction
DBDialectinterface encapsulating database-specific identifier quoting, placeholders, and aggregation.backend/src/apiserver/storage/list_filters.go).2. ListRuns Query Performance Optimization
3. Deployment & CI Configurations
platform-agnostic-postgresql.make DATABASE=postgres dev-kind-cluster).db_type: ["mysql", "pgx"].Testing
integration-tests-v1and V2 api tests to utilize both databases (with cache matrices). Unit coverage expanded.Migration Guide
-k manifests/kustomize/env/platform-agnostic-postgresql.Follow-up Issues, PRs and pending discussions