diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e026817ee..3927c31442 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,7 @@ jobs: # Nextflow versions: check pipeline minimum and current latest nxf_ver: ['21.04.0', ''] engine: ['docker'] - test: ['default', 'aligner', 'gatk4_spark', 'targeted', 'tumor_normal_pair', 'variant_calling', 'annotation'] + test: ['default', 'aligner', 'gatk4_spark', 'targeted', 'skip_markduplicates', 'tumor_normal_pair', 'variant_calling', 'annotation'] steps: - name: Check out pipeline code uses: actions/checkout@v2 diff --git a/conf/modules.config b/conf/modules.config index 70b707eaa2..5c21954be7 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -97,6 +97,11 @@ params { args2 = 'sort' publish_files = false } + 'samtools_index_mapping' { + publish_by_meta = true + publish_files = ['bai':'mapped'] + publish_dir = 'preprocessing' + } // MARKDUPLICATES 'markduplicates' { args = 'REMOVE_DUPLICATES=false VALIDATION_STRINGENCY=LENIENT' diff --git a/conf/test.config b/conf/test.config index d7aa7fbeb7..09659607f8 100644 --- a/conf/test.config +++ b/conf/test.config @@ -44,9 +44,16 @@ profiles { pair { params.input = 'https://raw.githubusercontent.com/nf-core/test-datasets/sarek/testdata/csv/tiny-manta-https.csv' } + prepare_recalibration { + params.input = 'https://raw.githubusercontent.com/nf-core/test-datasets/sarek/testdata/csv/tiny-mapped-normal-https.csv' + params.step = 'prepare_recalibration' + } save_bam_mapped { params.save_bam_mapped = true } + skip_markduplicates { + params.skip_markduplicates = true + } split_fastq { params.split_fastq = 2 } diff --git a/modules/local/gatk4/baserecalibrator/main.nf b/modules/local/gatk4/baserecalibrator/main.nf index 508138a5ab..b81d81a7a7 100644 --- a/modules/local/gatk4/baserecalibrator/main.nf +++ b/modules/local/gatk4/baserecalibrator/main.nf @@ -23,8 +23,8 @@ process GATK4_BASERECALIBRATOR { path fasta path fai path dict - path knownSitesTBI path knownSites + path knownSites_tbi output: tuple val(meta), path("*.table"), emit: table diff --git a/subworkflows/local/build_indices.nf b/subworkflows/local/build_indices.nf index 0ab42d8c46..2063e6a01f 100644 --- a/subworkflows/local/build_indices.nf +++ b/subworkflows/local/build_indices.nf @@ -65,7 +65,7 @@ workflow BUILD_INDICES { result_dbsnp_tbi = Channel.empty() version_dbsnp_tbi = Channel.empty() - if (!(params.dbsnp_tbi) && params.dbsnp && ('mapping' in step || 'prepare_recalibration' in step || 'controlfreec' in tools || 'haplotypecaller' in tools|| 'mutect2' in tools || 'tnscope' in tools)) { + if (!(params.dbsnp_tbi) && params.dbsnp && ('mapping' in step || 'preparerecalibration' in step || 'controlfreec' in tools || 'haplotypecaller' in tools|| 'mutect2' in tools || 'tnscope' in tools)) { dbsnp_id = dbsnp.map {it -> [[id:"$it.baseName"], it]} (result_dbsnp_tbi, version_dbsnp_tbi) = TABIX_DBSNP(dbsnp_id) result_dbsnp_tbi = result_dbsnp_tbi.map {meta, tbi -> [tbi]} @@ -81,14 +81,14 @@ workflow BUILD_INDICES { result_germline_resource_tbi = Channel.empty() version_germline_resource_tbi = Channel.empty() - if (!(params.germline_resource_tbi) && params.germline_resource && 'mutect2' in tools){ + if (!(params.germline_resource_tbi) && params.germline_resource && 'mutect2' in tools) { germline_resource_id = germline_resource.map {it -> [[id:"$it.baseName"], it]} (result_germline_resource_tbi, version_germline_resource_tbi) = TABIX_GERMLINE_RESOURCE(germline_resource_id) } result_known_indels_tbi = Channel.empty() version_known_indels_tbi = Channel.empty() - if (!(params.known_indels_tbi) && params.known_indels && ('mapping' in step || 'prepare_recalibration' in step)){ + if (!(params.known_indels_tbi) && params.known_indels && ('mapping' in step || 'preparerecalibration' in step)) { known_indels_id = known_indels.map {it -> [[id:"$it.baseName"], it]} (result_known_indels_tbi, version_known_indels_tbi) = TABIX_KNOWN_INDELS(known_indels_id) result_known_indels_tbi = result_known_indels_tbi.map {meta, tbi -> [tbi]} @@ -101,7 +101,7 @@ workflow BUILD_INDICES { result_pon_tbi = Channel.empty() version_pon_tbi = Channel.empty() - if (!(params.pon_tbi) && params.pon && ('tnscope' in tools || 'mutect2' in tools)){ + if (!(params.pon_tbi) && params.pon && ('tnscope' in tools || 'mutect2' in tools)) { pon_id = pon.map {it -> [[id:"$it.baseName"], it]} (result_pon_tbi, version_pon_tbi) = TABIX_PON(pon_id) } diff --git a/subworkflows/local/mapping_csv.nf b/subworkflows/local/mapping_csv.nf index a622622914..8bf8802b55 100644 --- a/subworkflows/local/mapping_csv.nf +++ b/subworkflows/local/mapping_csv.nf @@ -6,13 +6,13 @@ workflow MAPPING_CSV { take: - bam_mapped // channel: [mandatory] meta, bam, bai + bam_indexed // channel: [mandatory] meta, bam, bai save_bam_mapped // boolean: [mandatory] save_bam_mapped skip_markduplicates // boolean: [mandatory] skip_markduplicates main: if (save_bam_mapped) { - csv_bam_mapped = bam_mapped.map { meta, bam, bai -> [meta] } + csv_bam_mapped = bam_indexed.map { meta, bam, bai -> [meta] } // Creating csv files to restart from this step csv_bam_mapped.collectFile(storeDir: "${params.outdir}/preprocessing/csv") { meta -> patient = meta.patient[0] @@ -26,7 +26,7 @@ workflow MAPPING_CSV { } if (skip_markduplicates) { - csv_bam_mapped = bam_mapped.map { meta, bam, bai -> [meta] } + csv_bam_mapped = bam_indexed.map { meta, bam, bai -> [meta] } // Creating csv files to restart from this step csv_bam_mapped.collectFile(storeDir: "${params.outdir}/preprocessing/csv") { meta -> patient = meta.patient[0] diff --git a/subworkflows/nf-core/mapping.nf b/subworkflows/nf-core/mapping.nf index ae43c3fc93..44e322446e 100644 --- a/subworkflows/nf-core/mapping.nf +++ b/subworkflows/nf-core/mapping.nf @@ -4,33 +4,36 @@ ======================================================================================== */ -params.seqkit_split2_options = [:] params.bwamem1_mem_options = [:] params.bwamem1_mem_tumor_options = [:] params.bwamem2_mem_options = [:] params.bwamem2_mem_tumor_options = [:] +params.merge_bam_options = [:] +params.samtools_index_options = [:] +params.seqkit_split2_options = [:] -include { SEQKIT_SPLIT2 } from '../../modules/nf-core/modules/seqkit/split2/main.nf' addParams(options: params.seqkit_split2_options) +include { BWAMEM2_MEM as BWAMEM2_MEM_T } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_tumor_options) +include { BWAMEM2_MEM } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_options) include { BWA_MEM as BWAMEM1_MEM } from '../../modules/local/bwa/mem/main' addParams(options: params.bwamem1_mem_options) include { BWA_MEM as BWAMEM1_MEM_T } from '../../modules/local/bwa/mem/main' addParams(options: params.bwamem1_mem_tumor_options) -include { BWAMEM2_MEM } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_options) -include { BWAMEM2_MEM as BWAMEM2_MEM_T } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_tumor_options) +include { SAMTOOLS_INDEX } from '../../modules/local/samtools/index/main' addParams(options: params.samtools_index_options) +include { SAMTOOLS_MERGE } from '../../modules/nf-core/modules/samtools/merge/main' addParams(options: params.merge_bam_options) +include { SEQKIT_SPLIT2 } from '../../modules/nf-core/modules/seqkit/split2/main.nf' addParams(options: params.seqkit_split2_options) workflow MAPPING { take: - aligner // string: [mandatory] "bwa-mem" or "bwa-mem2" - bwa // channel: [mandatory] bwa - fai // channel: [mandatory] fai - fasta // channel: [mandatory] fasta - reads_input // channel: [mandatory] meta, reads_input + aligner // string: [mandatory] "bwa-mem" or "bwa-mem2" + bwa // channel: [mandatory] bwa + fai // channel: [mandatory] fai + fasta // channel: [mandatory] fasta + reads_input // channel: [mandatory] meta, reads_input + skip_markduplicates // boolean: true/false main: - bam_mapped_index = Channel.empty() - bam_reports = Channel.empty() - + bam_indexed = Channel.empty() - if(params.split_fastq > 1){ + if (params.split_fastq > 1) { reads_input_split = SEQKIT_SPLIT2(reads_input).reads.map{ key, reads -> //TODO maybe this can be replaced by a regex to include part_001 etc. @@ -38,10 +41,9 @@ workflow MAPPING { //sorts list of split fq files by : //[R1.part_001, R2.part_001, R1.part_002, R2.part_002,R1.part_003, R2.part_003,...] //TODO: determine whether it is possible to have an uneven number of parts, so remainder: true woud need to be used, I guess this could be possible for unfiltered reads, reads that don't have pairs etc. - return [key, reads.sort{ a,b -> a.getName().tokenize('.')[ a.getName().tokenize('.').size() - 3] <=> b.getName().tokenize('.')[ b.getName().tokenize('.').size() - 3]} - .collate(2)] + return [key, reads.sort{ a,b -> a.getName().tokenize('.')[ a.getName().tokenize('.').size() - 3] <=> b.getName().tokenize('.')[ b.getName().tokenize('.').size() - 3]}.collate(2)] }.transpose() - }else{ + } else { reads_input_split = reads_input } @@ -77,14 +79,31 @@ workflow MAPPING { bam_bwa.map{ meta, bam -> meta.remove('read_group') meta.id = meta.sample + // groupKey is to makes sure that the correct group can advance as soon as it is complete + // and not stall the workflow until all pieces are mapped def groupKey = groupKey(meta, meta.numLanes * params.split_fastq) tuple(groupKey, bam) [meta, bam] - }.groupTuple() //groupKey above is somehow makes sure, the workflow doesn't stall until all pieces are mapped, but that the correct group can advance as soon as it is complete + }.groupTuple() .set{bam_mapped} - // STEP 1.5: MERGING AND INDEXING BAM FROM MULTIPLE LANES // MarkDuplicates can take care of this + // MarkDuplicates can handle multiple BAMS as input, so no merging/indexing at this step + // Except if and only if skipping MarkDuplicates + + if (skip_markduplicates) { + bam_mapped.branch{ + single: it[1].size() == 1 + multiple: it[1].size() > 1 + }.set{ bam_to_merge } + + SAMTOOLS_MERGE(bam_to_merge.multiple) + bam_merged = bam_to_merge.single.mix(SAMTOOLS_MERGE.out.bam) + + SAMTOOLS_INDEX(bam_merged) + bam_indexed = bam_merged.join(SAMTOOLS_INDEX.out.bai) + } emit: - bam = bam_mapped + bam = bam_mapped + bam_indexed = bam_indexed } diff --git a/subworkflows/nf-core/prepare_recalibration.nf b/subworkflows/nf-core/prepare_recalibration.nf index 1f4e3adba9..10d0b98b11 100644 --- a/subworkflows/nf-core/prepare_recalibration.nf +++ b/subworkflows/nf-core/prepare_recalibration.nf @@ -24,30 +24,23 @@ workflow PREPARE_RECALIBRATION { known_sites // channel: [optional] known_sites known_sites_tbi // channel: [optional] known_sites_tbi no_intervals // value: [mandatory] no_intervals - known_indels - dbsnp main: cram_markduplicates.combine(intervals) - .map{ meta, cram, crai, intervals -> - new_meta = meta.clone() - new_meta.id = meta.sample + "_" + intervals.baseName - [new_meta, cram, crai, intervals] - } - .set{cram_markduplicates_intervals} + .map{ meta, cram, crai, intervals -> + new_meta = meta.clone() + new_meta.id = meta.sample + "_" + intervals.baseName + [new_meta, cram, crai, intervals] + }.set{cram_markduplicates_intervals} - if(use_gatk_spark){ + if (use_gatk_spark) { BASERECALIBRATOR_SPARK(cram_markduplicates_intervals, fasta, fai, dict, known_sites, known_sites_tbi) table_baserecalibrator = BASERECALIBRATOR_SPARK.out.table - }else{ - BASERECALIBRATOR(cram_markduplicates_intervals, fasta, fai, dict, known_sites_tbi, known_sites) + } else { + BASERECALIBRATOR(cram_markduplicates_intervals, fasta, fai, dict, known_sites, known_sites_tbi) table_baserecalibrator = BASERECALIBRATOR.out.table } - //num_intervals = intervals.toList().size.view() //Integer.valueOf() - //.view() - //println(intervals.toList().getClass()) //.value.getClass()) - //STEP 3.5: MERGING RECALIBRATION TABLES if (no_intervals) { table_baserecalibrator.map { meta, table -> @@ -62,7 +55,6 @@ workflow PREPARE_RECALIBRATION { GATHERBQSRREPORTS(recaltable) table_bqsr = GATHERBQSRREPORTS.out.table - } emit: diff --git a/subworkflows/nf-core/qc_markduplicates.nf b/subworkflows/nf-core/qc_markduplicates.nf index 45c9e24f9a..46e8dd7abe 100644 --- a/subworkflows/nf-core/qc_markduplicates.nf +++ b/subworkflows/nf-core/qc_markduplicates.nf @@ -16,7 +16,6 @@ params.samtools_index_options = [:] include { GATK4_MARKDUPLICATES } from '../../modules/local/gatk4/markduplicates/main' addParams(options: params.markduplicates_options) include { GATK4_MARKDUPLICATES_SPARK } from '../../modules/local/gatk4/markduplicatesspark/main' addParams(options: params.markduplicatesspark_options) include { GATK4_ESTIMATELIBRARYCOMPLEXITY } from '../../modules/local/gatk4/estimatelibrarycomplexity/main' addParams(options: params.estimatelibrarycomplexity_options) -include { SAMTOOLS_MERGE } from '../../modules/nf-core/modules/samtools/merge/main' addParams(options: params.merge_bam_options) include { QUALIMAP_BAMQC } from '../../modules/local/qualimap/bamqc/main' addParams(options: params.qualimap_bamqc_options) include { SAMTOOLS_STATS } from '../../modules/local/samtools/stats/main' addParams(options: params.samtools_stats_options) include { SAMTOOLS_VIEW as SAMTOOLS_BAM_TO_CRAM } from '../../modules/local/samtools/view/main.nf' addParams(options: params.samtools_view_options) @@ -25,12 +24,14 @@ include { SAMTOOLS_INDEX } from '../../modules/loca workflow QC_MARKDUPLICATES { take: - bam_mapped // channel: [mandatory] meta, bam, bai + bam_mapped // channel: [mandatory] meta, bam + bam_indexed // channel: [mandatory] meta, bam, bai use_gatk_spark // value: [mandatory] use gatk spark save_metrics // value: [mandatory] save metrics fasta // channel: [mandatory] fasta fai // channel: [mandatory] fai dict // channel: [mandatory] dict + skip_markduplicates // boolean: true/false skip_bamqc // boolean: true/false skip_samtools // boolean: true/false target_bed // channel: [optional] target_bed @@ -38,38 +39,28 @@ workflow QC_MARKDUPLICATES { main: report_markduplicates = Channel.empty() - if(params.skip_markduplicates){ - - bam_mapped.branch{ - single: it[1].size() == 1 - multiple: it[1].size() > 1 - }.set{ bam_to_merge } - - SAMTOOLS_MERGE(bam_to_merge.multiple) - bam_merged = bam_to_merge.single.mix(SAMTOOLS_MERGE.out.bam) - - SAMTOOLS_INDEX(bam_merged) - bam_markduplicates = bam_merged.join(SAMTOOLS_INDEX.out.bai) - cram_markduplicates = SAMTOOLS_BAM_TO_CRAM(bam_markduplicates, fasta, fai) - } else{ + if (skip_markduplicates) { + bam_markduplicates = bam_indexed + SAMTOOLS_BAM_TO_CRAM(bam_markduplicates, fasta, fai) + cram_markduplicates = SAMTOOLS_BAM_TO_CRAM.out.cram + } else { if (use_gatk_spark) { - //If BAMQC should be run on MD output, then don't use MDSpark to convert to cram, but use bam output instead - if(!skip_bamqc){ + if (!skip_bamqc) { GATK4_MARKDUPLICATES_SPARK(bam_mapped, fasta, fai, dict, "bam") SAMTOOLS_INDEX(GATK4_MARKDUPLICATES_SPARK.out.output) bam_markduplicates = GATK4_MARKDUPLICATES_SPARK.out.output.join(SAMTOOLS_INDEX.out.bai) SAMTOOLS_BAM_TO_CRAM_SPARK(bam_markduplicates, fasta, fai) cram_markduplicates = SAMTOOLS_BAM_TO_CRAM_SPARK.out.cram - }else{ + } else { GATK4_MARKDUPLICATES_SPARK(bam_mapped, fasta, fai, dict, "cram") SAMTOOLS_INDEX(GATK4_MARKDUPLICATES_SPARK.out.output) cram_markduplicates = GATK4_MARKDUPLICATES_SPARK.out.output.join(SAMTOOLS_INDEX.out.crai) } - if(save_metrics){ + if (save_metrics) { GATK4_ESTIMATELIBRARYCOMPLEXITY(bam_mapped, fasta, fai, dict) report_markduplicates = GATK4_ESTIMATELIBRARYCOMPLEXITY.out.metrics } @@ -88,12 +79,12 @@ workflow QC_MARKDUPLICATES { //if !skip_markduplicates, then QC tools are run on duplicate marked bams //After bamqc finishes, convert to cram for further analysis qualimap_bamqc = Channel.empty() - if(!skip_bamqc){ + if (!skip_bamqc && !skip_markduplicates) { + //TODO: after adding CI tests, allow bamqc on mapped bams if no duplicate marking is done QUALIMAP_BAMQC(bam_markduplicates, target_bed, params.target_bed) qualimap_bamqc = QUALIMAP_BAMQC.out } - samtools_stats = Channel.empty() if (!skip_samtools) { SAMTOOLS_STATS(cram_markduplicates, fasta) diff --git a/tests/test_annotation.yml b/tests/test_annotation.yml index 6ebcf4db14..c042e190e4 100644 --- a/tests/test_annotation.yml +++ b/tests/test_annotation.yml @@ -6,6 +6,7 @@ files: - path: results/annotation/1234N/1234N_snpEff.ann.gz - path: results/annotation/1234N/1234N_snpEff.ann.gz.tbi + - path: results/multiqc - name: Run VEP command: nextflow run main.nf -profile test,annotation,docker --tools vep tags: @@ -14,6 +15,7 @@ files: - path: results/annotation/1234N/1234N_VEP.ann.gz - path: results/annotation/1234N/1234N_VEP.ann.gz.tbi + - path: results/multiqc - name: Run snpEff followed by VEP command: nextflow run main.nf -profile test,annotation,docker --tools merge tags: @@ -26,3 +28,4 @@ - path: results/annotation/1234N/1234N_snpEff.ann.gz.tbi - path: results/annotation/1234N/1234N_snpEff_VEP.ann.gz - path: results/annotation/1234N/1234N_snpEff_VEP.ann.gz.tbi + - path: results/multiqc diff --git a/tests/test_skip_markduplicates.yml b/tests/test_skip_markduplicates.yml new file mode 100644 index 0000000000..a07d2a3d42 --- /dev/null +++ b/tests/test_skip_markduplicates.yml @@ -0,0 +1,50 @@ +- name: Run default pipeline while skipping MarkDuplicates + command: nextflow run main.nf -profile test,docker,skip_markduplicates + tags: + - preprocessing + - markduplicates + - skip_markduplicates + files: + - path: results/multiqc + - path: results/preprocessing/1234N/markduplicates/1234N.md.cram + - path: results/preprocessing/1234N/markduplicates/1234N.md.cram.crai + - path: results/preprocessing/1234N/recalibrated/1234N.recal.cram + - path: results/preprocessing/1234N/recalibrated/1234N.recal.cram.crai + - path: results/preprocessing/csv/markduplicates.csv + - path: results/preprocessing/csv/markduplicates_1234N.csv + - path: results/preprocessing/csv/markduplicates_no_table.csv + - path: results/preprocessing/csv/markduplicates_no_table_1234N.csv + - path: results/preprocessing/csv/recalibrated.csv + - path: results/preprocessing/csv/recalibrated_1234N.csv + - path: results/reports/fastqc/1234N-1234N_M1 + - path: results/reports/fastqc/1234N-1234N_M2 + - path: results/reports/fastqc/1234N-1234N_M4 + - path: results/reports/fastqc/1234N-1234N_M5 + - path: results/reports/fastqc/1234N-1234N_M6 + - path: results/reports/fastqc/1234N-1234N_M7 + - path: results/reports/qualimap/1234N + - path: results/reports/samtools_stats/1234N/1234N.md.cram.stats + - path: results/reports/samtools_stats/1234N/1234N.recal.cram.stats +- name: Run default pipeline while skipping MarkDuplicates, starting with prepare_recalibration + command: nextflow run main.nf -profile test,docker,prepare_recalibration,skip_markduplicates + tags: + - preprocessing + - prepare_recalibration + - markduplicates + - skip_markduplicates + files: + - path: results/multiqc + - path: results/preprocessing/1234N/mapped/1234N.recal.table + - path: results/preprocessing/1234N/markduplicates/1234N.md.cram + - path: results/preprocessing/1234N/markduplicates/1234N.md.cram.crai + - path: results/preprocessing/1234N/recalibrated/1234N.recal.cram + - path: results/preprocessing/1234N/recalibrated/1234N.recal.cram.crai + - path: results/preprocessing/csv/markduplicates.csv + - path: results/preprocessing/csv/markduplicates_1234N.csv + - path: results/preprocessing/csv/markduplicates_no_table.csv + - path: results/preprocessing/csv/markduplicates_no_table_1234N.csv + - path: results/preprocessing/csv/recalibrated.csv + - path: results/preprocessing/csv/recalibrated_1234N.csv + - path: results/reports/qualimap/1234N + - path: results/reports/samtools_stats/1234N/1234N.md.cram.stats + - path: results/reports/samtools_stats/1234N/1234N.recal.cram.stats diff --git a/workflows/sarek.nf b/workflows/sarek.nf index fb55963628..993c78a3fd 100644 --- a/workflows/sarek.nf +++ b/workflows/sarek.nf @@ -52,10 +52,10 @@ else { switch (step) { case 'mapping': break case 'prepare_recalibration': csv_file = file("${params.outdir}/preprocessing/csv/markduplicates_no_table.csv", checkIfExists: true); break - case 'recalibrate': csv_file = file("${params.outdir}/preprocessing/csv/markduplicates.csv", checkIfExists: true); break - case 'variant_calling': csv_file = file("${params.outdir}/preprocessing/csv/recalibrated.csv", checkIfExists: true); break + case 'recalibrate': csv_file = file("${params.outdir}/preprocessing/csv/markduplicates.csv" , checkIfExists: true); break + case 'variant_calling': csv_file = file("${params.outdir}/preprocessing/csv/recalibrated.csv" , checkIfExists: true); break // case 'controlfreec': csv_file = file("${params.outdir}/variant_calling/csv/control-freec_mpileup.csv", checkIfExists: true); break - case 'annotate': csv_file = file("${params.outdir}/variant_calling/csv/recalibrated.csv", checkIfExists: true); break + case 'annotate': csv_file = file("${params.outdir}/variant_calling/csv/recalibrated.csv" , checkIfExists: true); break default: exit 1, "Unknown step $step" } } @@ -96,7 +96,6 @@ if (params.save_reference) { modules['tabix_known_indels'].publish_files = ['vcf.gz.tbi':'known_indels'] modules['tabix_pon'].publish_files = ['vcf.gz.tbi':'pon'] } -if (save_bam_mapped) modules['samtools_index_mapping'].publish_files = ['bam':'mapped', 'bai':'mapped'] if (params.skip_markduplicates) { modules['baserecalibrator'].publish_files = ['recal.table':'mapped'] modules['gatherbqsrreports'].publish_files = ['recal.table':'mapped'] @@ -321,26 +320,38 @@ workflow SAREK { FASTQC_TRIMGALORE.out.trim_zip) // STEP 1: MAPPING READS TO REFERENCE GENOME - MAPPING( - params.aligner, + MAPPING(params.aligner, bwa, fai, fasta, - reads_input) + reads_input, + params.skip_markduplicates) - bam_mapped = MAPPING.out.bam + bam_mapped = MAPPING.out.bam + bam_indexed = MAPPING.out.bam_indexed // Create CSV to restart from this step - // MAPPING_CSV(bam_mapped, save_bam_mapped, params.skip_markduplicates) + MAPPING_CSV(bam_indexed, save_bam_mapped, params.skip_markduplicates) + } + + if (step == 'preparerecalibration') { + bam_mapped = Channel.empty() + if (params.skip_markduplicates) { + bam_indexed = input_sample + } else cram_markduplicates = input_sample + } + if (step in ['mapping', 'preparerecalibration']) { // STEP 2: MARKING DUPLICATES AND/OR QC, conversion to CRAM QC_MARKDUPLICATES(bam_mapped, - ('markduplicates' in params.use_gatk_spark), - !('markduplicates' in params.skip_qc), - fasta, fai, dict, - 'bamqc' in params.skip_qc, - 'samtools' in params.skip_qc, - target_bed) + bam_indexed, + ('markduplicates' in params.use_gatk_spark), + !('markduplicates' in params.skip_qc), + fasta, fai, dict, + params.skip_markduplicates, + 'bamqc' in params.skip_qc, + 'samtools' in params.skip_qc, + target_bed) cram_markduplicates = QC_MARKDUPLICATES.out.cram @@ -348,14 +359,9 @@ workflow SAREK { MARKDUPLICATES_CSV(cram_markduplicates) qc_reports = qc_reports.mix(QC_MARKDUPLICATES.out.qc) - } - - if (step == 'preparerecalibration') bam_markduplicates = input_sample - if (step in ['mapping', 'preparerecalibration']) { // STEP 3: CREATING RECALIBRATION TABLES - PREPARE_RECALIBRATION( - cram_markduplicates, + PREPARE_RECALIBRATION(cram_markduplicates, ('bqsr' in params.use_gatk_spark), dict, fai, @@ -364,9 +370,7 @@ workflow SAREK { num_intervals, known_sites, known_sites_tbi, - params.no_intervals, - known_indels, - dbsnp) + params.no_intervals) table_bqsr = PREPARE_RECALIBRATION.out.table_bqsr PREPARE_RECALIBRATION_CSV(table_bqsr) @@ -507,9 +511,11 @@ workflow.onComplete { // Function to extract information (meta data + file(s)) from csv file(s) def extract_csv(csv_file) { Channel.from(csv_file).splitCsv(header: true) - //Retrieves number of lanes by grouping together by patient and sample and counting how many entries there are for this combination - .map{ row -> [[row.patient.toString(), row.sample.toString()], row]} - .groupTuple() + //Retrieves number of lanes by grouping together by patient and sample and counting how many entries there are for this combination + .map{ row -> + if (!(row.patient && row.sample)) log.warn "Missing or unknown field in csv file header" + [[row.patient.toString(), row.sample.toString()], row] + }.groupTuple() .map{ meta, rows -> size = rows.size() return [rows, size] @@ -517,8 +523,6 @@ def extract_csv(csv_file) { .map{ row, numLanes -> //from here do the usual thing for csv parsing def meta = [:] - meta.numLanes = numLanes.toInteger() - //TODO since it is mandatory: error/warning if not present? // Meta data to identify samplesheet // Both patient and sample are mandatory @@ -536,30 +540,44 @@ def extract_csv(csv_file) { if (row.status) meta.status = row.status.toInteger() else meta.status = 0 - if (row.lane && row.fastq2) { // mapping with fastq + if (row.lane && row.fastq2) { meta.id = "${row.sample}-${row.lane}".toString() def fastq1 = file(row.fastq1, checkIfExists: true) def fastq2 = file(row.fastq2, checkIfExists: true) def CN = params.sequencing_center ? "CN:${params.sequencing_center}\\t" : '' def read_group = "\"@RG\\tID:${row.lane}\\t${CN}PU:${row.lane}\\tSM:${row.sample}\\tLB:${row.sample}\\tPL:ILLUMINA\"" + meta.numLanes = numLanes.toInteger() meta.read_group = read_group.toString() return [meta, [fastq1, fastq2]] - } else if (row.table) { // recalibration + } else if (row.table && row.cram) { meta.id = meta.sample def cram = file(row.cram, checkIfExists: true) def crai = file(row.crai, checkIfExists: true) def table = file(row.table, checkIfExists: true) return [meta, cram, crai, table] - } else if (row.cram) { + // recalibration when skipping MarkDuplicates + } else if (row.table && row.bam) { + meta.id = meta.sample + def bam = file(row.bam, checkIfExists: true) + def bai = file(row.bai, checkIfExists: true) + def table = file(row.table, checkIfExists: true) + return [meta, bam, bai, table] // prepare_recalibration or variant_calling + } else if (row.cram) { meta.id = meta.sample def cram = file(row.cram, checkIfExists: true) def crai = file(row.crai, checkIfExists: true) return [meta, cram, crai] - } else if (row.vcf) { + // prepare_recalibration when skipping MarkDuplicates + } else if (row.bam) { + meta.id = meta.sample + def bam = file(row.bam, checkIfExists: true) + def bai = file(row.bai, checkIfExists: true) + return [meta, bam, bai] // annotation + } else if (row.vcf) { meta.id = meta.sample def vcf = file(row.vcf, checkIfExists: true) return [meta, vcf]