Skip to content

portion producing method improve #19200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions ydb/core/tx/columnshard/counters/portion_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@
namespace NKikimr::NColumnShard {

TPortionIndexStats::TPortionClass::TPortionClass(const NOlap::TPortionInfo& portion) {
if (portion.HasRemoveSnapshot()) {
Produced = NOlap::NPortion::EProduced::INACTIVE;
} else if (portion.GetTierNameDef(NOlap::NBlobOperations::TGlobal::DefaultStorageId) != NOlap::NBlobOperations::TGlobal::DefaultStorageId) {
Produced = NOlap::NPortion::EProduced::EVICTED;
} else {
Produced = portion.GetMeta().GetProduced();
}
Produced = portion.GetProduced();
}

void TPortionIndexStats::AddPortion(const NOlap::TPortionInfo& portion) {
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ void TCompactColumnEngineChanges::DoDebugString(TStringOutput& out) const {
void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
TBase::DoCompile(context);

const TPortionMeta::EProduced producedClassResultCompaction = GetResultProducedClass();
for (auto& portionInfo : AppendedPortions) {
auto& constructor = portionInfo.GetPortionConstructor().MutablePortionConstructor();
constructor.MutableMeta().UpdateRecordsMeta(producedClassResultCompaction);
constructor.MutableMeta().SetCompactionLevel(GranuleMeta->GetOptimizerPlanner().GetAppropriateLevel(
GetPortionsToMove().GetTargetCompactionLevel().value_or(0), portionInfo.GetPortionConstructor()));
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TCompactColumnEngineChanges: public TChangesWithAppend {
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoCompile(TFinalizationContext& context) override;
virtual TPortionMeta::EProduced GetResultProducedClass() const = 0;
virtual NPortion::EProduced GetResultProducedClass() const = 0;
virtual void OnAbortEmergency() override {
NeedGranuleStatusProvide = false;
}
Expand Down
12 changes: 4 additions & 8 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TGeneralCompactColumnEngineChanges
if (shardingActual) {
shardingActualVersion = shardingActual->GetSnapshotVersion();
}
auto result = merger.Execute(stats, CheckPoints, resultFiltered, GranuleMeta->GetPathId(), shardingActualVersion);
for (auto&& p : result) {
p.GetPortionConstructor().MutablePortionConstructor().MutableMeta().UpdateRecordsMeta(NPortion::EProduced::SPLIT_COMPACTED);
}
return result;
return merger.Execute(stats, CheckPoints, resultFiltered, GranuleMeta->GetPathId(), shardingActualVersion);
}

TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
Expand All @@ -42,12 +38,12 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
THashMap<ui32, TSimplePortionsGroupInfo> portionGroups;
for (auto&& i : SwitchedPortions) {
portionGroups[i->GetMeta().GetCompactionLevel()].AddPortion(i);
if (i->GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) {
if (i->GetProduced() == NPortion::EProduced::INSERTED) {
insertedPortions.AddPortion(i);
} else if (i->GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) {
} else if (i->GetProduced() == NPortion::EProduced::SPLIT_COMPACTED) {
compactedPortions.AddPortion(i);
} else {
AFL_VERIFY(false);
AFL_VERIFY(false)("portion_prod", i->GetProduced())("portion_type", i->GetPortionType());
}
}
NChanges::TGeneralCompactionCounters::OnRepackPortions(insertedPortions + compactedPortions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges,
return true;
}

virtual TPortionMeta::EProduced GetResultProducedClass() const override {
return TPortionMeta::EProduced::SPLIT_COMPACTED;
virtual NPortion::EProduced GetResultProducedClass() const override {
return NPortion::EProduced::SPLIT_COMPACTED;
}
virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override;
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
merger.SetOptimizationWritingPackMode(true);
auto localAppended = merger.Execute(stats, itGranule->second, filteredSnapshot, pathId, shardingVersion);
for (auto&& i : localAppended) {
i.GetPortionConstructor().MutablePortionConstructor().MutableMeta().UpdateRecordsMeta(NPortion::EProduced::INSERTED);
AppendedPortions.emplace_back(std::move(i));
}
}
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
for (auto& portionBuilder : AppendedPortions) {
auto& portionInfo = portionBuilder.GetPortionResult();
sb << portionInfo.GetPortionInfo().GetPortionId() << ",";
switch (portionInfo.GetPortionInfo().GetMeta().Produced) {
case NOlap::TPortionMeta::EProduced::UNSPECIFIED:
switch (portionInfo.GetPortionInfo().GetProduced()) {
case NOlap::NPortion::EProduced::UNSPECIFIED:
Y_ABORT_UNLESS(false); // unexpected
case NOlap::TPortionMeta::EProduced::INSERTED:
case NOlap::NPortion::EProduced::INSERTED:
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_INDEXING_PORTIONS_WRITTEN);
break;
case NOlap::TPortionMeta::EProduced::COMPACTED:
case NOlap::NPortion::EProduced::COMPACTED:
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_COMPACTION_PORTIONS_WRITTEN);
break;
case NOlap::TPortionMeta::EProduced::SPLIT_COMPACTED:
case NOlap::NPortion::EProduced::SPLIT_COMPACTED:
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN);
break;
case NOlap::TPortionMeta::EProduced::EVICTED:
Y_ABORT("Unexpected evicted case");
case NOlap::NPortion::EProduced::EVICTED:
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_EVICTION_PORTIONS_WRITTEN);
break;
case NOlap::TPortionMeta::EProduced::INACTIVE:
case NOlap::NPortion::EProduced::INACTIVE:
Y_ABORT("Unexpected inactive case");
break;
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRe
}
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
if (row.GetChunkIdx() == 0 && row.GetColumnId() == firstPKColumnId) {
*rowProto.MutablePortionMeta() = portion.GetMeta().SerializeToProto();
*rowProto.MutablePortionMeta() =
portion.GetMeta().SerializeToProto(portion.GetPortionType() == EPortionType::Compacted ? NPortion::EProduced::SPLIT_COMPACTED
: NPortion::EProduced::INSERTED);
}
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/portions/compacted.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr::NOlap {

void TCompactedPortionInfo::DoSaveMetaToDatabase(NIceDb::TNiceDb& db) const {
auto metaProto = GetMeta().SerializeToProto();
auto metaProto = GetMeta().SerializeToProto(NPortion::EProduced::SPLIT_COMPACTED);
using IndexPortions = NColumnShard::Schema::IndexPortions;
const auto removeSnapshot = GetRemoveSnapshotOptional();
db.Table<IndexPortions>()
Expand Down
19 changes: 0 additions & 19 deletions ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ TPortionMetaConstructor::TPortionMetaConstructor(const TPortionMeta& meta, const
if (withBlobs) {
BlobIds = meta.BlobIds;
}
if (meta.Produced != NPortion::EProduced::UNSPECIFIED) {
Produced = meta.Produced;
}
}

TPortionMeta TPortionMetaConstructor::Build() {
Expand All @@ -65,7 +62,6 @@ TPortionMeta TPortionMetaConstructor::Build() {
result.BlobIds.shrink_to_fit();
result.CompactionLevel = *TValidator::CheckNotNull(CompactionLevel);
result.DeletionsCount = *TValidator::CheckNotNull(DeletionsCount);
result.Produced = *TValidator::CheckNotNull(Produced);

result.RecordsCount = *TValidator::CheckNotNull(RecordsCount);
result.ColumnRawBytes = *TValidator::CheckNotNull(ColumnRawBytes);
Expand All @@ -80,7 +76,6 @@ TPortionMeta TPortionMetaConstructor::Build() {

bool TPortionMetaConstructor::LoadMetadata(
const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo, const IBlobGroupSelector& groupSelector) {
AFL_VERIFY(!Produced)("produced", Produced);
if (portionMeta.GetTierName()) {
TierName = portionMeta.GetTierName();
}
Expand All @@ -99,20 +94,6 @@ bool TPortionMetaConstructor::LoadMetadata(
ColumnBlobBytes = TValidator::CheckNotNull(portionMeta.GetColumnBlobBytes());
IndexRawBytes = portionMeta.GetIndexRawBytes();
IndexBlobBytes = portionMeta.GetIndexBlobBytes();
if (portionMeta.GetIsInserted()) {
Produced = TPortionMeta::EProduced::INSERTED;
} else if (portionMeta.GetIsCompacted()) {
Produced = TPortionMeta::EProduced::COMPACTED;
} else if (portionMeta.GetIsSplitCompacted()) {
Produced = TPortionMeta::EProduced::SPLIT_COMPACTED;
} else if (portionMeta.GetIsEvicted()) {
Produced = TPortionMeta::EProduced::EVICTED;
} else {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")(
"meta", portionMeta.DebugString());
return false;
}
AFL_VERIFY(Produced != TPortionMeta::EProduced::UNSPECIFIED);
if (portionMeta.HasPrimaryKeyBordersV1()) {
FirstAndLastPK = NArrow::TFirstLastSpecialKeys(
portionMeta.GetPrimaryKeyBordersV1().GetFirst(), portionMeta.GetPrimaryKeyBordersV1().GetLast(), indexInfo.GetReplaceKey());
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/tx/columnshard/engines/portions/constructor_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class TPortionMetaConstructor: public TPortionMetaBase {
std::optional<TString> TierName;
std::optional<TSnapshot> RecordSnapshotMin;
std::optional<TSnapshot> RecordSnapshotMax;
std::optional<NPortion::EProduced> Produced;
std::optional<ui64> CompactionLevel;

std::optional<ui32> RecordsCount;
Expand Down Expand Up @@ -70,10 +69,6 @@ class TPortionMetaConstructor: public TPortionMetaBase {
SetTierName(tierName);
}

void UpdateRecordsMeta(const NPortion::EProduced prod) {
Produced = prod;
}

TPortionMeta Build();

[[nodiscard]] bool LoadMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ std::shared_ptr<TPortionInfo> TWrittenPortionInfoConstructor::BuildPortionImpl(T
}
AFL_VERIFY(InsertWriteId);
result->InsertWriteId = *InsertWriteId;

AFL_VERIFY(result->GetMeta().GetProduced() == NPortion::EProduced::INSERTED);
return result;
}

Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/columnshard/engines/portions/meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace NKikimr::NOlap {

NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const {
NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto(const NPortion::EProduced produced) const {
FullValidation();
NKikimrTxColumnShard::TIndexPortionMeta portionMeta;
portionMeta.SetTierName(TierName);
Expand All @@ -21,22 +21,22 @@ NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const {
portionMeta.SetColumnBlobBytes(ColumnBlobBytes);
portionMeta.SetIndexRawBytes(IndexRawBytes);
portionMeta.SetIndexBlobBytes(IndexBlobBytes);
switch (Produced) {
case TPortionMeta::EProduced::UNSPECIFIED:
switch (produced) {
case NPortion::EProduced::UNSPECIFIED:
Y_ABORT_UNLESS(false);
case TPortionMeta::EProduced::INSERTED:
case NPortion::EProduced::INSERTED:
portionMeta.SetIsInserted(true);
break;
case TPortionMeta::EProduced::COMPACTED:
case NPortion::EProduced::COMPACTED:
portionMeta.SetIsCompacted(true);
break;
case TPortionMeta::EProduced::SPLIT_COMPACTED:
case NPortion::EProduced::SPLIT_COMPACTED:
portionMeta.SetIsSplitCompacted(true);
break;
case TPortionMeta::EProduced::EVICTED:
case NPortion::EProduced::EVICTED:
portionMeta.SetIsEvicted(true);
break;
case TPortionMeta::EProduced::INACTIVE:
case NPortion::EProduced::INACTIVE:
Y_ABORT("Unexpected inactive case");
//portionMeta->SetInactive(true);
break;
Expand All @@ -59,7 +59,7 @@ NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const {

TString TPortionMeta::DebugString() const {
TStringBuilder sb;
sb << "(produced=" << Produced << ";";
sb << "(";
if (TierName) {
sb << "tier_name=" << TierName << ";";
}
Expand Down
10 changes: 1 addition & 9 deletions ydb/core/tx/columnshard/engines/portions/meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ class TPortionMeta: public TPortionMetaBase {
CompactionLevel = level;
}

using EProduced = NPortion::EProduced;

EProduced Produced = EProduced::UNSPECIFIED;

std::optional<TString> GetTierNameOptional() const;

ui64 GetMetadataMemorySize() const {
Expand All @@ -133,11 +129,7 @@ class TPortionMeta: public TPortionMetaBase {
return sizeof(TPortionMeta) + FirstPKRow.GetDataSize() + LastPKRow.GetDataSize() + TBase::GetMetadataDataSize();
}

NKikimrTxColumnShard::TIndexPortionMeta SerializeToProto() const;

EProduced GetProduced() const {
return Produced;
}
NKikimrTxColumnShard::TIndexPortionMeta SerializeToProto(const NPortion::EProduced produced) const;

TString DebugString() const;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortion
*proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto();
}

*proto.MutableMeta() = Meta.SerializeToProto();
*proto.MutableMeta() = Meta.SerializeToProto(GetProduced());
}

TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TPortionInfo& proto) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ class TPortionInfo {
if (GetTierNameDef(NBlobOperations::TGlobal::DefaultStorageId) != NBlobOperations::TGlobal::DefaultStorageId) {
return NPortion::EVICTED;
}
return GetMeta().GetProduced();
return GetPortionType() == EPortionType::Compacted ? NPortion::EProduced::SPLIT_COMPACTED : NPortion::EProduced::INSERTED;
}

bool ValidSnapshotInfo() const {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/portions/written.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr::NOlap {

void TWrittenPortionInfo::DoSaveMetaToDatabase(NIceDb::TNiceDb& db) const {
auto metaProto = GetMeta().SerializeToProto();
auto metaProto = GetMeta().SerializeToProto(NPortion::EProduced::INSERTED);
using IndexPortions = NColumnShard::Schema::IndexPortions;
const auto removeSnapshot = GetRemoveSnapshotOptional();
AFL_VERIFY(InsertWriteId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
ui64 insertedPortionsBytes = 0;
ui64 committedPortionsBytes = 0;
for (auto&& i : portions) {
if (i->GetMeta().GetProduced() == NPortion::EProduced::COMPACTED || i->GetMeta().GetProduced() == NPortion::EProduced::SPLIT_COMPACTED) {
if (i->GetPortionType() == EPortionType::Compacted) {
compactedPortionsBytes += i->GetTotalBlobBytes();
} else {
insertedPortionsBytes += i->GetTotalBlobBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
ui64 compactedPortionsBytes = 0;
ui64 insertedPortionsBytes = 0;
for (auto&& i : portions) {
if (i->GetMeta().GetProduced() == NPortion::EProduced::COMPACTED || i->GetMeta().GetProduced() == NPortion::EProduced::SPLIT_COMPACTED) {
if (i->GetPortionType() == EPortionType::Compacted) {
compactedPortionsBytes += i->GetTotalBlobBytes();
} else {
insertedPortionsBytes += i->GetTotalBlobBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ void TStatsIterator::AppendStats(
const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionDataAccessor& portionPtr) const {
const TPortionInfo& portion = portionPtr.GetPortionInfo();
auto portionSchema = ReadMetadata->GetLoadSchemaVerified(portion);
auto it = PortionType.find(portion.GetMeta().Produced);
auto it = PortionType.find(portion.GetProduced());
if (it == PortionType.end()) {
it = PortionType.emplace(portion.GetMeta().Produced, ::ToString(portion.GetMeta().Produced)).first;
it = PortionType.emplace(portion.GetProduced(), ::ToString(portion.GetProduced())).first;
}
const arrow::util::string_view prodView = it->second.GetView();
const bool activity = !portion.HasRemoveSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace NKikimr::NOlap::NReader::NSysView::NPortions {

void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionInfo& portion) const {
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId().GetRawValue());
const std::string prod = ::ToString(portion.GetMeta().Produced);
const std::string prod = ::ToString(portion.GetProduced());
NArrow::Append<arrow::StringType>(*builders[1], prod);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->GetTabletId());
NArrow::Append<arrow::UInt64Type>(*builders[3], portion.GetRecordsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ TConclusion<TWritePortionInfoWithBlobsResult> ISnapshotSchema::PrepareForWrite(c
constructor.GetPortionConstructor().MutablePortionConstructor().AddMetadata(*this, deletionsCount, primaryKeys, std::nullopt);
constructor.GetPortionConstructor().MutablePortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
constructor.GetPortionConstructor().MutablePortionConstructor().MutableMeta().SetCompactionLevel(0);
constructor.GetPortionConstructor().MutablePortionConstructor().MutableMeta().UpdateRecordsMeta(NPortion::EProduced::INSERTED);
return TWritePortionInfoWithBlobsResult(std::move(constructor));
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/granule/granule.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ class TGranuleAdditiveSummary {
}

void AddPortion(const TPortionInfo& info) {
if (info.GetMeta().GetProduced() == NPortion::EProduced::INSERTED) {
if (info.GetPortionType() == EPortionType::Written) {
Owner.Inserted.AddPortion(info);
} else {
Owner.Compacted.AddPortion(info);
}
}
void RemovePortion(const TPortionInfo& info) {
if (info.GetMeta().GetProduced() == NPortion::EProduced::INSERTED) {
if (info.GetPortionType() == EPortionType::Written) {
Owner.Inserted.RemovePortion(info);
} else {
Owner.Compacted.RemovePortion(info);
Expand Down
Loading
Loading