Skip to content

Commit 143dd07

Browse files
authored
transfer missing to forsaken (#3747)
* vine: do not count as tasks completed missing transfers * vine: transient error to usecs to make it easier to use * vine: adds failure time to worker * transfer missing into forsaken * record time task failure on status * throttle worker that could not get file (all cases) * keep track of number of forsaken attempts * correctly initialize max forsaken * temp fix: disconnect workers that forsake tasks with no completions * fix typo
1 parent 0f9390d commit 143dd07

File tree

12 files changed

+89
-49
lines changed

12 files changed

+89
-49
lines changed

taskvine/src/bindings/python3/ndcctools/taskvine/task.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,13 +420,21 @@ def add_environment(self, f):
420420
return cvine.vine_task_add_environment(self._task, f._file)
421421

422422
##
423-
# Indicate the number of times the task should be retried. If 0 (the
423+
# Indicate the number of times the task should be retried. If less than 1 (the
424424
# default), the task is tried indefinitely. A task that did not succeed
425425
# after the given number of retries is returned with result
426-
# "result_max_retries".
426+
# "max retries".
427427
def set_retries(self, max_retries):
428428
return cvine.vine_task_set_retries(self._task, max_retries)
429429

430+
##
431+
# Indicate the number of times the task can be returned to the manager
432+
# without being executed. If less than 0 (the default), the task is tried indefinitely.
433+
# A task that did not succeed after the given number of retries is returned
434+
# with result "forsaken".
435+
def set_max_forsaken(self, max_forsaken):
436+
return cvine.vine_task_set_max_forsaken(self._task, max_forsaken)
437+
430438
##
431439
# Indicate the number of cores required by this task.
432440
def set_cores(self, cores):

taskvine/src/manager/taskvine.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ typedef enum {
9797
VINE_RESULT_OUTPUT_TRANSFER_ERROR = 9 << 3, /**< The task failed because an output could be transfered to the manager (not enough disk space, incorrect write permissions. */
9898
VINE_RESULT_FIXED_LOCATION_MISSING = 10 << 3, /**< The task failed because no worker could satisfy the fixed location input file requirements. */
9999
VINE_RESULT_CANCELLED = 11<<3, /**< The task was cancelled by the caller. */
100-
VINE_RESULT_LIBRARY_EXIT = 12 << 3, /**< Task is a library that has terminated. **/
101-
VINE_RESULT_TRANSFER_MISSING = 13 << 3, /**< Task failed because a worker could not fetch a file. **/
100+
VINE_RESULT_LIBRARY_EXIT = 12 << 3 /**< Task is a library that has terminated. **/
102101
} vine_result_t;
103102

104103
/** Select how to allocate resources for similar tasks with @ref vine_set_category_mode */
@@ -286,13 +285,20 @@ int vine_task_add_input( struct vine_task *t, struct vine_file *f, const char *r
286285

287286
int vine_task_add_output( struct vine_task *t, struct vine_file *f, const char *remote_name, vine_mount_flags_t flags );
288287

289-
/** Specify the number of times this task is retried on worker errors. If less than one, the task is retried indefinitely (this the default). A task that did not succeed after the given number of retries is returned with result VINE_RESULT_MAX_RETRIES.
288+
/** Specify the number of times this task is retried on worker errors. If less than one, the task is retried indefinitely (this the default). A task that did not succeed after the given number of retries is returned with the result of its last attempt.
290289
@param t A task object.
291290
@param max_retries The number of retries.
292291
*/
293292

294293
void vine_task_set_retries( struct vine_task *t, int64_t max_retries );
295294

295+
/** Specify the total number of times this task can be return to the manager without being executed. If less than zero, the task is tried indefinitely (this the default). A task that did not succeed after the given number is returned with the result VINE_RESULT_FORSAKEN.
296+
@param t A task object.
297+
@param max_retries The number of retries.
298+
*/
299+
300+
void vine_task_set_max_forsaken( struct vine_task *t, int64_t max_forsaken );
301+
296302
/** Specify the amount of disk space required by a task.
297303
@param t A task object.
298304
@param memory The amount of disk space required by the task, in megabytes.

taskvine/src/manager/vine_file_replica_table.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ struct vine_worker_info *vine_file_replica_table_find_worker(struct vine_manager
9090
continue;
9191

9292
timestamp_t current_time = timestamp_get();
93-
if (((current_time - peer->last_transfer_failure) / 1000000) <
94-
(long unsigned int)q->transient_error_interval) {
93+
if (current_time - peer->last_transfer_failure < q->transient_error_interval) {
9594
debug(D_VINE, "Skipping worker source after recent failure : %s", peer->transfer_addr);
9695
continue;
9796
}

taskvine/src/manager/vine_manager.c

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,11 @@ See the file COPYING for details.
9090
/* Default value for keepalive timeout in seconds. */
9191
#define VINE_DEFAULT_KEEPALIVE_TIMEOUT 900
9292

93-
/* Default value to before entity is considered again after last failure */
94-
#define VINE_DEFAULT_TRANSIENT_ERROR_INTERVAL 15
93+
/* Default value before entity is considered again after last failure, in usecs */
94+
#define VINE_DEFAULT_TRANSIENT_ERROR_INTERVAL (15 * ONE_SECOND)
95+
96+
/* Default value before disconnecting a worker that keeps forsaking tasks without any completions */
97+
#define VINE_DEFAULT_MAX_FORSAKEN_PER_WORKER 10
9598

9699
/* Default value for maximum size of standard output from task. (If larger, send to a separate file.) */
97100
#define MAX_TASK_STDOUT_STORAGE (1 * GIGABYTE)
@@ -433,8 +436,12 @@ static int handle_cache_invalid(struct vine_manager *q, struct vine_worker_info
433436

434437
/* Remove the replica from our records. */
435438
struct vine_file_replica *replica = vine_file_replica_table_remove(q, w, cachename);
436-
if (replica)
439+
if (replica) {
437440
vine_file_replica_delete(replica);
441+
}
442+
443+
/* throttle workers that could transfer a file */
444+
w->last_failure_time = timestamp_get();
438445

439446
/* If the third argument was given, also remove the transfer record */
440447
if (n >= 3) {
@@ -1016,10 +1023,6 @@ static int fetch_output_from_worker(struct vine_manager *q, struct vine_worker_i
10161023
/* If the worker didn't run the task don't bother fetching outputs. */
10171024
result = VINE_SUCCESS;
10181025
break;
1019-
case VINE_RESULT_TRANSFER_MISSING:
1020-
/* If the worker didn't run the task don't bother fetching outputs. */
1021-
result = VINE_TRANSIENT_FAILURE;
1022-
break;
10231026
case VINE_RESULT_RESOURCE_EXHAUSTION:
10241027
/* On resource exhaustion, just get the monitor files to figure out what happened. */
10251028
result = vine_manager_get_monitor_output_file(q, w, t);
@@ -1071,7 +1074,6 @@ static int fetch_output_from_worker(struct vine_manager *q, struct vine_worker_i
10711074
switch (t->result) {
10721075
case VINE_RESULT_INPUT_MISSING:
10731076
case VINE_RESULT_FORSAKEN:
1074-
case VINE_RESULT_TRANSFER_MISSING:
10751077
/* do not count tasks that didn't execute as complete, or finished tasks */
10761078
break;
10771079
default:
@@ -1129,6 +1131,13 @@ static int fetch_output_from_worker(struct vine_manager *q, struct vine_worker_i
11291131
}
11301132
}
11311133

1134+
/* XXX: temp fix. Make this a tunable parameter and/or make it more sophisticated */
1135+
if (w->forsaken_tasks > VINE_DEFAULT_MAX_FORSAKEN_PER_WORKER && w->total_tasks_complete == 0) {
1136+
debug(D_VINE, "Disconnecting worker that keeps forsaking tasks %s (%s).", w->hostname, w->addrport);
1137+
handle_failure(q, w, t, VINE_WORKER_FAILURE);
1138+
return 0;
1139+
}
1140+
11321141
return 1;
11331142
}
11341143

@@ -1532,10 +1541,16 @@ static vine_result_code_t get_result(struct vine_manager *q, struct vine_worker_
15321541
return VINE_SUCCESS;
15331542
}
15341543

1535-
/* If the task was forsaken by the worker, it didn't really complete, so short circuit. */
1544+
if (task_status != VINE_RESULT_SUCCESS) {
1545+
w->last_failure_time = timestamp_get();
1546+
t->time_when_last_failure = w->last_failure_time;
1547+
}
1548+
1549+
/* If the task was forsaken by the worker or couldn't exeute, it didn't really complete, so short circuit. */
15361550
if (task_status == VINE_RESULT_FORSAKEN) {
1551+
t->forsaken_count++;
15371552
itable_remove(q->running_table, t->task_id);
1538-
vine_task_set_result(t, VINE_RESULT_FORSAKEN);
1553+
vine_task_set_result(t, task_status);
15391554
change_task_state(q, t, VINE_TASK_WAITING_RETRIEVAL);
15401555
return VINE_SUCCESS;
15411556
}
@@ -2791,10 +2806,12 @@ static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worke
27912806
static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
27922807
{
27932808
/* in this function, any change_task_state should only be to VINE_TASK_READY */
2794-
27952809
if (t->result == VINE_RESULT_FORSAKEN) {
2796-
/* forsaken tasks are always resubmitted. they also get a retry back as they are victims of circumstance
2797-
*/
2810+
if (t->max_forsaken > -1 && t->forsaken_count > t->max_forsaken) {
2811+
return 0;
2812+
}
2813+
2814+
/* forsaken tasks get a retry back as they are victims of circumstance */
27982815
t->try_count -= 1;
27992816
change_task_state(q, t, VINE_TASK_READY);
28002817
return 1;
@@ -2806,21 +2823,11 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w
28062823
}
28072824

28082825
/* special handlings per result. note that most results are terminal, that is tasks are not retried even if they
2809-
* have not reached max_retries. max_retries is only used for transient errors, or for modified tasks (such as a
2810-
* change in the resource request). */
2826+
* have not reached max_retries. */
28112827
switch (t->result) {
28122828
case VINE_RESULT_RESOURCE_EXHAUSTION:
28132829
return resubmit_task_on_exhaustion(q, w, t);
28142830
break;
2815-
case VINE_RESULT_TRANSFER_MISSING:
2816-
if (t->max_retries > 0 && t->try_count > t->max_retries) {
2817-
t->result = VINE_RESULT_INPUT_MISSING;
2818-
return 0;
2819-
} else {
2820-
change_task_state(q, t, VINE_TASK_READY);
2821-
return 1;
2822-
}
2823-
break;
28242831
default:
28252832
/* by default tasks are not resumitted */
28262833
return 0;
@@ -3130,7 +3137,6 @@ static int send_one_task(struct vine_manager *q)
31303137

31313138
timestamp_t now_usecs = timestamp_get();
31323139
double now_secs = ((double)now_usecs) / ONE_SECOND;
3133-
timestamp_t time_failure_range = now_usecs - q->transient_error_interval * ONE_SECOND;
31343140

31353141
int tasks_to_consider = MIN(list_size(q->ready_list), q->attempt_schedule_depth);
31363142

@@ -3145,7 +3151,7 @@ static int send_one_task(struct vine_manager *q)
31453151
}
31463152

31473153
// Skip if this task failed recently
3148-
if (time_failure_range > t->time_when_last_failure) {
3154+
if (t->time_when_last_failure + q->transient_error_interval > now_usecs) {
31493155
continue;
31503156
}
31513157

@@ -4303,9 +4309,6 @@ const char *vine_result_string(vine_result_t result)
43034309
case VINE_RESULT_LIBRARY_EXIT:
43044310
str = "LIBRARY_EXIT";
43054311
break;
4306-
case VINE_RESULT_TRANSFER_MISSING:
4307-
str = "TRANSFER_MISSING";
4308-
break;
43094312
}
43104313

43114314
return str;
@@ -4432,7 +4435,7 @@ static struct vine_task *send_library_to_worker(struct vine_manager *q, struct v
44324435

44334436
timestamp_t lastfail = original->time_when_last_failure;
44344437
timestamp_t current = timestamp_get();
4435-
if (current < (lastfail + q->transient_error_interval * ONE_SECOND)) {
4438+
if (current < lastfail + q->transient_error_interval) {
44364439
return 0;
44374440
}
44384441

@@ -5281,7 +5284,7 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
52815284
if (value < 1) {
52825285
q->transient_error_interval = VINE_DEFAULT_TRANSIENT_ERROR_INTERVAL;
52835286
} else {
5284-
q->transient_error_interval = value;
5287+
q->transient_error_interval = value * ONE_SECOND;
52855288
}
52865289

52875290
} else {

taskvine/src/manager/vine_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ struct vine_manager {
206206

207207
int update_interval; /* Seconds between updates to the catalog. */
208208
int resource_management_interval; /* Seconds between measurement of manager local resources. */
209-
int transient_error_interval; /* Seconds between new attempts on task rescheduling and using a file replica as source after a failure. */
209+
timestamp_t transient_error_interval; /* microseconds between new attempts on task rescheduling and using a file replica as source after a failure. */
210210

211211
/*todo: confirm datatype. int or int64*/
212212
int max_task_stdout_storage; /* Maximum size of standard output from task. (If larger, send to a separate file.) */

taskvine/src/manager/vine_schedule.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
142142
return 0;
143143
}
144144

145+
/* Don't send tasks if a task recently failed at this worker. */
146+
if (w->last_failure_time + q->transient_error_interval > timestamp_get()) {
147+
return 0;
148+
}
149+
145150
/* Don't send tasks if the factory is used and has too many connected workers. */
146151
if (w->factory_name) {
147152
struct vine_factory_info *f = vine_factory_info_lookup(q, w->factory_name);

taskvine/src/manager/vine_task.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ struct vine_task *vine_task_create(const char *command_line)
6161
t->result = VINE_RESULT_UNKNOWN;
6262
t->exit_code = -1;
6363

64+
t->max_forsaken = -1;
65+
6466
t->time_when_last_failure = -1;
6567

6668
/* In the absence of additional information, a task consumes an entire worker. */
@@ -129,6 +131,7 @@ void vine_task_reset(struct vine_task *t)
129131

130132
t->resource_request = CATEGORY_ALLOCATION_FIRST;
131133
t->try_count = 0;
134+
t->forsaken_count = 0;
132135
t->exhausted_attempts = 0;
133136
t->workers_slow = 0;
134137

@@ -226,6 +229,7 @@ struct vine_task *vine_task_copy(const struct vine_task *task)
226229
vine_task_set_scheduler(new, task->worker_selection_algorithm);
227230
vine_task_set_priority(new, task->priority);
228231
vine_task_set_retries(new, task->max_retries);
232+
vine_task_set_max_forsaken(new, task->max_forsaken);
229233
vine_task_set_time_min(new, task->min_running_time);
230234

231235
/* Internal state of task is cleared from vine_task_create */
@@ -306,6 +310,15 @@ void vine_task_set_retries(struct vine_task *t, int64_t max_retries)
306310
}
307311
}
308312

313+
void vine_task_set_max_forsaken(struct vine_task *t, int64_t max_forsaken)
314+
{
315+
if (max_forsaken < 0) {
316+
t->max_forsaken = -1;
317+
} else {
318+
t->max_forsaken = max_forsaken;
319+
}
320+
}
321+
309322
void vine_task_set_memory(struct vine_task *t, int64_t memory)
310323
{
311324
if (memory < 0) {

taskvine/src/manager/vine_task.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,20 @@ struct vine_task {
6161
vine_schedule_t worker_selection_algorithm; /**< How to choose worker to run the task. */
6262
double priority; /**< The priority of this task relative to others in the queue: higher number run earlier. */
6363
int max_retries; /**< Number of times the task is tried to be executed on some workers until success. If less than one, the task is retried indefinitely. See try_count below.*/
64+
int max_forsaken; /**< Number of times the task is submitted to workers without being executed. If less than one, the task is retried indefinitely. See forsaken_count below.*/
6465
int64_t min_running_time; /**< Minimum time (in seconds) the task needs to run. (see vine_worker --wall-time)*/
6566

6667
/***** Internal state of task as it works towards completion. *****/
6768

6869
vine_task_state_t state; /**< Current state of task: READY, RUNNING, etc */
6970
struct vine_worker_info *worker; /**< Worker to which this task has been dispatched. */
7071
struct vine_task* library_task; /**< Library task to which a function task has been matched. */
71-
int try_count; /**< The number of times the task has been dispatched to a worker. If larger than max_retries, the task failes with @ref VINE_RESULT_MAX_RETRIES. */
72-
int exhausted_attempts; /**< Number of times the task failed given exhausted resources. */
73-
int workers_slow; /**< Number of times this task has been terminated for running too long. */
74-
int function_slots_inuse; /**< If a library, the number of functions currently running. */
72+
int try_count; /**< The number of times the task has been dispatched to a worker without being forsaken. If larger than max_retries, return with result of last attempt. */
73+
int forsaken_count; /**< The number of times the task has been dispatched to a worker. If larger than max_forsaken, return with VINE_RESULT_FORSAKEN. */
74+
int exhausted_attempts; /**< Number of times the task failed given exhausted resources. */
75+
int forsaken_attempts; /**< Number of times the task was submitted to a worker but failed to start execution. */
76+
int workers_slow; /**< Number of times this task has been terminated for running too long. */
77+
int function_slots_inuse; /**< If a library, the number of functions currently running. */
7578

7679
/***** Results of task once it has reached completion. *****/
7780

taskvine/src/manager/vine_worker_info.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ struct vine_worker_info *vine_worker_create(struct link *lnk)
3939

4040
w->last_update_msg_time = w->start_time;
4141
w->last_transfer_failure = 0;
42+
w->last_failure_time = 0;
4243

4344
return w;
4445
}

taskvine/src/manager/vine_worker_info.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ struct vine_worker_info {
6868
int finished_tasks;
6969
int64_t total_tasks_complete;
7070
int64_t total_bytes_transferred;
71+
int forsaken_tasks;
7172
int64_t inuse_cache;
7273

7374
timestamp_t total_task_time;
@@ -76,6 +77,7 @@ struct vine_worker_info {
7677
timestamp_t start_time;
7778
timestamp_t last_msg_recv_time;
7879
timestamp_t last_update_msg_time;
80+
timestamp_t last_failure_time;
7981
};
8082

8183
struct vine_worker_info * vine_worker_create( struct link * lnk );

0 commit comments

Comments
 (0)