Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7 from moka-guys/v1.1
Browse files Browse the repository at this point in the history
one upload command per folder. no default ignore statement. doesn't w…
  • Loading branch information
andyb3 authored Jan 18, 2019
2 parents 90c8536 + 5e9003b commit 1c84e8a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 47 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Scripts to manage data on the NGS workstation

---

## backup_runfolder.py v1.0
## backup_runfolder.py v1.1
Uploads an Illumina runfolder to DNANexus.

### Quickstart
Expand All @@ -18,9 +18,9 @@ This tool requires the DNAnexus utilities `ua` (upload agent) and `dx` (DNAnexus
### How does this tool work?
* The script parses the input parameters, asserting that the given runfolder exists.
* If the `-p` option is given, the script attempts to find a matching DNAnexus project. Otherwise, it looks for a single project matching the runfolder name. If more or less than 1 project matches, the script logs an error and exits.
* From a list of files in the runfolder, files matching any of the comma-separated strings passed to `--ignore` are removed. The default is '/L00', which ignores BCL files in directories with this prefix. To upload all files in a runfolder, pass the argument `--ignore ""`.
* Finally, each remaining file is passed to the DNAnexus `ua` utility. This will attempt an upload if the files are not found in the expected location in the DNAnexus project. The number of upload tries is set to 100 with the `--tries` flag.
* Logs from this and the script are written to STDERR and a logfile, named after the runfolder. A destination for this file can be passed to the `--logpath` flag.
* The runfolder is traversed and a list of files in each folder is obtained. If any comma-separated strings passed to the `--ignore` argument are present within the filepath, or filename the file is excluded.
* Finally, the list of files in each folder is passed to the DNAnexus `ua` utility. This will attempt to upload all files in a folder in a single command. The number of upload tries is set to 100 with the `--tries` flag.
* Logs from this and the script are written to a logfile, named after the runfolder. A destination for this file can be passed to the `--logpath` flag.

---

Expand Down
160 changes: 117 additions & 43 deletions backup_runfolder.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sys
import subprocess
from distutils.spawn import find_executable
import math

import logging
from logging.config import dictConfig
Expand Down Expand Up @@ -55,7 +56,7 @@ def log_setup(args):
'stream_handler': {'class': 'logging.StreamHandler', 'formatter': 'log_formatter', 'level': logging.DEBUG},
'file_handler': {'class': 'logging.FileHandler', 'formatter': 'log_formatter', 'level': logging.DEBUG,
'filename': os.path.join(logpath, logfile_name)}},
root={'handlers': ['file_handler', 'stream_handler'], 'level': logging.DEBUG}
root={'handlers': ['file_handler'], 'level': logging.DEBUG}
)

# Read the logging config and initaite root logger for the script.
Expand All @@ -80,7 +81,7 @@ def cli_arguments(args):
# Os.path.expanduser allows expands tilde signs (~) to a string containing the user home directory.
parser.add_argument('-i', '--runfolder', required=True, help='An Illumina runfolder directory', type=os.path.expanduser)
parser.add_argument('-a', '--auth-token', help='A DNAnexus authorisation key for the upload agent', default='~/.dnanexus_auth_token', type=os.path.expanduser)
parser.add_argument('--ignore', default="/L00", help="Comma-separated string. Regular expressions for files to ignore.")
parser.add_argument('--ignore', default=None, help="Comma-separated string. Regular expressions for files to ignore.")
# Note: When no project is given to the -p argument below, this script searches for a project in DNAnexus. See UAcaller.find_nexus_project() for details.
parser.add_argument('-p', '--project', default=None, help='The name of an existing DNAnexus project for the given runfolder')
parser.add_argument('--logpath', help='Logfile output directory', type=os.path.expanduser)
Expand Down Expand Up @@ -187,40 +188,80 @@ def find_nexus_project(self, project):
'Repeat script by giving explicit project to -p/--project flag', len(project_matches), pattern)
raise ValueError('Invalid DNAnexus project name. 0 or >1 matching projects found.')

def get_nexus_filepath(self, input_file):
"""Get the DNAneuxs project directory path from a local runfolder input file path. This is required
to replicate the local directory structure for the file being uploaded in the DNAnexus project.
The output of this function is passed to the upload agent --folder flag.
def get_nexus_filepath(self, folder_path):
"""
To recreate the directory structure in DNA Nexus need to take relative path of each the subfolder.
This subfolder path is prefixed with the top level folder in DNA Nexus(the project name without the first four characters (002_)).
Returns a tuple (DNAnexus upload folder path, full DNAnexus file path)
DNAnexus upload folder path is used in the upload agent's '--folder' argument.
Args:
input_file - The full path of a local runfolder file to be uploaded to DNAnexus.
folder_path - The path of a local folder containing files to be uploaded to DNAnexus.
Returns:
A tuple: (DNAnexus upload folder path, full DNAneuxs file path)
Example:
self.get_nexus_filepath('/media/data1/share/runfolder/RTALogs/logfile.tsv')
>>> (runfolder/RTALogs, PROJECT:/runfolder/RTALogs/logfile.tsv)
self.get_nexus_filepath('/media/data1/share/runfolder/RTALogs/')
>>> (runfolder/RTALogs, PROJECT:/runfolder/RTALogs/)
"""
# Clean the runfolder name (and prefixes) from the input file path. Features of the regular expression below:
# Clean the runfolder name and parent folders from the input file path. Features of the regular expression below:
# {} - Replaced with the runfolder name by call to str.format(self.runfolder)
# [\/] - Looks a forward or backward slash in this position, accounting for linux or windows filesystems
# (.*)$ - Capture all characters to the end of the line.
# Parentheses in regular expressions capture a group, the first of which can be returned from re.search().group(1)
clean_runfolder_path = re.search(r'{}[\/](.*)$'.format(self.runfolder), input_file).group(1)
# Prepend the nexus folder path. This is the project name without the first four characters (002).
nexus_path_full = os.path.join(self.project[4:], clean_runfolder_path)
# Remove the filename extension
nexus_folder = os.path.dirname(nexus_path_full)
# if we are uploading files in the root of a runfolder need to skip this step
if folder_path == self.runfolder:
clean_runfolder_path = ""
else:
clean_runfolder_path = re.search(r'{}[\/](.*)$'.format(self.runfolder), folder_path).group(1)

# Prepend the nexus folder path to cleaned path. the nexus folder path is the project name without the first four characters (002_).
nexus_path = "'/" + os.path.join(self.project[4:],clean_runfolder_path) + "'"

# Return the nexus folder and full project filepath
return nexus_folder, "{}:/{}".format(self.project, nexus_path_full)
return nexus_path, "{}:{}".format(self.project, nexus_path)

def call_upload_agent(self):
"""Call the DNAnexus upload agent using the class attributes."""
# List the full paths of files in the runfolder
files = [os.path.join(root, file) for (root, dir, files) in os.walk(self.runfolder) for file in files]
self.logger.info('Listing files in input runfolder: %s', files)
# Filter out files based on regular expressions if self.ignore is set
self.logger.info('Filtering files that match the regex "%s"...', self.ignore)
if self.ignore:
upload_files = list(filter(
"""
Loop through the runfolder and build the upload agent command.
It is quicker to upload files in paralell so all files in a folder are added to a list and a single command issued per folder
"""
# create a dictionary to hold the directories as a key, and the list of files as the value
file_dict = {}
# walk through run folder
for root, subfolders, files in os.walk(self.runfolder):
# for any subfolders
for folder in subfolders:
# build path to the folder
path = os.path.join(root, folder)
# for some reason if a folder is empty the files list contains the list of files from the previous dirctory
# therefore create a list of all the files in this subfolder
file_list = [file for file in os.listdir(path) if os.path.isfile(os.path.join(path, file))]
upload_files = []
# if any ignore terms given
if self.ignore:
# check if any of the ignore terms are present in the folder path
if not any([re.search(pattern, path) for pattern in self.ignore.split(",")]):
upload_files = list(filter(
lambda file:
# For each filename in files, search it against every regular expression passed to the script.
# These are comma-separated in the string variable self.ignore.
# The list of True/False returned by the searches is passed to any(),
# which evaulates True if the filename matches any regular expression. Then, 'not' is
# used to set this as False, telling filter() to exclude the file from the list (files)
not any(
[re.search(pattern, file) for pattern in self.ignore.split(",")]
),
file_list
))
# if there are any files in that folder to upload add to dictionary
if upload_files:
file_dict[path] = upload_files
# if nothing given to ignore add all files to the dictionary
else:
file_dict[path] = file_list
# also need to loop through any files present in the root
# as above, check to see if any files contain any of the ignore terms
if self.ignore:
upload_files = list(filter(
lambda file:
# For each filename in files, search it against every regular expression passed to the script.
# These are comma-separated in the string variable self.ignore.
Expand All @@ -232,27 +273,60 @@ def call_upload_agent(self):
),
files
))
else:
upload_files = files
self.logger.info('Files for upload: %s', upload_files)
# if any files to upload add to dictionary
if upload_files:
file_dict[root] = upload_files
# if no ignore terms given add all files to dictionary
else:
file_dict[root] = files

# report the folders and files to be uploaded
self.logger.info('Files for upload: %s', file_dict)

# call upload agent on each folder
for path in file_dict:
if file_dict[path]:
# create the nexus path for each dir
nexus_path, project_filepath = self.get_nexus_filepath(path)
self.logger.info('Calling upload agent on %s to location %s', path, project_filepath)
# upload agent has a max number of uploads of 1000 per command
# count number of files in list and divide by 1000.0 eg 20/1000.0 = 0.02. ceil rounds up to the nearest integer (0.02->1). If there are 1000, ceil(1000/1000.0)=1.0
iterations_needed = math.ceil(len(file_dict[path]) / 1000.0)
# set the iterations count to 1
iteration_count = 1
# will pass a slice of the file list to the upload agent so set variables for start and stop so it uploads files 0-999
start = 0
stop = 1000
# while we haven't finished the iterations
while iteration_count <= iterations_needed:
# if it's the last iteration, set stop == length of list so not to ask for elements that aren't in the list (if 4 items in list len(list)=4 and slice of 0:4 won't miss the last element)
if iteration_count == iterations_needed:
stop = len(file_dict[path])
self.logger.info('uploading files %d to %d', start, stop)
# the upload agent command can take multiple files seperated by a space. the full file path is required for each file
files_string = ""
# take a slice of list using from and to
for file in file_dict[path][start:stop]:
files_string = files_string + " '" + os.path.join(path, file) + "'"

# increase the iteration_count and start and stop by 1000 for the next iteration so second iteration will do files 1000-1999
iteration_count += 1
start += 1000
stop += 1000

# Iterate over filtered files in runfolder
for input_file in upload_files:
nexus_folder, project_filepath = self.get_nexus_filepath(input_file)
self.logger.info('Calling upload agent on %s to location %s:%s', input_file, self.project, project_filepath)
# Create DNAnexus upload command
nexus_upload_command = ('ua --auth-token {auth_token} --project {nexus_project} --folder {nexus_folder} --do-not-compress --upload-threads 10 --tries 100 {files}'.format(
auth_token=self.auth_token, nexus_project=self.project, nexus_folder=nexus_path, files=files_string))

# Create DNAnexus upload command
nexus_upload_command = ('ua --auth-token {auth_token} --project {nexus_project} --folder /{nexus_folder} --do-not-compress --upload-threads 10 --tries 100 -v "{file}"'.format(
auth_token=self.auth_token, nexus_project=self.project, nexus_folder=nexus_folder, file=input_file))
# Mask the autentication key in the upload command and log
masked_nexus_upload_command = nexus_upload_command.replace(self.auth_token, "")
self.logger.info(masked_nexus_upload_command)
# Call upload command redirecting stderr to stdout
proc = subprocess.Popen([nexus_upload_command], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
# Capture output streams (err is redirected to out above)
(out, err) = proc.communicate()
# Write output stream to logfile and terminal
self.logger.debug(out.decode())
# Mask the autentication key in the upload command and log
masked_nexus_upload_command = nexus_upload_command.replace(self.auth_token, "")
self.logger.info(masked_nexus_upload_command)
# Call upload command redirecting stderr to stdout
proc = subprocess.Popen([nexus_upload_command], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
# Capture output streams (err is redirected to out above)
(out, err) = proc.communicate()
# Write output stream to logfile and terminal
self.logger.debug(out.decode())


def main(args):
Expand Down

0 comments on commit 1c84e8a

Please sign in to comment.