Skip to content

RDMA memory allocation support for load actor #20658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ void TCompletionChunkReadPart::Release(TActorSystem *actorSystem) {
}

TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
ui64 chunkNonce, TRcBufAllocator alloc, NWilson::TSpan&& span)
ui64 chunkNonce, TRdmaAllocatorWithFallback* alloc, NWilson::TSpan&& span)
: TCompletionAction()
, PDisk(pDisk)
, Read(read)
Expand Down Expand Up @@ -336,7 +336,7 @@ TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRe
? read->Size
: read->Size + read->Offset % sectorSize;
size_t tailroom = AlignUp<size_t>(newSize, sectorSize) - newSize;
CommonBuffer = TBufferWithGaps(read->Offset, alloc(newSize, 0, tailroom));
CommonBuffer = TBufferWithGaps(read->Offset, alloc->AllocRcBuf(newSize, 0, tailroom));
}

TCompletionChunkRead::~TCompletionChunkRead() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class TCompletionChunkRead : public TCompletionAction {
const ui64 DoubleFreeCanary;
public:
TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
ui64 chunkNonce, TRcBufAllocator alloc, NWilson::TSpan&& span);
ui64 chunkNonce, TRdmaAllocatorWithFallback* alloc, NWilson::TSpan&& span);
void Exec(TActorSystem *actorSystem) override;
~TCompletionChunkRead();
void ReplyError(TActorSystem *actorSystem, TString reason);
Expand Down
54 changes: 45 additions & 9 deletions ydb/core/load_test/group_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <ydb/core/util/lz4_data_generator.h>
#include <ydb/core/jaeger_tracing/throttler.h>

#include <ydb/library/actors/interconnect/rdma/mem_pool.h>

#include <google/protobuf/text_format.h>

#include <library/cpp/monlib/service/pages/templates.h>
Expand All @@ -22,6 +24,34 @@ namespace NKikimr {

namespace {

TRcBuf GenDataAsRcBuf(size_t size, NActors::TRdmaAllocatorWithFallback* alloc) {
struct TRcBufWrap : public TRcBuf {
using TRcBuf::TRcBuf;

TRcBufWrap(const TRcBuf& rcbuf)
: TRcBuf(rcbuf)
{}

TRcBufWrap(TRcBuf&& rcbuf)
: TRcBuf(std::move(rcbuf))
{}

char* mutable_data() {
return this->GetDataMut();
}

static TRcBufWrap Uninitialized(size_t size) {
return TRcBuf::Uninitialized(size);
}
};

TRcBufWrap data = alloc ? alloc->AllocRcBuf(size, 0, 0) : TRcBufWrap::Uninitialized(size);

FastGenDataForLZ4<TRcBufWrap>(size, 0, data);

return data;
}

class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActor> {
class TWakeupQueue {
using TCallback = std::function<void(const TActorContext&)>;
Expand Down Expand Up @@ -372,6 +402,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
std::shared_ptr<TRequestDelayManager> DelayManager;
TInFlightTracker InFlightTracker;
const ui64 MaxTotalBytes;
const ui32 RdmaMode;
};

private:
Expand Down Expand Up @@ -590,8 +621,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
const ui32 size = 1;
const ui32 lastStep = Max<ui32>();
const TLogoBlobID id(TabletId, Generation, lastStep, Channel, size, 0);
const TSharedData buffer = Self.GenerateBuffer(id);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), PutHandleClass);
TRcBuf buffer = Self.GenerateBuffer(id, ctx, WriteSettings.RdmaMode > 0);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), TInstant::Max(), PutHandleClass);

auto callback = [this] (IEventBase *event, const TActorContext& ctx) {
auto *res = dynamic_cast<TEvBlobStorage::TEvPutResult *>(event);
Expand Down Expand Up @@ -916,8 +947,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
putHandleClass = PutHandleClass;
}
const TLogoBlobID id(TabletId, Generation, WriteStep, Channel, size, Cookie);
const TSharedData buffer = Self.GenerateBuffer(id);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), putHandleClass);
TRcBuf buffer = Self.GenerateBuffer(id, ctx, WriteSettings.RdmaMode > 0);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), TInstant::Max(), putHandleClass);
const ui64 writeQueryId = ++WriteQueryId;

auto writeCallback = [this, writeQueryId](IEventBase *event, const TActorContext& ctx) {
Expand Down Expand Up @@ -1177,7 +1208,9 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo

ui32 DelayAfterInitialWrite = 0;

TSharedData BlobData;
ui32 MaxBlobSize;
TRcBuf BlobData;
ui32 TryRdmaMomory = false;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BS_LOAD_ACTOR;
Expand Down Expand Up @@ -1234,9 +1267,11 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
.DelayManager = std::move(writeDelayManager),
.InFlightTracker = TInFlightTracker(profile.GetMaxInFlightWriteRequests(), profile.GetMaxInFlightWriteBytes()),
.MaxTotalBytes = profile.GetMaxTotalBytesWritten(),
.RdmaMode = profile.GetRdmaMode(),
};

maxBlobSize = std::max(maxBlobSize, writeSettings.SizeGen->GetMax());
TryRdmaMomory |= (bool)profile.GetRdmaMode();

bool enableReads = profile.ReadIntervalsSize() || profile.HasReadHardRateDispatcher();
NKikimrBlobStorage::EGetHandleClass getHandleClass = NKikimrBlobStorage::EGetHandleClass::FastRead;
Expand Down Expand Up @@ -1327,7 +1362,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
WorkersInInitialState++;
}
}
BlobData = FastGenDataForLZ4<TSharedData>(maxBlobSize);
MaxBlobSize = maxBlobSize;
}

void StartWorkers(const TActorContext& ctx) {
Expand All @@ -1354,6 +1389,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
}

void Bootstrap(const TActorContext& ctx) {
BlobData = GenDataAsRcBuf(MaxBlobSize, TryRdmaMomory ? ctx.ActorSystem()->GetRcBufAllocator() : nullptr);
Become(&TLogWriterLoadTestActor::StateFunc);
EarlyStop = false;
for (auto& writer : TabletWriters) {
Expand Down Expand Up @@ -1522,11 +1558,11 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
Y_ABORT("TEvUndelivered# 0x%08" PRIx32 " ActorId# %s", ev->Get()->SourceType, ev->Sender.ToString().data());
}

TSharedData GenerateBuffer(const TLogoBlobID& id) const {
TRcBuf GenerateBuffer(const TLogoBlobID& id, const TActorContext& ctx, bool rdmaMem) const {
if (id.BlobSize() > BlobData.size())
return FastGenDataForLZ4<TSharedData>(id.BlobSize());
return GenDataAsRcBuf(id.BlobSize(), rdmaMem ? ctx.ActorSystem()->GetRcBufAllocator() : nullptr);
Y_ABORT_UNLESS(id.BlobSize() <= BlobData.size());
TSharedData data(BlobData);
TRcBuf data(BlobData);
data.TrimBack(id.BlobSize());
return data;
}
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/load_test/ut/group_test_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ Y_UNIT_TEST_SUITE(GroupWriteTest) {
UNIT_ASSERT(GetOutputValue(html, "TotalBytesRead").front() == 0U);
}

Y_UNIT_TEST(SimpleRdma) {
TTetsEnv env;

const TString conf(R"(DurationSeconds: 30
Tablets: {
Tablets: { TabletId: 1 Channel: 0 GroupId: )" + ToString(env.GroupInfo->GroupID) + R"( Generation: 1 }
WriteSizes: { Weight: 1.0 Min: 1000000 Max: 4000000 }
WriteIntervals: { Weight: 1.0 Uniform: { MinUs: 100000 MaxUs: 100000 } }
MaxInFlightWriteRequests: 10
FlushIntervals: { Weight: 1.0 Uniform: { MinUs: 1000000 MaxUs: 1000000 } }
PutHandleClass: TabletLog
RdmaMode: 1
})"
);

const auto html = env.RunSingleLoadTest(conf);
UNIT_ASSERT(GetOutputValue(html, "OkPutResults").front() >= 300U);
UNIT_ASSERT(GetOutputValue(html, "BadPutResults").front() == 0U);
UNIT_ASSERT(GetOutputValue(html, "TotalBytesWritten").front() >= 300000000U);
UNIT_ASSERT(GetOutputValue(html, "TotalBytesRead").front() == 0U);
}

Y_UNIT_TEST(ByTableName) {
TTetsEnv env;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/load_test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ message TEvLoadTestRequest {

optional uint32 TracingThrottlerRate = 18 [default = 0];
optional uint32 TracingThrottlerBurst = 19 [default = 0];
optional uint32 RdmaMode = 20 [default = 0];
};
optional uint64 Tag = 1;
optional uint32 DurationSeconds = 2;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/util/actorsys_test/testactorsys.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/actors/core/scheduler_queue.h>
#include <ydb/library/actors/core/executor_thread.h>
#include <ydb/library/actors/interconnect/interconnect_common.h>
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
#include <ydb/library/actors/util/should_continue.h>
#include <ydb/library/actors/core/monotonic_provider.h>
#include <ydb/core/base/appdata.h>
Expand Down Expand Up @@ -268,6 +269,14 @@ class TTestActorSystem {
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
IExecutorPool *pool = CreateTestExecutorPool(nodeId);
setup->Executors[0].Reset(pool);
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
setup->RcBufAllocator = [memPool](ui32 size, ui32 headRoom, ui32 tailRoom) -> TRcBuf {
Cerr << "RDMA CLLOA" << Endl;
TRcBuf buf = memPool->AllocRcBuf(size + headRoom + tailRoom);
buf.TrimFront(size + tailRoom);
buf.TrimBack(size);
return buf;
};

// we create this actor for correct service lookup through ActorSystem
setup->LocalServices.emplace_back(LoggerSettings_->LoggerActorId, TActorSetupCmd(
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/util/lz4_data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ inline ResultContainer GenDataForLZ4(const ui64 size, const ui64 seed = 0) {
return data;
}

template <class ResultContainer = TString>
inline ResultContainer FastGenDataForLZ4(size_t size, ui64 seed = 0) {
ResultContainer data = ResultContainer::Uninitialized(size);
template <class ResultContainer>
inline void FastGenDataForLZ4(size_t size, ui64 seed, ResultContainer& data) {
char *ptr = [&]() -> char * {
if constexpr(std::is_same<ResultContainer, TString>::value) {
return data.Detach();
Expand Down Expand Up @@ -111,7 +110,12 @@ inline ResultContainer FastGenDataForLZ4(size_t size, ui64 seed = 0) {
case 64: UNROLL(64); break;
default: Y_ABORT();
}
}

template <class ResultContainer = TString>
inline ResultContainer FastGenDataForLZ4(size_t size, ui64 seed = 0) {
ResultContainer data = ResultContainer::Uninitialized(size);
FastGenDataForLZ4<ResultContainer>(size, seed, data);
return data;
}

Expand Down
12 changes: 12 additions & 0 deletions ydb/library/actors/core/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ namespace NActors {
return buf;
};

TRdmaAllocatorWithFallback::TRdmaAllocatorWithFallback(TRcBufAllocator cb) noexcept
: Allocator(cb)
{}

TRcBuf TRdmaAllocatorWithFallback::AllocRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept {
auto buf = Allocator(size, headRoom, tailRoom);
if (!buf) {
return DefaultRcBufAllocator(size, headRoom, tailRoom);
}
return buf;
}

TActorSystem::TActorSystem(THolder<TActorSystemSetup>& setup, void* appData,
TIntrusivePtr<NLog::TSettings> loggerSettings)
: NodeId(setup->NodeId)
Expand Down
17 changes: 14 additions & 3 deletions ydb/library/actors/core/actorsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include <util/datetime/base.h>
#include <util/system/mutex.h>

namespace NInterconnect::NRdma {
class IMemPool;
}

namespace NActors {
class IActor;
class TActorSystem;
Expand Down Expand Up @@ -85,6 +89,13 @@ namespace NActors {
};

using TRcBufAllocator = std::function<TRcBuf(size_t size, size_t headRoom, size_t tailRoom)>;
class TRdmaAllocatorWithFallback {
public:
TRdmaAllocatorWithFallback(TRcBufAllocator cb) noexcept;
TRcBuf AllocRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept;
private:
TRcBufAllocator Allocator;
};

struct TActorSystemSetup {
ui32 NodeId = 0;
Expand Down Expand Up @@ -156,7 +167,7 @@ namespace NActors {
THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
mutable TTicketLock ScheduleLock;

const TRcBufAllocator RcBufAllocator;
mutable TRdmaAllocatorWithFallback RcBufAllocator;

friend class TExecutorThread;

Expand Down Expand Up @@ -314,8 +325,8 @@ namespace NActors {
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;

const TRcBufAllocator& GetRcBufAllocator() const {
return RcBufAllocator;
TRdmaAllocatorWithFallback* GetRcBufAllocator() const {
return &RcBufAllocator;
}
};
}
4 changes: 2 additions & 2 deletions ydb/library/actors/interconnect/load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ namespace NInterconnect {
THolder<TEvLoadMessage> ev;
if (Params.UseProtobufWithPayload && size) {
if (Params.RdmaMode) {
auto& allocator = ctx.ActorSystem()->GetRcBufAllocator();
TRcBuf buffer = allocator(size, 0, 0);
auto allocator = ctx.ActorSystem()->GetRcBufAllocator();
TRcBuf buffer = allocator->AllocRcBuf(size, 0, 0);
memset(buffer.GetDataMut(), '*', size);
ev.Reset(new TEvLoadMessage(Hops, id, TRope(std::move(buffer))));
} else {
Expand Down