Skip to content

Add scheduler batch size option #1704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

yang-xiaodong
Copy link
Member

Summary

  • add SchedulerBatchSize option to limit scheduler fetch size
  • support the new limit in PostgreSql, MySql, SqlServer, MongoDB, and InMemory storages
  • document SchedulerBatchSize in both English and Chinese configuration docs

Testing

  • dotnet test --no-restore --verbosity minimal (fails: command not found)

https://chatgpt.com/codex/tasks/task_e_68500d907df88320972e9759b7612750

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new SchedulerBatchSize option that limits the number of messages fetched by the scheduler across various storage implementations. Key changes include:

  • Introducing a new SchedulerBatchSize property with a default value of 1000 in the CapOptions class.
  • Updating SQL queries for SQL Server, PostgreSql, and MySql to use the new batch size limit.
  • Adapting the MongoDB and InMemory storage implementations to respect the SchedulerBatchSize value.

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/DotNetCore.CAP/CAP.Options.cs Added SchedulerBatchSize property with documentation.
src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs Updated SQL Server query using SELECT TOP (@batchsize) and added a new parameter.
src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs Updated PostgreSQL query to include LIMIT @batchsize and added the parameter.
src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs Updated MySQL query to include LIMIT @batchsize and added the parameter.
src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs Modified MongoDB query to limit results based on SchedulerBatchSize.
src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs Incorporated the batch size option into the LINQ query with .Take().
docs/content/user-guide/zh/cap/configuration.md Updated documentation with SchedulerBatchSize configuration in Chinese.
docs/content/user-guide/en/cap/configuration.md Updated documentation with SchedulerBatchSize configuration in English.
Comments suppressed due to low confidence (1)

src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs:246

  • Adding an ORDER BY clause to this query would ensure that the limited set of messages is selected in a deterministic order.
… LIMIT @BatchSize {lockSql};

Comment on lines 240 to 242
UNION ALL
SELECT Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
SELECT TOP (@BatchSize) Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
WHERE Version = @Version AND StatusName = '{StatusName.Queued}' AND ExpiresAt < @OneMinutesAgo;";
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding an ORDER BY clause to ensure deterministic message selection when using TOP(@batchsize). This will help guarantee consistent behavior across executions.

Copilot uses AI. Check for mistakes.

@@ -237,13 +237,14 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
{
var sql =
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\" FROM {_pubName} WHERE \"Version\"=@Version " +
$"AND ((\"ExpiresAt\"< @TwoMinutesLater AND \"StatusName\" = '{StatusName.Delayed}') OR (\"ExpiresAt\"< @OneMinutesAgo AND \"StatusName\" = '{StatusName.Queued}')) FOR UPDATE SKIP LOCKED;";
$"AND ((\"ExpiresAt\"< @TwoMinutesLater AND \"StatusName\" = '{StatusName.Delayed}') OR (\"ExpiresAt\"< @OneMinutesAgo AND \"StatusName\" = '{StatusName.Queued}')) FOR UPDATE SKIP LOCKED LIMIT @BatchSize;";
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider appending an ORDER BY clause to the query to enforce a consistent record order when applying the LIMIT @batchsize.

Suggested change
$"AND ((\"ExpiresAt\"< @TwoMinutesLater AND \"StatusName\" = '{StatusName.Delayed}') OR (\"ExpiresAt\"< @OneMinutesAgo AND \"StatusName\" = '{StatusName.Queued}')) FOR UPDATE SKIP LOCKED LIMIT @BatchSize;";
$"AND ((\"ExpiresAt\"< @TwoMinutesLater AND \"StatusName\" = '{StatusName.Delayed}') OR (\"ExpiresAt\"< @OneMinutesAgo AND \"StatusName\" = '{StatusName.Queued}')) FOR UPDATE SKIP LOCKED ORDER BY \"Id\" LIMIT @BatchSize;";

Copilot uses AI. Check for mistakes.

@@ -322,7 +322,9 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
await collection.UpdateManyAsync(session, filter, update, cancellationToken: linkedTs.Token)
.ConfigureAwait(false);

var queryResult = await collection.Find(session, filter).ToListAsync(linkedTs.Token)
var queryResult = await collection.Find(session, filter)
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the MongoDB query, consider adding a Sort() clause before applying the Limit to ensure predictable results.

Suggested change
var queryResult = await collection.Find(session, filter)
var queryResult = await collection.Find(session, filter)
.Sort(Builders<BsonDocument>.Sort.Ascending("Added")) // Sort by the 'Added' field

Copilot uses AI. Check for mistakes.

@@ -215,6 +215,7 @@ public Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessag
var result = PublishedMessages.Values.Where(x =>
(x.StatusName == StatusName.Delayed && x.ExpiresAt < DateTime.Now.AddMinutes(2))
|| (x.StatusName == StatusName.Queued && x.ExpiresAt < DateTime.Now.AddMinutes(-1)))
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If message order is important in the in-memory implementation, consider sorting the messages before taking the batch to ensure consistency.

Suggested change
|| (x.StatusName == StatusName.Queued && x.ExpiresAt < DateTime.Now.AddMinutes(-1)))
|| (x.StatusName == StatusName.Queued && x.ExpiresAt < DateTime.Now.AddMinutes(-1)))
.OrderBy(x => x.ExpiresAt)

Copilot uses AI. Check for mistakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant