Processing huge data robustly with SLURM on clusters
Table of Contents
In scientific computing you often need to either apply a single algorithm or pipeline to a huge dataset, or execute a complex parameter sweep over said data. A problem like this is deceptively simple, but has stumbling blocks that can cost you an enormous amount of wasted time. In imperative style it would look like
for part ∈ data
out = f(part)
save(file, out)
end
or with a parameter sweep
for part ∈ data
for x ∈ 1:100, y ∈ 2:200000
out = f(part; p1=x, p2=y)
save(file, out)
end
end
In functional style equivalent
s = x -> save(file, x)
ap = map(s ,map(f, data))
# or with a sweep
map(res -> save(file, res), map((d,x,y)-> f(d;p1=x,p2=y), (for d ∈ data, x ∈ [1:1000], y∈ [2:2000]))
So far so good, but you assume here nothing will go wrong. If you knew the outcome, you would not need to run the code in the first place, so you can’t know it’s not going to crash, run out of memory, suddenly runs out of disk space, and so on. If you write it in this style, you need to make each step 100% foolproof, catch all errors, and handle them in a sane way, probably with some logging in thrown as well. I’m not going to add the code here, but suffice to say it’ll explode to ~ 20 lines or so, and even then you can’t protect against out of memory errors, Julia allocates dynamically, so you would need a direct interface to a custom allocator to even have a chance of trying to detect and respond to those kind of errors. On a cluster, resources are scarce, if you ask for more, you wait (exponentially) longer, so the risk of underestimating resources is very real.
Batch jobs to the rescue #
If the above would be a julia script, the submit script would look like
#!/bin/bash
#SBATCH --mem=120G
#SBATCH --cpus-per-task=6
#SBATCH --time=18:00:00
module load julia
set -euo pipefail
export JULIA_NUM_THREADS=$SLURM_CPUS_PER_TASK
julia --project=/my/project myscript.jl
Because our script is embarassingly parallel, with no dependencies between tasks, you can add
using Base.Threads
@threads for ...
To make it run in parallel. But note that the memory I specify is for the entire script, if you have 6 threads using maximum 100MB you need 600MB + 20% margin = 800MB. The margin accounts for allocator/GC overhead, but threaded code by itself can increase allocations as well. But we’re using SLURM< a scheduler used in most HPC clusters, which is honed to optimally schedule a huge amount of jobs at the same time, and we still risk having a corrupt/halfway state if only 1 task fails. Instead, we use SLURM’s array support
#!/bin/bash
#SBATCH --mem=5G
#SBATCH --cpus-per-task=1
#SBATCH --time=00:30:00
#SBATCH --array=1-1000%10
echo "Starting task $SLURM_ARRAY_TASK_ID"
IDIR=$(sed -n "${SLURM_ARRAY_TASK_ID}p" inlist.txt)
ODIR=$(sed -n "${SLURM_ARRAY_TASK_ID}p" outlist.txt)
module load julia
set -euo pipefail
export JULIA_NUM_THREADS=$SLURM_CPUS_PER_TASK
julia --project=/my/project myscript.jl --infile=$IDIR --outfile=$ODIR --task=$SLURM_ARRAY_TASK_ID
First, we tell SLURM this is an array of tasks, not 1 task. We give it the number of tasks, as well as how many should be run at the same time (10).
#SBATCH --array=1-1000%10
Next we need a way to tell our adapted script (which I’m leaving out here) what file to read and save, and where in the sweep we are, with the TASK_ID variable. Inlist.txt and outlist.txt are simple text files, with 1 file per line, which at runtime are indexed by SLURM and then passed to the script. Simple, and elegant. But you maybe missed the most important part: the requirements in time and memory dropped by an order of magnitude. In scheduling on a busy system with heterogeneous tasks, it is (almost) always easier and faster to do 1000 x 1h tasks versus 1x1000h task. It’s both better for your time-to-result, as well as for the cluster overall.
Dealing with failure #
SLURM will execute all tasks, and if you don’t tell it otherwise, will continue even if some fail. One of the most painful things in cluster computing is having to requeue a huge job because 1/1000 tasks failed by some edge case you could not foresee. With arrays, that’s irrelevant, we only need to reschedule the failed tasks.
So let’s find out what, if anything failed First, we write a small shell function, because this will be a command we’ll need multiple times.
findfailed () { sacct -X -j $1 -o JobID,State,ExitCode,DerivedExitCode | grep -v COMPLETED ; }
In short, for a specific array job ID ($1 = 1st arg), print the lines that don’t have “COMPLETED”. It can be written more precise, and more elegant, but for now it works.
Example output of a 1000-task job taking a week of compute time
findfailed 26951251
JobID State ExitCode DerivedExitCode
------------ ---------- -------- ---------------
26951251_4 OUT_OF_ME+ 0:125 0:0
26951251_5 OUT_OF_ME+ 0:125 0:0
26951251_56 FAILED 1:0 0:0
26951251_63 FAILED 1:0 0:0
Now we have 4 failed jobs, meaning we need to reschedule (assuming we fixed the cause) those 4, with more resources, and have inlist.txt and outlist.txt only contain those 4 lines. A crude shell script can do this fairly well:
#!/bin/sh
set -euo pipefail
ID=$1
IN=$2
OUT=$3
findfailed () { sacct -X -j $1 -o JobID,State,ExitCode,DerivedExitCode | grep -v COMPLETED ; }
findfailed $ID > failed.txt
FN=`wc -l failed.txt | awk '{print $1}'`
N=$(($FN - 2))
echo "Found $N failed array job ids, recreating sub lists for reprocessing"
tail -$N failed.txt | awk '{print $1}' | cut -d_ -f 2 > lines.txt
echo "Writing new lists"
sed 's/^/NR==/' lines.txt | awk -f - $IN > inlist_rerun.txt
sed 's/^/NR==/' lines.txt | awk -f - $OUT > outlist_rerun.txt
echo "Done"
The more attentive among you note the many ways this can fail, for example, if no jobs failed, but then it’s a tool for failed jobs. A quick walkthrough, first we use the findfailed function, we then parse the output to get the array IDs of the failed jobs. Next we select only those lines of in/out files, and create new ones.
./findrerun.sh $ID infile.txt outfile.txt
Then update the number of tasks, and resubmit
sbatch array_job.sh
From the time you get the email from SLURM notifying you not all jobs completed, to rescheduling only the ones you need, no more than 30 seconds.
There a lot more you can do with array jobs, there’s support for conditional execution, for example, which is nice to have in more advanced case, your IDs do not need to be sequential, you can reference the max/min/total tasks and so on.
Credit #
The support staff at Compute Canada, who taught me 1/2 of what I shared here, mostly the half I got wrong. Snippets of the above script come from stackoverflow, but I couldn’t find the url, credit for those snippets is due to the original authors.