@@ -78,7 +78,7 @@ public String getMostRecentMigrationID(MigrationContext context) {
78
78
}
79
79
80
80
@ Override
81
- public void createMigrationStatus (CopyContext context ) throws Exception {
81
+ public synchronized void createMigrationStatus (CopyContext context ) throws Exception {
82
82
String insert = "INSERT INTO " + TABLECOPYSTATUS + " (migrationId, total) VALUES (?, ?)" ;
83
83
try (Connection conn = getConnection (context ); PreparedStatement stmt = conn .prepareStatement (insert )) {
84
84
stmt .setObject (1 , context .getMigrationId ());
@@ -88,7 +88,7 @@ public void createMigrationStatus(CopyContext context) throws Exception {
88
88
}
89
89
90
90
@ Override
91
- public void resetMigration (CopyContext context ) throws Exception {
91
+ public synchronized void resetMigration (CopyContext context ) throws Exception {
92
92
String update = "UPDATE " + TABLECOPYSTATUS
93
93
+ " SET completed = total - failed, status = ?, failed=?, lastUpdate=? WHERE migrationId = ?" ;
94
94
try (Connection conn = getConnection (context ); PreparedStatement stmt = conn .prepareStatement (update )) {
@@ -106,7 +106,7 @@ public void setMigrationStatus(CopyContext context, MigrationProgress progress)
106
106
}
107
107
108
108
@ Override
109
- public boolean setMigrationStatus (CopyContext context , MigrationProgress from , MigrationProgress to )
109
+ public synchronized boolean setMigrationStatus (CopyContext context , MigrationProgress from , MigrationProgress to )
110
110
throws Exception {
111
111
final String update = "UPDATE " + TABLECOPYSTATUS + " SET status = ? WHERE status = ? AND migrationId = ?" ;
112
112
try (Connection conn = getConnection (context ); PreparedStatement stmt = conn .prepareStatement (update )) {
@@ -177,7 +177,7 @@ private LocalDateTime getDateTime(ResultSet rs, String column) throws Exception
177
177
}
178
178
179
179
@ Override
180
- public void scheduleTask (CopyContext context , CopyContext .DataCopyItem copyItem , long sourceRowCount ,
180
+ public synchronized void scheduleTask (CopyContext context , CopyContext .DataCopyItem copyItem , long sourceRowCount ,
181
181
int targetNode ) throws Exception {
182
182
String insert = "INSERT INTO " + TABLECOPYTASKS
183
183
+ " (targetnodeid, pipelinename, sourcetablename, targettablename, columnmap, migrationid, sourcerowcount, lastupdate) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" ;
@@ -195,7 +195,7 @@ public void scheduleTask(CopyContext context, CopyContext.DataCopyItem copyItem,
195
195
}
196
196
197
197
@ Override
198
- public void rescheduleTask (CopyContext context , String pipelineName , int targetNode ) throws Exception {
198
+ public synchronized void rescheduleTask (CopyContext context , String pipelineName , int targetNode ) throws Exception {
199
199
String sql = "UPDATE " + TABLECOPYTASKS
200
200
+ " SET failure='0', duration=NULL, error='', targetnodeid=?, lastupdate=? WHERE migrationId=? AND pipelinename=? " ;
201
201
try (Connection connection = getConnection (context );
@@ -209,8 +209,8 @@ public void rescheduleTask(CopyContext context, String pipelineName, int targetN
209
209
}
210
210
211
211
@ Override
212
- public void scheduleBatch (CopyContext context , CopyContext .DataCopyItem copyItem , int batchId , Object lowerBoundary ,
213
- Object upperBoundary ) throws Exception {
212
+ public synchronized void scheduleBatch (CopyContext context , CopyContext .DataCopyItem copyItem , int batchId ,
213
+ Object lowerBoundary , Object upperBoundary ) throws Exception {
214
214
LOG .debug ("Schedule Batch for {} with ID {}" , copyItem .getPipelineName (), batchId );
215
215
String insert = "INSERT INTO " + TABLECOPYBATCHES
216
216
+ " (migrationId, batchId, pipelinename, lowerBoundary, upperBoundary) VALUES (?, ?, ?, ?, ?)" ;
@@ -225,7 +225,7 @@ public void scheduleBatch(CopyContext context, CopyContext.DataCopyItem copyItem
225
225
}
226
226
227
227
@ Override
228
- public void markBatchCompleted (CopyContext context , CopyContext .DataCopyItem copyItem , int batchId )
228
+ public synchronized void markBatchCompleted (CopyContext context , CopyContext .DataCopyItem copyItem , int batchId )
229
229
throws Exception {
230
230
LOG .debug ("Mark batch completed for {} with ID {}" , copyItem .getPipelineName (), batchId );
231
231
String insert = "DELETE FROM " + TABLECOPYBATCHES + " WHERE migrationId=? AND batchId=? AND pipelinename=?" ;
@@ -241,7 +241,8 @@ public void markBatchCompleted(CopyContext context, CopyContext.DataCopyItem cop
241
241
}
242
242
243
243
@ Override
244
- public void resetPipelineBatches (CopyContext context , CopyContext .DataCopyItem copyItem ) throws Exception {
244
+ public synchronized void resetPipelineBatches (CopyContext context , CopyContext .DataCopyItem copyItem )
245
+ throws Exception {
245
246
String insert = "DELETE FROM " + TABLECOPYBATCHES + " WHERE migrationId=? AND pipelinename=?" ;
246
247
try (Connection conn = getConnection (context ); PreparedStatement stmt = conn .prepareStatement (insert )) {
247
248
stmt .setObject (1 , context .getMigrationId ());
@@ -335,7 +336,7 @@ public Set<DatabaseCopyTask> findFailedTasks(CopyContext context) throws Excepti
335
336
}
336
337
337
338
@ Override
338
- public void updateTaskProgress (CopyContext context , CopyContext .DataCopyItem copyItem , long itemCount )
339
+ public synchronized void updateTaskProgress (CopyContext context , CopyContext .DataCopyItem copyItem , long itemCount )
339
340
throws Exception {
340
341
String sql = "UPDATE " + TABLECOPYTASKS
341
342
+ " SET targetrowcount=?, lastupdate=?, avgwriterrowthroughput=?, avgreaderrowthroughput=? WHERE targetnodeid=? AND migrationid=? AND pipelinename=?" ;
@@ -362,7 +363,7 @@ public void markTaskCompleted(final CopyContext context, final CopyContext.DataC
362
363
}
363
364
364
365
@ Override
365
- public void markTaskCompleted (final CopyContext context , final CopyContext .DataCopyItem copyItem ,
366
+ public synchronized void markTaskCompleted (final CopyContext context , final CopyContext .DataCopyItem copyItem ,
366
367
final String duration , final float durationseconds ) throws Exception {
367
368
Objects .requireNonNull (duration , "duration must not be null" );
368
369
// spotless:off
@@ -384,7 +385,7 @@ public void markTaskCompleted(final CopyContext context, final CopyContext.DataC
384
385
}
385
386
386
387
@ Override
387
- public void markTaskFailed (CopyContext context , CopyContext .DataCopyItem copyItem , Exception error )
388
+ public synchronized void markTaskFailed (CopyContext context , CopyContext .DataCopyItem copyItem , Exception error )
388
389
throws Exception {
389
390
// spotless:off
390
391
String sql = "UPDATE " + TABLECOPYTASKS + " SET failure='1', duration='-1', error=?, lastupdate=? WHERE targetnodeid=? AND migrationId=? AND pipelinename=? AND failure = '0'" ;
@@ -406,7 +407,8 @@ public void markTaskFailed(CopyContext context, CopyContext.DataCopyItem copyIte
406
407
}
407
408
408
409
@ Override
409
- public void markTaskTruncated (CopyContext context , CopyContext .DataCopyItem copyItem ) throws Exception {
410
+ public synchronized void markTaskTruncated (CopyContext context , CopyContext .DataCopyItem copyItem )
411
+ throws Exception {
410
412
String sql = "UPDATE " + TABLECOPYTASKS
411
413
+ " SET truncated = '1' WHERE targetnodeid=? AND migrationId=? AND pipelinename=? " ;
412
414
try (Connection connection = getConnection (context );
@@ -419,8 +421,8 @@ public void markTaskTruncated(CopyContext context, CopyContext.DataCopyItem copy
419
421
}
420
422
421
423
@ Override
422
- public void updateTaskCopyMethod (CopyContext context , CopyContext .DataCopyItem copyItem , String copyMethod )
423
- throws Exception {
424
+ public synchronized void updateTaskCopyMethod (CopyContext context , CopyContext .DataCopyItem copyItem ,
425
+ String copyMethod ) throws Exception {
424
426
String sql = "UPDATE " + TABLECOPYTASKS
425
427
+ " SET copymethod=? WHERE targetnodeid=? AND migrationId=? AND pipelinename=? " ;
426
428
try (Connection connection = getConnection (context );
@@ -434,7 +436,7 @@ public void updateTaskCopyMethod(CopyContext context, CopyContext.DataCopyItem c
434
436
}
435
437
436
438
@ Override
437
- public void updateTaskKeyColumns (CopyContext context , CopyContext .DataCopyItem copyItem ,
439
+ public synchronized void updateTaskKeyColumns (CopyContext context , CopyContext .DataCopyItem copyItem ,
438
440
Collection <String > keyColumns ) throws Exception {
439
441
String sql = "UPDATE " + TABLECOPYTASKS
440
442
+ " SET keycolumns=? WHERE targetnodeid=? AND migrationId=? AND pipelinename=? " ;
0 commit comments