Skip to content

Commit

Permalink
re-add missing partial commits (list below)
Browse files Browse the repository at this point in the history
  • Loading branch information
swelborn committed Jan 17, 2025
1 parent 2bd4570 commit 60e15c3
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 25 deletions.
9 changes: 2 additions & 7 deletions backend/agent/interactem/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@
PipelineJSON,
)
from interactem.core.models.uri import URI, CommBackend, URILocation
from interactem.core.nats import (
create_or_update_stream,
get_agents_bucket,
)
from interactem.core.nats import create_or_update_stream, get_agents_bucket, nc
from interactem.core.nats.config import (
AGENTS_STREAM_CONFIG,
OPERATORS_STREAM_CONFIG,
Expand Down Expand Up @@ -164,9 +161,7 @@ async def run(self):
*[self.setup_signal_handlers(), self._start_podman_service()]
)

self.nc = await nats.connect(
servers=[str(cfg.NATS_SERVER_URL)], name=f"agent-{id}"
)
self.nc = await nc(servers=[str(cfg.NATS_SERVER_URL)], name=f"agent-{id}")
self.js = self.nc.jetstream()

await create_or_update_stream(PARAMETERS_STREAM_CONFIG, self.js)
Expand Down
35 changes: 33 additions & 2 deletions backend/app/interactem/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@

import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from fastapi.security import (
HTTPAuthorizationCredentials,
HTTPBearer,
OAuth2PasswordBearer,
)
from jwt.exceptions import InvalidTokenError
from pydantic import ValidationError
from sqlmodel import Session

from interactem.app.core import security
from interactem.app.core.config import settings
from interactem.app.core.db import engine
from interactem.app.models import TokenPayload, User
from interactem.app.models import ExternalTokenPayload, TokenPayload, User

reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
Expand Down Expand Up @@ -55,3 +59,30 @@ def get_current_active_superuser(current_user: CurrentUser) -> User:
status_code=403, detail="The user doesn't have enough privileges"
)
return current_user


external_bearer = HTTPBearer(auto_error=True)
ExternalTokenDep = Annotated[HTTPAuthorizationCredentials, Depends(external_bearer)]


async def verify_external_token(auth: ExternalTokenDep) -> ExternalTokenPayload:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
auth.credentials,
settings.EXTERNAL_SECRET_KEY,
algorithms=[settings.EXTERNAL_ALGORITHM],
)
username: str = payload.get("sub")
exp: int = payload.get("exp")

if username is None or exp is None:
raise credentials_exception

return ExternalTokenPayload(username=username, exp=exp)
except InvalidTokenError:
raise credentials_exception
67 changes: 65 additions & 2 deletions backend/app/interactem/app/api/routes/login.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import timedelta
import secrets
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any

from fastapi import APIRouter, Depends, HTTPException
Expand All @@ -10,17 +11,28 @@
CurrentUser,
SessionDep,
get_current_active_superuser,
verify_external_token,
)
from interactem.app.core import security
from interactem.app.core.config import settings
from interactem.app.core.security import get_password_hash
from interactem.app.models import Message, NewPassword, Token, UserPublic
from interactem.app.models import (
ExternalTokenPayload,
Message,
NewPassword,
Token,
UserCreate,
UserPublic,
)
from interactem.app.utils import (
generate_password_reset_token,
generate_reset_password_email,
send_email,
verify_password_reset_token,
)
from interactem.core.logger import get_logger

logger = get_logger()

router = APIRouter()

Expand All @@ -46,6 +58,57 @@ def login_access_token(
)
)

@router.post("/login/external-token")
async def login_with_external_token(
session: SessionDep,
external_user: Annotated[ExternalTokenPayload, Depends(verify_external_token)],
) -> Token:
"""
Login with an external token (e.g., distiller)
"""
logger.info("Received login request from external user")
email = f"{external_user.username}@external.com"

user = crud.get_user_by_email(session=session, email=email)

if not user:
logger.info("Creating external user in db...")
# password doesn't matter
random_password = secrets.token_urlsafe(16)[:32]

user_create = UserCreate(
email=email,
password=random_password,
is_superuser=False,
is_external=True,
)

try:
user = crud.create_user(session=session, user_create=user_create)
except Exception:
raise HTTPException(
status_code=400, detail="Could not create user in second server"
)
logger.info("External user created in db.")

logger.info(f"External user `{external_user.username}` logged in.")

if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")

now = datetime.now(timezone.utc)
exp_datetime = datetime.fromtimestamp(external_user.exp, tz=timezone.utc)
access_token_expires = exp_datetime - now

logger.info(
f"Token will expire at: {exp_datetime.strftime('%Y-%m-%d %I:%M:%S %p %Z')}"
)
return Token(
access_token=security.create_access_token(
user.id, expires_delta=access_token_expires
)
)


@router.post("/login/test-token", response_model=UserPublic)
def test_token(current_user: CurrentUser) -> Any:
Expand Down
12 changes: 2 additions & 10 deletions backend/app/interactem/app/events/producer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import logging

import nats
import nats.errors
from nats.aio.client import Client as NATSClient
from nats.js import JetStreamContext
from pydantic import BaseModel
from sqlmodel import SQLModel

from interactem.core.constants import STREAM_PIPELINES, SUBJECT_PIPELINES_RUN
from interactem.core.events.pipelines import PipelineRunEvent
from interactem.core.nats import create_or_update_stream
from interactem.core.nats import create_or_update_stream, nc
from interactem.core.nats.config import PIPELINES_STREAM_CONFIG

from ..core.config import settings
Expand All @@ -18,15 +14,11 @@
logger = logging.getLogger(__name__)


nats_client: NATSClient | None = None
nats_jetstream: JetStreamContext | None = None


async def start():
global nats_client
global nats_jetstream
logger.info(f"Connecting to NATS server: {settings.NATS_SERVER_URL}")
nats_client = await nats.connect(settings.NATS_SERVER_URL.unicode_string())
nats_client = await nc([str(settings.NATS_SERVER_URL)], "api")
nats_jetstream = nats_client.jetstream()
info = await create_or_update_stream(PIPELINES_STREAM_CONFIG, nats_jetstream)
logger.info(f"Stream information: {info}")
Expand Down
21 changes: 19 additions & 2 deletions backend/core/interactem/core/nats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Awaitable, Callable

import nats

from interactem.core.constants import (
BUCKET_AGENTS,
BUCKET_AGENTS_TTL,
Expand All @@ -17,20 +20,34 @@
from nats.js.errors import BucketNotFoundError, KeyNotFoundError, NoKeysError
from nats.js.kv import KeyValue
from nats.aio.msg import Msg as NATSMsg
from nats.aio.client import Client as NATSClient
from nats.js.api import StreamConfig, StreamInfo
from nats.js.errors import BadRequestError
from typing import TypeVar, Type
from pydantic import BaseModel, ValidationError
from nats.js.kv import KeyValue
from nats.js.errors import KeyNotFoundError

ValType = TypeVar("ValType", bound=BaseModel)

from interactem.core.logger import get_logger
from interactem.core.config import cfg

ValType = TypeVar("ValType", bound=BaseModel)
logger = get_logger()


async def nc(servers: list[str], name: str) -> NATSClient:
options_map = {
cfg.NATS_MODE.NKEYS: {
"nkeys_seed_str": cfg.NKEYS_SEED_STR,
},
cfg.NATS_MODE.CREDS: {
"user_credentials": str(cfg.NATS_CREDS_FILE),
},
}
options = options_map[cfg.NATS_MODE]
return await nats.connect(servers=servers, name=name, **options)


async def create_bucket_if_doesnt_exist(
js: JetStreamContext, bucket_name: str, ttl: int
) -> KeyValue:
Expand Down
8 changes: 6 additions & 2 deletions backend/operators/interactem/operators/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
OperatorTiming,
ParameterType,
)
from interactem.core.nats import create_bucket_if_doesnt_exist, create_or_update_stream
from interactem.core.nats import (
create_bucket_if_doesnt_exist,
create_or_update_stream,
nc,
)
from interactem.core.nats.config import (
METRICS_STREAM_CONFIG,
OPERATORS_STREAM_CONFIG,
Expand Down Expand Up @@ -187,7 +191,7 @@ async def execute_dependencies_startup(self):

async def connect_to_nats(self):
logger.info(f"Connecting to NATS at {cfg.NATS_SERVER_URL}...")
self.nc = await nats.connect(
self.nc = await nc(
servers=[str(cfg.NATS_SERVER_URL)], name=f"operator-{self.id}"
)
logger.info("Connected to NATS...")
Expand Down

0 comments on commit 60e15c3

Please sign in to comment.