Skip to content

[draft] Incremental restore changes #20482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -655,4 +655,5 @@ enum ETxTypes {
TXTYPE_LOGIN_FINALIZE = 100 [(TxTypeOpts) = {Name: "TxLoginFinalize"}];

TXTYPE_PROGRESS_INCREMENTAL_RESTORE = 101 [(TxTypeOpts) = {Name: "TxProgressIncrementalRestore"}];
TXTYPE_INCREMENTAL_RESTORE_RESPONSE = 102 [(TxTypeOpts) = {Name: "TxIncrementalRestoreResponse"}];
}
35 changes: 35 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2385,3 +2385,38 @@ message TEvForceDataCleanupResult {
optional uint64 TabletId = 2;
optional EStatus Status = 3;
}

message TEvRestoreMultipleIncrementalBackups {
optional uint64 TxId = 1;
optional NKikimrProto.TPathID PathId = 2; // Table being restored

message TIncrementalBackup {
optional string BackupPath = 1;
optional uint64 BackupStep = 2;
optional uint64 BackupTxId = 3;
optional string BackupTrimmedName = 4;
}

repeated TIncrementalBackup IncrementalBackups = 3;
}

message TEvRestoreMultipleIncrementalBackupsResponse {
optional uint64 TxId = 1;
optional NKikimrProto.TPathID PathId = 2;
optional uint64 TabletId = 3;

enum EStatus {
UNKNOWN = 0;
SUCCESS = 1;
SCHEME_ERROR = 2;
BAD_REQUEST = 3;
OVERLOADED = 4;
OPERATION_NOT_FOUND = 5;
ERROR = 6;
}

optional EStatus Status = 4;
repeated Ydb.Issue.IssueMessage Issues = 5;
optional uint64 ProcessedRows = 6;
optional uint64 ProcessedBytes = 7;
}
15 changes: 15 additions & 0 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ namespace TEvDataShard {
EvRecomputeKMeansRequest,
EvRecomputeKMeansResponse,

EvRestoreMultipleIncrementalBackups,
EvRestoreMultipleIncrementalBackupsResponse,

EvEnd
};

Expand Down Expand Up @@ -1548,6 +1551,18 @@ namespace TEvDataShard {
TEvDataShard::EvPrefixKMeansResponse> {
};

struct TEvRestoreMultipleIncrementalBackups
: public TEventPB<TEvRestoreMultipleIncrementalBackups,
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackups,
TEvDataShard::EvRestoreMultipleIncrementalBackups> {
};

struct TEvRestoreMultipleIncrementalBackupsResponse
: public TEventPB<TEvRestoreMultipleIncrementalBackupsResponse,
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse,
TEvDataShard::EvRestoreMultipleIncrementalBackupsResponse> {
};

struct TEvKqpScan
: public TEventPB<TEvKqpScan,
NKikimrTxDataShard::TEvKqpScan,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#include "datashard_impl.h"
#include "datashard_active_transaction.h"
#include "incr_restore_scan.h"
#include "change_sender_incr_restore.h"

#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>

namespace NKikimr {
namespace NDataShard {

void TDataShard::Handle(TEvDataShard::TEvRestoreMultipleIncrementalBackups::TPtr& ev, const TActorContext& ctx) {
using TEvRequest = TEvDataShard::TEvRestoreMultipleIncrementalBackups;
using TEvResponse = TEvDataShard::TEvRestoreMultipleIncrementalBackupsResponse;

const auto& record = ev->Get()->Record;

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << TabletID() << " received TEvRestoreMultipleIncrementalBackups"
<< " TxId: " << record.GetTxId()
<< " SrcPaths: " << record.SrcTablePathsSize()
<< " DstPath: " << (record.HasDstTablePath() ? record.GetDstTablePath() : "none"));

auto response = MakeHolder<TEvResponse>();
response->Record.SetTabletID(TabletID());
response->Record.SetTxId(record.GetTxId());

auto errorResponse = [&](NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::EStatus status, const TString& error) {
response->Record.SetStatus(status);
response->Record.SetErrorDescription(error);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << TabletID() << " restore error: " << error
<< " TxId: " << record.GetTxId());

ctx.Send(ev->Sender, response.Release());
};

// Validate the tablet state
if (!IsStateActive()) {
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::WRONG_SHARD_STATE,
TStringBuilder() << "DataShard is not active, state: " << State);
return;
}

// Validate that we have source and destination paths
if (record.SrcTablePathsSize() == 0) {
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::BAD_REQUEST,
"No source table paths specified");
return;
}

if (!record.HasDstTablePath() && !record.HasDstPathId()) {
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::BAD_REQUEST,
"No destination table path or path ID specified");
return;
}

// Extract path IDs - we need both source and destination to be local to this DataShard
TVector<TPathId> srcPathIds;
if (record.SrcPathIdsSize() > 0) {
for (const auto& protoPathId : record.GetSrcPathIds()) {
srcPathIds.push_back(TPathId::FromProto(protoPathId));
}
} else {
// If no path IDs provided, we cannot proceed (we need local table IDs)
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::BAD_REQUEST,
"Source path IDs are required for DataShard-to-DataShard streaming");
return;
}

TPathId dstPathId;
if (record.HasDstPathId()) {
dstPathId = TPathId::FromProto(record.GetDstPathId());
} else {
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::BAD_REQUEST,
"Destination path ID is required for DataShard-to-DataShard streaming");
return;
}

// Validate that all source tables exist on this DataShard
for (const auto& srcPathId : srcPathIds) {
const ui64 localTableId = srcPathId.LocalPathId;
if (!GetUserTables().contains(localTableId)) {
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::SCHEME_ERROR,
TStringBuilder() << "Source table not found on this DataShard: " << srcPathId);
return;
}
}

// For DataShard-to-DataShard streaming, we start incremental restore scans
// that will use the change exchange infrastructure to stream changes
try {
TVector<THolder<NTable::IScan>> scans;

for (size_t i = 0; i < srcPathIds.size(); ++i) {
const auto& srcPathId = srcPathIds[i];
const ui64 localTableId = srcPathId.LocalPathId;

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << TabletID() << " starting incremental restore scan"
<< " from table: " << srcPathId << " to: " << dstPathId
<< " TxId: " << record.GetTxId());

// Create incremental restore scan that will stream to target DataShard
auto scan = CreateIncrementalRestoreScan(
SelfId(),
[=, tabletID = TabletID(), generation = Generation(), tabletActor = SelfId()]
(const TActorContext& ctx, TActorId parent) {
// Create change sender for DataShard-to-DataShard streaming
return ctx.Register(
CreateIncrRestoreChangeSender(
parent,
NDataShard::TDataShardId{
.TabletId = tabletID,
.Generation = generation,
.ActorId = tabletActor,
},
srcPathId,
dstPathId));
},
srcPathId,
GetUserTables().at(localTableId),
dstPathId,
record.GetTxId(),
{} // Use default limits for now
);

if (!scan) {
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::INTERNAL_ERROR,
TStringBuilder() << "Failed to create incremental restore scan for table: " << srcPathId);
return;
}

const ui32 localTid = GetUserTables().at(localTableId)->LocalTid;
QueueScan(localTid, std::move(scan), record.GetTxId(), TRowVersion::Min());
}

// Success response
response->Record.SetStatus(NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::SUCCESS);

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << TabletID() << " successfully started " << srcPathIds.size()
<< " incremental restore scans for TxId: " << record.GetTxId());

} catch (const std::exception& ex) {
errorResponse(
NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::INTERNAL_ERROR,
TStringBuilder() << "Exception while starting incremental restore scans: " << ex.what());
return;
}

ctx.Send(ev->Sender, response.Release());
}

} // namespace NDataShard
} // namespace NKikimr
3 changes: 3 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,8 @@ class TDataShard

void Handle(TEvDataShard::TEvForceDataCleanup::TPtr& ev, const TActorContext& ctx);

void Handle(TEvDataShard::TEvRestoreMultipleIncrementalBackups::TPtr& ev, const TActorContext& ctx);

void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);

void DoPeriodicTasks(const TActorContext &ctx);
Expand Down Expand Up @@ -3256,6 +3258,7 @@ class TDataShard
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
HFunc(TEvIncrementalRestoreScan::TEvFinished, Handle);
HFunc(TEvDataShard::TEvForceDataCleanup, Handle);
HFunc(TEvDataShard::TEvRestoreMultipleIncrementalBackups, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString());
Expand Down
96 changes: 96 additions & 0 deletions ydb/core/tx/datashard/datashard_incremental_restore.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#include "defs.h"
#include "datashard_impl.h"
#include "incr_restore_scan.h"
#include "change_exchange_impl.h"

#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tablet_flat/flat_scan_spent.h>

namespace NKikimr {
namespace NDataShard {

void TDataShard::Handle(TEvDataShard::TEvRestoreMultipleIncrementalBackups::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
const ui64 txId = record.GetTxId();
const TPathId pathId = TPathId::FromProto(record.GetPathId());

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << TabletID() << " received incremental restore request"
<< " txId: " << txId
<< " pathId: " << pathId
<< " backups count: " << record.IncrementalBackupsSize());

auto response = MakeHolder<TEvDataShard::TEvRestoreMultipleIncrementalBackupsResponse>();
response->Record.SetTxId(txId);
response->Record.SetTabletId(TabletID());
response->Record.SetStatus(NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::SUCCESS);

try {
// Find the table by path ID
const ui64 tableId = pathId.LocalPathId;
if (!GetUserTables().contains(tableId)) {
throw yexception() << "Table not found: " << tableId;
}

const ui32 localTableId = GetUserTables().at(tableId)->LocalTid;

// Create incremental restore scan using existing infrastructure
// We use the same infrastructure as CreateIncrementalRestoreSrcUnit
auto scan = CreateIncrementalRestoreScan(
SelfId(),
[=, tabletID = TabletID(), generation = Generation(), tabletActor = SelfId()]
(const TActorContext& ctx, TActorId parent) {
// Create change sender for DataShard-to-DataShard streaming
// This will stream changes to the target DataShard
return ctx.Register(
CreateIncrRestoreChangeSender(
parent,
NDataShard::TDataShardId{
.TabletId = tabletID,
.Generation = generation,
.ActorId = tabletActor,
},
pathId,
pathId // For DataShard-to-DataShard streaming, source and target path are same table
)
);
},
pathId,
GetUserTables().at(tableId),
pathId, // Target path ID (same as source for DataShard streaming)
txId,
{} // Use default limits
);

if (!scan) {
throw yexception() << "Failed to create incremental restore scan";
}

// Queue the scan for execution
QueueScan(localTableId, std::move(scan), txId);

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << TabletID() << " successfully started incremental restore scan"
<< " txId: " << txId
<< " pathId: " << pathId);

} catch (const std::exception& e) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
"DataShard " << TabletID() << " failed to start incremental restore: " << e.what()
<< " txId: " << txId
<< " pathId: " << pathId);

response->Record.SetStatus(NKikimrTxDataShard::TEvRestoreMultipleIncrementalBackupsResponse::ERROR);

// Add error to Issues field
auto* issue = response->Record.AddIssues();
issue->set_message(e.what());
issue->set_severity(NYql::TSeverityIds::S_ERROR);
}

// Send response back to SchemeShard
ctx.Send(ev->Sender, response.Release());
}

} // namespace NDataShard
} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ SRCS(
datashard_failpoints.cpp
datashard_failpoints.h
datashard_impl.h
datashard_incremental_restore.cpp
datashard_kqp.cpp
datashard_kqp.h
datashard_kqp_compute.cpp
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5306,11 +5306,15 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}

// Check for orphaned incremental restore operations during restart
// Check for incremental restore operations that lost their control operations during restart
// This can happen if the schemeshard restarts while incremental restore operations are in flight
for (const auto& [opId, op] : Self->LongIncrementalRestoreOps) {
// Check if the corresponding control operation still exists in TxInFlight
// TxInFlight is keyed by TOperationId, so we need to check for operations with the same txId
TTxId txId = opId.GetTxId();
bool controlOperationExists = false;

// Look for any operation in TxInFlight with the same txId
for (const auto& [txOpId, txState] : Self->TxInFlight) {
if (txOpId.GetTxId() == txId) {
controlOperationExists = true;
Expand All @@ -5319,6 +5323,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}

if (!controlOperationExists) {
// Control operation is no longer active, but the long operation was already started
// and hasn't finished yet since it's still present in the local restore database.
// We need to run TTxProgress to continue the restore operation.

TPathId backupCollectionPathId;
backupCollectionPathId.OwnerId = op.GetBackupCollectionPathId().GetOwnerId();
backupCollectionPathId.LocalPathId = op.GetBackupCollectionPathId().GetLocalId();
Expand All @@ -5331,6 +5339,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ", scheduling TTxProgress to continue operation"
<< ", at schemeshard: " << Self->TabletID());

// Send the event to self to trigger TTxProgress execution
// This will be handled by the Handle method in schemeshard_incremental_restore_scan.cpp
OnComplete.Send(Self->SelfId(), new TEvPrivate::TEvRunIncrementalRestore(backupCollectionPathId));
}
}
Expand Down
Loading
Loading