Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PatchWork GenerateREADME #1191

Closed
21 changes: 21 additions & 0 deletions patchwork/common/utils/input_parsing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from collections.abc import Iterable, Mapping

from typing_extensions import AnyStr, Union
Expand Down Expand Up @@ -69,3 +70,23 @@ def parse_to_list(
continue
rv.append(stripped_value)
return rv


def parse_to_dict(possible_dict, limit=-1):
if possible_dict is None and limit == 0:
return None

if isinstance(possible_dict, dict):
new_dict = dict()
for k, v in possible_dict.items():
new_dict[k] = parse_to_dict(v, limit - 1)
return new_dict
elif isinstance(possible_dict, str):
try:
new_dict = json.loads(possible_dict, strict=False)
except json.JSONDecodeError:
return possible_dict

return parse_to_dict(new_dict, limit - 1)
else:
return possible_dict
18 changes: 18 additions & 0 deletions patchwork/common/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import atexit
import dataclasses
import random
import signal
import string
import tempfile
from collections.abc import Mapping
from pathlib import Path

import chevron
import tiktoken
from chardet.universaldetector import UniversalDetector
from git import Head, Repo
Expand All @@ -19,6 +23,20 @@
_NEWLINES = {"\n", "\r\n", "\r"}


def mustache_render(template: str, data: Mapping) -> str:
if len(data.keys()) < 1:
return template

chevron.render.__globals__["_html_escape"] = lambda x: x
return chevron.render(
template=template,
data=data,
partials_path=None,
partials_ext="".join(random.choices(string.ascii_uppercase + string.digits, k=32)),
partials_dict=dict(),
)


def detect_newline(path: str | Path) -> str | None:
with open(path, "r", newline="") as f:
lines = f.read().splitlines(keepends=True)
Expand Down
46 changes: 30 additions & 16 deletions patchwork/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@

from enum import Enum

from typing_extensions import Any, Dict, List, Optional, Union, is_typeddict
from typing_extensions import (
Any,
Collection,
Dict,
List,
Optional,
Type,
Union,
is_typeddict,
)

from patchwork.logger import logger

Expand Down Expand Up @@ -45,10 +54,9 @@ def __init__(self, inputs: DataPoint):
"""

# check if the inputs have the required keys
if self.__input_class is not None:
missing_keys = self.__input_class.__required_keys__.difference(inputs.keys())
if len(missing_keys) > 0:
raise ValueError(f"Missing required data: {list(missing_keys)}")
missing_keys = self.find_missing_inputs(inputs)
if len(missing_keys) > 0:
raise ValueError(f"Missing required data: {list(missing_keys)}")

# store the inputs
self.inputs = inputs
Expand All @@ -64,19 +72,25 @@ def __init__(self, inputs: DataPoint):
self.original_run = self.run
self.run = self.__managed_run

def __init_subclass__(cls, **kwargs):
input_class = kwargs.get("input_class", None) or getattr(cls, "input_class", None)
output_class = kwargs.get("output_class", None) or getattr(cls, "output_class", None)
def __init_subclass__(cls, input_class: Optional[Type] = None, output_class: Optional[Type] = None, **kwargs):
if cls.__name__ == "PreparePR":
print(1)
input_class = input_class or getattr(cls, "input_class", None)
if input_class is not None and not is_typeddict(input_class):
input_class = None

if input_class is not None and is_typeddict(input_class):
cls.__input_class = input_class
else:
cls.__input_class = None
output_class = output_class or getattr(cls, "output_class", None)
if output_class is not None and not is_typeddict(output_class):
output_class = None

if output_class is not None and is_typeddict(output_class):
cls.__output_class = output_class
else:
cls.__output_class = None
cls._input_class = input_class
cls._output_class = output_class

@classmethod
def find_missing_inputs(cls, inputs: DataPoint) -> Collection:
if getattr(cls, "_input_class", None) is None:
return []
return cls._input_class.__required_keys__.difference(inputs.keys())

def __managed_run(self, *args, **kwargs) -> Any:
self.debug(self.inputs)
Expand Down
57 changes: 57 additions & 0 deletions patchwork/steps/CallSQL/CallSQL.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

from sqlalchemy import URL, create_engine, exc, text

from patchwork.common.utils.input_parsing import parse_to_dict
from patchwork.common.utils.utils import mustache_render
from patchwork.logger import logger
from patchwork.step import Step, StepStatus
from patchwork.steps.CallSQL.typed import CallSQLInputs, CallSQLOutputs


class CallSQL(Step, input_class=CallSQLInputs, output_class=CallSQLOutputs):
def __init__(self, inputs: dict):
super().__init__(inputs)
query_template_data = inputs.get("db_query_template_values", {})
self.query = mustache_render(inputs["db_query"], query_template_data)
self.__build_engine(inputs)

def __build_engine(self, inputs: dict):
dialect = inputs["db_dialect"]
driver = inputs.get("db_driver")
dialect_plus_driver = f"{dialect}+{driver}" if driver is not None else dialect
kwargs = dict(
username=inputs.get("db_username"),
host=inputs.get("db_host", "localhost"),
port=inputs.get("db_port", 5432),
password=inputs.get("db_password"),
database=inputs.get("db_database"),
query=parse_to_dict(inputs.get("db_params")),
)
connection_url = URL.create(
dialect_plus_driver,
**{k: v for k, v in kwargs.items() if v is not None},
)

connect_args = None
if inputs.get("db_driver_args") is not None:
connect_args = parse_to_dict(inputs.get("db_driver_args"))

self.engine = create_engine(connection_url, connect_args=connect_args)
with self.engine.connect() as conn:
conn.execute(text("SELECT 1"))
return self.engine

def run(self) -> dict:
try:
rv = []
with self.engine.begin() as conn:
cursor = conn.execute(text(self.query))
for row in cursor:
result = row._asdict()
rv.append(result)
logger.info(f"Retrieved {len(rv)} rows!")
return dict(results=rv)
except exc.InvalidRequestError as e:
self.set_status(StepStatus.FAILED, f"`{self.query}` failed with message:\n{e}")
return dict(results=[])
Empty file.
24 changes: 24 additions & 0 deletions patchwork/steps/CallSQL/typed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from typing_extensions import Any, TypedDict


class __RequiredCallSQLInputs(TypedDict):
db_dialect: str
db_query: str


class CallSQLInputs(__RequiredCallSQLInputs, total=False):
db_driver: str
db_username: str
db_password: str
db_host: str
db_port: int
db_name: str
db_params: dict[str, Any]
db_driver_args: dict[str, Any]
db_query_template_values: dict[str, Any]


class CallSQLOutputs(TypedDict):
results: list[dict[str, Any]]
58 changes: 58 additions & 0 deletions patchwork/steps/CallShell/CallShell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

import shlex
import subprocess
from pathlib import Path

from patchwork.common.utils.utils import mustache_render
from patchwork.logger import logger
from patchwork.step import Step, StepStatus
from patchwork.steps.CallShell.typed import CallShellInputs, CallShellOutputs


class CallShell(Step, input_class=CallShellInputs, output_class=CallShellOutputs):
def __init__(self, inputs: dict):
super().__init__(inputs)
script_template_values = inputs.get("script_template_values", {})
self.script = mustache_render(inputs["script"], script_template_values)
self.working_dir = inputs.get("working_dir", Path.cwd())
self.env = self.__parse_env_text(inputs.get("env", ""))

@staticmethod
def __parse_env_text(env_text: str) -> dict[str, str]:
env_spliter = shlex.shlex(env_text, posix=True)
env_spliter.whitespace_split = True
env_spliter.whitespace += ";"

env: dict[str, str] = dict()
for env_assign in env_spliter:
env_assign_spliter = shlex.shlex(env_assign, posix=True)
env_assign_spliter.whitespace_split = True
env_assign_spliter.whitespace += "="
env_parts = list(env_assign_spliter)
if len(env_parts) < 1:
continue

env_assign_target = env_parts[0]
if len(env_parts) < 2:
logger.error(f"{env_assign_target} is not assigned anything, skipping...")
continue
if len(env_parts) > 2:
logger.error(f"{env_assign_target} has more than 1 assignment, skipping...")
continue
env[env_assign_target] = env_parts[1]

return env

def run(self) -> dict:
p = subprocess.run(self.script, shell=True, capture_output=True, text=True, cwd=self.working_dir, env=self.env)
try:
p.check_returncode()
except subprocess.CalledProcessError as e:
self.set_status(
StepStatus.FAILED,
f"Script failed.",
)
logger.info(f"stdout: \n{p.stdout}")
logger.info(f"stderr:\n{p.stderr}")
return dict(stdout_output=p.stdout, stderr_output=p.stderr)
71 changes: 71 additions & 0 deletions patchwork/steps/CallShell/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Documentation for CallShell Module

This documentation provides an overview of the `CallShell` module, explaining its purpose, how it can be used, and detailing its input and output parameters.

## Overview

The `CallShell` module is part of the `patchwork` library, designed to execute shell scripts from a Python context. It offers a structured way of defining the inputs necessary for running shell commands, handles output capture, and provides error logging.

This module can be particularly useful in automated pipelines or scenarios where shell interaction within a Python script is needed.

## Inputs

### Required Inputs

- **script** (`str`): The shell script or command to be executed.

### Optional Inputs

- **working_dir** (`str`): Specifies the working directory for executing the shell command. It can be provided as a path.
- **env** (`str`): A semi-colon separated string for setting environment variables in `KEY=VALUE` format.
- **script_template_values** (`dict[str, Any]`): A dictionary of values to be used in template rendering the script.

## Outputs

- **stdout_output** (`str`): Captures the standard output produced by the shell command.
- **stderr_output** (`str`): Captures the standard error output produced by the shell command.

Note: While not explicitly defined in `typed.py`, `stderr_output` is captured in the `CallShell` class's `run()` method.

## Implementation Details

### Class: `CallShell`

This class inherits from the `Step` base class, using `CallShellInputs` as the input type and `CallShellOutputs` as the output type. It processes the specified script and runs it within a subprocess.

#### Methods

- **`__init__`**: Initializes the `CallShell` instance with given inputs, rendering the script if template values are provided, and preparing the environment.

- **`__parse_env_text`**: A static method to parse environment variable assignments from a text string.

- **`run`**: Executes the shell command, logs output and error details, and sets the status based on the execution outcome. Uses Python's `subprocess.run` for executing the command.

### Utility Imports and Logging

The module makes use of utility functions like `mustache_render` for template processing and leverages a `logger` for logging standard output and error messages during execution.

This integration is essential for efficiently managing and debugging shell script executions embedded within Python workflows.

## Usage

To use this module:

1. Instantiate the `CallShell` class with a dictionary of inputs.
2. Invoke the `run` method to execute the shell script.
3. Capture the outputs, particularly `stdout_output` and `stderr_output`, for further processing or analysis.

```python
inputs = {
"script": "echo Hello World",
"working_dir": "/path/to/dir",
"env": "VAR1=value1;VAR2=value2",
"script_template_values": {"some_placeholder": "some_value"}
}

call_shell = CallShell(inputs)
outputs = call_shell.run()
print(outputs["stdout_output"])
```

This module provides a robust interface for handling shell commands within Python, ensuring that inputs are properly managed, and outputs are consistently captured and logged.
Empty file.
19 changes: 19 additions & 0 deletions patchwork/steps/CallShell/typed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from typing_extensions import Annotated, Any, TypedDict

from patchwork.common.utils.step_typing import StepTypeConfig


class __RequiredCallShellInputs(TypedDict):
script: str


class CallShellInputs(__RequiredCallShellInputs, total=False):
working_dir: Annotated[str, StepTypeConfig(is_path=True)]
env: str
script_template_values: dict[str, Any]


class CallShellOutputs(TypedDict):
stdout_output: str
Loading