Skip to content

Commit

Permalink
Unify Data Models for Bulk Actions in Rest API (FastAPI) (apache#46033)
Browse files Browse the repository at this point in the history
* Unify datamodels and services for bulk endpoints

* Remove commented code pieces
  • Loading branch information
bugraoz93 authored and got686-yandex committed Jan 30, 2025
1 parent 2adc219 commit 98d43b8
Show file tree
Hide file tree
Showing 22 changed files with 1,568 additions and 2,069 deletions.
96 changes: 91 additions & 5 deletions airflow/api_fastapi/core_api/datamodels/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
from __future__ import annotations

import enum
from typing import Annotated, Any, Generic, TypeVar, Union

from airflow.api_fastapi.core_api.base import BaseModel
from pydantic import Discriminator, Field, Tag

from airflow.api_fastapi.core_api.base import BaseModel

# Common Bulk Data Models
T = TypeVar("T")
K = TypeVar("K")


class BulkAction(enum.Enum):
"""Bulk Action to be performed on the used model."""

Expand All @@ -45,14 +51,94 @@ class BulkActionOnExistence(enum.Enum):


class BulkActionNotOnExistence(enum.Enum):
"""Bulk Action to be taken if the entity does not exists."""
"""Bulk Action to be taken if the entity does not exist."""

FAIL = "fail"
SKIP = "skip"


# TODO: Unify All Bulk Operation Related Base Data Models
class BulkBaseAction(BaseModel):
class BulkBaseAction(BaseModel, Generic[T]):
"""Base class for bulk actions."""

action: BulkAction
action: BulkAction = Field(..., description="The action to be performed on the entities.")


class BulkCreateAction(BulkBaseAction[T]):
"""Bulk Create entity serializer for request bodies."""

entities: list[T] = Field(..., description="A list of entities to be created.")
action_on_existence: BulkActionOnExistence = BulkActionOnExistence.FAIL


class BulkUpdateAction(BulkBaseAction[T]):
"""Bulk Update entity serializer for request bodies."""

entities: list[T] = Field(..., description="A list of entities to be updated.")
action_on_non_existence: BulkActionNotOnExistence = BulkActionNotOnExistence.FAIL


class BulkDeleteAction(BulkBaseAction[T]):
"""Bulk Delete entity serializer for request bodies."""

entities: list[str] = Field(..., description="A list of entity id/key to be deleted.")
action_on_non_existence: BulkActionNotOnExistence = BulkActionNotOnExistence.FAIL


def _action_discriminator(action: Any) -> str:
return BulkAction(action["action"]).value


class BulkBody(BaseModel, Generic[T]):
"""Serializer for bulk entity operations."""

actions: list[
Annotated[
Union[
Annotated[BulkCreateAction[T], Tag(BulkAction.CREATE.value)],
Annotated[BulkUpdateAction[T], Tag(BulkAction.UPDATE.value)],
Annotated[BulkDeleteAction[T], Tag(BulkAction.DELETE.value)],
],
Discriminator(_action_discriminator),
]
]


class BulkActionResponse(BaseModel):
"""
Serializer for individual bulk action responses.
Represents the outcome of a single bulk operation (create, update, or delete).
The response includes a list of successful keys and any errors encountered during the operation.
This structure helps users understand which key actions succeeded and which failed.
"""

success: list[str] = Field(
default=[], description="A list of unique id/key representing successful operations."
)
errors: list[dict[str, Any]] = Field(
default=[],
description="A list of errors encountered during the operation, each containing details about the issue.",
)


class BulkResponse(BaseModel):
"""
Serializer for responses to bulk entity operations.
This represents the results of create, update, and delete actions performed on entity in bulk.
Each action (if requested) is represented as a field containing details about successful keys and any encountered errors.
Fields are populated in the response only if the respective action was part of the request, else are set None.
"""

create: BulkActionResponse | None = Field(
default=None,
description="Details of the bulk create operation, including successful keys and errors.",
)
update: BulkActionResponse | None = Field(
default=None,
description="Details of the bulk update operation, including successful keys and errors.",
)
delete: BulkActionResponse | None = Field(
default=None,
description="Details of the bulk delete operation, including successful keys and errors.",
)
80 changes: 0 additions & 80 deletions airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,11 @@
from __future__ import annotations

import json
from typing import Any

from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import (
BulkAction,
BulkActionNotOnExistence,
BulkActionOnExistence,
BulkBaseAction,
)
from airflow.utils.log.secrets_masker import redact


Expand Down Expand Up @@ -95,76 +88,3 @@ class ConnectionBody(BaseModel):
port: int | None = Field(default=None)
password: str | None = Field(default=None)
extra: str | None = Field(default=None)


class ConnectionBulkCreateAction(BulkBaseAction):
"""Bulk Create Variable serializer for request bodies."""

action: BulkAction = BulkAction.CREATE
connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.")
action_on_existence: BulkActionOnExistence = BulkActionOnExistence.FAIL


class ConnectionBulkUpdateAction(BulkBaseAction):
"""Bulk Update Connection serializer for request bodies."""

action: BulkAction = BulkAction.UPDATE
connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.")
action_on_non_existence: BulkActionNotOnExistence = BulkActionNotOnExistence.FAIL


class ConnectionBulkDeleteAction(BulkBaseAction):
"""Bulk Delete Connection serializer for request bodies."""

action: BulkAction = BulkAction.DELETE
connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.")
action_on_non_existence: BulkActionNotOnExistence = BulkActionNotOnExistence.FAIL


class ConnectionBulkBody(BaseModel):
"""Request body for bulk Connection operations (create, update, delete)."""

actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | ConnectionBulkDeleteAction] = (
Field(..., description="A list of Connection actions to perform.")
)


class ConnectionBulkActionResponse(BaseModel):
"""
Serializer for individual bulk action responses.
Represents the outcome of a single bulk operation (create, update, or delete).
The response includes a list of successful connection_ids and any errors encountered during the operation.
This structure helps users understand which key actions succeeded and which failed.
"""

success: list[str] = Field(
default_factory=list, description="A list of connection_ids representing successful operations."
)
errors: list[dict[str, Any]] = Field(
default_factory=list,
description="A list of errors encountered during the operation, each containing details about the issue.",
)


class ConnectionBulkResponse(BaseModel):
"""
Serializer for responses to bulk connection operations.
This represents the results of create, update, and delete actions performed on connections in bulk.
Each action (if requested) is represented as a field containing details about successful connection_ids and any encountered errors.
Fields are populated in the response only if the respective action was part of the request, else are set None.
"""

create: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk create operation, including successful connection_ids and errors.",
)
update: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk update operation, including successful connection_ids and errors.",
)
delete: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk delete operation, including successful connection_ids and errors.",
)
90 changes: 2 additions & 88 deletions airflow/api_fastapi/core_api/datamodels/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@

from __future__ import annotations

from typing import Annotated, Any, Callable
from typing import Annotated, Callable

from pydantic import BeforeValidator, ConfigDict, Field

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import (
BulkAction,
BulkActionNotOnExistence,
BulkActionOnExistence,
BulkBaseAction,
)


def _call_function(function: Callable[[], int]) -> int:
Expand Down Expand Up @@ -77,89 +71,9 @@ class PoolPatchBody(BaseModel):
include_deferred: bool | None = None


class PoolPostBody(BasePool):
class PoolBody(BasePool):
"""Pool serializer for post bodies."""

pool: str = Field(alias="name", max_length=256)
description: str | None = None
include_deferred: bool = False


class PoolPostBulkBody(BaseModel):
"""Pools serializer for post bodies."""

pools: list[PoolPostBody]
overwrite: bool | None = Field(default=False)


class PoolBulkCreateAction(BulkBaseAction):
"""Bulk Create Pool serializer for request bodies."""

action: BulkAction = BulkAction.CREATE
pools: list[PoolPostBody] = Field(..., description="A list of pools to be created.")
action_on_existence: BulkActionOnExistence = BulkActionOnExistence.FAIL


class PoolBulkUpdateAction(BulkBaseAction):
"""Bulk Update Pool serializer for request bodies."""

action: BulkAction = BulkAction.UPDATE
pools: list[PoolPatchBody] = Field(..., description="A list of pools to be updated.")
action_on_non_existence: BulkActionNotOnExistence = BulkActionNotOnExistence.FAIL


class PoolBulkDeleteAction(BulkBaseAction):
"""Bulk Delete Pool serializer for request bodies."""

action: BulkAction = BulkAction.DELETE
pool_names: list[str] = Field(..., description="A list of pool names to be deleted.")
action_on_non_existence: BulkActionNotOnExistence = BulkActionNotOnExistence.FAIL


class PoolBulkBody(BaseModel):
"""Request body for bulk Pool operations (create, update, delete)."""

actions: list[PoolBulkCreateAction | PoolBulkUpdateAction | PoolBulkDeleteAction] = Field(
..., description="A list of Pool actions to perform."
)


class PoolBulkActionResponse(BaseModel):
"""
Serializer for individual bulk action responses.
Represents the outcome of a single bulk operation (create, update, or delete).
The response includes a list of successful pool names and any errors encountered during the operation.
This structure helps users understand which key actions succeeded and which failed.
"""

success: list[str] = Field(
default_factory=list, description="A list of pool names representing successful operations."
)
errors: list[dict[str, Any]] = Field(
default_factory=list,
description="A list of errors encountered during the operation, each containing details about the issue.",
)


class PoolBulkResponse(BaseModel):
"""
Serializer for responses to bulk pool operations.
This represents the results of create, update, and delete actions performed on pools in bulk.
Each action (if requested) is represented as a field containing details about successful pool names and any encountered errors.
Fields are populated in the response only if the respective action was part of the request, else are set None.
"""

create: PoolBulkActionResponse | None = Field(
default=None,
description="Details of the bulk create operation, including successful pool names and errors.",
)
update: PoolBulkActionResponse | None = Field(
default=None,
description="Details of the bulk update operation, including successful pool names and errors.",
)
delete: PoolBulkActionResponse | None = Field(
default=None,
description="Details of the bulk delete operation, including successful pool names and errors.",
)
Loading

0 comments on commit 98d43b8

Please sign in to comment.