Skip to content

Commit

Permalink
feat: #1276 add Asyncio SQLAlchemy support (#1633)
Browse files Browse the repository at this point in the history
  • Loading branch information
galuszkak authored Jan 28, 2025
1 parent fcd0a35 commit c77b863
Show file tree
Hide file tree
Showing 5 changed files with 700 additions and 43 deletions.
3 changes: 3 additions & 0 deletions requirements/testing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ boto3<=2
# For AWS tests
moto>=4.0.13,<6
mypy<=1.14.1
# For AsyncSQLAlchemy tests
greenlet<=4
aiosqlite<=1
333 changes: 290 additions & 43 deletions slack_sdk/oauth/installation_store/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
)
from sqlalchemy.engine import Engine
from sqlalchemy.sql.sqltypes import Boolean

from sqlalchemy.ext.asyncio import AsyncEngine
from slack_sdk.oauth.installation_store.installation_store import InstallationStore
from slack_sdk.oauth.installation_store.models.bot import Bot
from slack_sdk.oauth.installation_store.models.installation import Installation
from slack_sdk.oauth.installation_store.async_installation_store import (
AsyncInstallationStore,
)


class SQLAlchemyInstallationStore(InstallationStore):
Expand Down Expand Up @@ -219,21 +222,7 @@ def find_bot(
with self.engine.connect() as conn:
result: object = conn.execute(query)
for row in result.mappings(): # type: ignore[attr-defined]
return Bot(
app_id=row["app_id"],
enterprise_id=row["enterprise_id"],
enterprise_name=row["enterprise_name"],
team_id=row["team_id"],
team_name=row["team_name"],
bot_token=row["bot_token"],
bot_id=row["bot_id"],
bot_user_id=row["bot_user_id"],
bot_scopes=row["bot_scopes"],
bot_refresh_token=row["bot_refresh_token"],
bot_token_expires_at=row["bot_token_expires_at"],
is_enterprise_install=row["is_enterprise_install"],
installed_at=row["installed_at"],
)
return self.build_bot_entity(row)
return None

def find_installation(
Expand Down Expand Up @@ -267,33 +256,7 @@ def find_installation(
with self.engine.connect() as conn:
result: object = conn.execute(query)
for row in result.mappings(): # type: ignore[attr-defined]
installation = Installation(
app_id=row["app_id"],
enterprise_id=row["enterprise_id"],
enterprise_name=row["enterprise_name"],
enterprise_url=row["enterprise_url"],
team_id=row["team_id"],
team_name=row["team_name"],
bot_token=row["bot_token"],
bot_id=row["bot_id"],
bot_user_id=row["bot_user_id"],
bot_scopes=row["bot_scopes"],
bot_refresh_token=row["bot_refresh_token"],
bot_token_expires_at=row["bot_token_expires_at"],
user_id=row["user_id"],
user_token=row["user_token"],
user_scopes=row["user_scopes"],
user_refresh_token=row["user_refresh_token"],
user_token_expires_at=row["user_token_expires_at"],
# Only the incoming webhook issued in the latest installation is set in this logic
incoming_webhook_url=row["incoming_webhook_url"],
incoming_webhook_channel=row["incoming_webhook_channel"],
incoming_webhook_channel_id=row["incoming_webhook_channel_id"],
incoming_webhook_configuration_url=row["incoming_webhook_configuration_url"],
is_enterprise_install=row["is_enterprise_install"],
token_type=row["token_type"],
installed_at=row["installed_at"],
)
installation = self.build_installation_entity(row)

has_user_installation = user_id is not None and installation is not None
no_bot_token_installation = installation is not None and installation.bot_token is None
Expand Down Expand Up @@ -362,3 +325,287 @@ def delete_installation(
)
)
conn.execute(deletion)

@classmethod
def build_installation_entity(cls, row) -> Installation:
return Installation(
app_id=row["app_id"],
enterprise_id=row["enterprise_id"],
enterprise_name=row["enterprise_name"],
enterprise_url=row["enterprise_url"],
team_id=row["team_id"],
team_name=row["team_name"],
bot_token=row["bot_token"],
bot_id=row["bot_id"],
bot_user_id=row["bot_user_id"],
bot_scopes=row["bot_scopes"],
bot_refresh_token=row["bot_refresh_token"],
bot_token_expires_at=row["bot_token_expires_at"],
user_id=row["user_id"],
user_token=row["user_token"],
user_scopes=row["user_scopes"],
user_refresh_token=row["user_refresh_token"],
user_token_expires_at=row["user_token_expires_at"],
# Only the incoming webhook issued in the latest installation is set in this logic
incoming_webhook_url=row["incoming_webhook_url"],
incoming_webhook_channel=row["incoming_webhook_channel"],
incoming_webhook_channel_id=row["incoming_webhook_channel_id"],
incoming_webhook_configuration_url=row["incoming_webhook_configuration_url"],
is_enterprise_install=row["is_enterprise_install"],
token_type=row["token_type"],
installed_at=row["installed_at"],
)

@classmethod
def build_bot_entity(cls, row) -> Bot:
return Bot(
app_id=row["app_id"],
enterprise_id=row["enterprise_id"],
enterprise_name=row["enterprise_name"],
team_id=row["team_id"],
team_name=row["team_name"],
bot_token=row["bot_token"],
bot_id=row["bot_id"],
bot_user_id=row["bot_user_id"],
bot_scopes=row["bot_scopes"],
bot_refresh_token=row["bot_refresh_token"],
bot_token_expires_at=row["bot_token_expires_at"],
is_enterprise_install=row["is_enterprise_install"],
installed_at=row["installed_at"],
)


class AsyncSQLAlchemyInstallationStore(AsyncInstallationStore):
default_bots_table_name: str = "slack_bots"
default_installations_table_name: str = "slack_installations"

client_id: str
engine: AsyncEngine
metadata: MetaData
installations: Table

def __init__(
self,
client_id: str,
engine: AsyncEngine,
bots_table_name: str = default_bots_table_name,
installations_table_name: str = default_installations_table_name,
logger: Logger = logging.getLogger(__name__),
):
self.metadata = sqlalchemy.MetaData()
self.bots = self.build_bots_table(metadata=self.metadata, table_name=bots_table_name)
self.installations = self.build_installations_table(metadata=self.metadata, table_name=installations_table_name)
self.client_id = client_id
self._logger = logger
self.engine = engine

@classmethod
def build_installations_table(cls, metadata: MetaData, table_name: str) -> Table:
return SQLAlchemyInstallationStore.build_installations_table(metadata, table_name)

@classmethod
def build_bots_table(cls, metadata: MetaData, table_name: str) -> Table:
return SQLAlchemyInstallationStore.build_bots_table(metadata, table_name)

async def create_tables(self):
async with self.engine.begin() as conn:
await conn.run_sync(self.metadata.create_all)

@property
def logger(self) -> Logger:
return self._logger

async def async_save(self, installation: Installation):
async with self.engine.begin() as conn:
i = installation.to_dict()
i["client_id"] = self.client_id

i_column = self.installations.c
installations_rows = await conn.execute(
sqlalchemy.select(i_column.id)
.where(
and_(
i_column.client_id == self.client_id,
i_column.enterprise_id == installation.enterprise_id,
i_column.team_id == installation.team_id,
i_column.installed_at == i.get("installed_at"),
)
)
.limit(1)
)
installations_row_id: Optional[str] = None
for row in installations_rows.mappings():
installations_row_id = row["id"]
if installations_row_id is None:
await conn.execute(self.installations.insert(), i)
else:
update_statement = self.installations.update().where(i_column.id == installations_row_id).values(**i)
await conn.execute(update_statement, i)

# bots
await self.async_save_bot(installation.to_bot())

async def async_save_bot(self, bot: Bot):
async with self.engine.begin() as conn:
# bots
b = bot.to_dict()
b["client_id"] = self.client_id

b_column = self.bots.c
bots_rows = await conn.execute(
sqlalchemy.select(b_column.id)
.where(
and_(
b_column.client_id == self.client_id,
b_column.enterprise_id == bot.enterprise_id,
b_column.team_id == bot.team_id,
b_column.installed_at == b.get("installed_at"),
)
)
.limit(1)
)
bots_row_id: Optional[str] = None
for row in bots_rows.mappings():
bots_row_id = row["id"]
if bots_row_id is None:
await conn.execute(self.bots.insert(), b)
else:
update_statement = self.bots.update().where(b_column.id == bots_row_id).values(**b)
await conn.execute(update_statement, b)

async def async_find_bot(
self,
*,
enterprise_id: Optional[str],
team_id: Optional[str],
is_enterprise_install: Optional[bool] = False,
) -> Optional[Bot]:
if is_enterprise_install or team_id is None:
team_id = None

c = self.bots.c
query = (
self.bots.select()
.where(
and_(
c.client_id == self.client_id,
c.enterprise_id == enterprise_id,
c.team_id == team_id,
c.bot_token.is_not(None), # the latest one that has a bot token
)
)
.order_by(desc(c.installed_at))
.limit(1)
)

async with self.engine.connect() as conn:
result: object = await conn.execute(query)
for row in result.mappings(): # type: ignore[attr-defined]
return SQLAlchemyInstallationStore.build_bot_entity(row)
return None

async def async_find_installation(
self,
*,
enterprise_id: Optional[str],
team_id: Optional[str],
user_id: Optional[str] = None,
is_enterprise_install: Optional[bool] = False,
) -> Optional[Installation]:
if is_enterprise_install or team_id is None:
team_id = None

c = self.installations.c
where_clause = and_(
c.client_id == self.client_id,
c.enterprise_id == enterprise_id,
c.team_id == team_id,
)
if user_id is not None:
where_clause = and_(
c.client_id == self.client_id,
c.enterprise_id == enterprise_id,
c.team_id == team_id,
c.user_id == user_id,
)

query = self.installations.select().where(where_clause).order_by(desc(c.installed_at)).limit(1)

installation: Optional[Installation] = None
async with self.engine.connect() as conn:
result: object = await conn.execute(query)
for row in result.mappings(): # type: ignore[attr-defined]
installation = SQLAlchemyInstallationStore.build_installation_entity(row)

has_user_installation = user_id is not None and installation is not None
no_bot_token_installation = installation is not None and installation.bot_token is None
should_find_bot_installation = has_user_installation or no_bot_token_installation
if should_find_bot_installation:
# Retrieve the latest bot token, just in case
# See also: https://github.com/slackapi/bolt-python/issues/664
latest_bot_installation = await self.async_find_bot(
enterprise_id=enterprise_id,
team_id=team_id,
is_enterprise_install=is_enterprise_install,
)
if (
latest_bot_installation is not None
and installation is not None
and installation.bot_token != latest_bot_installation.bot_token
):
installation.bot_id = latest_bot_installation.bot_id
installation.bot_user_id = latest_bot_installation.bot_user_id
installation.bot_token = latest_bot_installation.bot_token
installation.bot_scopes = latest_bot_installation.bot_scopes
installation.bot_refresh_token = latest_bot_installation.bot_refresh_token
installation.bot_token_expires_at = latest_bot_installation.bot_token_expires_at

return installation

async def async_delete_bot(
self,
*,
enterprise_id: Optional[str],
team_id: Optional[str],
) -> None:
table = self.bots
c = table.c
async with self.engine.begin() as conn:
deletion = table.delete().where(
and_(
c.client_id == self.client_id,
c.enterprise_id == enterprise_id,
c.team_id == team_id,
)
)
await conn.execute(deletion)

async def async_delete_installation(
self,
*,
enterprise_id: Optional[str],
team_id: Optional[str],
user_id: Optional[str] = None,
) -> None:
table = self.installations
c = table.c
async with self.engine.begin() as conn:
if user_id is not None:
deletion = table.delete().where(
and_(
c.client_id == self.client_id,
c.enterprise_id == enterprise_id,
c.team_id == team_id,
c.user_id == user_id,
)
)
await conn.execute(deletion)
else:
deletion = table.delete().where(
and_(
c.client_id == self.client_id,
c.enterprise_id == enterprise_id,
c.team_id == team_id,
)
)
await conn.execute(deletion)
Loading

0 comments on commit c77b863

Please sign in to comment.