Skip to content
Merged
10 changes: 9 additions & 1 deletion taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,18 @@ def add_environment(self, f):
# Indicate the number of times the task should be retried. If 0 (the
# default), the task is tried indefinitely. A task that did not succeed
# after the given number of retries is returned with result
# "result_max_retries".
# "max retries".
def set_retries(self, max_retries):
return cvine.vine_task_set_retries(self._task, max_retries)

##
# Indicate the number of times the task can be returned to the manager
# without being executed. If 0 default), the task is tried indefinitely.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

# A task that did not succeed after the given number of retries is returned
# with result "forsaken".
def set_max_forsaken(self, max_forsaken):
return cvine.vine_task_set_max_forsaken(self._task, max_forsaken)

##
# Indicate the number of cores required by this task.
def set_cores(self, cores):
Expand Down
12 changes: 9 additions & 3 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ typedef enum {
VINE_RESULT_OUTPUT_TRANSFER_ERROR = 9 << 3, /**< The task failed because an output could be transfered to the manager (not enough disk space, incorrect write permissions. */
VINE_RESULT_FIXED_LOCATION_MISSING = 10 << 3, /**< The task failed because no worker could satisfy the fixed location input file requirements. */
VINE_RESULT_CANCELLED = 11<<3, /**< The task was cancelled by the caller. */
VINE_RESULT_LIBRARY_EXIT = 12 << 3, /**< Task is a library that has terminated. **/
VINE_RESULT_TRANSFER_MISSING = 13 << 3, /**< Task failed because a worker could not fetch a file. **/
VINE_RESULT_LIBRARY_EXIT = 12 << 3 /**< Task is a library that has terminated. **/
} vine_result_t;

/** Select how to allocate resources for similar tasks with @ref vine_set_category_mode */
Expand Down Expand Up @@ -286,13 +285,20 @@ int vine_task_add_input( struct vine_task *t, struct vine_file *f, const char *r

int vine_task_add_output( struct vine_task *t, struct vine_file *f, const char *remote_name, vine_mount_flags_t flags );

/** Specify the number of times this task is retried on worker errors. If less than one, the task is retried indefinitely (this the default). A task that did not succeed after the given number of retries is returned with result VINE_RESULT_MAX_RETRIES.
/** Specify the number of times this task is retried on worker errors. If less than one, the task is retried indefinitely (this the default). A task that did not succeed after the given number of retries is returned with the result of its last attempt.
@param t A task object.
@param max_retries The number of retries.
*/

void vine_task_set_retries( struct vine_task *t, int64_t max_retries );

/** Specify the total number of times this task can be return to the manager without being executed. If less than zero, the task is tried indefinitely (this the default). A task that did not succeed after the given number is returned with the result VINE_RESULT_FORSAKEN.
@param t A task object.
@param max_retries The number of retries.
*/

void vine_task_set_max_forsaken( struct vine_task *t, int64_t max_forsaken );

/** Specify the amount of disk space required by a task.
@param t A task object.
@param memory The amount of disk space required by the task, in megabytes.
Expand Down
3 changes: 1 addition & 2 deletions taskvine/src/manager/vine_file_replica_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ struct vine_worker_info *vine_file_replica_table_find_worker(struct vine_manager
continue;

timestamp_t current_time = timestamp_get();
if (((current_time - peer->last_transfer_failure) / 1000000) <
(long unsigned int)q->transient_error_interval) {
if (current_time - peer->last_transfer_failure < q->transient_error_interval) {
debug(D_VINE, "Skipping worker source after recent failure : %s", peer->transfer_addr);
continue;
}
Expand Down
65 changes: 34 additions & 31 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ See the file COPYING for details.
/* Default value for keepalive timeout in seconds. */
#define VINE_DEFAULT_KEEPALIVE_TIMEOUT 900

/* Default value to before entity is considered again after last failure */
#define VINE_DEFAULT_TRANSIENT_ERROR_INTERVAL 15
/* Default value before entity is considered again after last failure, in usecs */
#define VINE_DEFAULT_TRANSIENT_ERROR_INTERVAL (15 * ONE_SECOND)

/* Default value before disconnecting a worker that keeps forsaking tasks without any completions */
#define VINE_DEFAULT_MAX_FORSAKEN_PER_WORKER 10

/* Default value for maximum size of standard output from task. (If larger, send to a separate file.) */
#define MAX_TASK_STDOUT_STORAGE (1 * GIGABYTE)
Expand Down Expand Up @@ -433,8 +436,12 @@ static int handle_cache_invalid(struct vine_manager *q, struct vine_worker_info

/* Remove the replica from our records. */
struct vine_file_replica *replica = vine_file_replica_table_remove(q, w, cachename);
if (replica)
if (replica) {
vine_file_replica_delete(replica);
}

/* throttle workers that could transfer a file */
w->last_failure_time = timestamp_get();

/* If the third argument was given, also remove the transfer record */
if (n >= 3) {
Expand Down Expand Up @@ -1016,10 +1023,6 @@ static int fetch_output_from_worker(struct vine_manager *q, struct vine_worker_i
/* If the worker didn't run the task don't bother fetching outputs. */
result = VINE_SUCCESS;
break;
case VINE_RESULT_TRANSFER_MISSING:
/* If the worker didn't run the task don't bother fetching outputs. */
result = VINE_TRANSIENT_FAILURE;
break;
case VINE_RESULT_RESOURCE_EXHAUSTION:
/* On resource exhaustion, just get the monitor files to figure out what happened. */
result = vine_manager_get_monitor_output_file(q, w, t);
Expand Down Expand Up @@ -1071,7 +1074,6 @@ static int fetch_output_from_worker(struct vine_manager *q, struct vine_worker_i
switch (t->result) {
case VINE_RESULT_INPUT_MISSING:
case VINE_RESULT_FORSAKEN:
case VINE_RESULT_TRANSFER_MISSING:
/* do not count tasks that didn't execute as complete, or finished tasks */
break;
default:
Expand Down Expand Up @@ -1129,6 +1131,13 @@ static int fetch_output_from_worker(struct vine_manager *q, struct vine_worker_i
}
}

/* XXX: temp fix. Make this a tunable parameter and/or make it more sophisticated */
if (w->forsaken_tasks > VINE_DEFAULT_MAX_FORSAKEN_PER_WORKER && w->total_tasks_complete == 0) {
debug(D_VINE, "Disconnecting worker that keeps forsaking tasks %s (%s).", w->hostname, w->addrport);
handle_failure(q, w, t, VINE_WORKER_FAILURE);
return 0;
}

return 1;
}

Expand Down Expand Up @@ -1532,10 +1541,16 @@ static vine_result_code_t get_result(struct vine_manager *q, struct vine_worker_
return VINE_SUCCESS;
}

/* If the task was forsaken by the worker, it didn't really complete, so short circuit. */
if (task_status != VINE_RESULT_SUCCESS) {
w->last_failure_time = timestamp_get();
t->time_when_last_failure = w->last_failure_time;
}

/* If the task was forsaken by the worker or couldn't exeute, it didn't really complete, so short circuit. */
if (task_status == VINE_RESULT_FORSAKEN) {
t->forsaken_count++;
itable_remove(q->running_table, t->task_id);
vine_task_set_result(t, VINE_RESULT_FORSAKEN);
vine_task_set_result(t, task_status);
change_task_state(q, t, VINE_TASK_WAITING_RETRIEVAL);
return VINE_SUCCESS;
}
Expand Down Expand Up @@ -2791,10 +2806,12 @@ static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worke
static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
/* in this function, any change_task_state should only be to VINE_TASK_READY */

if (t->result == VINE_RESULT_FORSAKEN) {
/* forsaken tasks are always resubmitted. they also get a retry back as they are victims of circumstance
*/
if (t->max_forsaken > -1 && t->forsaken_count > t->max_forsaken) {
return 0;
}

/* forsaken tasks get a retry back as they are victims of circumstance */
t->try_count -= 1;
change_task_state(q, t, VINE_TASK_READY);
return 1;
Expand All @@ -2806,21 +2823,11 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w
}

/* special handlings per result. note that most results are terminal, that is tasks are not retried even if they
* have not reached max_retries. max_retries is only used for transient errors, or for modified tasks (such as a
* change in the resource request). */
* have not reached max_retries. */
switch (t->result) {
case VINE_RESULT_RESOURCE_EXHAUSTION:
return resubmit_task_on_exhaustion(q, w, t);
break;
case VINE_RESULT_TRANSFER_MISSING:
if (t->max_retries > 0 && t->try_count > t->max_retries) {
t->result = VINE_RESULT_INPUT_MISSING;
return 0;
} else {
change_task_state(q, t, VINE_TASK_READY);
return 1;
}
break;
default:
/* by default tasks are not resumitted */
return 0;
Expand Down Expand Up @@ -3130,7 +3137,6 @@ static int send_one_task(struct vine_manager *q)

timestamp_t now_usecs = timestamp_get();
double now_secs = ((double)now_usecs) / ONE_SECOND;
timestamp_t time_failure_range = now_usecs - q->transient_error_interval * ONE_SECOND;

int tasks_to_consider = MIN(list_size(q->ready_list), q->attempt_schedule_depth);

Expand All @@ -3145,7 +3151,7 @@ static int send_one_task(struct vine_manager *q)
}

// Skip if this task failed recently
if (time_failure_range > t->time_when_last_failure) {
if (t->time_when_last_failure + q->transient_error_interval > now_usecs) {
continue;
}

Expand Down Expand Up @@ -4303,9 +4309,6 @@ const char *vine_result_string(vine_result_t result)
case VINE_RESULT_LIBRARY_EXIT:
str = "LIBRARY_EXIT";
break;
case VINE_RESULT_TRANSFER_MISSING:
str = "TRANSFER_MISSING";
break;
}

return str;
Expand Down Expand Up @@ -4432,7 +4435,7 @@ static struct vine_task *send_library_to_worker(struct vine_manager *q, struct v

timestamp_t lastfail = original->time_when_last_failure;
timestamp_t current = timestamp_get();
if (current < (lastfail + q->transient_error_interval * ONE_SECOND)) {
if (current < lastfail + q->transient_error_interval) {
return 0;
}

Expand Down Expand Up @@ -5281,7 +5284,7 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
if (value < 1) {
q->transient_error_interval = VINE_DEFAULT_TRANSIENT_ERROR_INTERVAL;
} else {
q->transient_error_interval = value;
q->transient_error_interval = value * ONE_SECOND;
}

} else {
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ struct vine_manager {

int update_interval; /* Seconds between updates to the catalog. */
int resource_management_interval; /* Seconds between measurement of manager local resources. */
int transient_error_interval; /* Seconds between new attempts on task rescheduling and using a file replica as source after a failure. */
timestamp_t transient_error_interval; /* microseconds between new attempts on task rescheduling and using a file replica as source after a failure. */

/*todo: confirm datatype. int or int64*/
int max_task_stdout_storage; /* Maximum size of standard output from task. (If larger, send to a separate file.) */
Expand Down
5 changes: 5 additions & 0 deletions taskvine/src/manager/vine_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
return 0;
}

/* Don't send tasks if a task recently failed at this worker. */
if (w->last_failure_time + q->transient_error_interval > timestamp_get()) {
return 0;
}

/* Don't send tasks if the factory is used and has too many connected workers. */
if (w->factory_name) {
struct vine_factory_info *f = vine_factory_info_lookup(q, w->factory_name);
Expand Down
13 changes: 13 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ struct vine_task *vine_task_create(const char *command_line)
t->result = VINE_RESULT_UNKNOWN;
t->exit_code = -1;

t->max_forsaken = -1;

t->time_when_last_failure = -1;

/* In the absence of additional information, a task consumes an entire worker. */
Expand Down Expand Up @@ -129,6 +131,7 @@ void vine_task_reset(struct vine_task *t)

t->resource_request = CATEGORY_ALLOCATION_FIRST;
t->try_count = 0;
t->forsaken_count = 0;
t->exhausted_attempts = 0;
t->workers_slow = 0;

Expand Down Expand Up @@ -226,6 +229,7 @@ struct vine_task *vine_task_copy(const struct vine_task *task)
vine_task_set_scheduler(new, task->worker_selection_algorithm);
vine_task_set_priority(new, task->priority);
vine_task_set_retries(new, task->max_retries);
vine_task_set_max_forsaken(new, task->max_forsaken);
vine_task_set_time_min(new, task->min_running_time);

/* Internal state of task is cleared from vine_task_create */
Expand Down Expand Up @@ -306,6 +310,15 @@ void vine_task_set_retries(struct vine_task *t, int64_t max_retries)
}
}

void vine_task_set_max_forsaken(struct vine_task *t, int64_t max_forsaken)
{
if (max_forsaken < 0) {
t->max_forsaken = -1;
} else {
t->max_forsaken = max_forsaken;
}
}

void vine_task_set_memory(struct vine_task *t, int64_t memory)
{
if (memory < 0) {
Expand Down
11 changes: 7 additions & 4 deletions taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,20 @@ struct vine_task {
vine_schedule_t worker_selection_algorithm; /**< How to choose worker to run the task. */
double priority; /**< The priority of this task relative to others in the queue: higher number run earlier. */
int max_retries; /**< Number of times the task is tried to be executed on some workers until success. If less than one, the task is retried indefinitely. See try_count below.*/
int max_forsaken; /**< Number of times the task is submitted to workers without being executed. If less than one, the task is retried indefinitely. See forsaken_count below.*/
int64_t min_running_time; /**< Minimum time (in seconds) the task needs to run. (see vine_worker --wall-time)*/

/***** Internal state of task as it works towards completion. *****/

vine_task_state_t state; /**< Current state of task: READY, RUNNING, etc */
struct vine_worker_info *worker; /**< Worker to which this task has been dispatched. */
struct vine_task* library_task; /**< Library task to which a function task has been matched. */
int try_count; /**< The number of times the task has been dispatched to a worker. If larger than max_retries, the task failes with @ref VINE_RESULT_MAX_RETRIES. */
int exhausted_attempts; /**< Number of times the task failed given exhausted resources. */
int workers_slow; /**< Number of times this task has been terminated for running too long. */
int function_slots_inuse; /**< If a library, the number of functions currently running. */
int try_count; /**< The number of times the task has been dispatched to a worker without being forsaken. If larger than max_retries, return with result of last attempt. */
int forsaken_count; /**< The number of times the task has been dispatched to a worker. If larger than max_forsaken, return with VINE_RESULT_FORSAKEN. */
int exhausted_attempts; /**< Number of times the task failed given exhausted resources. */
int forsaken_attempts; /**< Number of times the task was submitted to a worker but failed to start execution. */
int workers_slow; /**< Number of times this task has been terminated for running too long. */
int function_slots_inuse; /**< If a library, the number of functions currently running. */

/***** Results of task once it has reached completion. *****/

Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_worker_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct vine_worker_info *vine_worker_create(struct link *lnk)

w->last_update_msg_time = w->start_time;
w->last_transfer_failure = 0;
w->last_failure_time = 0;

return w;
}
Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_worker_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct vine_worker_info {
int finished_tasks;
int64_t total_tasks_complete;
int64_t total_bytes_transferred;
int forsaken_tasks;
int64_t inuse_cache;

timestamp_t total_task_time;
Expand All @@ -76,6 +77,7 @@ struct vine_worker_info {
timestamp_t start_time;
timestamp_t last_msg_recv_time;
timestamp_t last_update_msg_time;
timestamp_t last_failure_time;
};

struct vine_worker_info * vine_worker_create( struct link * lnk );
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ static int start_process(struct vine_process *p, struct link *manager)
/* Create the sandbox environment for the task. */
if (!vine_sandbox_stagein(p, cache_manager)) {
p->execution_start = p->execution_end = timestamp_get();
p->result = VINE_RESULT_TRANSFER_MISSING;
p->result = VINE_RESULT_FORSAKEN;
p->exit_code = 1;
itable_insert(procs_complete, p->task->task_id, p);
return 0;
Expand Down
10 changes: 5 additions & 5 deletions taskvine/test/vine_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def next_output_name():
report_task(t, "success", 0)

# should fail in the alloted time
t = vine.Task("/bin/sleep 10")
t = vine.Task("/bin/sleep 100")
t.set_time_max(1)
q.submit(t)
t = q.wait(wait_time)
Expand Down Expand Up @@ -266,13 +266,13 @@ def next_output_name():
report_task(t, "success", 0)

# generate an invalid remote input file, should get an input missing error.
t = vine.Task("wc -l infile")
t.set_retries(1)
t = vine.Task("wc -l infile_for_forsaken")
t.set_max_forsaken(1)
url = q.declare_url("https://pretty-sure-this-is-not-a-valid-url.com")
t.add_input(url, "infile")
t.add_input(url, "infile_for_forsaken")
q.submit(t)
t = q.wait(wait_time)
report_task(t, "transfer missing", 1)
report_task(t, "forsaken", -1)

# create a temporary output file, and then fetch its contents manually.
t = vine.Task("echo howdy > output")
Expand Down