Skip to content

Commit 9d13274

Browse files
authored
WaitInput/OutputTimes for Reads from CS (#18727)
1 parent a064c2d commit 9d13274

File tree

10 files changed

+74
-20
lines changed

10 files changed

+74
-20
lines changed

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void TKqpComputeActor::DoBootstrap() {
119119

120120
if (ScanData) {
121121
ScanData->TaskId = GetTask().GetId();
122-
ScanData->TableReader = CreateKqpTableReader(*ScanData);
122+
ScanData->TableReader = CreateKqpTableReader(*ScanData, *ComputeCtx.StartTs, *ComputeCtx.InputConsumed);
123123

124124
TMaybe<NKikimrSysView::ESysViewType> sysViewType;
125125
if (Meta->GetTable().HasSysViewType()) {

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
111111
externalStat.SetExternalBytes(stat.ExternalBytes);
112112
externalStat.SetFirstMessageMs(stat.FirstMessageMs);
113113
externalStat.SetLastMessageMs(stat.LastMessageMs);
114+
externalStat.SetWaitOutputTimeUs(stat.WaitOutputTimeUs);
114115
}
115116

116117
taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
@@ -183,9 +184,9 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvSendData::TPtr& ev) {
183184

184185
auto guard = TaskRunner->BindAllocator();
185186
if (!!msg.GetArrowBatch()) {
186-
ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.GetArrowBatch(), std::move(msg.MutableDataIndexes()), BlockTrackingMode), msg.GetTabletId(), TaskRunner->GetHolderFactory());
187+
ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.GetArrowBatch(), std::move(msg.MutableDataIndexes()), BlockTrackingMode), msg.GetTabletId(), TaskRunner->GetHolderFactory(), msg.GetWaitOutputTimeUs());
187188
} else if (!msg.GetRows().empty()) {
188-
ScanData->AddData(std::move(msg.MutableRows()), msg.GetTabletId(), TaskRunner->GetHolderFactory());
189+
ScanData->AddData(std::move(msg.MutableRows()), msg.GetTabletId(), TaskRunner->GetHolderFactory(), msg.GetWaitOutputTimeUs());
189190
}
190191
if (IsQuotingEnabled()) {
191192
AcquireRateQuota();
@@ -278,7 +279,7 @@ void TKqpScanComputeActor::DoBootstrap() {
278279
ScanData = &ComputeCtx.GetTableScan(0);
279280

280281
ScanData->TaskId = GetTask().GetId();
281-
ScanData->TableReader = CreateKqpTableReader(*ScanData);
282+
ScanData->TableReader = CreateKqpTableReader(*ScanData, *ComputeCtx.StartTs, *ComputeCtx.InputConsumed);
282283
Become(&TKqpScanComputeActor::StateFunc);
283284

284285
TBase::DoBoostrap();

ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ class TShardScannerInfo {
3333
bool NeedAck = true;
3434
bool Finished = false;
3535
bool AllowPings = false;
36+
TDuration WaitOutputTime;
37+
TInstant StartWaitOutputTime;
38+
ui64 PendingMessageCount = 0;
3639

3740
void DoAck() {
3841
if (Finished) {
@@ -129,11 +132,29 @@ class TShardScannerInfo {
129132
DoAck();
130133
}
131134
}
135+
136+
void IncPending() {
137+
if (PendingMessageCount++ == 0) {
138+
StartWaitOutputTime = TInstant::Now();
139+
}
140+
}
141+
142+
void DecPending() {
143+
AFL_ENSURE(PendingMessageCount > 0);
144+
auto now = TInstant::Now();
145+
WaitOutputTime += (now - StartWaitOutputTime);
146+
StartWaitOutputTime = --PendingMessageCount ? now : TInstant::Zero();
147+
}
148+
149+
ui64 GetPendingTimeUs() const {
150+
return WaitOutputTime.MicroSeconds();
151+
}
132152
};
133153

134154
class TComputeTaskData {
135-
private:
155+
public:
136156
std::shared_ptr<TShardScannerInfo> Info;
157+
private:
137158
std::unique_ptr<TEvScanExchange::TEvSendData> Event;
138159
bool Finished = false;
139160
const std::optional<ui32> ComputeShardId;
@@ -144,6 +165,7 @@ class TComputeTaskData {
144165
}
145166

146167
std::unique_ptr<TEvScanExchange::TEvSendData> ExtractEvent() {
168+
Event->SetWaitOutputTimeUs(Info->GetPendingTimeUs());
147169
return std::move(Event);
148170
}
149171

@@ -251,6 +273,7 @@ class TInFlightComputes {
251273
}
252274
it->second->OnAckReceived(freeSpace);
253275
if (it->second->IsFree() && UndefinedShardTaskData.size()) {
276+
UndefinedShardTaskData.front()->Info->DecPending();
254277
it->second->AddDataToSend(std::move(UndefinedShardTaskData.front()));
255278
UndefinedShardTaskData.pop_front();
256279
}
@@ -266,6 +289,7 @@ class TInFlightComputes {
266289
return;
267290
}
268291
}
292+
sendTask->Info->IncPending();
269293
UndefinedShardTaskData.emplace_back(std::move(sendTask));
270294
} else {
271295
AFL_ENSURE(*computeShardId < ComputeActors.size())("compute_shard_id", *computeShardId);

ydb/core/kqp/compute_actor/kqp_scan_events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ struct TEvScanExchange {
4444
YDB_READONLY(ui64, TabletId, 0);
4545
YDB_ACCESSOR_DEF(std::vector<ui32>, DataIndexes);
4646
YDB_READONLY_DEF(TLocksInfo, LocksInfo);
47+
YDB_ACCESSOR_DEF(ui64, WaitOutputTimeUs);
4748
public:
4849
ui32 GetRowsCount() const {
4950
return ArrowBatch ? ArrowBatch->num_rows() : Rows.size();

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,8 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
578578
index, key, partitionStat.GetFirstMessageMs(), false, EPartitionedAggKind::PartitionedAggMin);
579579
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
580580
index, key, partitionStat.GetLastMessageMs(), false, EPartitionedAggKind::PartitionedAggMax);
581+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.WaitOutputTimeUs,
582+
index, key, partitionStat.GetWaitOutputTimeUs(), false, EPartitionedAggKind::PartitionedAggMax);
581583
}
582584
}
583585
}

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ struct TExternalStats : public TTimeMultiSeriesStats {
7676
TPartitionedStats ExternalBytes;
7777
TPartitionedStats FirstMessageMs;
7878
TPartitionedStats LastMessageMs;
79+
TPartitionedStats WaitOutputTimeUs;
7980

8081
void Resize(ui32 taskCount);
8182
void SetHistorySampleCount(ui32 historySampleCount);

ydb/core/kqp/runtime/kqp_scan_data.cpp

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -588,15 +588,16 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(c
588588
return TBytesStatistics();
589589
}
590590

591-
void TKqpScanComputeContext::TScanData::UpdateStats(size_t rows, size_t bytes, TMaybe<ui64> shardId) {
591+
void TKqpScanComputeContext::TScanData::UpdateStats(size_t rows, size_t bytes, TMaybe<ui64> shardId, ui64 waitOutputTime) {
592592
if (BasicStats) {
593593
ui64 nowMs = Now().MilliSeconds();
594594
if (shardId) {
595-
const auto& [it, inserted] = BasicStats->ExternalStats.emplace(*shardId, TExternalStats(rows, bytes, nowMs, nowMs));
595+
const auto& [it, inserted] = BasicStats->ExternalStats.emplace(*shardId, TExternalStats(rows, bytes, nowMs, nowMs, waitOutputTime));
596596
if (!inserted) {
597597
it->second.ExternalRows += rows;
598598
it->second.ExternalBytes += bytes;
599599
it->second.LastMessageMs = nowMs;
600+
it->second.WaitOutputTimeUs = waitOutputTime;
600601
}
601602
}
602603
BasicStats->Rows += rows;
@@ -608,12 +609,12 @@ void TKqpScanComputeContext::TScanData::UpdateStats(size_t rows, size_t bytes, T
608609
}
609610
}
610611

611-
ui64 TKqpScanComputeContext::TScanData::AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory) {
612+
ui64 TKqpScanComputeContext::TScanData::AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory, ui64 waitOutputTime) {
612613
if (Finished || batch.empty()) {
613614
return 0;
614615
}
615616
TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory);
616-
UpdateStats(batch.size(), stats.DataBytes, shardId);
617+
UpdateStats(batch.size(), stats.DataBytes, shardId, waitOutputTime);
617618
return stats.AllocatedBytes;
618619
}
619620

@@ -713,15 +714,15 @@ TBytesStatistics TKqpScanComputeContext::TScanData::TBlockBatchReader::AddData(c
713714
}
714715

715716
ui64 TKqpScanComputeContext::TScanData::AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId,
716-
const THolderFactory& holderFactory)
717+
const THolderFactory& holderFactory, ui64 waitOutputTime)
717718
{
718719
// RecordBatch hasn't empty method so check the number of rows
719720
if (Finished || batch.GetRecordsCount() == 0) {
720721
return 0;
721722
}
722723

723724
TBytesStatistics stats = BatchReader->AddData(batch, shardId, holderFactory);
724-
UpdateStats(batch.GetRecordsCount(), stats.DataBytes, shardId);
725+
UpdateStats(batch.GetRecordsCount(), stats.DataBytes, shardId, waitOutputTime);
725726
return stats.AllocatedBytes;
726727
}
727728

@@ -759,8 +760,10 @@ TIntrusivePtr<IKqpTableReader> TKqpScanComputeContext::ReadTable(ui32) const {
759760

760761
class TKqpTableReader : public IKqpTableReader {
761762
public:
762-
TKqpTableReader(TKqpScanComputeContext::TScanData& scanData)
763+
TKqpTableReader(TKqpScanComputeContext::TScanData& scanData, TInstant& startTs, bool& inputConsumed)
763764
: ScanData(scanData)
765+
, StartTs(startTs)
766+
, InputConsumed(inputConsumed)
764767
{}
765768

766769
NUdf::EFetchStatus Next(NUdf::TUnboxedValue& /*result*/) override {
@@ -779,21 +782,31 @@ class TKqpTableReader : public IKqpTableReader {
779782
EFetchResult Next(NUdf::TUnboxedValue* const* result) override {
780783
if (ScanData.IsEmpty()) {
781784
if (ScanData.IsFinished()) {
785+
if (Y_UNLIKELY(!StartTs)) {
786+
StartTs = Now();
787+
}
782788
return EFetchResult::Finish;
783789
}
784790
return EFetchResult::Yield;
785791
}
786792

787793
ScanData.FillDataValues(result);
794+
795+
if (Y_UNLIKELY(!StartTs)) {
796+
StartTs = Now();
797+
}
798+
InputConsumed = true;
788799
return EFetchResult::One;
789800
}
790801

791802
private:
792803
TKqpScanComputeContext::TScanData& ScanData;
804+
TInstant& StartTs;
805+
bool& InputConsumed;
793806
};
794807

795-
TIntrusivePtr<IKqpTableReader> CreateKqpTableReader(TKqpScanComputeContext::TScanData& scanData) {
796-
return MakeIntrusive<TKqpTableReader>(scanData);
808+
TIntrusivePtr<IKqpTableReader> CreateKqpTableReader(TKqpScanComputeContext::TScanData& scanData, TInstant& startTs, bool& inputConsumed) {
809+
return MakeIntrusive<TKqpTableReader>(scanData, startTs, inputConsumed);
797810
}
798811

799812
} // namespace NMiniKQL

ydb/core/kqp/runtime/kqp_scan_data.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ class TKqpScanComputeContext : public TKqpComputeContextBase {
167167
return BatchReader->GetColumns();
168168
}
169169

170-
void UpdateStats(size_t rows, size_t bytes, TMaybe<ui64> shardId);
171-
ui64 AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
172-
ui64 AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory);
170+
void UpdateStats(size_t rows, size_t bytes, TMaybe<ui64> shardId, ui64 waitOutputTime);
171+
ui64 AddData(const TVector<TOwnedCellVec>& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory, ui64 waitOutputTime = 0);
172+
ui64 AddData(const TBatchDataAccessor& batch, TMaybe<ui64> shardId, const THolderFactory& holderFactory, ui64 waitOutputTime = 0);
173173

174174
bool IsEmpty() const {
175175
return BatchReader->IsEmpty();
@@ -203,10 +203,15 @@ class TKqpScanComputeContext : public TKqpComputeContextBase {
203203
ui64 ExternalBytes = 0;
204204
ui64 FirstMessageMs = 0;
205205
ui64 LastMessageMs = 0;
206+
ui64 WaitOutputTimeUs = 0;
206207

207208
TExternalStats() = default;
208-
TExternalStats(ui64 externalRows, ui64 externalBytes, ui64 firstMessageMs, ui64 lastMessageMs)
209-
: ExternalRows(externalRows), ExternalBytes(externalBytes), FirstMessageMs(firstMessageMs), LastMessageMs(lastMessageMs)
209+
TExternalStats(ui64 externalRows, ui64 externalBytes, ui64 firstMessageMs, ui64 lastMessageMs, ui64 waitOutputTime)
210+
: ExternalRows(externalRows)
211+
, ExternalBytes(externalBytes)
212+
, FirstMessageMs(firstMessageMs)
213+
, LastMessageMs(lastMessageMs)
214+
, WaitOutputTimeUs(waitOutputTime)
210215
{}
211216
};
212217

@@ -412,7 +417,7 @@ class TKqpScanComputeContext : public TKqpComputeContextBase {
412417
TMap<ui32, TScanData> Scans;
413418
};
414419

415-
TIntrusivePtr<IKqpTableReader> CreateKqpTableReader(TKqpScanComputeContext::TScanData& scanData);
420+
TIntrusivePtr<IKqpTableReader> CreateKqpTableReader(TKqpScanComputeContext::TScanData& scanData, TInstant& startTs, bool& inputConsumed);
416421

417422
} // namespace NMiniKQL
418423
} // namespace NKikimr

ydb/library/yql/dq/runtime/dq_compute.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ namespace NYql::NDq {
77
class TDqComputeContextBase : private TNonCopyable {
88
public:
99
virtual ~TDqComputeContextBase() = default;
10+
11+
bool* InputConsumed = nullptr;
12+
TInstant* StartTs = nullptr;
1013
};
1114

1215
NKikimr::NMiniKQL::TComputationNodeFactory GetDqBaseComputeFactory(const TDqComputeContextBase* computeCtx);

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ class TDqTaskRunner : public IDqTaskRunner {
249249
{
250250
Stats = std::make_unique<TDqTaskRunnerStats>();
251251
Stats->CreateTs = TInstant::Now();
252+
if (Context.ComputeCtx) {
253+
Context.ComputeCtx->StartTs = &Stats->StartTs;
254+
Context.ComputeCtx->InputConsumed = &InputConsumed;
255+
}
252256
if (Y_UNLIKELY(CollectFull())) {
253257
Stats->ComputeCpuTimeByRun = NMonitoring::ExponentialHistogram(6, 10, 10);
254258
}

0 commit comments

Comments
 (0)