Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Snakemake ignore failed path and redefine inputs for a common rule

I'm currently writing a pipeline that looks like this (code for the minimal example is below, the input files are just blank files which names are in the SAMPLES list in the example).

enter image description here

What I would like, is, if a sample fails in one of the first two steps (minimal example is set to make sample1 fail on rule two), keep going with all the next steps just like it not being there (meaning it would do the rule gather_and_do_something and split_final only on sample2 and sample3 here).

I'm already using the --keep-going option to go on with independant jobs but I have trouble defining the input for the common rule and make it ignore the files that were in a failing path.

SAMPLES = ["sample1", "sample2", "sample3"]

rule all:
    input:
        expand("{sample}_final", sample=SAMPLES)

rule one:
    input:
        "{sample}"
    output:
        "{sample}_ruleOne"
    shell:
        "touch {output}"

rule two:
    input:
        rules.one.output
    output:
        "{sample}_ruleTwo"
    run:
        if input[0] != 'sample1_ruleOne':
            with open(output[0], 'w') as fh:
                fh.write(f'written {output[0]}')

rule gather_and_do_something:
    input:
        expand(rules.two.output, sample=SAMPLES)
    output:
        'merged'
    run:
        words = []
        for f in input:
            with open(f, 'r') as fh:
                words.append(next(fh))
        if len(input):
            with open(output[0], 'w') as fh:
                fh.write('\n'.join(words))

rule split_final:
    input:
        rules.gather_and_do_something.output
    output:
        '{sample}_final'
    shell:
        'touch {output}'

I tried writing some custom function to use as an input but that does not seems to work...

def get_files(wildcards):
    import os
    return [f for f in expand(rules.two.output, sample=SAMPLES) if f in os.listdir(os.getcwd())]

rule gather_and_do_something:
    input:
        unpack(get_files)
    output:
        'merged'
    run:
        words = []
        for f in input:
            with open(f, 'r') as fh:
                words.append(next(fh))
        if len(input):
            with open(output[0], 'w') as fh:
                fh.write('\n'.join(words))
like image 316
Plopp Avatar asked Feb 03 '26 19:02

Plopp


1 Answers

By default, snakemake's DAG is static, so it will expect all of the input files requested by the all rule (and their dependencies) to be present. You can get around this requirement by defining checkpoints and corresponding input functions, which can alter the DAG dynamically based on the output of the checkpoint rule(s).

Here's an updated example that turns rule two into a checkpoint and uses input functions during the aggregation steps (gather_and_do_something and all) to skip samples that fail rule two. The updated example defines an empty output file as failure (os.stat(fn).st_size > 0), but you could use a different test instead.

import shlex
import os

SAMPLES = ["sample1", "sample2", "sample3"]

def all_input(wldc):
    files = []
    for sample in SAMPLES:
        fn = checkpoints.two.get(sample=sample).output[0]
        if os.stat(fn).st_size > 0:
            files.append('{sample}_final'.format(sample=sample))
    return files

rule all:
    input:
        all_input

rule one:
    input:
        "{sample}"
    output:
        "{sample}_ruleOne"
    shell:
        "touch {output}"

checkpoint two:
    input:
        rules.one.output
    output:
        "{sample}_ruleTwo"
    run:
        with open(output[0], 'w') as fh:
            if input[0] != 'sample1_ruleOne':
                fh.write(f'written {output[0]}')

def gather_input(wldc):
    files = []
    for sample in SAMPLES:
        fn = checkpoints.two.get(sample=sample).output[0]
        if os.stat(fn).st_size > 0:
            files.append("{sample}_ruleTwo".format(sample=sample))
    return files

rule gather_and_do_something:
    input:
        gather_files = gather_input,
    output:
        'merged'
    params:
        gather_files = lambda wldc, input: ' '.join(map(shlex.quote, input.gather_files)) # Handle samples/files with spaces
    shell:
        """
        cat {params.gather_files} > '{output}'
        """

rule split_final:
    input:
        rules.gather_and_do_something.output
    output:
        '{sample}_final'
    shell:
        'touch {output}'

With these changes, rule gather_and_do_something will only read rule two output from passing samples (sample2 and sample3) and rule split_final will only run jobs for samples 2 and 3.

like image 97
dofree Avatar answered Feb 05 '26 08:02

dofree



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!