Skip to content

Commit

Permalink
making executors start containers explicitly
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Jan 5, 2025
1 parent 0ea9024 commit 864f356
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 186 deletions.
51 changes: 20 additions & 31 deletions python-sdk/indexify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,32 +155,6 @@ def build_image(
_create_image(obj, python_sdk_path)


@app.command(help="Build platform images for function names")
def build_platform_image(
workflow_file_path: Annotated[str, typer.Argument()],
image_names: Optional[List[str]] = None,
build_service="https://api.tensorlake.ai/images/v1",
):

globals_dict = {}

# Add the folder in the workflow file path to the current Python path
folder_path = os.path.dirname(workflow_file_path)
if folder_path not in sys.path:
sys.path.append(folder_path)

try:
exec(open(workflow_file_path).read(), globals_dict)
except FileNotFoundError as e:
raise Exception(
f"Could not find workflow file to execute at: " f"`{workflow_file_path}`"
)
for _, obj in globals_dict.items():
if type(obj) and isinstance(obj, Image):
if image_names is None or obj._image_name in image_names:
_create_platform_image(obj, build_service)


@app.command(help="Build default image for indexify")
def build_default_image(
python_version: Optional[str] = typer.Option(
Expand All @@ -206,6 +180,18 @@ def executor(
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
compute_graph: Optional[str] = typer.Option(
None, help="Compute Graph that the executor will run"
),
function: Optional[str] = typer.Option(
None, help="Function that the executor will run"
),
namespace: Optional[str] = typer.Option(
"default", help="Namespace for the Compute Graph"
),
graph_version: Optional[int] = typer.Option(
None, help="Version of the Compute Graph"
),
config_path: Optional[str] = typer.Option(
None, help="Path to the TLS configuration file"
),
Expand All @@ -215,9 +201,6 @@ def executor(
name_alias: Optional[str] = typer.Option(
None, help="Image name override for the executor"
),
image_hash: Optional[str] = typer.Option(
None, help="Image hash override for the executor"
),
):
if not dev:
configure_production_logging()
Expand All @@ -232,8 +215,11 @@ def executor(
executor_version=executor_version,
executor_cache=executor_cache,
name_alias=name_alias,
image_hash=image_hash,
namespace=namespace,
compute_graph=compute_graph,
function=function,
dev_mode=dev,
graph_version=graph_version,
)

from pathlib import Path
Expand All @@ -248,8 +234,11 @@ def executor(
server_addr=server_addr,
config_path=config_path,
code_path=executor_cache,
compute_graph=compute_graph,
function=function,
namespace=namespace,
graph_version=graph_version,
name_alias=name_alias,
image_hash=image_hash,
function_executor_server_factory=SubprocessFunctionExecutorServerFactory(
development_mode=dev
),
Expand Down
8 changes: 8 additions & 0 deletions python-sdk/indexify/executor/api_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ class Task(BaseModel):
"image_uri defines the URI of the image of this task. Optional since some executors do not require it."


class Function(BaseModel):
namespace: str
compute_graph: str
compute_fn: str
version: int


class ExecutorMetadata(BaseModel):
id: str
executor_version: str
addr: str
functions: List[Function]
image_name: str
image_hash: str
labels: Dict[str, Any]
Expand Down
16 changes: 11 additions & 5 deletions python-sdk/indexify/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ def __init__(
self,
executor_id: str,
code_path: Path,
compute_graph: str,
namespace: Optional[str],
function: Optional[str],
graph_version: Optional[int],
function_executor_server_factory: FunctionExecutorServerFactory,
server_addr: str = "localhost:8900",
config_path: Optional[str] = None,
name_alias: Optional[str] = None,
image_hash: Optional[str] = None,
):
self._logger = structlog.get_logger(module=__name__)
self._should_run = True
Expand All @@ -41,7 +44,7 @@ def __init__(
self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._task_runnner = TaskRunner(
self._task_runner = TaskRunner(
function_executor_server_factory=function_executor_server_factory,
base_url=self._base_url,
config_path=config_path,
Expand All @@ -53,8 +56,11 @@ def __init__(
protocol=protocol,
indexify_server_addr=self._server_addr,
executor_id=executor_id,
namespace=namespace,
compute_graph=compute_graph,
function=function,
graph_version=graph_version,
name_alias=name_alias,
image_hash=image_hash,
config_path=config_path,
)
self._task_reporter = TaskReporter(
Expand Down Expand Up @@ -95,7 +101,7 @@ async def _run_task(self, task: Task) -> None:
await self._downloader.download_init_value(task)
)
logger.info("task_execution_started")
output: TaskOutput = await self._task_runnner.run(
output: TaskOutput = await self._task_runner.run(
TaskInput(
task=task,
graph=graph,
Expand Down Expand Up @@ -131,7 +137,7 @@ async def _report_task_outcome(self, output: TaskOutput, logger: Any) -> None:
async def _shutdown(self, loop):
self._logger.info("shutting_down")
self._should_run = False
await self._task_runnner.shutdown()
await self._task_runner.shutdown()
for task in asyncio.all_tasks(loop):
task.cancel()

Expand Down
17 changes: 15 additions & 2 deletions python-sdk/indexify/executor/task_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import socket
from importlib.metadata import version
from typing import AsyncGenerator, Optional

Expand All @@ -7,7 +8,7 @@

from indexify.common_util import get_httpx_client

from .api_objects import ExecutorMetadata, Task
from .api_objects import ExecutorMetadata, Function, Task
from .runtime_probes import ProbeInfo, RuntimeProbes


Expand All @@ -19,6 +20,10 @@ def __init__(
protocol: str,
indexify_server_addr: str,
executor_id: str,
namespace: Optional[str] = None,
compute_graph: Optional[str] = None,
function: Optional[str] = None,
graph_version: Optional[int] = None,
name_alias: Optional[str] = None,
image_hash: Optional[int] = None,
config_path: Optional[str] = None,
Expand All @@ -28,11 +33,19 @@ def __init__(
self.config_path = config_path
self._logger = structlog.get_logger(module=__name__)

hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)

functions = []
if namespace is not None and compute_graph is not None and function is not None and graph_version is not None:
functions = [Function(namespace=namespace, compute_graph=compute_graph, compute_fn=function, version=graph_version)]

probe_info: ProbeInfo = RuntimeProbes().probe()
self._executor_metadata: ExecutorMetadata = ExecutorMetadata(
id=executor_id,
executor_version=version("indexify"),
addr="",
addr=ip_address,
functions=functions,
image_name=probe_info.image_name if name_alias is None else name_alias,
image_hash=(probe_info.image_hash if image_hash is None else image_hash),
labels=probe_info.labels,
Expand Down
132 changes: 9 additions & 123 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,38 +199,6 @@ impl Node {
}
}

pub fn matches_executor(
&self,
executor: &ExecutorMetadata,
diagnostic_msgs: &mut Vec<String>,
) -> bool {
if executor.image_name != self.image_name() {
diagnostic_msgs.push(format!(
"executor {}, image name: {} does not match function image name {}. Make sure the executor is running the latest image.",
executor.id,
executor.image_name,
self.image_name()
));
return false;
}

// Empty executor image hash means that the executor can accept any image
// version. This is needed for backwards compatibility.
// FIXME - This is a temporary hack to allow us to unblock some internal
// migrations if !executor.image_hash.is_empty() && executor.image_hash
// != self.image_hash() { diagnostic_msgs.push(format!(
// "executor {}, image hash: {} does not match function image hash {}.
// Make sure the executor is running the latest image.",
// executor.id,
// executor.image_hash,
// self.image_hash()
// ));
// return false;
//}

true
}

pub fn reducer(&self) -> bool {
match self {
Node::Router(_) => false,
Expand Down Expand Up @@ -905,11 +873,20 @@ fn default_executor_ver() -> String {
"0.2.17".to_string()
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FunctionContainer {
pub namespace: String,
pub compute_graph_name: String,
pub compute_fn_name: String,
pub version: GraphVersion,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ExecutorMetadata {
pub id: ExecutorId,
#[serde(default = "default_executor_ver")]
pub executor_version: String,
pub containers: Vec<FunctionContainer>,
pub image_name: String,
#[serde(default)]
pub image_hash: String,
Expand Down Expand Up @@ -1025,106 +1002,15 @@ mod tests {

use crate::{
test_objects::tests::test_compute_fn,
ComputeFn,
ComputeGraph,
ComputeGraphCode,
ComputeGraphVersion,
DynamicEdgeRouter,
ExecutorMetadata,
GraphVersion,
ImageInformation,
Node,
RuntimeInformation,
};

#[test]
fn test_node_matches_executor_scenarios() {
fn check(
test_name: &str,
image_name: &str,
image_hash: &str,
executor_image_name: &str,
executor_image_hash: &str,
expected: bool,
) {
let executor_metadata = ExecutorMetadata {
image_name: executor_image_name.to_string(),
image_hash: executor_image_hash.to_string(),
..Default::default()
};
let mut diagnostic_msgs = vec![];

let compute_fn = ComputeFn {
name: "fn1".to_string(),
image_information: ImageInformation {
image_name: image_name.to_string(),
image_hash: image_hash.to_string(),
..Default::default()
},
..Default::default()
};

let router = DynamicEdgeRouter {
name: "router1".to_string(),
image_information: ImageInformation {
image_name: image_name.to_string(),
image_hash: image_hash.to_string(),
..Default::default()
},
..Default::default()
};

print!("{:?}", executor_metadata);

assert_eq!(
Node::Compute(compute_fn)
.matches_executor(&executor_metadata, &mut diagnostic_msgs),
expected,
"Failed for test: {}, {}",
test_name,
diagnostic_msgs.join(", ")
);

assert_eq!(
Node::Router(router).matches_executor(&executor_metadata, &mut diagnostic_msgs),
expected,
"Failed for test: {}, {}",
test_name,
diagnostic_msgs.join(", ")
);
}

// Test case: Image name does not match
check(
"Image name does not match",
"some_image_name",
"some_image_hash",
"some_image_name1",
"some_image_hash",
false,
);

// Test case: Image name and hash match
check(
"Image name and hash match",
"some_image_name",
"some_image_hash",
"some_image_name",
"some_image_hash",
true,
);

// Test case: Executor Image hash is empty so it should match any image hash
check(
"Executor Image hash is empty",
"some_image_name",
"some_image_hash",
"some_image_name",
"", // empty hash
true,
);
}

#[test]
fn test_compute_graph_update() {
const TEST_NAMESPACE: &str = "namespace1";
Expand Down
1 change: 1 addition & 0 deletions server/data_model/src/test_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ pub mod tests {
id: mock_executor_id(),
executor_version: "1.0.0".to_string(),
image_name: TEST_EXECUTOR_IMAGE_NAME.to_string(),
containers: vec![],
addr: "".to_string(),
labels: Default::default(),
image_hash: "image_hash".to_string(),
Expand Down
Loading

0 comments on commit 864f356

Please sign in to comment.