"""This module defines the AmpliconCoverageDataPreparator class
and associated components for analyzing sequencing data coverage
within specified genomic regions.
Modules and Classes:
- PositionNotFoundError:
Custom exception raised when a specific genomic
position cannot be found in a mpileup file.
- AmpliconCoverageDataPreparator:
A class responsible for generating mpileup files
for given sample regions, calculating coverage metrics,
and analyzing variant coverage at specific positions.
It inherits from LoggerMixin for logging capabilities
and IDataPreparator to adhere to a data preparation interface.
Key functionalities:
- Generating mpileup files for sample regions using samtools.
- Counting coverage over regions based on mpileup data.
- Counting variant coverage at specific genomic positions.
- Performing the entire process of mpileup generation
and coverage calculation.
- Managing configuration and executing system commands
for data processing.
Usage:
Instantiate the class with a configuration object and
a filter function, then call `perform()` with a sample data container,
target regions, and an executor to process coverage analysis
for sequencing samples.
Note:
Ensure that the configuration file contains correct paths
and parameters, especially for 'samtools' and 'bedtools'.
The class also relies on the presence of mpileup files
and the ability to generate them via command-line tools.
"""
# region Imports
import os
import sys
import re
import mmap
from os import PathLike
from typing import Union, AnyStr
from src.configurator import Configurator
from src.core.base import LoggerMixin
from src.core.base import CommandExecutor
from src.core.base import execute
from src.core.base import touch
from src.core.base import get_platform
from src.core.sample_data_container import SampleDataContainer
from src.core.analyzer.i_data_preparator import IDataPreparator
from src.utils.util import depth_filter
# endregion
[docs]
class PositionNotFoundError(Exception):
"""Base exception for handling source file positions management process"""
[docs]
class AmpliconCoverageDataPreparator(LoggerMixin, IDataPreparator):
"""The AmpliconCoverageDataPreparator class is designed to
generate mpileup files for specified regions of a sequenced sample
and calculate coverage metrics within those regions.
It also provides methods to count coverage over regions and analyze
variant coverage at specific positions.
Inherits from LoggerMixin and IDataPreparator,
ensuring logging capabilities and adherence to
data preparation interface.
"""
def __init__(
self,
configurator: Configurator,
filter_func: callable
):
"""Initializes the AmpliconCoverageDataPreparator
with configuration and filter function.
Args:
configurator (Configurator):
The configuration object containing settings and paths.
filter_func (callable):
A function to filter coverage data, e.g.,
calculating average or median.
"""
super().__init__(logger=configurator.logger)
self.configurator = configurator
self.config = self.configurator.parse_configuration(
base_config_filepath=self.configurator.args.configFilepath,
target_section='samtools')
# a file with lines in format
# "chrom_number, region_start, region_end, depth"
self.coords = self.config['coords-file']
self.filter_func = filter_func
self.mpileup_files = {}
self.results = []
[docs]
def generate_mpileup(
self,
sample: SampleDataContainer,
executor: Union[CommandExecutor, callable]
) -> list[PathLike[AnyStr]]:
"""Generates mpileup files for specified regions of the sample.
Args:
sample (SampleDataContainer):
The sample containing sequencing data.
executor (callable):
The command executor or function to run system commands.
Returns:
list of PathLike: Paths to the generated mpileup files.
"""
mp_files = []
sample.bam_filepath = os.path.join(
sample.processing_path,
sample.sid+".sorted.read_groups.recalibrated.bam")
try:
for region, out_name in sample.target_regions:
out_path = os.path.join(
sample.processing_path, f"{sample.sid}.{out_name}")
cmd = ' '.join([
self.configurator.config['samtools'], "mpileup",
sample.bam_filepath,
# skip bases with baseQ/BAQ smaller than value was given
# '--min-BQ', self.config['min-bq'],
'--no-BAQ' if 'no-BAQ' in self.config else '',
'--max-depth', self.config['max-depth'],
'--region', region,
'--reference', self.configurator.config['reference'],
'--count-orphans', # do not discard anomalous read pairs
'--output', out_path])
execute(executor, cmd)
try:
depth_filter(
filepath=out_path,
depth=2,
logger=self.logger
)
mp_files.append(out_path)
except FileNotFoundError:
self.logger.info(
'Skip mpileup performing for "%s"', out_path
)
return mp_files
except TypeError:
return None
[docs]
def count_region_coverage(
self,
mpileup: PathLike[AnyStr],
chromosome: Union[int, str],
start: Union[int, str],
end: Union[int, str]
) -> float:
"""Counts the coverage within a specified region from a mpileup file.
Args:
mpileup (PathLike):
Path to the mpileup file.
chromosome (int or str):
Chromosome identifier.
start (int or str):
Start position of the region.
end (int or str):
End position of the region.
Returns:
float:
The filtered average coverage within the region.
"""
try:
start, end = int(start), int(end)
if start > end:
start, end = end, start
chromosome = str(chromosome).upper()
try:
with open(mpileup, 'r', encoding='utf-8') as fd:
coverages = []
last_position = start - 1
for line in fd:
chrom, position, _, depth = \
line.strip().split('\t')[:4]
position, depth = int(position), int(depth)
chrom = chrom.upper().replace('CHR', '')
if chromosome != chrom or \
start > position or \
'X' in chrom:
continue
if position > end > 0:
break
if position > last_position + 1:
coverages.extend(
[0] * (position - last_position - 1))
coverages.append(depth)
last_position = position
if coverages:
if len(coverages) < end - start + 1:
coverages.extend(
[0] * (end - start + 1 - len(coverages)))
return self.filter_func(coverages)
return 0.0
except FileNotFoundError:
self.logger.warning(
"There is no mpileup-file for chromosome %s", chromosome)
return 0.0
except (SyntaxError, TypeError, OSError, IOError) as e:
self.logger.critical(
"An error '%s' occurred in '%s.%s'. Abort",
e, self.__class__.__name__,
self.perform.__func__.__name__)
raise e
[docs]
@staticmethod
def count_indels(data: str) -> dict[str, int]:
"""Counts the number of insertions and deletions
for two replicates (r1 and r2) based on the input data string.
Args:
data (str):
A string containing insertion and deletion patterns
in the form '+<number><bases>' or '-<number><bases>',
where <bases> is a sequence of [ACTGNactgn] characters.
Returns:
dict[str, int]: \\
key:
An indel signature. \\
value:
Count of the key in pileup line
"""
matches = re.findall(
r'([+-])(\d+)([ACTGNactgn]+)', data)
cov_dict = {}
if matches:
for sign, number, bases in matches:
var_match = str(sign)+str(number)+str(bases[:int(number)])
if var_match:
if var_match in cov_dict:
cov_dict[var_match] += 1
else:
cov_dict[var_match] = 1
return cov_dict
[docs]
@staticmethod
def count_target_char(
src: AnyStr,
target_char: AnyStr = '*'
) -> int:
pattern = re.compile(rf'([+-]\d+[actgnACTGN]*)|({target_char})')
count = 0
for match in pattern.finditer(src):
if match.group(2):
count += 1
return count
[docs]
def count_variant_coverage(
self,
chromosome: Union[int, str],
position: Union[int, str],
ref: str,
alt: str
) -> tuple[int, int, float]:
"""Calculates coverage information and variant counts
at a specific genomic position from mpileup data.
Args:
chromosome (int or str):
The chromosome identifier (number or string).
position (int or str):
The genomic position to analyze.
ref (str):
The reference allele at the position.
alt (str):
The alternative allele at the position.
Returns:
tuple: \\
depth (int):
The total read depth at the position. \\
total_alt_count (int):
The total count of reads supporting
the alternative allele, including indels. \\
alt_ratio (float):
The ratio of reads supporting the alternative allele
to total depth, rounded to 3 decimal places.
Note:
- This method searches for the specified position
in a chromosome-specific mpileup file.
- It uses memory-mapped file access for efficiency.
- It counts reference matches ('.' and ',')
and mismatches (based on alt allele).
- It also calls `count_indels()` to count insertions
and deletions supporting the variant.
- Returns (-1, -1, -1) if the position is not found
or an error occurs.
- Raises FileNotFoundError if the mpileup file
for the chromosome does not exist.
"""
self.logger.debug(
"Starting to determine (%s:%s>%s, %s) variant coverage",
chromosome, ref, alt, position)
chromosome = str(chromosome) # .replace('chr', '').strip()
position = str(position).strip()
chromosome = '0'+chromosome if chromosome in [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] else chromosome
if chromosome in self.mpileup_files:
self.logger.debug(
"Chromosome %s found on %s",
chromosome,
os.path.abspath(self.mpileup_files[chromosome]))
try:
with open(
self.mpileup_files[chromosome],
mode='r',
encoding='utf-8'
) as fd:
self.logger.debug(
"File '%s' opened with 'r' flag",
os.path.abspath(self.mpileup_files[chromosome]))
with mmap.mmap(
fd.fileno(), 0, access=mmap.ACCESS_READ
) as mm:
b_position = mm.find(bytes(
position, encoding='utf-8'))
if b_position != 0:
line_number = mm.rfind(b'\n', 0, b_position) + 1
line_number = mm[:line_number].count(b'\n') + 1
self.logger.debug(
"Position '%s' found at line %s",
position, line_number)
# region Rules:
# Forward Reverse Meaning
# . dot , comma Base matches the reference base
# ACGTN acgtn Base is a mismatch to the
# reference base
# > < Reference skip (due to CIGAR “N”)
# * */# Deletion of the
# reference base (CIGAR “D”)
#
# Deleted bases are shown as “*” on both strands
# unless --reverse-del is used,
# in which case they are shown as “#”
# on the reverse strand.
#
# If there is an insertion after this read base,
# text matching “\+[0-9]+[ACGTNacgtn*#]+”:
# a “+” character followed by
# an integer giving the length
# of the insertion and then
# the inserted sequence.
#
# Pads are shown as “*” # unless --reverse-del
# is used, in which case pads
# on the reverse strand will be shown as “#”.
#
# If there is a deletion after this read base,
# text matching “-[0-9]+[ACGTNacgtn]+”:
# a “-” character followed by the deleted
# reference bases represented similarly.
# (Subsequent pileup lines will contain “*”
# for this read indicating the deleted bases.)
#
# If this is the last position covered by the read,
# a “$” character.
# endregion
depth, pileup_data = fd.readlines()[line_number-1]\
.split('\t')[3:5]
depth = int(depth)
# r1_ref_count = pileup_data.count('.')
# r2_ref_count = pileup_data.count(',')
r1_alt_count = AmpliconCoverageDataPreparator\
.count_target_char(
src=pileup_data, target_char=alt.upper())
r2_alt_count = AmpliconCoverageDataPreparator\
.count_target_char(
src=pileup_data, target_char=alt.lower())
indels_dict = self.count_indels(pileup_data)
r1_ins_count, r1_del_count = 0, 0
r2_ins_count, r2_del_count = 0, 0
if indels_dict:
number_regex = re.compile(r"([+-])(\d+)")
for key, value in indels_dict.items():
sign, number = number_regex.search(key)\
.groups()
bases = key[-int(number):]
if sign == '-':
if bases.isupper():
r1_del_count += value
elif bases.islower():
r2_del_count += value
elif sign == '+':
if bases.isupper():
r1_ins_count += value
elif bases.islower():
r2_ins_count += value
total_alt_count = (
r1_alt_count + r2_alt_count +
r1_ins_count + r1_del_count +
r2_ins_count + r2_del_count)
return (
depth,
total_alt_count,
round(total_alt_count/depth, 3))
self.logger.warning(
"Can't find position '%s' in mpileup data '%s'",
position, os.path.basename(
self.mpileup_files[chromosome]))
return -1, -1, -1
except (FileNotFoundError, ValueError):
self.logger.critical(
"File '%s' not found or it is empty",
self.mpileup_files[chromosome])
return -1, -1, -1
else:
msg = "The mpileup file for " \
f"chromosome {chromosome} doesn't exist."
self.logger.critical(
"%s You have to execute '%s.%s' at first",
msg,
self.__class__.__name__,
self.perform.__func__.__name__)
raise FileNotFoundError(msg)