I'm currently writing a pipeline that looks like this (code for the minimal example is below, the input files are just blank files which names are in the SAMPLES list in the example).

What I would like, is, if a sample fails in one of the first two steps (minimal example is set to make sample1 fail on rule two), keep going with all the next steps just like it not being there (meaning it would do the rule gather_and_do_something and split_final only on sample2 and sample3 here).
I'm already using the --keep-going option to go on with independant jobs but I have trouble defining the input for the common rule and make it ignore the files that were in a failing path.
SAMPLES = ["sample1", "sample2", "sample3"]
rule all:
input:
expand("{sample}_final", sample=SAMPLES)
rule one:
input:
"{sample}"
output:
"{sample}_ruleOne"
shell:
"touch {output}"
rule two:
input:
rules.one.output
output:
"{sample}_ruleTwo"
run:
if input[0] != 'sample1_ruleOne':
with open(output[0], 'w') as fh:
fh.write(f'written {output[0]}')
rule gather_and_do_something:
input:
expand(rules.two.output, sample=SAMPLES)
output:
'merged'
run:
words = []
for f in input:
with open(f, 'r') as fh:
words.append(next(fh))
if len(input):
with open(output[0], 'w') as fh:
fh.write('\n'.join(words))
rule split_final:
input:
rules.gather_and_do_something.output
output:
'{sample}_final'
shell:
'touch {output}'
I tried writing some custom function to use as an input but that does not seems to work...
def get_files(wildcards):
import os
return [f for f in expand(rules.two.output, sample=SAMPLES) if f in os.listdir(os.getcwd())]
rule gather_and_do_something:
input:
unpack(get_files)
output:
'merged'
run:
words = []
for f in input:
with open(f, 'r') as fh:
words.append(next(fh))
if len(input):
with open(output[0], 'w') as fh:
fh.write('\n'.join(words))
By default, snakemake's DAG is static, so it will expect all of the input files requested by the all rule (and their dependencies) to be present. You can get around this requirement by defining checkpoints and corresponding input functions, which can alter the DAG dynamically based on the output of the checkpoint rule(s).
Here's an updated example that turns rule two into a checkpoint and uses input functions during the aggregation steps (gather_and_do_something and all) to skip samples that fail rule two. The updated example defines an empty output file as failure (os.stat(fn).st_size > 0), but you could use a different test instead.
import shlex
import os
SAMPLES = ["sample1", "sample2", "sample3"]
def all_input(wldc):
files = []
for sample in SAMPLES:
fn = checkpoints.two.get(sample=sample).output[0]
if os.stat(fn).st_size > 0:
files.append('{sample}_final'.format(sample=sample))
return files
rule all:
input:
all_input
rule one:
input:
"{sample}"
output:
"{sample}_ruleOne"
shell:
"touch {output}"
checkpoint two:
input:
rules.one.output
output:
"{sample}_ruleTwo"
run:
with open(output[0], 'w') as fh:
if input[0] != 'sample1_ruleOne':
fh.write(f'written {output[0]}')
def gather_input(wldc):
files = []
for sample in SAMPLES:
fn = checkpoints.two.get(sample=sample).output[0]
if os.stat(fn).st_size > 0:
files.append("{sample}_ruleTwo".format(sample=sample))
return files
rule gather_and_do_something:
input:
gather_files = gather_input,
output:
'merged'
params:
gather_files = lambda wldc, input: ' '.join(map(shlex.quote, input.gather_files)) # Handle samples/files with spaces
shell:
"""
cat {params.gather_files} > '{output}'
"""
rule split_final:
input:
rules.gather_and_do_something.output
output:
'{sample}_final'
shell:
'touch {output}'
With these changes, rule gather_and_do_something will only read rule two output from passing samples (sample2 and sample3) and rule split_final will only run jobs for samples 2 and 3.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With