Skip to content

Commit 5fdfa8e

Browse files
authored
Merge 51ee3d3 into 243b640
2 parents 243b640 + 51ee3d3 commit 5fdfa8e

File tree

14 files changed

+205
-27
lines changed

14 files changed

+205
-27
lines changed

ydb/core/memory_controller/memory_controller.cpp

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
#include <ydb/core/tablet_flat/shared_sausagecache.h>
1515
#include <ydb/core/tablet/resource_broker.h>
1616
#include <ydb/core/tx/columnshard/common/limits.h>
17+
#include <ydb/core/tx/columnshard/blob_cache.h>
18+
#include <ydb/core/tx/limiter/grouped_memory/usage/events.h>
19+
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
1720
#include <ydb/library/actors/core/actor_bootstrapped.h>
1821
#include <ydb/library/actors/core/log.h>
1922
#include <ydb/library/actors/core/process_stats.h>
@@ -285,6 +288,8 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
285288
Counters->GetCounter("Stats/ConsumersLimit")->Set(consumersLimitBytes);
286289

287290
ProcessResourceBrokerConfig(ctx, memoryStats, hardLimitBytes, activitiesLimitBytes);
291+
ProcessGroupedMemoryLimiterConfig(ctx, memoryStats, hardLimitBytes);
292+
ProcessCacheConfig(ctx, memoryStats, hardLimitBytes);
288293

289294
Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), memoryStatsUpdate);
290295

@@ -371,6 +376,40 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
371376
}
372377
}
373378

379+
void ProcessGroupedMemoryLimiterConfig(const TActorContext& /*ctx*/, NKikimrMemory::TMemoryStats& /*memoryStats*/, ui64 hardLimitBytes) {
380+
ui64 columnTablesScanLimitBytes = GetColumnTablesReadExecutionLimitBytes(Config, hardLimitBytes);
381+
ui64 columnTablesCompactionLimitBytes = GetColumnTablesCompactionLimitBytes(Config, hardLimitBytes) *
382+
NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterCompactionLimitCoefficient;
383+
384+
ApplyGroupedMemoryLimiterConfig(columnTablesScanLimitBytes, columnTablesCompactionLimitBytes);
385+
}
386+
387+
void ApplyGroupedMemoryLimiterConfig(const ui64 scanHardLimitBytes, const ui64 compactionHardLimitBytes) {
388+
namespace NGroupedMemoryManager = ::NKikimr::NOlap::NGroupedMemoryManager;
389+
using UpdateMemoryLimitsEv = NGroupedMemoryManager::NEvents::TEvExternal::TEvUpdateMemoryLimits;
390+
391+
Send(NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(SelfId().NodeId()),
392+
new UpdateMemoryLimitsEv(scanHardLimitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient,
393+
scanHardLimitBytes));
394+
395+
Send(NGroupedMemoryManager::TCompMemoryLimiterOperator::MakeServiceId(SelfId().NodeId()),
396+
new UpdateMemoryLimitsEv(compactionHardLimitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient,
397+
compactionHardLimitBytes));
398+
}
399+
400+
void ProcessCacheConfig(const TActorContext& /*ctx*/, NKikimrMemory::TMemoryStats& /*memoryStats*/, ui64 hardLimitBytes) {
401+
ui64 columnTablesBlobCacheLimitBytes = GetColumnTablesCacheLimitBytes(Config, hardLimitBytes);
402+
403+
ApplyCacheConfig(columnTablesBlobCacheLimitBytes);
404+
}
405+
406+
void ApplyCacheConfig(const ui64 cacheLimitBytes) {
407+
namespace NBlobCache = ::NKikimr::NBlobCache;
408+
using UpdateMaxCacheDataSizeEv = NBlobCache::TEvBlobCache::TEvUpdateMaxCacheDataSize;
409+
410+
Send(NBlobCache::MakeBlobCacheServiceId(), new UpdateMaxCacheDataSizeEv(cacheLimitBytes * NKikimr::NOlap::TGlobalLimits::BlobCacheCoefficient));
411+
}
412+
374413
void ProcessResourceBrokerConfig(const TActorContext& ctx, NKikimrMemory::TMemoryStats& memoryStats, ui64 hardLimitBytes,
375414
ui64 activitiesLimitBytes) {
376415
ui64 queryExecutionConsumption = TAlignedPagePool::GetGlobalPagePoolSize();
@@ -406,13 +445,13 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
406445
// Compaction uses 4 queues, but there is only one memory limit setting in the configuration,
407446
// so the coefficients are used to split allocated memory between the queues.
408447
AddLimitToQueueConfig(record, NLocalDb::KqpResourceManagerQueue, config.QueryExecutionLimitBytes);
409-
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionIndexationQueue,
448+
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionIndexationQueue,
410449
config.ColumnTablesCompactionLimitBytes * NKikimr::NOlap::TGlobalLimits::CompactionIndexationQueueLimitCoefficient);
411-
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionTtlQueue,
450+
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionTtlQueue,
412451
config.ColumnTablesCompactionLimitBytes * NKikimr::NOlap::TGlobalLimits::CompactionTtlQueueLimitCoefficient);
413-
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionGeneralQueue,
452+
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionGeneralQueue,
414453
config.ColumnTablesCompactionLimitBytes * NKikimr::NOlap::TGlobalLimits::CompactionGeneralQueueLimitCoefficient);
415-
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionNormalizerQueue,
454+
AddLimitToQueueConfig(record, NLocalDb::ColumnShardCompactionNormalizerQueue,
416455
config.ColumnTablesCompactionLimitBytes * NKikimr::NOlap::TGlobalLimits::CompactionNormalizerQueueLimitCoefficient);
417456

418457
Send(MakeResourceBrokerID(), configure.Release());

ydb/core/memory_controller/memory_controller_ut.cpp

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <ydb/core/tablet/resource_broker.h>
44
#include <ydb/core/tablet_flat/shared_sausagecache.h>
55
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
6+
#include <ydb/core/tx/columnshard/common/limits.h>
7+
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
68
#include <ydb/library/actors/testlib/test_runtime.h>
79

810
namespace NKikimr::NMemory {
@@ -502,11 +504,13 @@ Y_UNIT_TEST(ResourceBroker_ConfigCS) {
502504
TServerSettings serverSettings(pm.GetPort(2134));
503505
serverSettings.SetDomainName("Root").SetUseRealThreads(false);
504506

507+
const ui64 compactionMemoryLimitPercent = 36;
505508
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
506-
memoryControllerConfig->SetColumnTablesCompactionLimitPercent(32);
509+
memoryControllerConfig->SetColumnTablesCompactionLimitPercent(compactionMemoryLimitPercent);
507510

511+
ui64 currentHardMemoryLimit = 1000_MB;
508512
auto resourceBrokerConfig = serverSettings.AppConfig->MutableResourceBrokerConfig();
509-
resourceBrokerConfig->MutableResourceLimit()->SetMemory(1000_MB);
513+
resourceBrokerConfig->MutableResourceLimit()->SetMemory(currentHardMemoryLimit);
510514

511515
auto addQueueWithMemoryLimit = [&](const TString& name, const ui64 memoryLimit) {
512516
auto queue = resourceBrokerConfig->AddQueues();
@@ -520,7 +524,7 @@ Y_UNIT_TEST(ResourceBroker_ConfigCS) {
520524
addQueueWithMemoryLimit(NLocalDb::ColumnShardCompactionNormalizerQueue, 1_MB);
521525

522526
auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
523-
server->ProcessMemoryInfo->CGroupLimit = 1000_MB;
527+
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
524528
auto& runtime = *server->GetRuntime();
525529
TAutoPtr<IEventHandle> handle;
526530
auto sender = runtime.AllocateEdgeActor();
@@ -529,25 +533,81 @@ Y_UNIT_TEST(ResourceBroker_ConfigCS) {
529533

530534
runtime.SimulateSleep(TDuration::Seconds(2));
531535

532-
auto checkMemoryLimit = [&](const TString& queueName, const ui64 expectedLimit) {
536+
auto checkMemoryLimit = [&](const TString& queueName, const double coeff) {
533537
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(queueName)));
534538
auto config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
535-
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), expectedLimit);
539+
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(),
540+
static_cast<ui64>(currentHardMemoryLimit * coeff * compactionMemoryLimitPercent / 100));
536541
};
537542

538-
checkMemoryLimit(NLocalDb::ColumnShardCompactionIndexationQueue, 40_MB);
539-
checkMemoryLimit(NLocalDb::ColumnShardCompactionTtlQueue, 40_MB);
540-
checkMemoryLimit(NLocalDb::ColumnShardCompactionGeneralQueue, 120_MB);
541-
checkMemoryLimit(NLocalDb::ColumnShardCompactionNormalizerQueue, 120_MB);
543+
using OlapLimits = NKikimr::NOlap::TGlobalLimits;
544+
checkMemoryLimit(NLocalDb::ColumnShardCompactionIndexationQueue, OlapLimits::CompactionIndexationQueueLimitCoefficient);
545+
checkMemoryLimit(NLocalDb::ColumnShardCompactionTtlQueue, OlapLimits::CompactionTtlQueueLimitCoefficient);
546+
checkMemoryLimit(NLocalDb::ColumnShardCompactionGeneralQueue, OlapLimits::CompactionGeneralQueueLimitCoefficient);
547+
checkMemoryLimit(NLocalDb::ColumnShardCompactionNormalizerQueue, OlapLimits::CompactionNormalizerQueueLimitCoefficient);
542548

543549
// Check memory change
544-
server->ProcessMemoryInfo->CGroupLimit = 50_MB;
550+
currentHardMemoryLimit = 100_MB;
551+
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
552+
runtime.SimulateSleep(TDuration::Seconds(2));
553+
554+
checkMemoryLimit(NLocalDb::ColumnShardCompactionIndexationQueue, OlapLimits::CompactionIndexationQueueLimitCoefficient);
555+
checkMemoryLimit(NLocalDb::ColumnShardCompactionTtlQueue, OlapLimits::CompactionTtlQueueLimitCoefficient);
556+
checkMemoryLimit(NLocalDb::ColumnShardCompactionGeneralQueue, OlapLimits::CompactionGeneralQueueLimitCoefficient);
557+
checkMemoryLimit(NLocalDb::ColumnShardCompactionNormalizerQueue, OlapLimits::CompactionNormalizerQueueLimitCoefficient);
558+
}
559+
560+
Y_UNIT_TEST(GroupedMemoryLimiter_ConfigCS) {
561+
using namespace NResourceBroker;
562+
563+
TPortManager pm;
564+
TServerSettings serverSettings(pm.GetPort(2134));
565+
serverSettings.SetDomainName("Root").SetUseRealThreads(false);
566+
567+
const ui64 compactionMemoryLimitPercent = 36;
568+
const ui64 readExecutionMemoryLimitPercent = 20;
569+
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
570+
memoryControllerConfig->SetColumnTablesCompactionLimitPercent(compactionMemoryLimitPercent);
571+
memoryControllerConfig->SetColumnTablesReadExecutionLimitPercent(readExecutionMemoryLimitPercent);
572+
573+
ui64 currentHardMemoryLimit = 1000_MB;
574+
auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
575+
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
576+
auto& runtime = *server->GetRuntime();
577+
TAutoPtr<IEventHandle> handle;
578+
auto sender = runtime.AllocateEdgeActor();
579+
580+
auto scanLimits = NKikimr::NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::GetDefaultStageFeatures();
581+
auto compactionLimits = NKikimr::NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::GetDefaultStageFeatures();
582+
583+
InitRoot(server, sender);
584+
585+
auto checkMemoryLimits = [&]() {
586+
using OlapLimits = NKikimr::NOlap::TGlobalLimits;
587+
UNIT_ASSERT_VALUES_EQUAL(static_cast<ui64>(currentHardMemoryLimit * OlapLimits::GroupedMemoryLimiterSoftLimitCoefficient * readExecutionMemoryLimitPercent / 100),
588+
scanLimits->GetLimit());
589+
UNIT_ASSERT_VALUES_EQUAL(currentHardMemoryLimit * readExecutionMemoryLimitPercent / 100, scanLimits->GetHardLimit());
590+
591+
UNIT_ASSERT_VALUES_EQUAL(static_cast<ui64>(currentHardMemoryLimit * OlapLimits::GroupedMemoryLimiterCompactionLimitCoefficient * OlapLimits::GroupedMemoryLimiterSoftLimitCoefficient * compactionMemoryLimitPercent / 100),
592+
compactionLimits->GetLimit());
593+
UNIT_ASSERT_VALUES_EQUAL(static_cast<ui64>(currentHardMemoryLimit * OlapLimits::GroupedMemoryLimiterCompactionLimitCoefficient * compactionMemoryLimitPercent / 100),
594+
compactionLimits->GetHardLimit());
595+
};
596+
545597
runtime.SimulateSleep(TDuration::Seconds(2));
598+
checkMemoryLimits();
546599

547-
checkMemoryLimit(NLocalDb::ColumnShardCompactionIndexationQueue, 2_MB);
548-
checkMemoryLimit(NLocalDb::ColumnShardCompactionTtlQueue, 2_MB);
549-
checkMemoryLimit(NLocalDb::ColumnShardCompactionGeneralQueue, 6_MB);
550-
checkMemoryLimit(NLocalDb::ColumnShardCompactionNormalizerQueue, 6_MB);
600+
// Check memory decrease
601+
currentHardMemoryLimit = 500_MB;
602+
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
603+
runtime.SimulateSleep(TDuration::Seconds(2));
604+
checkMemoryLimits();
605+
606+
// Check memory increase
607+
currentHardMemoryLimit = 2000_MB;
608+
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
609+
runtime.SimulateSleep(TDuration::Seconds(2));
610+
checkMemoryLimits();
551611
}
552612
}
553613

ydb/core/protos/memory_controller_config.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ message TMemoryControllerConfig {
2828

2929
optional float ColumnTablesReadExecutionLimitPercent = 200 [default = 20];
3030
optional uint64 ColumnTablesReadExecutionLimitBytes = 201;
31-
optional float ColumnTablesCompactionLimitPercent = 202 [default = 16];
31+
optional float ColumnTablesCompactionLimitPercent = 202 [default = 36];
3232
optional uint64 ColumnTablesCompactionLimitBytes = 203;
3333
optional float ColumnTablesCacheLimitPercent = 204 [default = 4];
3434
optional uint64 ColumnTablesCacheLimitBytes = 205;

ydb/core/testlib/test_client.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,6 +1171,11 @@ namespace Tests {
11711171
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
11721172
Runtime->RegisterService(NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
11731173
}
1174+
{
1175+
auto* actor = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::CreateService(NOlap::NGroupedMemoryManager::TConfig(), new ::NMonitoring::TDynamicCounters());
1176+
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
1177+
Runtime->RegisterService(NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
1178+
}
11741179
{
11751180
auto* actor = NPrioritiesQueue::TCompServiceOperator::CreateService(NPrioritiesQueue::TConfig(), new ::NMonitoring::TDynamicCounters());
11761181
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);

ydb/core/tx/columnshard/blob_cache.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ class TBlobCache: public TActorBootstrapped<TBlobCache> {
191191
HFunc(TEvBlobCache::TEvReadBlobRangeBatch, Handle);
192192
HFunc(TEvBlobCache::TEvCacheBlobRange, Handle);
193193
HFunc(TEvBlobCache::TEvForgetBlob, Handle);
194+
HFunc(TEvBlobCache::TEvUpdateMaxCacheDataSize, Handle);
194195
HFunc(TEvBlobStorage::TEvGetResult, Handle);
195196
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
196197
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
@@ -333,6 +334,17 @@ class TBlobCache: public TActorBootstrapped<TBlobCache> {
333334
CachedRanges.erase(begin, end);
334335
}
335336

337+
void Handle(TEvBlobCache::TEvUpdateMaxCacheDataSize::TPtr& ev, const TActorContext&) {
338+
const i64 newMaxCacheDataSize = ev->Get()->MaxCacheDataSize;
339+
if (newMaxCacheDataSize == (i64)MaxCacheDataSize) {
340+
return;
341+
}
342+
343+
LOG_S_INFO("Update max cache data size: " << newMaxCacheDataSize);
344+
345+
MaxCacheDataSize = newMaxCacheDataSize;
346+
}
347+
336348
void SendBatchReadRequestToDS(const std::vector<TBlobRange>& blobRanges, const ui64 cookie,
337349
ui32 dsGroup, TReadItem::EReadVariant readVariant, const TActorContext& ctx)
338350
{

ydb/core/tx/columnshard/blob_cache.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ struct TEvBlobCache {
3939
EvReadBlobRangeResult,
4040
EvCacheBlobRange,
4141
EvForgetBlob,
42+
EvUpdateMemoryLimit,
4243

4344
EvEnd
4445
};
@@ -108,6 +109,14 @@ struct TEvBlobCache {
108109
: BlobId(blobId)
109110
{}
110111
};
112+
113+
struct TEvUpdateMaxCacheDataSize: public NActors::TEventLocal<TEvUpdateMaxCacheDataSize, EvUpdateMemoryLimit> {
114+
i64 MaxCacheDataSize = 0;
115+
116+
explicit TEvUpdateMaxCacheDataSize(const i64 maxCacheDataSize)
117+
: MaxCacheDataSize(maxCacheDataSize) {
118+
}
119+
};
111120
};
112121

113122
inline

ydb/core/tx/columnshard/common/limits.h

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,27 @@ class TGlobalLimits {
1313

1414
static constexpr inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
1515

16-
static constexpr double CompactionIndexationQueueLimitCoefficient = 0.125;
17-
static constexpr double CompactionTtlQueueLimitCoefficient = 0.125;
18-
static constexpr double CompactionGeneralQueueLimitCoefficient = 0.375;
19-
static constexpr double CompactionNormalizerQueueLimitCoefficient = 0.375;
16+
static constexpr double CompactionIndexationQueueLimitCoefficient = 0.055;
17+
static constexpr double CompactionTtlQueueLimitCoefficient = 0.055;
18+
static constexpr double CompactionGeneralQueueLimitCoefficient = 0.165;
19+
static constexpr double CompactionNormalizerQueueLimitCoefficient = 0.165;
20+
static constexpr double GroupedMemoryLimiterCompactionLimitCoefficient = 0.56;
2021

2122
static_assert((CompactionIndexationQueueLimitCoefficient + CompactionTtlQueueLimitCoefficient +
22-
CompactionGeneralQueueLimitCoefficient + CompactionNormalizerQueueLimitCoefficient - 1.0 <
23-
std::numeric_limits<double>::epsilon()) &&
23+
CompactionGeneralQueueLimitCoefficient + CompactionNormalizerQueueLimitCoefficient +
24+
GroupedMemoryLimiterCompactionLimitCoefficient - 1.0 < std::numeric_limits<double>::epsilon()) &&
2425
(1.0 - (CompactionIndexationQueueLimitCoefficient + CompactionTtlQueueLimitCoefficient +
25-
CompactionGeneralQueueLimitCoefficient + CompactionNormalizerQueueLimitCoefficient) <
26-
std::numeric_limits<double>::epsilon()),
26+
CompactionGeneralQueueLimitCoefficient + CompactionNormalizerQueueLimitCoefficient +
27+
GroupedMemoryLimiterCompactionLimitCoefficient) < std::numeric_limits<double>::epsilon()),
2728
"Compaction coefficients sum must be equal to 1.0");
29+
30+
static constexpr double GroupedMemoryLimiterSoftLimitCoefficient = 0.3;
31+
32+
static constexpr double BlobCacheCoefficient = 0.5;
33+
static constexpr double DataAccessorCoefficient = 0.5;
34+
35+
static_assert((BlobCacheCoefficient + DataAccessorCoefficient - 1.0 < std::numeric_limits<double>::epsilon()) &&
36+
(1.0 - (BlobCacheCoefficient + DataAccessorCoefficient) < std::numeric_limits<double>::epsilon()),
37+
"Cache coefficients sum must be equal to 1.0");
2838
};
2939
}

ydb/core/tx/limiter/grouped_memory/service/actor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace NKikimr::NOlap::NGroupedMemoryManager {
44

55
void TMemoryLimiterActor::Bootstrap() {
6+
Cerr << "!! Started " << Name << Endl;
67
Manager = std::make_shared<TManager>(SelfId(), Config, Name, Signals, DefaultStage);
78
Become(&TThis::StateWait);
89
}
@@ -47,4 +48,8 @@ void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartProcessScope::TPt
4748
Manager->RegisterProcessScope(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId());
4849
}
4950

51+
void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvUpdateMemoryLimits::TPtr& ev) {
52+
Manager->UpdateMemoryLimits(ev->Get()->GetSoftMemoryLimit(), ev->Get()->GetHardMemoryLimit());
53+
}
54+
5055
} // namespace NKikimr::NOlap::NGroupedMemoryManager

ydb/core/tx/limiter/grouped_memory/service/actor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class TMemoryLimiterActor: public NActors::TActorBootstrapped<TMemoryLimiterActo
3636
void Handle(NEvents::TEvExternal::TEvFinishProcess::TPtr& ev);
3737
void Handle(NEvents::TEvExternal::TEvStartProcessScope::TPtr& ev);
3838
void Handle(NEvents::TEvExternal::TEvFinishProcessScope::TPtr& ev);
39+
void Handle(NEvents::TEvExternal::TEvUpdateMemoryLimits::TPtr& ev);
3940

4041
void Bootstrap();
4142

@@ -50,6 +51,7 @@ class TMemoryLimiterActor: public NActors::TActorBootstrapped<TMemoryLimiterActo
5051
hFunc(NEvents::TEvExternal::TEvFinishProcess, Handle);
5152
hFunc(NEvents::TEvExternal::TEvStartProcessScope, Handle);
5253
hFunc(NEvents::TEvExternal::TEvFinishProcessScope, Handle);
54+
hFunc(NEvents::TEvExternal::TEvUpdateMemoryLimits, Handle);
5355
default:
5456
AFL_VERIFY(false)("ev_type", ev->GetTypeName());
5557
}

ydb/core/tx/limiter/grouped_memory/service/manager.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,16 @@ void TManager::UnregisterProcessScope(const ui64 externalProcessId, const ui64 e
119119
RefreshSignals();
120120
}
121121

122+
void TManager::UpdateMemoryLimits(const ui64 limit, const ui64 hardLimit) {
123+
if (!DefaultStage) {
124+
return;
125+
}
126+
127+
bool isLimitIncreased = false;
128+
DefaultStage->UpdateMemoryLimits(limit, hardLimit, isLimitIncreased);
129+
if (isLimitIncreased) {
130+
TryAllocateWaiting();
131+
}
132+
}
133+
122134
} // namespace NKikimr::NOlap::NGroupedMemoryManager

0 commit comments

Comments
 (0)