Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dorado Alignments #359

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/toml.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,40 @@ This is added using:

### Aligner

#### Built-in dorado/guppy alignment

It is possible to leverage the built in dorado/guppy alignment capabilities for readfish.
This does limit some flexibility in readfish, such as updating the reference.

In order to do this, specify two extra keys in caller settings: `align_ref` and `server_file_load_timeout`.

`align_ref` should be the absolute path to the reference file for alignment. This can be either an `.mmi` or a `FASTA` file.

`server_file_load_timeout` is the number of seconds to wait before the server throws a timeout error whilst loading the reference. The time required scales with the size of the reference, so a large value would make sense, such as 120.

An example TOML for using built in alignment would be:
```toml
[caller_settings.dorado]
# ^^^^^^
# Chooses the dorado plugin
config = "dna_r10.4.1_e8.2_400bps_5khz_fast_prom"
address = "ipc:///tmp/.guppy/5555"
debug_log = "live_reads.fq"
align_ref = "/data/refs/hg38.p14.simple.mmi"
server_file_load_timeout = 180
```

The aligner section of the TOML is required, and so this should be set to, which means the basecaller alignments are used.
```toml
[mapper_settings.no_op]
# ^^^^^^
# Chooses the mappy plugin
```

**One note** is that there is currently no easy way to change the number of basecaller alignment threads using readfish, and so when the basecaller server instance is started, the number of alignment_threads should be set to a sensible number using the `--num_alignment_threads`



#### minimap2

Currently we provide two mapping plugins, [`mappy`], and [`mappy-rs`].
Expand Down
50 changes: 49 additions & 1 deletion src/readfish/_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""utils.py
functions and utilities used internally.
"""

from __future__ import annotations
import sys
import logging
from collections import Counter
from functools import reduce
from operator import itemgetter
from operator import itemgetter, getitem
from enum import IntEnum
import re
import base64
Expand Down Expand Up @@ -80,6 +81,53 @@ def format_bases(num: int, factor: int = 1000, suffix: str = "B") -> str:
return f"{num:3.2f} Y{suffix}"


def get_item(obj: Sequence | Mapping, key: str | int, default: str = "*") -> Any:
"""
Retrieve an item from a sequence or mapping, returning a default value if the item is not found.

Args:
:param obj: The sequence or mapping from which to retrieve the item.
:param key: The index or key of the item to retrieve.
:param default: The value to return if the item is not found. Defaults to "*".

Returns:
Any: The item from the sequence or mapping, or the default value if the item is not found.

Examples:
>>> get_item([1, 2, 3], 1)
2
>>> get_item([1, 2, 3], 5)
'*'
>>> get_item({'a': 1, 'b': 2}, 'b')
2
>>> get_item({'a': 1, 'b': 2}, 'c')
'*'
>>> get_item((1, 2, 3), 0)
1
>>> get_item((1, 2, 3), -1)
3
>>> get_item((1, 2, 3), 5)
'*'
>>> get_item((1, 2, 3), 'invalid')
'*'
>>> get_item(np.array([1, 2, 3]), 1)
2
>>> get_item(np.array([1, 2, 3]), 5)
'*'
>>> structured_array = np.array([(1, 'Alice'), (2, 'Bob')], dtype=[('id', 'i4'), ('name', 'U10')])
>>> get_item(structured_array, 1)
(2, 'Bob')
>>> get_item(structured_array, 'name')
array(['Alice', 'Bob'], dtype='<U10')
>>> get_item(structured_array, 'invalid')
'*'
"""
try:
return getitem(obj, key)
except (IndexError, KeyError, TypeError, ValueError):
return default


def nested_get(obj: Mapping, key: Any, default: Any = None, *, delim: str = ".") -> Any:
"""Get a value from a nested structure

Expand Down
30 changes: 30 additions & 0 deletions src/readfish/plugins/_mappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,36 @@

UNMAPPED_PAF = "0\t0\t*\t*\t0\t0\t0\t0\t0\t0"

FIELDS = [
"query_name",
"query_length",
"query_start",
"query_end",
"strand",
"ctg",
"target_length",
"r_st",
"r_en",
"residue_matches",
"alignment_block_length",
"mapping_quality",
"tags",
]


class _PAF:
"""Base PAF methods, can't guarantee field names here so use indices"""

def __str__(self):
"""Formats a record as a PAF line for writing to a file"""
return "{}\t{}".format("\t".join(map(str, self[:-1])), self._fmt_tags())

def blast_identity(self):
"""BLAST identity, see:
https://lh3.github.io/2018/11/25/on-the-definition-of-sequence-identity
"""
return self[9] / self[10]


class Aligners(Enum):
C_MAPPY = "mappy"
Expand Down
52 changes: 50 additions & 2 deletions src/readfish/plugins/dorado.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,28 @@
from readfish._loggers import setup_logger
from readfish.plugins.abc import CallerABC
from readfish.plugins.utils import Result
from readfish._utils import nice_join

from readfish._utils import nice_join, get_item, nested_get
from readfish.plugins._mappy import FIELDS, _PAF

_PAF_nt = namedtuple("PAF", FIELDS)
PAF = type("PAF", (_PAF, _PAF_nt), dict())

# The keys for fields which appear in a PAF Formatted record, taken from
# the nanopore alignment_data array. Second value are defaults if the field
# is somehow not present although it should always be if an alignment exists.
PAF_KEYS = [
("strand_start", 0),
("strand_end", 0),
("direction", "*"),
("genome", "*"),
("genome_length", 0),
("genome_start", 0),
("genome_end", 0),
("num_aligned", 0),
("alignment_block_length", "0"),
("mapping_quality", 0),
("tags", {}),
]

if TYPE_CHECKING:
import minknow_api
Expand Down Expand Up @@ -81,6 +101,10 @@ def __init__(

# Set our own priority
self.dorado_params = kwargs
self.dorado_params["reconnect_timeout"] = 10
self.dorado_params["server_file_load_timeout"] = self.dorado_params.get(
"server_file_load_timeout", 10
)
self.dorado_params["priority"] = PyBasecallClient.high_priority
# Set our own client name to appear in the dorado server logs
self.dorado_params["client_name"] = "Readfish_connection"
Expand Down Expand Up @@ -291,13 +315,37 @@ def basecall(
)
barcode = res["metadata"].get("barcode_arrangement", None)
# TODO: Add Filter here
als = []
# Call to list converts np array of no array to list of np arrays,
# So we can assign the result of the .get via walrus if it's true (has alignments)
# Cause "the truth value of an array is ambiguous" blah blah blah
if alignments := list(res["datasets"].get("align_results", [])):
for al in filter(
lambda x: not x["secondary_alignment"],
alignments,
):
query_sequence_length = nested_get(
res, "metadata.sequence_length", 0
)
x = [read_id, query_sequence_length] + [
(
get_item(al, key, default).decode("utf-8")
if key in {"genome", "direction"}
else get_item(al, key, default)
)
for key, default in PAF_KEYS
]
# If it's a *, it's unaligned
if not x[4] == "*":
als.append(PAF(*x))
yield Result(
channel=channel,
read_number=read_number,
read_id=res["metadata"]["read_id"],
seq=res["datasets"]["sequence"],
barcode=barcode if barcode else None,
basecall_data=res,
alignment_data=als if als else None,
)
reads_received += 1

Expand Down
Loading