Skip to content

Commit ac16318

Browse files
EmmyMiao87morningman
authored andcommitted
[Bug-fix][Broker-load] Fix the bug of the label already exists when the txn has been finished (#1992)
If FE is restarted between txn committed and visible, the load job will be rescheduled and failed with label already exists. The reason is that there are inconsistency between transaction of load job and meta of load job. So, the replay of the txn attachment need to be done in function replayOnCommitted. The load job state and progress is correct after that.
1 parent d2bc47d commit ac16318

File tree

5 files changed

+20
-39
lines changed

5 files changed

+20
-39
lines changed

fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ private String increaseCounter(String key, String deltaValue) {
484484
}
485485

486486
@Override
487-
protected void executeReplayOnAborted(TransactionState txnState) {
487+
protected void replayTxnAttachment(TransactionState txnState) {
488488
if (txnState.getTxnCommitAttachment() == null) {
489489
// The txn attachment maybe null when broker load has been cancelled without attachment.
490490
// The end log of broker load has been record but the callback id of txnState hasn't been removed
@@ -494,11 +494,6 @@ protected void executeReplayOnAborted(TransactionState txnState) {
494494
unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
495495
}
496496

497-
@Override
498-
protected void executeReplayOnVisible(TransactionState txnState) {
499-
unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
500-
}
501-
502497
@Override
503498
public void write(DataOutput out) throws IOException {
504499
super.write(out);

fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,11 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
115115
protected int progress;
116116

117117
// non-persistence
118+
// This param is set true during txn is committing.
119+
// During committing, the load job could not be cancelled.
118120
protected boolean isCommitting = false;
121+
// This param is set true in mini load.
122+
// The streaming mini load could not be cancelled by frontend.
119123
protected boolean isCancellable = true;
120124

121125
// only for persistence param
@@ -743,7 +747,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
743747
public void replayOnCommitted(TransactionState txnState) {
744748
writeLock();
745749
try {
746-
isCommitting = true;
750+
replayTxnAttachment(txnState);
747751
} finally {
748752
writeUnlock();
749753
}
@@ -770,17 +774,14 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
770774
return;
771775
}
772776
// record attachment in load job
773-
executeAfterAborted(txnState);
777+
replayTxnAttachment(txnState);
774778
// cancel load job
775779
unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason), false);
776780
} finally {
777781
writeUnlock();
778782
}
779783
}
780784

781-
protected void executeAfterAborted(TransactionState txnState) {
782-
}
783-
784785
/**
785786
* This method is used to replay the cancelled state of load job
786787
*
@@ -790,7 +791,7 @@ protected void executeAfterAborted(TransactionState txnState) {
790791
public void replayOnAborted(TransactionState txnState) {
791792
writeLock();
792793
try {
793-
executeReplayOnAborted(txnState);
794+
replayTxnAttachment(txnState);
794795
failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnState.getReason());
795796
finishTimestamp = txnState.getFinishTime();
796797
state = JobState.CANCELLED;
@@ -799,9 +800,6 @@ public void replayOnAborted(TransactionState txnState) {
799800
}
800801
}
801802

802-
protected void executeReplayOnAborted(TransactionState txnState) {
803-
}
804-
805803
/**
806804
* This method will finish the load job without edit log.
807805
* The job will be finished by replayOnVisible when txn journal replay
@@ -814,18 +812,15 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
814812
if (!txnOperated) {
815813
return;
816814
}
817-
executeAfterVisible(txnState);
815+
replayTxnAttachment(txnState);
818816
updateState(JobState.FINISHED);
819817
}
820818

821-
protected void executeAfterVisible(TransactionState txnState) {
822-
}
823-
824819
@Override
825820
public void replayOnVisible(TransactionState txnState) {
826821
writeLock();
827822
try {
828-
executeReplayOnVisible(txnState);
823+
replayTxnAttachment(txnState);
829824
progress = 100;
830825
finishTimestamp = txnState.getFinishTime();
831826
state = JobState.FINISHED;
@@ -834,7 +829,7 @@ public void replayOnVisible(TransactionState txnState) {
834829
}
835830
}
836831

837-
protected void executeReplayOnVisible(TransactionState txnState) {
832+
protected void replayTxnAttachment(TransactionState txnState) {
838833
}
839834

840835
@Override

fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,12 @@ public void readFields(DataInput in) throws IOException {
602602
map.put(loadJob.getLabel(), jobs);
603603
}
604604
jobs.add(loadJob);
605+
// The callback of load job which is replayed by image need to be registered in callback factory.
606+
// The commit and visible txn will callback the unfinished load job.
607+
// Otherwise, the load job always does not be completed while the txn is visible.
608+
if (!loadJob.isCompleted()) {
609+
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
610+
}
605611
}
606612
}
607613
}

fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti
9494
}
9595

9696
@Override
97-
protected void executeAfterAborted(TransactionState txnState) {
98-
updateLoadingStatue(txnState);
99-
}
100-
101-
@Override
102-
protected void executeAfterVisible(TransactionState txnState) {
103-
updateLoadingStatue(txnState);
104-
}
105-
106-
@Override
107-
protected void executeReplayOnAborted(TransactionState txnState) {
108-
updateLoadingStatue(txnState);
109-
}
110-
111-
@Override
112-
protected void executeReplayOnVisible(TransactionState txnState) {
97+
protected void replayTxnAttachment(TransactionState txnState) {
11398
updateLoadingStatue(txnState);
11499
}
115100

fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ public void testExecuteReplayOnAborted(@Injectable TransactionState txnState,
398398
result = JobState.CANCELLED;
399399
}
400400
};
401-
brokerLoadJob.executeReplayOnAborted(txnState);
401+
brokerLoadJob.replayTxnAttachment(txnState);
402402
Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress"));
403403
Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp());
404404
Assert.assertEquals(JobState.CANCELLED, brokerLoadJob.getState());
@@ -424,7 +424,7 @@ public void testExecuteReplayOnVisible(@Injectable TransactionState txnState,
424424
result = JobState.LOADING;
425425
}
426426
};
427-
brokerLoadJob.executeReplayOnAborted(txnState);
427+
brokerLoadJob.replayTxnAttachment(txnState);
428428
Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress"));
429429
Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp());
430430
Assert.assertEquals(JobState.LOADING, brokerLoadJob.getState());

0 commit comments

Comments
 (0)