feat(backend): Replace MLMD with KFP Server APIs#12430
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
48b441e to
bac691f
Compare
|
Upgrade Test failures are expected until we add migration logic (to follow this PR). Note also UI changes are not included in this, those too - will follow this pr. |
|
First off, this is amazing! Not sure where you find the time 😂 A couple questions because this overlaps with an area of interest. My understanding is that this PR is reporting / updating the status of tasks (components) directly from the launcher such as here. So to check my understanding, this means that we are moving completely away from the persistence agent, correct? I have been running into issues with the persistence agent at scale & with short lived workflows so I am excited about new approaches. Secondly, I see the added RPCs to update task state. Are these the counter part to the ones used by the V1 persistence agent to populate |
|
Insanely impressive, @HumairAK! I look forward to going through it in-depth. Please let us know if there any specific areas you want us to sequence first / prioritize with our reviews.
^ This will be critical for existing workloads. |
|
For your first point, PA is still required to report the overall status of the Run. It monitors the Argo WF resource and we still require this to report on failures not encountered during driver/launcher runs (e.g. pod schedule failures, etc.). So we still require an external monitoring of a run. I will also be moving the update status propagation logic to the api server in this PR after some offline discussions with Matt/Nelesh. For your second point, the tasks table in v1 is being removed it is only used for caching today and it is not utilized by any other APIs. It is a bit abused and part of an incomplete implementation of a different approach that was intended by previous maintainers. As such this change will be part of the next KFP major version bump (3.0). All the data required for KFP runs in tasks table is persisted in mlmd, and we can use this for migration (namely just cache fingerprints).
@droctothorpe as per our discussion today, I would suggest you review the higher level changes first, e.g. Proto files, Gorm Models, Authorization and related changes - consideration for things like migration etc. |
| rpc UpdateTasksBulk(UpdateTasksBulkRequest) returns (UpdateTasksBulkResponse) { | ||
| option (google.api.http) = { | ||
| post: "/apis/v2beta1/tasks:batchUpdate" | ||
| body: "*" | ||
| }; | ||
| option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { | ||
| operation_id: "batch_update_tasks" | ||
| summary: "Updates multiple tasks in bulk." | ||
| tags: "RunService" | ||
| }; | ||
| } |
There was a problem hiding this comment.
Get rid of bulk operations, make individual calls to update status from launcher/driver, and move status/artifact propagations within api server. There is concern around race conditions, we will need to update tasks in this order:
For an update task request:
- Update Task
- Fetch run.
- Propagate statuses up the dag
There was a problem hiding this comment.
So is UpdateTasksBulk to be removed?
There was a problem hiding this comment.
We discussed this offline and this can be done in a follow up PR.
There was a problem hiding this comment.
Instead of aggregating to default roles, create a new SA for driver/launcher to utilize for making calls to API Server. Have sync.py ensure this SA, and the required rbac is created in kubeflow profiles.
|
Thanks for the response @HumairAK!
Interesting, good to know!
The other place I have seen it seen it used previously was in the
That part went over my head lol. So I am mostly concerned with the ability to get run / component information (status / runtime) primarily through the SDK. At the moment this depends on the PA (only partially for V2) and is why I am asking about these components. As mentioned, I have noticed some instability when handling many workflows. Since you expect the PA to exist in V3 too, want to make sure we are able to scale that properly. Since I do not know the timeline for V3, maybe it is worthwhile implementing something in V2 to help us with this in the meantime. Potentially building in some metrics and suggested scaling behavior of the PA deployment or similar. Any suggestions where I should continue discussion on this? Any existing similar issues / threads you are familiar with? |
9763470 to
cb02722
Compare
It's used to populate the run details field for the runs object, but it's mostly just a copy of the run's associated Argo workflow status field (node statuses). We will likely drop this field next major version upgrade.
Our current intent is to get rid of PA as we see it as an unnecessary overhead for merely run status reporting, either we consolidate this logic into the KFP server or a separate dedicated controller that uses controller runtime, either way we'll certainly keep scalability in mind. |
|
GPT5.1 Codex review: ### Review Findings
1. **Artifact-task records always marked as plain outputs**
`CreateArtifact` and `CreateArtifactsBulk` ignore the `request.type` field and hardcode every `ArtifactTask` as `IOType_OUTPUT`, even when the caller explicitly sets `IOType_ITERATOR_OUTPUT` for loop iterations or other specialized output modes. This drops iterator semantics, so parent DAGs can no longer distinguish per-iteration outputs and downstream resolvers will treat every propagated artifact as a flat output.
```87:95:backend/src/apiserver/server/artifact_server.go
artifactTask := &apiv2beta1.ArtifactTask{
ArtifactId: artifact.UUID,
TaskId: task.UUID,
RunId: request.GetRunId(),
Type: apiv2beta1.IOType_OUTPUT,
Producer: producer,
Key: request.GetProducerKey(),
}The same hardcoding occurs in the bulk path ( resourceAttributes := &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: common.RbacResourceVerbGet,
Group: common.RbacPipelinesGroup,
Version: common.RbacPipelinesVersion,
Resource: common.RbacResourceTypeRuns,
}
err := s.resourceManager.IsAuthorized(ctx, resourceAttributes)Please change - apiGroups:
- [pipelines.kubeflow.org](http://pipelines.kubeflow.org/)
resources:
- runs
verbs:
- get
- list
- readArtifactPlease extend the aggregated roles (both “view” and “edit” flavors) with Open Questions / Follow-ups
Suggested Next Steps
|
|
Claude 4.5 review: PR #12430: MLMD Removal - Action ItemsPR: #12430 🚨 Critical Issues (Must Fix Before Merge)1. Terminal State Enforcement MissingPriority: 🔴 CRITICAL ProblemThe design requires preventing task updates when the parent run is in a terminal state (SUCCEEDED, FAILED, or CANCELED). This check is not implemented. ImpactLaunchers could update tasks after a run completes, leading to inconsistent state. Required FixFile: Location: Add this code before the authorization check: func (s *RunServer) UpdateTask(ctx context.Context, request *apiv2beta1.UpdateTaskRequest) (*apiv2beta1.PipelineTaskDetail, error) {
taskID := request.GetTaskId()
// Get existing task
existingTask, err := s.resourceManager.GetTask(taskID)
if err != nil {
return nil, util.Wrap(err, "Failed to get existing task for authorization")
}
// ✅ ADD THIS: Check if run is in terminal state
run, err := s.resourceManager.GetRun(existingTask.RunUUID)
if err != nil {
return nil, util.Wrap(err, "Failed to get run to check terminal state")
}
terminalStates := []model.RuntimeState{
model.RuntimeStateSucceeded,
model.RuntimeStateFailed,
model.RuntimeStateCanceled,
}
for _, terminalState := range terminalStates {
if run.State == terminalState {
return nil, util.NewInvalidInputError(
"Cannot update task %s: parent run %s is in terminal state %s",
taskID, existingTask.RunUUID, terminalState,
)
}
}
// Continue with existing authorization and update logic...
}Also apply to: Required TestFile: func TestUpdateTask_TerminalState_Rejected(t *testing.T) {
// Setup
clientManager, resourceManager := setupTestEnv()
runSrv := NewRunServer(resourceManager, nil)
// Create run and task
run := createTestRun(t, resourceManager, "test-run")
task := createTestTask(t, runSrv, run.UUID, "test-task")
// Mark run as SUCCEEDED (terminal state)
resourceManager.UpdateRun(run.UUID, &model.Run{State: model.RuntimeStateSucceeded})
// Attempt to update task - should fail
_, err := runSrv.UpdateTask(context.Background(), &apiv2beta1.UpdateTaskRequest{
TaskId: task.GetTaskId(),
Task: &apiv2beta1.PipelineTaskDetail{
TaskId: task.GetTaskId(),
State: apiv2beta1.PipelineTaskDetail_FAILED,
},
})
// Assert: Update should be rejected
assert.Error(t, err)
assert.Contains(t, err.Error(), "terminal state")
}
|
| Issue | Priority | Effort | Files to Modify | Tests Required |
|---|---|---|---|---|
| 1. Terminal State | 🔴 Critical | 4h | run_server.go |
run_server_tasks_test.go |
| 2. Cache Fingerprint | 🟡 Medium | 2h | launcher_v2.go |
launcher_v2_test.go |
| 3. Exit Handler | 🟡 Medium | 3h | dag.go |
dag_test.go |
| 4. Documentation | 🟢 Low | 1h | design-details.md |
N/A |
| Total | 10h | 4 files | 3 test files |
✅ Merge Recommendations
For mlmd-removal Branch
Status:
Requirements:
- ✅ Must fix: Issue 1 (Terminal State Enforcement)
Timeline: 1 day
For master Branch
Status: 🚫 Not Ready
Requirements:
- ✅ Must fix: Issue 1 (Terminal State Enforcement)
⚠️ Should fix: Issue 2 (Cache Fingerprint)⚠️ Should fix: Issue 3 (Exit Handler)- 📝 Should update: Issue 4 (Documentation)
Timeline: 2-3 days
🎯 Next Steps
-
Immediate (Before merging to
mlmd-removal):- Implement terminal state enforcement
- Add terminal state tests
- Test manually with concurrent runs
-
Before merging to
master:- Clear cache fingerprint on failure
- Add exit handler detection
- Update design documentation
- Run full integration test suite
- Verify all new tests pass
-
Post-merge (Follow-up PRs as planned):
- Migration implementation
- Frontend changes
📞 Contact
For questions or clarifications about these action items, refer to the detailed review in BACKEND_VERIFICATION_CHECKLIST.md.
Reviewer: AI Assistant
Date: 2025-11-20
| // Create an artifact entry in the database. | ||
| CreateArtifact(artifact *model.Artifact) (*model.Artifact, error) | ||
|
|
||
| // Fetches an artifact with a given id. | ||
| GetArtifact(id string) (*model.Artifact, error) | ||
|
|
||
| // Fetches artifacts for given filtering and listing options. | ||
| ListArtifacts(filterContext *model.FilterContext, opts *list.Options) ([]*model.Artifact, int, string, error) |
There was a problem hiding this comment.
nit: It is a convention to begin a comment with the name of the exported element
| // Create an artifact entry in the database. | |
| CreateArtifact(artifact *model.Artifact) (*model.Artifact, error) | |
| // Fetches an artifact with a given id. | |
| GetArtifact(id string) (*model.Artifact, error) | |
| // Fetches artifacts for given filtering and listing options. | |
| ListArtifacts(filterContext *model.FilterContext, opts *list.Options) ([]*model.Artifact, int, string, error) | |
| // CreateArtifact creates an artifact entry in the database. | |
| CreateArtifact(artifact *model.Artifact) (*model.Artifact, error) | |
| // GetArtifact fetches an artifact with a given id. | |
| GetArtifact(id string) (*model.Artifact, error) | |
| // ListArtifacts fetches artifacts for given filtering and listing options. | |
| ListArtifacts(filterContext *model.FilterContext, opts *list.Options) ([]*model.Artifact, int, string, error) |
| ListArtifacts(filterContext *model.FilterContext, opts *list.Options) ([]*model.Artifact, int, string, error) | ||
| } | ||
|
|
||
| type ArtifactStore struct { |
There was a problem hiding this comment.
ArtifactStore is exported but consumers only use ArtifactStoreInterface. Exporting both makes the interface (that is good for encapsulation) pointless. External code can bypass it and depend on the concrete type directly.
Since this is new code, I suggest making the struct unexported (artifactStore) and returning the interface from the constructor. Then you can also rename the interface from ArtifactStoreInterface to ArtifactStore.
Same applies to ArtifactTaskStore.
| &numberValue, | ||
| ) | ||
| if err != nil { | ||
| return artifacts, err |
There was a problem hiding this comment.
| return artifacts, err | |
| return nil, err |
| if metadataBytes != nil { | ||
| err = metadata.Scan(metadataBytes) | ||
| if err != nil { | ||
| return artifacts, util.NewInternalServerError(err, "Failed to parse artifact metadata") |
There was a problem hiding this comment.
| return artifacts, util.NewInternalServerError(err, "Failed to parse artifact metadata") | |
| return nil, util.NewInternalServerError(err, "Failed to parse artifact metadata") |
| // Fetches an artifact with a given id. | ||
| GetArtifact(id string) (*model.Artifact, error) | ||
|
|
||
| // Fetches artifacts for given filtering and listing options. |
There was a problem hiding this comment.
The 4 return values ([]*model.Artifact, int, string, error) are not documented. A reader has to trace through the implementation to know that int is the total count and string is the next page token.
|
|
||
| rows, err := tx.Query(rowsSQL, rowsArgs...) | ||
| if err != nil { | ||
| tx.Rollback() |
There was a problem hiding this comment.
tx.Rollback() error is discarded here (and on lines 228, 233, 240, 244, 249). If the rollback itself fails, there's no indication.
Consider using a single defer after Begin instead of scattered rollback calls:
defer func() {
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
glog.Errorf("Failed to rollback: %v", rbErr)
}
}
}()| sizeRow, err := tx.Query(sizeSQL, sizeArgs...) | ||
| if err != nil { | ||
| tx.Rollback() | ||
| return errorF(err) | ||
| } | ||
| if err := sizeRow.Err(); err != nil { | ||
| tx.Rollback() | ||
| return errorF(err) | ||
| } | ||
| totalSize, err := list.ScanRowToTotalSize(sizeRow) | ||
| if err != nil { | ||
| tx.Rollback() | ||
| return errorF(err) | ||
| } | ||
| defer sizeRow.Close() |
There was a problem hiding this comment.
Same here about closing sizeRow and discarding rollback errors.
|
|
||
| err = tx.Commit() | ||
| if err != nil { | ||
| glog.Errorf("Failed to commit transaction to list artifacts") |
There was a problem hiding this comment.
Same as earlier regarding logging and returning error.
| } | ||
|
|
||
| npt, err := opts.NextPageToken(artifacts[opts.PageSize]) | ||
| return artifacts[:opts.PageSize], totalSize, npt, err |
There was a problem hiding this comment.
| return artifacts[:opts.PageSize], totalSize, npt, err | |
| if err != nil { | |
| return nil, 0, "", err | |
| } | |
| return artifacts[:opts.PageSize], totalSize, npt, nil |
|
|
||
| artifacts, err := s.scanRows(r) | ||
| if err != nil || len(artifacts) > 1 { | ||
| return nil, util.NewInternalServerError(err, "Failed to get artifact: %v", err.Error()) |
There was a problem hiding this comment.
err can be nil here. If scanRows succeeds but returns more than 1 artifact, err is nil and err.Error() on line 289 panics. These two conditions should be separate checks.
| return "artifacts" | ||
| } | ||
|
|
||
| func (a Artifact) GetSortByFieldPrefix(s string) string { |
There was a problem hiding this comment.
nit: Unused parameter
| func (a Artifact) GetSortByFieldPrefix(s string) string { | |
| func (a Artifact) GetSortByFieldPrefix(string) string { |
| type Artifact struct { | ||
| UUID string `gorm:"column:UUID; not null; primaryKey; type:varchar(191);"` | ||
| Namespace string `gorm:"column:Namespace; not null; type:varchar(63); index:idx_type_namespace,priority:1;"` | ||
| Type ArtifactType `gorm:"column:Type; default:null; index:idx_type_namespace,priority:2;"` |
There was a problem hiding this comment.
This is a nullable column. It should be a pointer.
| Namespace string `gorm:"column:Namespace; not null; type:varchar(63); index:idx_type_namespace,priority:1;"` | ||
| Type ArtifactType `gorm:"column:Type; default:null; index:idx_type_namespace,priority:2;"` | ||
| URI *string `gorm:"column:URI; type:text;"` | ||
| Name string `gorm:"column:Name; type:varchar(128); default:null;"` |
| Type ArtifactType `gorm:"column:Type; default:null; index:idx_type_namespace,priority:2;"` | ||
| URI *string `gorm:"column:URI; type:text;"` | ||
| Name string `gorm:"column:Name; type:varchar(128); default:null;"` | ||
| Description string `gorm:"column:Description; type:text; default:null;"` |
| return "", false | ||
| } | ||
|
|
||
| func (a Artifact) GetFieldValue(name string) interface{} { |
There was a problem hiding this comment.
| func (a Artifact) GetFieldValue(name string) interface{} { | |
| func (a Artifact) GetFieldValue(name string) any { |
| if err != nil || len(artifactTasks) > 1 { | ||
| return nil, util.NewInternalServerError(err, "Failed to get artifact-task: %v", err.Error()) |
There was a problem hiding this comment.
if err != nil || len(artifactTasks) > 1 then util.NewInternalServerError(err, ...); when err is nil but len > 1, err.Error() panics. Use separate checks.
| taskIDs = append(taskIDs, filterContext.ID) | ||
| case model.RunResourceType: | ||
| runIDs = append(runIDs, filterContext.ID) | ||
| } |
There was a problem hiding this comment.
Any other filterContext.Type is skipped with no error or log. Fail fast in default branch.
| if err != nil { | ||
| return nil, util.NewInternalServerError(err, "Failed to start transaction for creating artifact-tasks") | ||
| } | ||
| defer tx.Rollback() |
| &key, | ||
| ) | ||
| if err != nil { | ||
| return artifactTasks, err |
There was a problem hiding this comment.
| return artifactTasks, err | |
| return nil, err |
| glog.Errorf("Failed to commit transaction to list artifact-tasks") | ||
| return errorF(err) |
There was a problem hiding this comment.
| glog.Errorf("Failed to commit transaction to list artifact-tasks") | |
| return errorF(err) | |
| return errorF(err) |
| @@ -79,6 +91,267 @@ func NewTaskStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterf | |||
| } | |||
| } | |||
|
|
|||
| // scanTaskRow scans a single row into a model.Task. It expects the column order to match taskColumns. | |||
| func scanTaskRow(rowscanner interface{ Scan(dest ...any) error }) (*model.Task, error) { | |||
There was a problem hiding this comment.
All calls pass *sql.Rows as argument. Can we change the parameter type?
| func scanTaskRow(rowscanner interface{ Scan(dest ...any) error }) (*model.Task, error) { | |
| func scanTaskRow(rowscanner *sql.Rows) (*model.Task, error) { |
| fmt.Printf("scan error is %v", err) | ||
| return tasks, err |
There was a problem hiding this comment.
| fmt.Printf("scan error is %v", err) | |
| return tasks, err | |
| return nil, err |
| if err != nil { | ||
| return util.NewInternalServerError(err, "Failed to check existing tasks") | ||
| if err == sql.ErrNoRows { |
There was a problem hiding this comment.
Comparison with errors using equality operators fails on wrapped errors.
| if err == sql.ErrNoRows { | |
| if errors.Is(err, sql.ErrNoRows) { |
| if err != nil { | ||
| return nil, util.NewInternalServerError(err, "Failed to start transaction for task update") | ||
| } | ||
| defer tx.Rollback() // Will be no-op if Commit() succeeds |
| // to include the value hash to avoid collisions. | ||
| valueHash, err := hashProtoValue(p.GetValue()) | ||
| if err != nil { | ||
| glog.Errorf("Failed to hash parameter value: %v", err) |
There was a problem hiding this comment.
It shouldn't continue in case of error.
| // Creates a new metric entry. | ||
| CreateMetric(metric *model.RunMetric) (err error) | ||
| // CreateV1Metric Creates a new metric entry. | ||
| // Deprecated: use CreateMetric instead. |
| if run == nil || run.UUID == "" { | ||
| continue | ||
| } |
There was a problem hiding this comment.
Are there any valid cases that this would happen? Otherwise it should return error or be removed to avoid masking bugs.
| func (r *ResourceManager) ListArtifacts(filterContexts []*model.FilterContext, opts *list.Options) ([]*model.Artifact, int, string, error) { | ||
| // Use the first filter context for now (artifacts are typically filtered by namespace) | ||
| var filterContext *model.FilterContext | ||
| if len(filterContexts) > 0 { | ||
| filterContext = filterContexts[0] | ||
| } | ||
|
|
There was a problem hiding this comment.
If only one FilterContext is used, why accepting a slice?
| func (r *ResourceManager) ListArtifacts(filterContexts []*model.FilterContext, opts *list.Options) ([]*model.Artifact, int, string, error) { | |
| // Use the first filter context for now (artifacts are typically filtered by namespace) | |
| var filterContext *model.FilterContext | |
| if len(filterContexts) > 0 { | |
| filterContext = filterContexts[0] | |
| } | |
| func (r *ResourceManager) ListArtifacts(filterContext *model.FilterContext, opts *list.Options) ([]*model.Artifact, int, string, error) { |
| // Fetches a run with a given id. | ||
| // GetRun fetches a run with full task hydration (backward compatible). |
There was a problem hiding this comment.
The comment got duplicated.
| func (r *ResourceManager) ReportMetric(metric *model.RunMetric) error { | ||
| err := r.runStore.CreateMetric(metric) | ||
| // ReportMetric Read metrics as ordinary artifacts instead. | ||
| // Creates a run metric entry. Deprecated. |
There was a problem hiding this comment.
| // Creates a run metric entry. Deprecated. | |
| // Creates a run metric entry. | |
| // Deprecated. |
| // Set the validated namespace | ||
| modelArtifact.Namespace = namespace | ||
|
|
||
| artifact, err := s.resourceManager.CreateArtifact(modelArtifact) |
There was a problem hiding this comment.
A mid-loop failure leaves partial state. Consider adding a CreateArtifacts batch method at the store layer (like CreateArtifactTasks) and calling it here.
| return s.listRunsWithHydration(ctx, pageToken, pageSize, sortBy, opts, namespace, experimentId, true) | ||
| } | ||
|
|
||
| func (s *BaseRunServer) listRunsWithHydration(ctx context.Context, pageToken string, pageSize int, sortBy string, opts *list.Options, namespace string, experimentID string, hydrateTasks bool) ([]*model.Run, int, string, error) { |
There was a problem hiding this comment.
pageToken, pageSize and sortBy are unused and can be removed.
| // Reports run metrics. | ||
| // Supports v1beta1 API. | ||
| // ReportRunMetricsV1 reports run metrics. | ||
| // Supports v1beta1 API. Deprecated. |
There was a problem hiding this comment.
| // Supports v1beta1 API. Deprecated. | |
| // Supports v1beta1 API. | |
| // Deprecated. |
1512455 to
89065dd
Compare
|
I just hope that after this is merged, that we still have one last release that still has v1 as well and with this PR here a usable v2. Many companies need to have a chance to migrate production pipelines from v1 to v2. For that we first need to have a release with a reasonable secure, reliable and scalable v2 (with this PR) and still v1. Then a few months later we can cut another release that removes v1. |
| if opts.Namespace == "" { | ||
| return fmt.Errorf("namespace is required") | ||
| } | ||
| if opts.Task.GetTaskInfo().GetName() != "" { |
There was a problem hiding this comment.
** 🤖 AI Review **
validateRootDAG() now dereferences opts.Task here, but the compiled root-driver shape still omits task entirely. That makes normal root DAG runs panic before the root task is created. Could this treat opts.Task == nil as the expected root case and add a regression test using a real compiled root-driver invocation?
| @@ -1,14 +0,0 @@ | |||
| apiVersion: kustomize.config.k8s.io/v1beta1 | |||
There was a problem hiding this comment.
** 🤖 AI Review **
Deleting this base leaves a few install/tooling entry points still referencing base/metadata/** and base/pipeline/metadata-writer/** (env/dev-kind, platform-agnostic-postgresql, platform-agnostic-multi-user-legacy, and hack/{release,format}.sh). PR head still has broken kustomize/release paths until those references are updated or removed in the same rollout.
| } | ||
|
|
||
| // Fetch task and artifact for validation and authorization | ||
| task, err := s.resourceManager.GetTask(at.GetTaskId()) |
There was a problem hiding this comment.
** 🤖 AI Review **
We fetch the task/artifact here for auth, but we never validate that artifact_task.run_id == task.RunUUID. The same ownership gap exists in CreateArtifact() / CreateArtifactsBulk(), which authorize on the submitted artifact namespace before linking to a task/run. Could the server derive run/namespace from the fetched task and reject mismatches across all artifact-create paths?
| cm.cacheClient = cacheClient | ||
|
|
||
| // Initialize connection to new KFP v2beta1 API server | ||
| apiCfg := apiclient.FromEnv() |
There was a problem hiding this comment.
** 🤖 AI Review **
This now ignores the existing --ml_pipeline_server_address / --ml_pipeline_server_port plumbing and only honors KFP_API_ADDRESS / KFP_API_PORT. The compiler, driver, and launcher still thread the flag-based endpoint through, so non-default installs will silently dial the hard-coded default here. Could the new client path reuse those existing options or at least honor both config sources?
| if err != nil { | ||
| return execution, fmt.Errorf("failed to create artifact tasks: %w", err) | ||
| } | ||
| execution.TaskID = createdTask.TaskId |
There was a problem hiding this comment.
** 🤖 AI Review **
Could this also set taskToCreate.TaskId = createdTask.TaskId? The deferred error/status paths later operate on taskToCreate, and the update branch above sends UpdateTaskRequest{Task: taskToCreate} without a separate path ID. Today that means cleanup/status propagation can end up targeting no task even though CreateTask() succeeded.
| Payload LargeText `gorm:"column:Payload; default:null;"` | ||
| UUID string `gorm:"column:UUID; not null; primaryKey; type:varchar(191);"` | ||
| Namespace string `gorm:"column:Namespace; not null; type:varchar(63);"` | ||
| RunUUID string `gorm:"column:RunUUID; type:varchar(191); not null; index:idx_parent_run,priority:1;"` |
There was a problem hiding this comment.
** 🤖 AI Review **
model.Task now renamed this field to RunUUID, but validation.LengthSpecs still references the old Go field name RunID. Since ValidateModel() looks fields up by struct name via reflection, task validation now trips an internal error instead of enforcing the length check. Could the validation spec be updated to RunUUID with a targeted test?
bd8350c to
3be92f4
Compare
Remove the MLMD dependency by storing artifacts and task state directly in KFP's API server and database, and switch the runtime path to use the new v2beta1 artifact and task APIs instead of the metadata service. This change: - adds artifact/task models, storage, API surface, and generated clients - updates the driver, launcher, importer, auth, and workspace artifact flows to use the KFP API end to end - removes MLMD deployment/manifests and refreshes the frontend, CI, tests, and generated outputs needed to keep the rebased branch passing Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
Port the merged task API contract fixes from the rebased runtime branch back onto mlmd-removal-11, restore the missing runtime pod-name helper, and refresh the run proto goldens so the backend unit suite passes on the squashed integration branch. Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
Populate the top-level run_id on runtime task API requests, update the v2 integration cache test to the merged PipelineTask HTTP model names, and stop the manifest smoke test from probing metadata overlays that this branch removes. Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
Stop pre-populating artifact IDs on create requests, map bulk-created IDs back onto queued artifact-task links before creating those relationships, and update the manifest release helper to stop rewriting metadata image tags that this branch removes. Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
Filter duplicate artifact-task relationships before the bulk create call so the runtime does not trip the artifact_tasks unique constraint, and add a unit test covering the dedupe path. Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
Rely on the artifact APIs to create the initial output artifact-task link when a new artifact is created, and align the local mock behavior with that server contract so the runtime path and tests stay consistent. Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
Description of your changes:
This PR removes MLMD as per the KEP here
Resolves: #11760
Overview
Core Change: Replaced MLMD (ML Metadata) service with direct database storage via KFP API server.
This is a major architectural shift that eliminates the external ML Metadata service dependency and consolidates all artifact and task metadata operations directly into the KFP API server with MySQL/database backend.
Components Removed
MLMD Service Infrastructure
backend/metadata_writer/)backend/src/v2/metadata/)Deployment Changes
Components Added
New API Layer
Artifact Service API (
backend/api/v2beta1/artifact.proto)CRUD Operations:
CreateArtifact- Create single artifactGetArtifact- Retrieve artifact by IDListArtifacts- Query artifacts with filteringBatchCreateArtifacts- Bulk artifact creationArtifact Task Operations:
CreateArtifactTask- Track artifact usage in tasksListArtifactTasks- Query artifact-task relationshipsBatchCreateArtifactTasks- Bulk task-artifact linkingGenerated Clients:
Extended Run Service API (
backend/api/v2beta1/run.proto)New Task Endpoints:
CreateTask- Create pipeline task execution recordGetTask- Retrieve task detailsListTasks- Query tasks with filteringUpdateTask- Update task status/metadataBatchUpdateTasks- Efficient bulk task updatesViewMode Feature:
BASIC- Minimal response (IDs, status, timestamps)RUNTIME_ONLY- Include runtime details without full specFULL- Complete task/run details with specStorage Layer
Artifact Storage (
backend/src/apiserver/storage/artifact_store.go)Artifact Task Store (
backend/src/apiserver/storage/artifact_task_store.go)Enhanced Task Store (
backend/src/apiserver/storage/task_store.go)API Server Implementation
Artifact Server (
backend/src/apiserver/server/artifact_server.go)Extended Run Server (
backend/src/apiserver/server/run_server.go)Client Infrastructure
KFP API Client (
backend/src/v2/apiclient/)Driver/Launcher Refactoring
Parameter/Artifact Resolution (
backend/src/v2/driver/resolver/)resolve.go(~1,100 lines removed)parameters.go- Parameter resolution (~560 lines)artifacts.go- Artifact resolution (~314 lines)resolve.go- Orchestration (~90 lines)Driver Changes (
backend/src/v2/driver/)Launcher Changes (
backend/src/v2/cmd/launcher-v2/)Batch Updater (
backend/src/v2/component/batch_updater.go)Testing Infrastructure
Test Data Pipelines (
backend/src/v2/driver/test_data/)cache_test.yaml- Cache hit/miss scenarioscomponentInput.yaml- Input parameter testingk8s_parameters.yaml- Kubernetes-specific featuresoneof_simple.yaml- Conditional executionnested_naming_conflicts.yaml- Name resolution edge casesTest Coverage
Utility Additions
Scope Path (
backend/src/common/util/scope_path.go)Proto Helpers (
backend/src/common/util/proto_helpers.go)YAML Parser (
backend/src/common/util/yaml_parser.go)Key Behavioral Changes
Artifact Tracking
Task State Management
Performance Optimizations
API Response Size
ListRunswithVIEW_MODE=DEFAULT: ~80% smaller payloadsMigration Considerations
Database Schema
artifacts,artifact_taskstaskstable with new columnsBackwards Compatibility
Deployment
Testing Strategy
Unit Tests
Integration Tests
Golden File Updates
Files Changed Summary
Breakdown
Risks & Considerations
Testing
Performance
Operational
Recommended Follow-up
Conclusion
This is an architectural improvement that:
Checklist: