Skip to content

Commit f4d3f87

Browse files
committed
Add support for ES indexing when running batch reprocessing of scores
1 parent b99cde7 commit f4d3f87

File tree

1 file changed

+22
-1
lines changed

1 file changed

+22
-1
lines changed

osu.Server.Queues.ScoreStatisticsProcessor/Commands/Performance/Scores/UpdateAllScoresCommand.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
namespace osu.Server.Queues.ScoreStatisticsProcessor.Commands.Performance.Scores
2020
{
21-
[Command(Name = "all", Description = "Computes pp of all scores from all users. Note that this doesn't update the ES index.")]
21+
[Command(Name = "all", Description = "Computes pp of all scores from all users.")]
2222
public class UpdateAllScoresCommand : PerformanceCommand
2323
{
2424
[Option(Description = "The size of each batch, which is then distributed to threads.")]
@@ -39,6 +39,12 @@ public class UpdateAllScoresCommand : PerformanceCommand
3939
[Option(Description = "Optional where clause", Template = "--where")]
4040
public string Where { get; set; } = "1 = 1";
4141

42+
/// <summary>
43+
/// Whether to push changed scores to the ES indexing queue.
44+
/// </summary>
45+
[Option(CommandOptionType.SingleOrNoValue, Template = "--run-indexing")]
46+
public bool RunIndexing { get; set; }
47+
4248
/// <summary>
4349
/// Whether to adjust processing rate based on slave latency. Defaults to <c>false</c>.
4450
/// </summary>
@@ -51,6 +57,10 @@ public class UpdateAllScoresCommand : PerformanceCommand
5157
/// </summary>
5258
private readonly ConcurrentQueue<MySqlConnection> connections = new ConcurrentQueue<MySqlConnection>();
5359

60+
private readonly ElasticQueuePusher elasticQueueProcessor = new ElasticQueuePusher();
61+
62+
private readonly ConcurrentBag<ElasticQueuePusher.ElasticScoreItem> elasticItems = new ConcurrentBag<ElasticQueuePusher.ElasticScoreItem>();
63+
5464
protected override async Task<int> ExecuteAsync(CancellationToken cancellationToken)
5565
{
5666
// TODO: ruleset parameter is in base class but unused.
@@ -111,6 +121,8 @@ protected override async Task<int> ExecuteAsync(CancellationToken cancellationTo
111121
if (scores.Count == 0)
112122
break;
113123

124+
elasticItems.Clear();
125+
114126
await Task.WhenAll(Partitioner.Create(scores).GetPartitions(Threads).Select(async partition =>
115127
{
116128
connections.TryDequeue(out var connection);
@@ -128,7 +140,10 @@ await Task.WhenAll(Partitioner.Create(scores).GetPartitions(Threads).Select(asyn
128140
bool changed = await ScoreProcessor.ProcessScoreAsync(partition.Current, connection, transaction);
129141

130142
if (changed)
143+
{
131144
Interlocked.Increment(ref changedPp);
145+
elasticItems.Add(new ElasticQueuePusher.ElasticScoreItem { ScoreId = (long?)partition.Current.id });
146+
}
132147
}
133148
catch (Exception e)
134149
{
@@ -145,6 +160,12 @@ await Task.WhenAll(Partitioner.Create(scores).GetPartitions(Threads).Select(asyn
145160
if (cancellationToken.IsCancellationRequested)
146161
return -1;
147162

163+
if (RunIndexing && elasticItems.Count > 0)
164+
{
165+
elasticQueueProcessor.PushToQueue(elasticItems.ToList());
166+
Console.WriteLine($"Queued {elasticItems.Count} items for indexing");
167+
}
168+
148169
Interlocked.Add(ref processedCount, (ulong)scores.Count);
149170

150171
currentScoreId = scores.Last().id + 1;

0 commit comments

Comments
 (0)