Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import iter_subproc() #23

Merged
merged 16 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/ruff.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1
- name: Setup Python
uses: actions/setup-python@v5
with:
src: './datasalad'
# run on a "fresh" python
python-version: 3.12
- name: Checkout
uses: actions/checkout@v4
- name: Install hatch
run: python -m pip install hatch
- name: Check code
run: hatch fmt --check
21 changes: 21 additions & 0 deletions datasalad/iterable_subprocess/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2021 Department for International Trade

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
81 changes: 81 additions & 0 deletions datasalad/iterable_subprocess/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# iterable-subprocess

[![PyPI package](https://img.shields.io/pypi/v/iterable-subprocess?label=PyPI%20package&color=%234c1)](https://pypi.org/project/iterable-subprocess/) [![Test suite](https://img.shields.io/github/actions/workflow/status/uktrade/iterable-subprocess/test.yml?label=Test%20suite)](https://github.com/uktrade/iterable-subprocess/actions/workflows/test.yml) [![Code coverage](https://img.shields.io/codecov/c/github/uktrade/iterable-subprocess?label=Code%20coverage)](https://app.codecov.io/gh/uktrade/iterable-subprocess)

Python context manager to communicate with a subprocess using iterables. This offers a higher level interface to subprocesses than Python's built-in subprocess module, and is particularly helpful when data won't fit in memory and has to be streamed.

This also allows an external subprocess to be naturally placed in a chain of iterables as part of a data processing pipeline.


## Installation

```bash
pip install iterable-subprocess
```


## Usage

A single context manager `iterable_subprocess` is exposed. The first parameter is the `args` argument passed to the [Popen Constructor](https://docs.python.org/3/library/subprocess.html#popen-constructor), and the second is an iterable whose items must be `bytes` instances and are sent to the subprocess's standard input.

Returned from the function is an iterable whose items are `bytes` instances of the process's standard output.

```python
from iterable_subprocess import iterable_subprocess

# In a real case could be a generator function that reads from the filesystem or the network
iterable_of_bytes = (
b'first\n',
b'second\n',
b'third\n',
)

with iterable_subprocess(['cat'], iterable_of_bytes) as output:
for chunk in output:
print(chunk)
```


## Exceptions

Python's `subprocess.Popen` is used to start the process, and any exceptions it raises are propagated without transformation. For example, if the subprocess can't be found, then a `FileNotFoundError` is raised.

If the process starts, but exits with a non-zero return code, then an `iterable_subprocess.IterableSubprocessError` exception will be raised with two members:

- `returncode` - the return code of the process
- `stderr` - the final 65536 bytes of the standard error of the process

However, if the process starts, but an exception is raised from inside the context or from the source iterable, then this exception is propagated, even if the process subsequently exits with a non-zero return code.


## Example: unzip the first file of a ZIP archive while downloading

It's possible to download the bytes of a ZIP file in Python, and unzip by passing the bytes to `funzip`, as in the following example.

```python
import httpx
from iterable_subprocess import iterable_subprocess

with \
httpx.stream('GET', 'https://www.example.com/my.zip') as r, \
iterable_subprocess(['funzip'], r.iter_bytes()) as unzipped_chunks:

for chunk in unzipped_chunks:
print(chunk)
```

Note that it's also possible to stream unzip files without resorting to another process using [stream-unzip](https://github.com/uktrade/stream-unzip).


## Example: download file using curl and process in Python

You would usually download directly from Python, but as an example, you can download using the curl executable and process its output in Python.

```python
from iterable_subprocess import iterable_subprocess

url = 'https://data.api.trade.gov.uk/v1/datasets/uk-tariff-2021-01-01/versions/v3.0.212/tables/measures-on-declarable-commodities/data?format=csv'
with iterable_subprocess(['curl', '--no-progress-meter', '--fail-with-body', url], ()) as output:
for chunk in output:
print(chunk)
```
26 changes: 26 additions & 0 deletions datasalad/iterable_subprocess/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Context manager to communicate with a subprocess using iterables

This module a higher level interface to subprocesses than Python's built-in
subprocess module, and is particularly helpful when data won't fit in memory
and has to be streamed.

This also allows an external subprocess to be naturally placed in a chain of
iterables as part of a data processing pipeline.

This code has been taken from https://pypi.org/project/iterable-subprocess/
and was subsequently adjusted for cross-platform compatibility and performance,
and for using datasalad's ``CommandError`` exception.

The original code was made available under the terms of the MIT License,
and was written by Michal Charemza.

.. currentmodule:: datasalad.iterable_subprocess
.. autosummary::
:toctree: generated

iterable_subprocess
"""

__all__ = ['iterable_subprocess']

from .iterable_subprocess import iterable_subprocess
242 changes: 242 additions & 0 deletions datasalad/iterable_subprocess/iterable_subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
from __future__ import annotations

from collections import deque
from collections.abc import Generator
from contextlib import contextmanager
from subprocess import PIPE, Popen
from threading import Thread
from typing import (
TYPE_CHECKING,
Iterable,
)

if TYPE_CHECKING:
from os import PathLike

from datasalad.runners import CommandError

# Errno22 indicates an IO failure with a
# file descriptor
ERRCODE_IO_FAILURE = 22


class OutputFrom(Generator):
def __init__(self, stdout, stderr_deque, chunk_size=65536):
self.stdout = stdout
self.stderr_deque = stderr_deque
self.chunk_size = chunk_size
self.returncode = None

def send(self, _):
chunk = self.stdout.read(self.chunk_size)
if not chunk:
raise StopIteration
return chunk

def throw(self, typ, value=None, traceback=None):
return super().throw(typ, value, traceback)


@contextmanager
def iterable_subprocess(
program: list[str],
input_chunks: Iterable[bytes],
chunk_size: int = 65536,
cwd: PathLike | str | None = None,
bufsize: int = -1,
):
"""Subprocess execution context manager with iterable IO

This context starts a thread that populates the subprocess's standard
input. It also starts a threads that reads the process's standard error.
Otherwise we risk a deadlock - there is no output because the process is
waiting for more input.

This itself introduces its own complications and risks, but hopefully
mitigated by having a well defined start and stop mechanism that also avoid
sending data to the process if it's not running

To start, i.e. on entry to the context from client code

- The process is started
- The thread to read from standard error is started
- The thread to populate input is started

When running:

- The standard input thread iterates over the input, passing chunks to the
process
- While the standard error thread fetches the error output
- And while this thread iterates over the processe's output from client
code in the context

To stop, i.e. on exit of the context from client code

- This thread closes the process's standard output
- Wait for the standard input thread to exit
- Wait for the standard error thread to exit
- Wait for the process to exit

By using context managers internally, this also gives quite strong
guarantees that the above order is enforced to make sure the thread doesn't
send data to the process whose standard input is closed and so we don't get
``BrokenPipe`` errors

Writing to the process can result in a ``BrokenPipeError``. If this then
results in a non-zero code from the process, the process's standard error
probably has useful information on the cause of this. However, the non-zero
error code happens after ``BrokenPipeError``, so propagating "what happens
first" isn't helpful in this case. So, we re-raise ``BrokenPipeError`` as
``_BrokenPipeError`` so we can catch it after the process ends to then
allow us to branch on its error code:

- if it's non-zero raise a ``CommandError`` containing its standard error
- if it's zero, re-raise the original ``BrokenPipeError``

>>> # regular execution, no input iterable
>>> with iterable_subprocess(['printf', 'test'], []) as proc:
... for chunk in proc:
... print(chunk)
b'test'
>>> # feed subprocess stdin from an iterable
>>> with iterable_subprocess(['cat'], [b'test']) as proc:
... for chunk in proc:
... print(chunk)
b'test'
"""

class _BrokenPipeError(Exception):
pass

@contextmanager
def thread(target, *args):
exception = None

def wrapper():
nonlocal exception
try:
target(*args)
except BaseException as e: # noqa: BLE001
# ok to catch any exception here, we are reporting them
exception = e

t = Thread(target=wrapper)

def start():
t.start()

def join():
if t.ident:
t.join()
return exception

yield start, join

def input_to(stdin):
try:
for chunk in input_chunks:
try:
stdin.write(chunk)
except BrokenPipeError:
raise _BrokenPipeError # noqa B904
except OSError as e:
if e.errno != ERRCODE_IO_FAILURE:
# maybe process is dead already
raise _BrokenPipeError # noqa B904
# no idea what this could be, let it bubble up
raise
finally:
try:
stdin.close()
except BrokenPipeError:
raise _BrokenPipeError # noqa B904
except OSError as e:
# silently ignore Errno22, which happens on
# windows when trying to interacted with file descriptors
# associated with a process that exited already
if e.errno != ERRCODE_IO_FAILURE:
raise

def keep_only_most_recent(stderr, stderr_deque):
total_length = 0
while True:
chunk = stderr.read(chunk_size)
total_length += len(chunk)
if not chunk:
break
stderr_deque.append(chunk)
if total_length - len(stderr_deque[0]) >= chunk_size:
total_length -= len(stderr_deque[0])
stderr_deque.popleft()

def raise_if_not_none(exception):
if exception is not None:
raise exception from None

proc: Popen | None = None
stderr_deque: deque[bytes] = deque()
chunk_generator = None
exception_stdin = None
exception_stderr = None

try:
with Popen( # nosec - all arguments are controlled by the caller
program,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=cwd,
bufsize=bufsize,
) as proc, thread(
keep_only_most_recent,
proc.stderr,
stderr_deque,
) as (start_t_stderr, join_t_stderr), thread(
input_to,
proc.stdin,
) as (start_t_stdin, join_t_stdin):
try:
start_t_stderr()
start_t_stdin()
chunk_generator = OutputFrom(proc.stdout, stderr_deque, chunk_size)
yield chunk_generator
except BaseException:
proc.terminate()
raise
finally:
if TYPE_CHECKING:
assert proc is not None
assert proc.stdout is not None
proc.stdout.close()
exception_stdin = join_t_stdin()
exception_stderr = join_t_stderr()

raise_if_not_none(exception_stdin)
raise_if_not_none(exception_stderr)

except _BrokenPipeError as e:
if TYPE_CHECKING:
assert proc is not None
if chunk_generator:
chunk_generator.returncode = proc.returncode
if proc.returncode == 0:
if TYPE_CHECKING:
assert isinstance(e.__context__, BrokenPipeError) # noqa PT017
raise e.__context__ from None
except BaseException:
if TYPE_CHECKING:
assert proc is not None
if chunk_generator:
chunk_generator.returncode = proc.returncode
raise

if TYPE_CHECKING:
assert chunk_generator is not None
chunk_generator.returncode = proc.returncode
if proc.returncode:
raise CommandError(
cmd=program,
returncode=proc.returncode,
stderr=b''.join(stderr_deque)[-chunk_size:],
cwd=cwd,
)
Loading