-
Notifications
You must be signed in to change notification settings - Fork 2k
feat: convert scheduler and proactive triggers to docket, remove LoopService #19756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
CodSpeed Performance ReportMerging #19756 will not alter performanceComparing Summary
|
This converts the Scheduler and RecentDeploymentsScheduler LoopService classes to schedule_deployments and schedule_recent_deployments perpetual functions using the docket perpetual_service decorator pattern. Changes: - Convert Scheduler class to schedule_deployments perpetual function - Convert RecentDeploymentsScheduler class to schedule_recent_deployments perpetual function - Extract helper methods as standalone functions: - _get_select_deployments_to_schedule_query - _get_select_recent_deployments_to_schedule_query - _collect_flow_runs - Remove scheduler module from base.py service registry - Update test_service_subsets.py to remove Scheduler/RecentDeploymentsScheduler - Add scheduler registration test to test_perpetual_services.py - Update test_scheduler.py to use functions directly instead of class instances 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Updates the CLI to run both LoopService-based services AND docket-based perpetual services when using `prefect server services start`. Previously, the HA setup (`prefect server start --no-services` + multiple `prefect server services start` replicas) only ran LoopService subclasses. With the conversion of scheduler and other services to docket perpetual functions, this left perpetual services (like schedule_deployments) unexecuted. Now `prefect server services start` starts a docket Worker alongside the LoopServices, enabling proper HA operation with the new docket-based services. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…ove LoopService - Convert ProactiveTriggers LoopService to evaluate_proactive_triggers_periodic perpetual function - Remove LoopService class and run_multiple_services from base.py entirely - Update test_service_subsets.py to remove ProactiveTriggers from Service class tests - Delete test_loop_service.py (no longer needed) - Update CLI and perpetual_services.py docstrings to remove LoopService mentions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
59e5ab6 to
b42ee0d
Compare
…service The @perpetual_service decorator for evaluate_proactive_triggers_periodic only runs when its module is imported. Without this import, the proactive triggers service was never registered with the perpetual services registry, causing the proactive automation integration test to time out. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
desertaxle
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM, bu the ammount of test changes make it hard to spot if there are any regressions. We'll want to keep an eye on this in the test bed to see if there are any issues.
| # Get the deployment schedules to check their IDs/slugs | ||
| schedules = await models.deployments.read_deployment_schedules( | ||
| session=session, | ||
| deployment_id=deployment.id, | ||
| ) | ||
| schedule_with_slug = next(s for s in schedules if s.slug == "my-schedule") | ||
| schedule_without_slug = next(s for s in schedules if not s.slug) | ||
|
|
||
| # Verify the schedule with slug has the expected properties | ||
| assert schedule_with_slug.slug == "my-schedule" | ||
| assert schedule_with_slug.parameters == {"name": "whoami"} | ||
| assert schedule_with_slug.active is True | ||
|
|
||
| # Check that idempotency keys use slugs when available and IDs when not | ||
| expected_dates = await schedule.get_dates(service.min_runs) | ||
| for date in expected_dates: | ||
| # Find runs for this date | ||
| date_runs = [r for r in runs if r.state.state_details.scheduled_time == date] | ||
| assert len(date_runs) == 2 # Should have two runs per date | ||
|
|
||
| # One run should use the slug in its idempotency key | ||
| assert any( | ||
| f"scheduled {deployment.id} {schedule_with_slug.id} {date}" | ||
| == r.idempotency_key | ||
| for r in date_runs | ||
| ) | ||
| # One run should use the ID in its idempotency key | ||
| assert any( | ||
| f"scheduled {deployment.id} {schedule_without_slug.id} {date}" | ||
| == r.idempotency_key | ||
| for r in date_runs | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this part of the test case removed?
| @pytest.mark.parametrize( | ||
| "interval,n", | ||
| [ | ||
| # schedule until we at least exceed an hour | ||
| (datetime.timedelta(minutes=1), 61), | ||
| # schedule at least 3 runs | ||
| (datetime.timedelta(hours=1), 3), | ||
| # schedule until we at least exceed an hour | ||
| (datetime.timedelta(minutes=5), 13), | ||
| # schedule until at most 100 days | ||
| (datetime.timedelta(days=60), 2), | ||
| ], | ||
| ) | ||
| async def test_create_schedule_respects_max_future_time( | ||
| self, | ||
| flow: schemas.core.Flow, | ||
| session: AsyncSession, | ||
| interval: datetime.timedelta, | ||
| n: int, | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep this test? I see there's one with the same name, but it isn't parameterized like this one.
- Restore idempotency key assertions in test_create_parametrized_schedules_with_slugs - Re-add parameterized test_create_schedule_respects_max_future_time to TestScheduleRulesWaterfall - Test now uses one schedule with slug, one without (as originally intended) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
Summary
Final PR in the background services docket migration. Converts the last two LoopService classes and removes LoopService entirely.
Scheduler conversion
SchedulerLoopService class toschedule_deploymentsperpetual functionRecentDeploymentsSchedulerLoopService class toschedule_recent_deploymentsperpetual functionProactiveTriggers conversion
ProactiveTriggersLoopService class toevaluate_proactive_triggers_periodicperpetual functionLoopService removal
LoopServiceclass andrun_multiple_servicesfrombase.pytest_loop_service.py_run_all_services()to useget_current_settings().server.docket_urlTest Plan
🤖 Generated with Claude Code