/*****************************************************************************\ * job_scheduler.c - manage the scheduling of pending jobs in priority order * Note there is a global job list (job_list) ***************************************************************************** * Copyright (C) 2002-2007 The Regents of the University of California. * Copyright (C) 2008-2010 Lawrence Livermore National Security. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Morris Jette <jette1@llnl.gov> * CODE-OCEC-09-009. All rights reserved. * * This file is part of SLURM, a resource management program. * For details, see <http://www.schedmd.com/slurmdocs/>. * Please also read the included file: DISCLAIMER. * * SLURM is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * * In addition, as a special exception, the copyright holders give permission * to link the code of portions of this program with the OpenSSL library under * certain conditions as described in each individual source file, and * distribute linked combinations including the two. You must obey the GNU * General Public License in all respects for all of the code used other than * OpenSSL. If you modify file(s) with this exception, you may extend this * exception to your version of the file(s), but you are not obligated to do * so. If you do not wish to do so, delete this exception statement from your * version. If you delete this exception statement from all source files in * the program, then also delete it here. * * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along * with SLURM; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. \*****************************************************************************/ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "src/common/assoc_mgr.h" #include "src/common/env.h" #include "src/common/gres.h" #include "src/common/list.h" #include "src/common/macros.h" #include "src/common/node_select.h" #include "src/common/slurm_accounting_storage.h" #include "src/common/timers.h" #include "src/common/uid.h" #include "src/common/xassert.h" #include "src/common/xstring.h" #include "src/slurmctld/acct_policy.h" #include "src/slurmctld/agent.h" #include "src/slurmctld/front_end.h" #include "src/slurmctld/job_scheduler.h" #include "src/slurmctld/licenses.h" #include "src/slurmctld/locks.h" #include "src/slurmctld/node_scheduler.h" #include "src/slurmctld/preempt.h" #include "src/slurmctld/proc_req.h" #include "src/slurmctld/reservation.h" #include "src/slurmctld/slurmctld.h" #include "src/slurmctld/srun_comm.h" #define _DEBUG 0 #define MAX_RETRIES 10 static char ** _build_env(struct job_record *job_ptr); static void _depend_list_del(void *dep_ptr); static void _feature_list_delete(void *x); static void _job_queue_append(List job_queue, struct job_record *job_ptr, struct part_record *part_ptr); static void _job_queue_rec_del(void *x); static void * _run_epilog(void *arg); static void * _run_prolog(void *arg); static bool _scan_depend(List dependency_list, uint32_t job_id); static int _valid_feature_list(uint32_t job_id, List feature_list); static int _valid_node_feature(char *feature); static int save_last_part_update = 0; /* * _build_user_job_list - build list of jobs for a given user * and an optional job name * IN user_id - user id * IN job_name - job name constraint * RET the job queue * NOTE: the caller must call list_destroy() on RET value to free memory */ static List _build_user_job_list(uint32_t user_id, char* job_name) { List job_queue; ListIterator job_iterator; struct job_record *job_ptr = NULL; job_queue = list_create(NULL); if (job_queue == NULL) fatal("list_create memory allocation failure"); job_iterator = list_iterator_create(job_list); if (job_iterator == NULL) fatal("list_iterator_create malloc failure"); while ((job_ptr = (struct job_record *) list_next(job_iterator))) { xassert (job_ptr->magic == JOB_MAGIC); if (job_ptr->user_id != user_id) continue; if (job_name && job_ptr->name && strcmp(job_name, job_ptr->name)) continue; list_append(job_queue, job_ptr); } list_iterator_destroy(job_iterator); return job_queue; } static void _job_queue_append(List job_queue, struct job_record *job_ptr, struct part_record *part_ptr) { job_queue_rec_t *job_queue_rec; job_queue_rec = xmalloc(sizeof(job_queue_rec_t)); job_queue_rec->job_ptr = job_ptr; job_queue_rec->part_ptr = part_ptr; list_append(job_queue, job_queue_rec); } static void _job_queue_rec_del(void *x) { xfree(x); } /* * build_job_queue - build (non-priority ordered) list of pending jobs * IN clear_start - if set then clear the start_time for pending jobs * RET the job queue * NOTE: the caller must call list_destroy() on RET value to free memory */ extern List build_job_queue(bool clear_start) { List job_queue; ListIterator job_iterator, part_iterator; struct job_record *job_ptr = NULL; struct part_record *part_ptr; bool job_is_pending; bool job_indepen = false; job_queue = list_create(_job_queue_rec_del); if (job_queue == NULL) fatal("list_create memory allocation failure"); job_iterator = list_iterator_create(job_list); if (job_iterator == NULL) fatal("list_iterator_create memory allocation failure"); while ((job_ptr = (struct job_record *) list_next(job_iterator))) { xassert (job_ptr->magic == JOB_MAGIC); job_is_pending = IS_JOB_PENDING(job_ptr); if (!job_is_pending || IS_JOB_COMPLETING(job_ptr)) continue; /* ensure dependency shows current values behind a hold */ job_indepen = job_independent(job_ptr, 0); if (job_is_pending && clear_start) job_ptr->start_time = (time_t) 0; if (job_ptr->priority == 0) { /* held */ if ((job_ptr->state_reason != WAIT_HELD) && (job_ptr->state_reason != WAIT_HELD_USER)) { job_ptr->state_reason = WAIT_HELD; xfree(job_ptr->state_desc); } debug3("sched: JobId=%u. State=%s. Reason=%s. " "Priority=%u.", job_ptr->job_id, job_state_string(job_ptr->job_state), job_reason_string(job_ptr->state_reason), job_ptr->priority); continue; } else if ((job_ptr->priority == 1) && !job_indepen && ((job_ptr->state_reason == WAIT_HELD) || (job_ptr->state_reason == WAIT_HELD_USER))) { /* released behind active dependency? */ job_ptr->state_reason = WAIT_DEPENDENCY; xfree(job_ptr->state_desc); } if (!job_indepen) /* can not run now */ continue; if (job_ptr->part_ptr_list) { part_iterator = list_iterator_create(job_ptr-> part_ptr_list); if (part_iterator == NULL) fatal("list_iterator_create malloc failure"); while ((part_ptr = (struct part_record *) list_next(part_iterator))) { job_ptr->part_ptr = part_ptr; if (job_limits_check(&job_ptr) != WAIT_NO_REASON) continue; _job_queue_append(job_queue, job_ptr, part_ptr); } list_iterator_destroy(part_iterator); } else { if (job_ptr->part_ptr == NULL) { part_ptr = find_part_record(job_ptr->partition); if (part_ptr == NULL) { error("Could not find partition %s " "for job %u", job_ptr->partition, job_ptr->job_id); continue; } job_ptr->part_ptr = part_ptr; error("partition pointer reset for job %u, " "part %s", job_ptr->job_id, job_ptr->partition); } _job_queue_append(job_queue, job_ptr, job_ptr->part_ptr); } } list_iterator_destroy(job_iterator); return job_queue; } /* * job_is_completing - Determine if jobs are in the process of completing. * RET - True of any job is in the process of completing AND * CompleteWait is configured non-zero * NOTE: This function can reduce resource fragmentation, which is a * critical issue on Elan interconnect based systems. */ extern bool job_is_completing(void) { bool completing = false; ListIterator job_iterator; struct job_record *job_ptr = NULL; uint16_t complete_wait = slurm_get_complete_wait(); time_t recent; if ((job_list == NULL) || (complete_wait == 0)) return completing; recent = time(NULL) - complete_wait; job_iterator = list_iterator_create(job_list); while ((job_ptr = (struct job_record *) list_next(job_iterator))) { if (IS_JOB_COMPLETING(job_ptr) && (job_ptr->end_time >= recent)) { completing = true; break; } } list_iterator_destroy(job_iterator); return completing; } /* * set_job_elig_time - set the eligible time for pending jobs once their * dependencies are lifted (in job->details->begin_time) */ extern void set_job_elig_time(void) { struct job_record *job_ptr = NULL; struct part_record *part_ptr = NULL; ListIterator job_iterator; slurmctld_lock_t job_write_lock = { READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK }; lock_slurmctld(job_write_lock); job_iterator = list_iterator_create(job_list); while ((job_ptr = (struct job_record *) list_next(job_iterator))) { part_ptr = job_ptr->part_ptr; if (!IS_JOB_PENDING(job_ptr)) continue; if (part_ptr == NULL) continue; if ((job_ptr->details == NULL) || job_ptr->details->begin_time) continue; if ((part_ptr->state_up & PARTITION_SCHED) == 0) continue; if ((job_ptr->time_limit != NO_VAL) && (job_ptr->time_limit > part_ptr->max_time)) continue; if ((job_ptr->details->max_nodes != 0) && ((job_ptr->details->max_nodes < part_ptr->min_nodes) || (job_ptr->details->min_nodes > part_ptr->max_nodes))) continue; /* Job's eligible time is set in job_independent() */ if (!job_independent(job_ptr, 0)) continue; } list_iterator_destroy(job_iterator); unlock_slurmctld(job_write_lock); } /* Test of part_ptr can still run jobs or if its nodes have * already been reserved by higher priority jobs (those in * the failed_parts array) */ static bool _failed_partition(struct part_record *part_ptr, struct part_record **failed_parts, int failed_part_cnt) { int i; for (i = 0; i < failed_part_cnt; i++) { if (failed_parts[i] == part_ptr) return true; } return false; } /* * schedule - attempt to schedule all pending jobs * pending jobs for each partition will be scheduled in priority * order until a request fails * IN job_limit - maximum number of jobs to test now, avoid testing the full * queue on every job submit (0 means to use the system default, * SchedulerParameters for default_queue_depth) * RET count of jobs scheduled * Note: We re-build the queue every time. Jobs can not only be added * or removed from the queue, but have their priority or partition * changed with the update_job RPC. In general nodes will be in priority * order (by submit time), so the sorting should be pretty fast. */ extern int schedule(uint32_t job_limit) { List job_queue = NULL; int error_code, failed_part_cnt = 0, job_cnt = 0, i; uint32_t job_depth = 0; job_queue_rec_t *job_queue_rec; struct job_record *job_ptr; struct part_record *part_ptr, **failed_parts = NULL; bitstr_t *save_avail_node_bitmap; /* Locks: Read config, write job, write node, read partition */ slurmctld_lock_t job_write_lock = { READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK }; #ifdef HAVE_BG char *ionodes = NULL; char tmp_char[256]; static bool backfill_sched = false; #endif static time_t sched_update = 0; static bool wiki_sched = false; static int sched_timeout = 0; static int def_job_limit = 100; time_t now = time(NULL), sched_start; DEF_TIMERS; sched_start = now; if (sched_timeout == 0) { sched_timeout = slurm_get_msg_timeout() / 2; sched_timeout = MAX(sched_timeout, 1); sched_timeout = MIN(sched_timeout, 10); } START_TIMER; if (sched_update != slurmctld_conf.last_update) { char *sched_params, *tmp_ptr; char *sched_type = slurm_get_sched_type(); #ifdef HAVE_BG /* On BlueGene, do FIFO only with sched/backfill */ if (strcmp(sched_type, "sched/backfill") == 0) backfill_sched = true; #endif /* Disable avoiding of fragmentation with sched/wiki */ if ((strcmp(sched_type, "sched/wiki") == 0) || (strcmp(sched_type, "sched/wiki2") == 0)) wiki_sched = true; xfree(sched_type); sched_params = slurm_get_sched_params(); if (sched_params && (tmp_ptr = strstr(sched_params, "default_queue_depth="))) { /* 01234567890123456789 */ i = atoi(tmp_ptr + 20); if (i < 0) { error("ignoring SchedulerParameters: " "default_queue_depth value of %d", i); } else { def_job_limit = i; } } xfree(sched_params); sched_update = slurmctld_conf.last_update; } if (job_limit == 0) job_limit = def_job_limit; lock_slurmctld(job_write_lock); if (!avail_front_end()) { unlock_slurmctld(job_write_lock); debug("sched: schedule() returning, no front end nodes are " "available"); return SLURM_SUCCESS; } /* Avoid resource fragmentation if important */ if ((!wiki_sched) && job_is_completing()) { unlock_slurmctld(job_write_lock); debug("sched: schedule() returning, some job is still " "completing"); return SLURM_SUCCESS; } #ifdef HAVE_CRAY /* * Run a Basil Inventory immediately before scheduling, to avoid * race conditions caused by ALPS node state change (caused e.g. * by the node health checker). * This relies on the above write lock for the node state. */ if (select_g_reconfigure()) { unlock_slurmctld(job_write_lock); debug4("sched: not scheduling due to ALPS"); return SLURM_SUCCESS; } #endif failed_parts = xmalloc(sizeof(struct part_record *) * list_count(part_list)); save_avail_node_bitmap = bit_copy(avail_node_bitmap); debug("sched: Running job scheduler"); job_queue = build_job_queue(false); while ((job_queue_rec = list_pop_bottom(job_queue, sort_job_queue2))) { job_ptr = job_queue_rec->job_ptr; part_ptr = job_queue_rec->part_ptr; xfree(job_queue_rec); if ((time(NULL) - sched_start) >= sched_timeout) { debug("sched: loop taking too long, breaking out"); break; } if (job_depth++ > job_limit) { debug3("sched: already tested %u jobs, breaking out", job_depth); break; } if (!IS_JOB_PENDING(job_ptr)) continue; /* started in other partition */ if (job_ptr->priority == 0) { /* held */ debug3("sched: JobId=%u. State=%s. Reason=%s. " "Priority=%u.", job_ptr->job_id, job_state_string(job_ptr->job_state), job_reason_string(job_ptr->state_reason), job_ptr->priority); continue; } /* If a patition update has occurred, then do a limit check. */ if (save_last_part_update != last_part_update) { int fail_reason = job_limits_check(&job_ptr); if (fail_reason != WAIT_NO_REASON) { job_ptr->state_reason = fail_reason; job_ptr->priority = 1; continue; } } else if ((job_ptr->state_reason == WAIT_PART_TIME_LIMIT) || (job_ptr->state_reason == WAIT_PART_NODE_LIMIT)) { job_ptr->start_time = 0; job_ptr->priority = 1; continue; } if (job_ptr->part_ptr != part_ptr) { /* Cycle through partitions usable for this job */ job_ptr->part_ptr = part_ptr; } if ((job_ptr->resv_name == NULL) && _failed_partition(job_ptr->part_ptr, failed_parts, failed_part_cnt)) { if (job_ptr->priority != 1) { /* not system hold */ job_ptr->state_reason = WAIT_PRIORITY; xfree(job_ptr->state_desc); } debug3("sched: JobId=%u. State=%s. Reason=%s. " "Priority=%u. Partition=%s.", job_ptr->job_id, job_state_string(job_ptr->job_state), job_reason_string(job_ptr->state_reason), job_ptr->priority, job_ptr->partition); continue; } if (bit_overlap(avail_node_bitmap, job_ptr->part_ptr->node_bitmap) == 0) { /* All nodes DRAIN, DOWN, or * reserved for jobs in higher priority partition */ job_ptr->state_reason = WAIT_RESOURCES; debug3("sched: JobId=%u. State=%s. Reason=%s. " "Priority=%u. Partition=%s.", job_ptr->job_id, job_state_string(job_ptr->job_state), job_reason_string(job_ptr->state_reason), job_ptr->priority, job_ptr->partition); continue; } if (license_job_test(job_ptr, time(NULL)) != SLURM_SUCCESS) { job_ptr->state_reason = WAIT_LICENSES; xfree(job_ptr->state_desc); debug3("sched: JobId=%u. State=%s. Reason=%s. " "Priority=%u.", job_ptr->job_id, job_state_string(job_ptr->job_state), job_reason_string(job_ptr->state_reason), job_ptr->priority); continue; } if (assoc_mgr_validate_assoc_id(acct_db_conn, job_ptr->assoc_id, accounting_enforce)) { /* NOTE: This only happens if a user's account is * disabled between when the job was submitted and * the time we consider running it. It should be * very rare. */ info("sched: JobId=%u has invalid account", job_ptr->job_id); last_job_update = time(NULL); job_ptr->job_state = JOB_FAILED; job_ptr->exit_code = 1; job_ptr->state_reason = FAIL_ACCOUNT; xfree(job_ptr->state_desc); job_ptr->start_time = job_ptr->end_time = time(NULL); job_completion_logger(job_ptr, false); delete_job_details(job_ptr); continue; } error_code = select_nodes(job_ptr, false, NULL); if (error_code == ESLURM_NODES_BUSY) { debug3("sched: JobId=%u. State=%s. Reason=%s. " "Priority=%u. Partition=%s.", job_ptr->job_id, job_state_string(job_ptr->job_state), job_reason_string(job_ptr->state_reason), job_ptr->priority, job_ptr->partition); bool fail_by_part = true; #ifdef HAVE_BG /* When we use static or overlap partitioning on * BlueGene, each job can possibly be scheduled * independently, without impacting other jobs of * different sizes. Therefore we sort and try to * schedule every pending job unless the backfill * scheduler is configured. */ if (!backfill_sched) fail_by_part = false; #endif if (fail_by_part) { /* do not schedule more jobs in this partition * or on nodes in this partition */ failed_parts[failed_part_cnt++] = job_ptr->part_ptr; bit_not(job_ptr->part_ptr->node_bitmap); bit_and(avail_node_bitmap, job_ptr->part_ptr->node_bitmap); bit_not(job_ptr->part_ptr->node_bitmap); } } else if (error_code == ESLURM_RESERVATION_NOT_USABLE) { if (job_ptr->resv_ptr && job_ptr->resv_ptr->node_bitmap) { debug3("sched: JobId=%u. State=%s. " "Reason=%s. Priority=%u.", job_ptr->job_id, job_state_string(job_ptr->job_state), job_reason_string(job_ptr-> state_reason), job_ptr->priority); bit_not(job_ptr->resv_ptr->node_bitmap); bit_and(avail_node_bitmap, job_ptr->resv_ptr->node_bitmap); bit_not(job_ptr->resv_ptr->node_bitmap); } else { /* The job has no reservation but requires * nodes that are currently in some reservation * so just skip over this job and try running * the next lower priority job */ debug3("sched: JobId=%u State=%s. " "Reason=Required nodes are reserved." "Priority=%u",job_ptr->job_id, job_state_string(job_ptr->job_state), job_ptr->priority); } } else if (error_code == SLURM_SUCCESS) { /* job initiated */ debug3("sched: JobId=%u initiated", job_ptr->job_id); last_job_update = now; #ifdef HAVE_BG select_g_select_jobinfo_get(job_ptr->select_jobinfo, SELECT_JOBDATA_IONODES, &ionodes); if(ionodes) { sprintf(tmp_char,"%s[%s]", job_ptr->nodes, ionodes); } else { sprintf(tmp_char,"%s",job_ptr->nodes); } info("sched: Allocate JobId=%u BPList=%s", job_ptr->job_id, tmp_char); xfree(ionodes); #else info("sched: Allocate JobId=%u NodeList=%s #CPUs=%u", job_ptr->job_id, job_ptr->nodes, job_ptr->total_cpus); #endif if (job_ptr->batch_flag == 0) srun_allocate(job_ptr->job_id); else if (job_ptr->details->prolog_running == 0) launch_job(job_ptr); rebuild_job_part_list(job_ptr); job_cnt++; } else if ((error_code != ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE) && (error_code != ESLURM_NODE_NOT_AVAIL) && (error_code != ESLURM_ACCOUNTING_POLICY)) { info("sched: schedule: JobId=%u non-runnable: %s", job_ptr->job_id, slurm_strerror(error_code)); if (!wiki_sched) { last_job_update = now; job_ptr->job_state = JOB_FAILED; job_ptr->exit_code = 1; job_ptr->state_reason = FAIL_BAD_CONSTRAINTS; xfree(job_ptr->state_desc); job_ptr->start_time = job_ptr->end_time = now; job_completion_logger(job_ptr, false); delete_job_details(job_ptr); } } } save_last_part_update = last_part_update; FREE_NULL_BITMAP(avail_node_bitmap); avail_node_bitmap = save_avail_node_bitmap; xfree(failed_parts); list_destroy(job_queue); unlock_slurmctld(job_write_lock); END_TIMER2("schedule"); return job_cnt; } /* * sort_job_queue - sort job_queue in decending priority order * IN/OUT job_queue - sorted job queue */ extern void sort_job_queue(List job_queue) { list_sort(job_queue, sort_job_queue2); } /* Note this differs from the ListCmpF typedef since we want jobs sorted * in order of decreasing priority */ extern int sort_job_queue2(void *x, void *y) { job_queue_rec_t *job_rec1 = (job_queue_rec_t *) x; job_queue_rec_t *job_rec2 = (job_queue_rec_t *) y; bool has_resv1, has_resv2; if (slurm_job_preempt_check(job_rec1, job_rec2)) return -1; if (slurm_job_preempt_check(job_rec2, job_rec1)) return 1; has_resv1 = (job_rec1->job_ptr->resv_id != 0); has_resv2 = (job_rec2->job_ptr->resv_id != 0); if (has_resv1 && !has_resv2) return -1; if (!has_resv1 && has_resv2) return 1; if (job_rec1->job_ptr->priority < job_rec2->job_ptr->priority) return 1; if (job_rec1->job_ptr->priority > job_rec2->job_ptr->priority) return -1; return 0; } /* * launch_job - send an RPC to a slurmd to initiate a batch job * IN job_ptr - pointer to job that will be initiated */ extern void launch_job(struct job_record *job_ptr) { batch_job_launch_msg_t *launch_msg_ptr; agent_arg_t *agent_arg_ptr; /* Initialization of data structures */ launch_msg_ptr = (batch_job_launch_msg_t *) xmalloc(sizeof(batch_job_launch_msg_t)); launch_msg_ptr->job_id = job_ptr->job_id; launch_msg_ptr->step_id = NO_VAL; launch_msg_ptr->uid = job_ptr->user_id; launch_msg_ptr->gid = job_ptr->group_id; launch_msg_ptr->ntasks = job_ptr->details->num_tasks; launch_msg_ptr->nodes = xstrdup(job_ptr->nodes); launch_msg_ptr->overcommit = job_ptr->details->overcommit; launch_msg_ptr->open_mode = job_ptr->details->open_mode; launch_msg_ptr->acctg_freq = job_ptr->details->acctg_freq; launch_msg_ptr->cpus_per_task = job_ptr->details->cpus_per_task; launch_msg_ptr->pn_min_memory = job_ptr->details->pn_min_memory; launch_msg_ptr->restart_cnt = job_ptr->restart_cnt; if (make_batch_job_cred(launch_msg_ptr, job_ptr)) { error("aborting batch job %u", job_ptr->job_id); /* FIXME: This is a kludge, but this event indicates a serious * problem with OpenSSH and should never happen. We are * too deep into the job launch to gracefully clean up. */ job_ptr->end_time = time(NULL); job_ptr->time_limit = 0; xfree(launch_msg_ptr->nodes); xfree(launch_msg_ptr); return; } launch_msg_ptr->std_err = xstrdup(job_ptr->details->std_err); launch_msg_ptr->std_in = xstrdup(job_ptr->details->std_in); launch_msg_ptr->std_out = xstrdup(job_ptr->details->std_out); launch_msg_ptr->work_dir = xstrdup(job_ptr->details->work_dir); launch_msg_ptr->ckpt_dir = xstrdup(job_ptr->details->ckpt_dir); launch_msg_ptr->restart_dir = xstrdup(job_ptr->details->restart_dir); launch_msg_ptr->argc = job_ptr->details->argc; launch_msg_ptr->argv = xduparray(job_ptr->details->argc, job_ptr->details->argv); launch_msg_ptr->spank_job_env_size = job_ptr->spank_job_env_size; launch_msg_ptr->spank_job_env = xduparray(job_ptr->spank_job_env_size, job_ptr->spank_job_env); launch_msg_ptr->script = get_job_script(job_ptr); launch_msg_ptr->environment = get_job_env(job_ptr, &launch_msg_ptr->envc); launch_msg_ptr->job_mem = job_ptr->details->pn_min_memory; launch_msg_ptr->num_cpu_groups = job_ptr->job_resrcs->cpu_array_cnt; launch_msg_ptr->cpus_per_node = xmalloc(sizeof(uint16_t) * job_ptr->job_resrcs->cpu_array_cnt); memcpy(launch_msg_ptr->cpus_per_node, job_ptr->job_resrcs->cpu_array_value, (sizeof(uint16_t) * job_ptr->job_resrcs->cpu_array_cnt)); launch_msg_ptr->cpu_count_reps = xmalloc(sizeof(uint32_t) * job_ptr->job_resrcs->cpu_array_cnt); memcpy(launch_msg_ptr->cpu_count_reps, job_ptr->job_resrcs->cpu_array_reps, (sizeof(uint32_t) * job_ptr->job_resrcs->cpu_array_cnt)); launch_msg_ptr->select_jobinfo = select_g_select_jobinfo_copy( job_ptr->select_jobinfo); agent_arg_ptr = (agent_arg_t *) xmalloc(sizeof(agent_arg_t)); agent_arg_ptr->node_count = 1; agent_arg_ptr->retry = 0; xassert(job_ptr->batch_host); agent_arg_ptr->hostlist = hostlist_create(job_ptr->batch_host); agent_arg_ptr->msg_type = REQUEST_BATCH_JOB_LAUNCH; agent_arg_ptr->msg_args = (void *) launch_msg_ptr; /* Launch the RPC via agent */ agent_queue_request(agent_arg_ptr); } /* * make_batch_job_cred - add a job credential to the batch_job_launch_msg * IN/OUT launch_msg_ptr - batch_job_launch_msg in which job_id, step_id, * uid and nodes have already been set * IN job_ptr - pointer to job record * RET 0 or error code */ extern int make_batch_job_cred(batch_job_launch_msg_t *launch_msg_ptr, struct job_record *job_ptr) { slurm_cred_arg_t cred_arg; job_resources_t *job_resrcs_ptr; xassert(job_ptr->job_resrcs); job_resrcs_ptr = job_ptr->job_resrcs; memset(&cred_arg, 0, sizeof(slurm_cred_arg_t)); cred_arg.jobid = launch_msg_ptr->job_id; cred_arg.stepid = launch_msg_ptr->step_id; cred_arg.uid = launch_msg_ptr->uid; cred_arg.job_hostlist = job_resrcs_ptr->nodes; cred_arg.job_core_bitmap = job_resrcs_ptr->core_bitmap; cred_arg.job_mem_limit = job_ptr->details->pn_min_memory; cred_arg.job_nhosts = job_resrcs_ptr->nhosts; cred_arg.job_gres_list = job_ptr->gres_list; /* cred_arg.step_gres_list = NULL; */ #ifdef HAVE_FRONT_END xassert(job_ptr->batch_host); cred_arg.step_hostlist = job_ptr->batch_host; #else cred_arg.step_hostlist = launch_msg_ptr->nodes; #endif cred_arg.step_core_bitmap = job_resrcs_ptr->core_bitmap; cred_arg.step_mem_limit = job_ptr->details->pn_min_memory; cred_arg.cores_per_socket = job_resrcs_ptr->cores_per_socket; cred_arg.sockets_per_node = job_resrcs_ptr->sockets_per_node; cred_arg.sock_core_rep_count = job_resrcs_ptr->sock_core_rep_count; launch_msg_ptr->cred = slurm_cred_create(slurmctld_config.cred_ctx, &cred_arg); if (launch_msg_ptr->cred) return SLURM_SUCCESS; error("slurm_cred_create failure for batch job %u", cred_arg.jobid); return SLURM_ERROR; } static void _depend_list_del(void *dep_ptr) { xfree(dep_ptr); } /* Print a job's dependency information based upon job_ptr->depend_list */ extern void print_job_dependency(struct job_record *job_ptr) { ListIterator depend_iter; struct depend_spec *dep_ptr; char *dep_str; info("Dependency information for job %u", job_ptr->job_id); if ((job_ptr->details == NULL) || (job_ptr->details->depend_list == NULL)) return; depend_iter = list_iterator_create(job_ptr->details->depend_list); if (!depend_iter) fatal("list_iterator_create memory allocation failure"); while ((dep_ptr = list_next(depend_iter))) { if (dep_ptr->depend_type == SLURM_DEPEND_SINGLETON) { info(" singleton"); continue; } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER) dep_str = "after"; else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_ANY) dep_str = "afterany"; else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_NOT_OK) dep_str = "afternotok"; else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_OK) dep_str = "afterok"; else if (dep_ptr->depend_type == SLURM_DEPEND_EXPAND) dep_str = "expand"; else dep_str = "unknown"; info(" %s:%u", dep_str, dep_ptr->job_id); } list_iterator_destroy(depend_iter); } /* * Determine if a job's dependencies are met * RET: 0 = no dependencies * 1 = dependencies remain * 2 = failure (job completion code not per dependency), delete the job */ extern int test_job_dependency(struct job_record *job_ptr) { ListIterator depend_iter, job_iterator; struct depend_spec *dep_ptr; bool failure = false, depends = false, expands = false; List job_queue = NULL; bool run_now; int count = 0; struct job_record *qjob_ptr; if ((job_ptr->details == NULL) || (job_ptr->details->depend_list == NULL)) return 0; count = list_count(job_ptr->details->depend_list); depend_iter = list_iterator_create(job_ptr->details->depend_list); if (!depend_iter) fatal("list_iterator_create memory allocation failure"); while ((dep_ptr = list_next(depend_iter))) { bool clear_dep = false; count--; if ((dep_ptr->depend_type == SLURM_DEPEND_SINGLETON) && job_ptr->name) { /* get user jobs with the same user and name */ job_queue = _build_user_job_list(job_ptr->user_id, job_ptr->name); run_now = true; job_iterator = list_iterator_create(job_queue); if (job_iterator == NULL) fatal("list_iterator_create malloc failure"); while ((qjob_ptr = (struct job_record *) list_next(job_iterator))) { /* already running/suspended job or previously * submitted pending job */ if (IS_JOB_RUNNING(qjob_ptr) || IS_JOB_SUSPENDED(qjob_ptr) || (IS_JOB_PENDING(qjob_ptr) && (qjob_ptr->job_id < job_ptr->job_id))) { run_now = false; break; } } list_iterator_destroy(job_iterator); list_destroy(job_queue); /* job can run now, delete dependency */ if (run_now) list_delete_item(depend_iter); else depends = true; } else if ((dep_ptr->job_ptr->magic != JOB_MAGIC) || (dep_ptr->job_ptr->job_id != dep_ptr->job_id)) { /* job is gone, dependency lifted */ clear_dep = true; } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER) { if (!IS_JOB_PENDING(dep_ptr->job_ptr)) { clear_dep = true; } else depends = true; } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_ANY) { if (IS_JOB_FINISHED(dep_ptr->job_ptr)) { clear_dep = true; } else depends = true; } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_NOT_OK) { if (!IS_JOB_FINISHED(dep_ptr->job_ptr)) depends = true; else if (!IS_JOB_COMPLETE(dep_ptr->job_ptr)) { clear_dep = true; } else { failure = true; break; } } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_OK) { if (!IS_JOB_FINISHED(dep_ptr->job_ptr)) depends = true; else if (IS_JOB_COMPLETE(dep_ptr->job_ptr)) { clear_dep = true; } else { failure = true; break; } } else if (dep_ptr->depend_type == SLURM_DEPEND_EXPAND) { time_t now = time(NULL); expands = true; if (IS_JOB_PENDING(dep_ptr->job_ptr)) { depends = true; } else if (IS_JOB_FINISHED(dep_ptr->job_ptr)) { failure = true; break; } else if ((dep_ptr->job_ptr->end_time != 0) && (dep_ptr->job_ptr->end_time > now)) { job_ptr->time_limit = dep_ptr->job_ptr-> end_time - now; job_ptr->time_limit /= 60; /* sec to min */ } if (job_ptr->details && dep_ptr->job_ptr->details) { job_ptr->details->shared = dep_ptr->job_ptr->details->shared; } } else failure = true; if (clear_dep) { char *rmv_dep = xstrdup_printf( ":%u", dep_ptr->job_ptr->job_id); xstrsubstitute(job_ptr->details->dependency, rmv_dep, ""); xfree(rmv_dep); list_delete_item(depend_iter); } } list_iterator_destroy(depend_iter); if (!depends && !expands && (count == 0)) xfree(job_ptr->details->dependency); if (failure) return 2; if (depends) return 1; return 0; } /* * Parse a job dependency string and use it to establish a "depend_spec" * list of dependencies. We accept both old format (a single job ID) and * new format (e.g. "afterok:123:124,after:128"). * IN job_ptr - job record to have dependency and depend_list updated * IN new_depend - new dependency description * RET returns an error code from slurm_errno.h */ extern int update_job_dependency(struct job_record *job_ptr, char *new_depend) { int rc = SLURM_SUCCESS; uint16_t depend_type = 0; uint32_t job_id = 0; char *tok = new_depend, *sep_ptr, *sep_ptr2; List new_depend_list = NULL; struct depend_spec *dep_ptr; struct job_record *dep_job_ptr; char dep_buf[32]; bool expand_cnt = 0; if (job_ptr->details == NULL) return EINVAL; /* Clear dependencies on NULL, "0", or empty dependency input */ job_ptr->details->expanding_jobid = 0; if ((new_depend == NULL) || (new_depend[0] == '\0') || ((new_depend[0] == '0') && (new_depend[1] == '\0'))) { xfree(job_ptr->details->dependency); if (job_ptr->details->depend_list) { list_destroy(job_ptr->details->depend_list); job_ptr->details->depend_list = NULL; } return rc; } new_depend_list = list_create(_depend_list_del); if (new_depend_list == NULL) fatal("list_create: malloc failure"); /* validate new dependency string */ while (rc == SLURM_SUCCESS) { /* test singleton dependency flag */ if ( strncasecmp(tok, "singleton", 9) == 0 ) { depend_type = SLURM_DEPEND_SINGLETON; dep_ptr = xmalloc(sizeof(struct depend_spec)); dep_ptr->depend_type = depend_type; /* dep_ptr->job_id = 0; set by xmalloc */ /* dep_ptr->job_ptr = NULL; set by xmalloc */ if (!list_append(new_depend_list, dep_ptr)) { fatal("list_append memory allocation " "failure for singleton"); } if ( *(tok + 9 ) == ',' ) { tok += 10; continue; } else break; } sep_ptr = strchr(tok, ':'); if ((sep_ptr == NULL) && (job_id == 0)) { job_id = strtol(tok, &sep_ptr, 10); if ((sep_ptr == NULL) || (sep_ptr[0] != '\0') || (job_id == 0) || (job_id == job_ptr->job_id)) { rc = ESLURM_DEPENDENCY; break; } /* old format, just a single job_id */ dep_job_ptr = find_job_record(job_id); if (!dep_job_ptr) /* assume already done */ break; snprintf(dep_buf, sizeof(dep_buf), "afterany:%u", job_id); new_depend = dep_buf; dep_ptr = xmalloc(sizeof(struct depend_spec)); dep_ptr->depend_type = SLURM_DEPEND_AFTER_ANY; dep_ptr->job_id = job_id; dep_ptr->job_ptr = dep_job_ptr; if (!list_append(new_depend_list, dep_ptr)) fatal("list_append memory allocation failure"); break; } else if (sep_ptr == NULL) { rc = ESLURM_DEPENDENCY; break; } if (strncasecmp(tok, "afternotok", 10) == 0) depend_type = SLURM_DEPEND_AFTER_NOT_OK; else if (strncasecmp(tok, "afterany", 8) == 0) depend_type = SLURM_DEPEND_AFTER_ANY; else if (strncasecmp(tok, "afterok", 7) == 0) depend_type = SLURM_DEPEND_AFTER_OK; else if (strncasecmp(tok, "after", 5) == 0) depend_type = SLURM_DEPEND_AFTER; else if (strncasecmp(tok, "expand", 6) == 0) { if (!select_g_job_expand_allow()) { rc = ESLURM_DEPENDENCY; break; } depend_type = SLURM_DEPEND_EXPAND; } else { rc = ESLURM_DEPENDENCY; break; } sep_ptr++; /* skip over ":" */ while (rc == SLURM_SUCCESS) { job_id = strtol(sep_ptr, &sep_ptr2, 10); if ((sep_ptr2 == NULL) || (job_id == 0) || (job_id == job_ptr->job_id) || ((sep_ptr2[0] != '\0') && (sep_ptr2[0] != ',') && (sep_ptr2[0] != ':'))) { rc = ESLURM_DEPENDENCY; break; } dep_job_ptr = find_job_record(job_id); if ((depend_type == SLURM_DEPEND_EXPAND) && ((expand_cnt++ > 0) || (dep_job_ptr == NULL) || (!IS_JOB_RUNNING(dep_job_ptr)) || (dep_job_ptr->qos_id != job_ptr->qos_id) || (dep_job_ptr->part_ptr == NULL) || (job_ptr->part_ptr == NULL) || (dep_job_ptr->part_ptr != job_ptr->part_ptr))) { /* Expand only jobs in the same QOS and * and partition */ rc = ESLURM_DEPENDENCY; break; } if (depend_type == SLURM_DEPEND_EXPAND) { job_ptr->details->expanding_jobid = job_id; /* GRES configuration of this job must match * the job being expanded */ xfree(job_ptr->gres); job_ptr->gres = xstrdup(dep_job_ptr->gres); if (job_ptr->gres_list) list_destroy(job_ptr->gres_list); gres_plugin_job_state_validate(job_ptr->gres, &job_ptr->gres_list); } if (dep_job_ptr) { /* job still active */ dep_ptr = xmalloc(sizeof(struct depend_spec)); dep_ptr->depend_type = depend_type; dep_ptr->job_id = job_id; dep_ptr->job_ptr = dep_job_ptr; if (!list_append(new_depend_list, dep_ptr)) { fatal("list_append memory allocation " "failure"); } } if (sep_ptr2[0] != ':') break; sep_ptr = sep_ptr2 + 1; /* skip over ":" */ } if (sep_ptr2[0] == ',') tok = sep_ptr2 + 1; else break; } if (rc == SLURM_SUCCESS) { /* test for circular dependencies (e.g. A -> B -> A) */ if (_scan_depend(new_depend_list, job_ptr->job_id)) rc = ESLURM_CIRCULAR_DEPENDENCY; } if (rc == SLURM_SUCCESS) { xfree(job_ptr->details->dependency); job_ptr->details->dependency = xstrdup(new_depend); if (job_ptr->details->depend_list) list_destroy(job_ptr->details->depend_list); job_ptr->details->depend_list = new_depend_list; #if _DEBUG print_job_dependency(job_ptr); #endif } else { list_destroy(new_depend_list); } return rc; } /* Return TRUE if job_id is found in dependency_list. * Execute recursively for each dependent job */ static bool _scan_depend(List dependency_list, uint32_t job_id) { bool rc = false; ListIterator iter; struct depend_spec *dep_ptr; xassert(job_id); xassert(dependency_list); iter = list_iterator_create(dependency_list); if (iter == NULL) fatal("list_iterator_create malloc failure"); while (!rc && (dep_ptr = (struct depend_spec *) list_next(iter))) { if (dep_ptr->job_id == 0) /* Singleton */ continue; if (dep_ptr->job_id == job_id) rc = true; else if (dep_ptr->job_ptr->details && dep_ptr->job_ptr->details->depend_list) { rc = _scan_depend(dep_ptr->job_ptr->details-> depend_list, job_id); if (rc) { info("circular dependency: job %u is dependent " "upon job %u", dep_ptr->job_id, job_id); } } } list_iterator_destroy(iter); return rc; } static void _pre_list_del(void *x) { xfree(x); } /* If there are higher priority queued jobs in this job's partition, then * delay the job's expected initiation time as needed to run those jobs. * NOTE: This is only a rough estimate of the job's start time as it ignores * job dependencies, feature requirements, specific node requirements, etc. */ static void _delayed_job_start_time(struct job_record *job_ptr) { uint32_t part_node_cnt, part_cpu_cnt, part_cpus_per_node; uint32_t job_size_cpus, job_size_nodes, job_time; uint64_t cume_space_time = 0; struct job_record *job_q_ptr; ListIterator job_iterator; if (job_ptr->part_ptr == NULL) return; part_node_cnt = job_ptr->part_ptr->total_nodes; part_cpu_cnt = job_ptr->part_ptr->total_cpus; if (part_node_cnt > part_cpu_cnt) part_cpus_per_node = part_node_cnt / part_cpu_cnt; else part_cpus_per_node = 1; job_iterator = list_iterator_create(job_list); if (job_iterator == NULL) fatal("list_iterator_create memory allocation failure"); while ((job_q_ptr = (struct job_record *) list_next(job_iterator))) { if (!IS_JOB_PENDING(job_q_ptr) || !job_q_ptr->details || (job_q_ptr->part_ptr != job_ptr->part_ptr) || (job_q_ptr->priority < job_ptr->priority)) continue; if (job_q_ptr->details->min_nodes == NO_VAL) job_size_nodes = 1; else job_size_nodes = job_q_ptr->details->min_nodes; if (job_q_ptr->details->min_cpus == NO_VAL) job_size_cpus = 1; else job_size_cpus = job_q_ptr->details->min_nodes; job_size_cpus = MAX(job_size_cpus, (job_size_nodes * part_cpus_per_node)); if (job_ptr->time_limit == NO_VAL) job_time = job_q_ptr->part_ptr->max_time; else job_time = job_q_ptr->time_limit; cume_space_time += job_size_cpus * job_time; } list_iterator_destroy(job_iterator); cume_space_time /= part_cpu_cnt;/* Factor out size */ cume_space_time *= 60; /* Minutes to seconds */ debug2("Increasing estimated start of job %u by %"PRIu64" secs", job_ptr->job_id, cume_space_time); job_ptr->start_time += cume_space_time; } /* Determine if a pending job will run using only the specified nodes * (in job_desc_msg->req_nodes), build response message and return * SLURM_SUCCESS on success. Otherwise return an error code. Caller * must free response message */ extern int job_start_data(job_desc_msg_t *job_desc_msg, will_run_response_msg_t **resp) { struct job_record *job_ptr; struct part_record *part_ptr; bitstr_t *avail_bitmap = NULL, *resv_bitmap = NULL; uint32_t min_nodes, max_nodes, req_nodes; int i, rc = SLURM_SUCCESS; time_t now = time(NULL), start_res, orig_start_time = (time_t) 0; List preemptee_candidates = NULL, preemptee_job_list = NULL; job_ptr = find_job_record(job_desc_msg->job_id); if (job_ptr == NULL) return ESLURM_INVALID_JOB_ID; part_ptr = job_ptr->part_ptr; if (part_ptr == NULL) return ESLURM_INVALID_PARTITION_NAME; if ((job_ptr->details == NULL) || (!IS_JOB_PENDING(job_ptr))) return ESLURM_DISABLED; if ((job_desc_msg->req_nodes == NULL) || (job_desc_msg->req_nodes == '\0')) { /* assume all nodes available to job for testing */ avail_bitmap = bit_alloc(node_record_count); bit_nset(avail_bitmap, 0, (node_record_count - 1)); } else if (node_name2bitmap(job_desc_msg->req_nodes, false, &avail_bitmap) != 0) { return ESLURM_INVALID_NODE_NAME; } /* Consider only nodes in this job's partition */ if (part_ptr->node_bitmap) bit_and(avail_bitmap, part_ptr->node_bitmap); else rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE; if (job_req_node_filter(job_ptr, avail_bitmap)) rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE; if (job_ptr->details->exc_node_bitmap) { bitstr_t *exc_node_mask = NULL; exc_node_mask = bit_copy(job_ptr->details->exc_node_bitmap); if (exc_node_mask == NULL) fatal("bit_copy malloc failure"); bit_not(exc_node_mask); bit_and(avail_bitmap, exc_node_mask); FREE_NULL_BITMAP(exc_node_mask); } if (job_ptr->details->req_node_bitmap) { if (!bit_super_set(job_ptr->details->req_node_bitmap, avail_bitmap)) { rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE; } } /* Enforce reservation: access control, time and nodes */ if (job_ptr->details->begin_time) start_res = job_ptr->details->begin_time; else start_res = now; i = job_test_resv(job_ptr, &start_res, false, &resv_bitmap); if (i != SLURM_SUCCESS) return i; bit_and(avail_bitmap, resv_bitmap); FREE_NULL_BITMAP(resv_bitmap); /* Only consider nodes that are not DOWN or DRAINED */ bit_and(avail_bitmap, avail_node_bitmap); if (rc == SLURM_SUCCESS) { min_nodes = MAX(job_ptr->details->min_nodes, part_ptr->min_nodes); if (job_ptr->details->max_nodes == 0) max_nodes = part_ptr->max_nodes; else max_nodes = MIN(job_ptr->details->max_nodes, part_ptr->max_nodes); max_nodes = MIN(max_nodes, 500000); /* prevent overflows */ if (!job_ptr->limit_set_max_nodes && job_ptr->details->max_nodes) req_nodes = max_nodes; else req_nodes = min_nodes; preemptee_candidates = slurm_find_preemptable_jobs(job_ptr); /* The orig_start is based upon the backfill scheduler data * and considers all higher priority jobs. The logic below * only considers currently running jobs, so the expected * start time will almost certainly be earlier and not as * accurate, but this algorithm is much faster. */ orig_start_time = job_ptr->start_time; rc = select_g_job_test(job_ptr, avail_bitmap, min_nodes, max_nodes, req_nodes, SELECT_MODE_WILL_RUN, preemptee_candidates, &preemptee_job_list); } if (rc == SLURM_SUCCESS) { will_run_response_msg_t *resp_data; resp_data = xmalloc(sizeof(will_run_response_msg_t)); resp_data->job_id = job_ptr->job_id; #ifdef HAVE_BG select_g_select_jobinfo_get(job_ptr->select_jobinfo, SELECT_JOBDATA_NODE_CNT, &resp_data->proc_cnt); #else resp_data->proc_cnt = job_ptr->total_cpus; #endif _delayed_job_start_time(job_ptr); resp_data->start_time = MAX(job_ptr->start_time, orig_start_time); resp_data->start_time = MAX(resp_data->start_time, start_res); job_ptr->start_time = 0; /* restore pending job start time */ resp_data->node_list = bitmap2node_name(avail_bitmap); if (preemptee_job_list) { ListIterator preemptee_iterator; uint32_t *preemptee_jid; struct job_record *tmp_job_ptr; resp_data->preemptee_job_id=list_create(_pre_list_del); if (resp_data->preemptee_job_id == NULL) fatal("list_create: malloc failure"); preemptee_iterator = list_iterator_create( preemptee_job_list); while ((tmp_job_ptr = (struct job_record *) list_next(preemptee_iterator))) { preemptee_jid = xmalloc(sizeof(uint32_t)); (*preemptee_jid) = tmp_job_ptr->job_id; list_append(resp_data->preemptee_job_id, preemptee_jid); } list_iterator_destroy(preemptee_iterator); } *resp = resp_data; } else { rc = ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE; } if (preemptee_candidates) list_destroy(preemptee_candidates); if (preemptee_job_list) list_destroy(preemptee_job_list); FREE_NULL_BITMAP(avail_bitmap); return rc; } /* * epilog_slurmctld - execute the epilog_slurmctld for a job that has just * terminated. * IN job_ptr - pointer to job that has been terminated * RET SLURM_SUCCESS(0) or error code */ extern int epilog_slurmctld(struct job_record *job_ptr) { int rc; pthread_t thread_id_epilog; pthread_attr_t thread_attr_epilog; if ((slurmctld_conf.epilog_slurmctld == NULL) || (slurmctld_conf.epilog_slurmctld[0] == '\0')) return SLURM_SUCCESS; if (access(slurmctld_conf.epilog_slurmctld, X_OK) < 0) { error("Invalid EpilogSlurmctld: %m"); return errno; } slurm_attr_init(&thread_attr_epilog); pthread_attr_setdetachstate(&thread_attr_epilog, PTHREAD_CREATE_DETACHED); while (1) { rc = pthread_create(&thread_id_epilog, &thread_attr_epilog, _run_epilog, (void *) job_ptr); if (rc == 0) { slurm_attr_destroy(&thread_attr_epilog); return SLURM_SUCCESS; } if (errno == EAGAIN) continue; error("pthread_create: %m"); slurm_attr_destroy(&thread_attr_epilog); return errno; } } static char **_build_env(struct job_record *job_ptr) { char **my_env, *name; my_env = xmalloc(sizeof(char *)); my_env[0] = NULL; /* Set SPANK env vars first so that we can overrite as needed * below. Prevent user hacking from setting SLURM_JOB_ID etc. */ if (job_ptr->spank_job_env_size) { env_array_merge(&my_env, (const char **) job_ptr->spank_job_env); } #ifdef HAVE_BG select_g_select_jobinfo_get(job_ptr->select_jobinfo, SELECT_JOBDATA_BLOCK_ID, &name); setenvf(&my_env, "MPIRUN_PARTITION", "%s", name); # ifdef HAVE_BGP { uint16_t conn_type = (uint16_t)NO_VAL; select_g_select_jobinfo_get(job_ptr->select_jobinfo, SELECT_JOBDATA_CONN_TYPE, &conn_type); if (conn_type > SELECT_SMALL) { /* SUBMIT_POOL over rides HTC_SUBMIT_POOL */ setenvf(&my_env, "SUBMIT_POOL", "%s", name); } } # endif xfree(name); #elif defined HAVE_CRAY name = select_g_select_jobinfo_xstrdup(job_ptr->select_jobinfo, SELECT_PRINT_RESV_ID); setenvf(&my_env, "BASIL_RESERVATION_ID", "%s", name); xfree(name); #endif setenvf(&my_env, "SLURM_JOB_ACCOUNT", "%s", job_ptr->account); if (job_ptr->details) { setenvf(&my_env, "SLURM_JOB_CONSTRAINTS", "%s", job_ptr->details->features); } setenvf(&my_env, "SLURM_JOB_DERIVED_EC", "%u", job_ptr->derived_ec); setenvf(&my_env, "SLURM_JOB_EXIT_CODE", "%u", job_ptr->exit_code); setenvf(&my_env, "SLURM_JOB_GID", "%u", job_ptr->group_id); name = gid_to_string((uid_t) job_ptr->group_id); setenvf(&my_env, "SLURM_JOB_GROUP", "%s", name); xfree(name); setenvf(&my_env, "SLURM_JOBID", "%u", job_ptr->job_id); setenvf(&my_env, "SLURM_JOB_ID", "%u", job_ptr->job_id); setenvf(&my_env, "SLURM_JOB_NAME", "%s", job_ptr->name); setenvf(&my_env, "SLURM_JOB_NODELIST", "%s", job_ptr->nodes); setenvf(&my_env, "SLURM_JOB_PARTITION", "%s", job_ptr->partition); setenvf(&my_env, "SLURM_JOB_UID", "%u", job_ptr->user_id); name = uid_to_string((uid_t) job_ptr->user_id); setenvf(&my_env, "SLURM_JOB_USER", "%s", name); xfree(name); return my_env; } static void *_run_epilog(void *arg) { struct job_record *job_ptr = (struct job_record *) arg; uint32_t job_id; pid_t cpid; int i, status, wait_rc; char *argv[2], **my_env; /* Locks: Read config, job */ slurmctld_lock_t config_read_lock = { READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; lock_slurmctld(config_read_lock); argv[0] = xstrdup(slurmctld_conf.epilog_slurmctld); argv[1] = NULL; my_env = _build_env(job_ptr); job_id = job_ptr->job_id; unlock_slurmctld(config_read_lock); if ((cpid = fork()) < 0) { error("epilog_slurmctld fork error: %m"); goto fini; } if (cpid == 0) { #ifdef SETPGRP_TWO_ARGS setpgrp(0, 0); #else setpgrp(); #endif execve(argv[0], argv, my_env); exit(127); } while (1) { wait_rc = waitpid(cpid, &status, 0); if (wait_rc < 0) { if (errno == EINTR) continue; error("epilog_slurmctld waitpid error: %m"); break; } else if (wait_rc > 0) { killpg(cpid, SIGKILL); /* kill children too */ break; } } if (status != 0) { error("epilog_slurmctld job %u epilog exit status %u:%u", job_id, WEXITSTATUS(status), WTERMSIG(status)); } else debug2("epilog_slurmctld job %u epilog completed", job_id); fini: xfree(argv[0]); for (i=0; my_env[i]; i++) xfree(my_env[i]); xfree(my_env); return NULL; } /* * prolog_slurmctld - execute the prolog_slurmctld for a job that has just * been allocated resources. * IN job_ptr - pointer to job that will be initiated * RET SLURM_SUCCESS(0) or error code */ extern int prolog_slurmctld(struct job_record *job_ptr) { int rc; pthread_t thread_id_prolog; pthread_attr_t thread_attr_prolog; if ((slurmctld_conf.prolog_slurmctld == NULL) || (slurmctld_conf.prolog_slurmctld[0] == '\0')) return SLURM_SUCCESS; if (access(slurmctld_conf.prolog_slurmctld, X_OK) < 0) { error("Invalid PrologSlurmctld: %m"); return errno; } if (job_ptr->details) job_ptr->details->prolog_running = 1; slurm_attr_init(&thread_attr_prolog); pthread_attr_setdetachstate(&thread_attr_prolog, PTHREAD_CREATE_DETACHED); while (1) { rc = pthread_create(&thread_id_prolog, &thread_attr_prolog, _run_prolog, (void *) job_ptr); if (rc == 0) { slurm_attr_destroy(&thread_attr_prolog); return SLURM_SUCCESS; } if (errno == EAGAIN) continue; error("pthread_create: %m"); slurm_attr_destroy(&thread_attr_prolog); return errno; } } static void *_run_prolog(void *arg) { struct job_record *job_ptr = (struct job_record *) arg; uint32_t job_id; pid_t cpid; int i, rc, status, wait_rc; char *argv[2], **my_env; /* Locks: Read config, job; Write nodes */ slurmctld_lock_t config_read_lock = { READ_LOCK, READ_LOCK, WRITE_LOCK, NO_LOCK }; bitstr_t *node_bitmap = NULL; static int last_job_requeue = 0; lock_slurmctld(config_read_lock); argv[0] = xstrdup(slurmctld_conf.prolog_slurmctld); argv[1] = NULL; my_env = _build_env(job_ptr); job_id = job_ptr->job_id; if (job_ptr->node_bitmap) { node_bitmap = bit_copy(job_ptr->node_bitmap); for (i=0; i<node_record_count; i++) { if (bit_test(node_bitmap, i) == 0) continue; node_record_table_ptr[i].node_state |= NODE_STATE_POWER_UP; } } unlock_slurmctld(config_read_lock); if ((cpid = fork()) < 0) { error("prolog_slurmctld fork error: %m"); goto fini; } if (cpid == 0) { #ifdef SETPGRP_TWO_ARGS setpgrp(0, 0); #else setpgrp(); #endif execve(argv[0], argv, my_env); exit(127); } while (1) { wait_rc = waitpid(cpid, &status, 0); if (wait_rc < 0) { if (errno == EINTR) continue; error("prolog_slurmctld waitpid error: %m"); break; } else if (wait_rc > 0) { killpg(cpid, SIGKILL); /* kill children too */ break; } } if (status != 0) { bool kill_job = false; slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; error("prolog_slurmctld job %u prolog exit status %u:%u", job_id, WEXITSTATUS(status), WTERMSIG(status)); lock_slurmctld(job_write_lock); if (last_job_requeue == job_id) { info("prolog_slurmctld failed again for job %u", job_id); kill_job = true; } else if ((rc = job_requeue(0, job_id, -1, (uint16_t)NO_VAL, false))) { info("unable to requeue job %u: %m", job_id); kill_job = true; } else last_job_requeue = job_id; if (kill_job) { srun_user_message(job_ptr, "PrologSlurmctld failed, job killed"); (void) job_signal(job_id, SIGKILL, 0, 0, false); } unlock_slurmctld(job_write_lock); } else debug2("prolog_slurmctld job %u prolog completed", job_id); fini: xfree(argv[0]); for (i=0; my_env[i]; i++) xfree(my_env[i]); xfree(my_env); lock_slurmctld(config_read_lock); if (job_ptr->job_id != job_id) { error("prolog_slurmctld job %u pointer invalid", job_id); job_ptr = find_job_record(job_id); if (job_ptr == NULL) error("prolog_slurmctld job %u now defunct", job_id); } if (job_ptr) { if (job_ptr->details) job_ptr->details->prolog_running = 0; if (job_ptr->batch_flag && (IS_JOB_RUNNING(job_ptr) || IS_JOB_SUSPENDED(job_ptr))) launch_job(job_ptr); } if (job_ptr && job_ptr->node_bitmap) { for (i=0; i<node_record_count; i++) { if (bit_test(job_ptr->node_bitmap, i) == 0) continue; node_record_table_ptr[i].node_state &= (~NODE_STATE_POWER_UP); } } else if (node_bitmap) { for (i=0; i<node_record_count; i++) { if (bit_test(node_bitmap, i) == 0) continue; node_record_table_ptr[i].node_state &= (~NODE_STATE_POWER_UP); } } unlock_slurmctld(config_read_lock); FREE_NULL_BITMAP(node_bitmap); return NULL; } /* * build_feature_list - Translate a job's feature string into a feature_list * IN details->features * OUT details->feature_list * RET error code */ extern int build_feature_list(struct job_record *job_ptr) { struct job_details *detail_ptr = job_ptr->details; char *tmp_requested, *str_ptr, *feature = NULL; int bracket = 0, count = 0, i; bool have_count = false, have_or = false; struct feature_record *feat; if (detail_ptr->features == NULL) /* no constraints */ return SLURM_SUCCESS; if (detail_ptr->feature_list) /* already processed */ return SLURM_SUCCESS; tmp_requested = xstrdup(detail_ptr->features); detail_ptr->feature_list = list_create(_feature_list_delete); for (i=0; ; i++) { if (tmp_requested[i] == '*') { tmp_requested[i] = '\0'; have_count = true; count = strtol(&tmp_requested[i+1], &str_ptr, 10); if ((feature == NULL) || (count <= 0)) { info("Job %u invalid constraint %s", job_ptr->job_id, detail_ptr->features); xfree(tmp_requested); return ESLURM_INVALID_FEATURE; } i = str_ptr - tmp_requested - 1; } else if (tmp_requested[i] == '&') { tmp_requested[i] = '\0'; if ((feature == NULL) || (bracket != 0)) { info("Job %u invalid constraint %s", job_ptr->job_id, detail_ptr->features); xfree(tmp_requested); return ESLURM_INVALID_FEATURE; } feat = xmalloc(sizeof(struct feature_record)); feat->name = xstrdup(feature); feat->count = count; feat->op_code = FEATURE_OP_AND; list_append(detail_ptr->feature_list, feat); feature = NULL; count = 0; } else if (tmp_requested[i] == '|') { tmp_requested[i] = '\0'; have_or = true; if (feature == NULL) { info("Job %u invalid constraint %s", job_ptr->job_id, detail_ptr->features); xfree(tmp_requested); return ESLURM_INVALID_FEATURE; } feat = xmalloc(sizeof(struct feature_record)); feat->name = xstrdup(feature); feat->count = count; if (bracket) feat->op_code = FEATURE_OP_XOR; else feat->op_code = FEATURE_OP_OR; list_append(detail_ptr->feature_list, feat); feature = NULL; count = 0; } else if (tmp_requested[i] == '[') { tmp_requested[i] = '\0'; if ((feature != NULL) || bracket) { info("Job %u invalid constraint %s", job_ptr->job_id, detail_ptr->features); xfree(tmp_requested); return ESLURM_INVALID_FEATURE; } bracket++; } else if (tmp_requested[i] == ']') { tmp_requested[i] = '\0'; if ((feature == NULL) || (bracket == 0)) { info("Job %u invalid constraint %s", job_ptr->job_id, detail_ptr->features); xfree(tmp_requested); return ESLURM_INVALID_FEATURE; } bracket = 0; } else if (tmp_requested[i] == '\0') { if (feature) { feat = xmalloc(sizeof(struct feature_record)); feat->name = xstrdup(feature); feat->count = count; feat->op_code = FEATURE_OP_END; list_append(detail_ptr->feature_list, feat); } break; } else if (tmp_requested[i] == ',') { info("Job %u invalid constraint %s", job_ptr->job_id, detail_ptr->features); xfree(tmp_requested); return ESLURM_INVALID_FEATURE; } else if (feature == NULL) { feature = &tmp_requested[i]; } } xfree(tmp_requested); if (have_count && have_or) { info("Job %u invalid constraint (OR with feature count): %s", job_ptr->job_id, detail_ptr->features); return ESLURM_INVALID_FEATURE; } return _valid_feature_list(job_ptr->job_id, detail_ptr->feature_list); } static void _feature_list_delete(void *x) { struct feature_record *feature = (struct feature_record *)x; xfree(feature->name); xfree(feature); } static int _valid_feature_list(uint32_t job_id, List feature_list) { ListIterator feat_iter; struct feature_record *feat_ptr; char *buf = NULL, tmp[16]; int bracket = 0; int rc = SLURM_SUCCESS; if (feature_list == NULL) { debug2("Job %u feature list is empty", job_id); return rc; } feat_iter = list_iterator_create(feature_list); while ((feat_ptr = (struct feature_record *)list_next(feat_iter))) { if (feat_ptr->op_code == FEATURE_OP_XOR) { if (bracket == 0) xstrcat(buf, "["); bracket = 1; } xstrcat(buf, feat_ptr->name); if (rc == SLURM_SUCCESS) rc = _valid_node_feature(feat_ptr->name); if (feat_ptr->count) { snprintf(tmp, sizeof(tmp), "*%u", feat_ptr->count); xstrcat(buf, tmp); } if (bracket && (feat_ptr->op_code != FEATURE_OP_XOR)) { xstrcat(buf, "]"); bracket = 0; } if (feat_ptr->op_code == FEATURE_OP_AND) xstrcat(buf, "&"); else if ((feat_ptr->op_code == FEATURE_OP_OR) || (feat_ptr->op_code == FEATURE_OP_XOR)) xstrcat(buf, "|"); } list_iterator_destroy(feat_iter); if (rc == SLURM_SUCCESS) debug("Job %u feature list: %s", job_id, buf); else info("Job %u has invalid feature list: %s", job_id, buf); xfree(buf); return rc; } static int _valid_node_feature(char *feature) { int rc = ESLURM_INVALID_FEATURE; struct features_record *feature_ptr; ListIterator feature_iter; /* Clear these nodes from the feature_list record, * then restore as needed */ feature_iter = list_iterator_create(feature_list); if (feature_iter == NULL) fatal("list_inerator_create malloc failure"); while ((feature_ptr = (struct features_record *) list_next(feature_iter))) { if (strcmp(feature_ptr->name, feature)) continue; rc = SLURM_SUCCESS; break; } list_iterator_destroy(feature_iter); return rc; } /* If a job can run in multiple partitions, make sure that the one * actually used is first in the string. Needed for job state save/restore */ extern void rebuild_job_part_list(struct job_record *job_ptr) { ListIterator part_iterator; struct part_record *part_ptr; if ((job_ptr->part_ptr_list == NULL) || (job_ptr->part_ptr == NULL)) return; xfree(job_ptr->partition); job_ptr->partition = xstrdup(job_ptr->part_ptr->name); part_iterator = list_iterator_create(job_ptr->part_ptr_list); if (part_iterator == NULL) fatal("list_iterator_create malloc failure"); while ((part_ptr = (struct part_record *) list_next(part_iterator))) { if (part_ptr == job_ptr->part_ptr) continue; xstrcat(job_ptr->partition, ","); xstrcat(job_ptr->partition, part_ptr->name); } list_iterator_destroy(part_iterator); }