Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter function for freshness via custom sql #384

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241216-172047.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add function to run custom sql for getting freshness info
time: 2024-12-16T17:20:47.065611-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Type
from unittest.mock import MagicMock

from dbt_common.exceptions import DbtRuntimeError
import pytest

from dbt.adapters.base.impl import BaseAdapter


class BaseCalculateFreshnessMethod:
"""Tests the behavior of the calculate_freshness_from_customsql method for the relevant adapters.

The base method is meant to throw the appropriate custom exception when calculate_freshness_from_customsql
fails.
"""

@pytest.fixture(scope="class")
def valid_sql(self) -> str:
"""Returns a valid statement for issuing as a validate_sql query.

Ideally this would be checkable for non-execution. For example, we could use a
CREATE TABLE statement with an assertion that no table was created. However,
for most adapter types this is unnecessary - the EXPLAIN keyword has exactly the
behavior we want, and here we are essentially testing to make sure it is
supported. As such, we return a simple SELECT query, and leave it to
engine-specific test overrides to specify more detailed behavior as appropriate.
"""

return "select now()"

@pytest.fixture(scope="class")
def invalid_sql(self) -> str:
"""Returns an invalid statement for issuing a bad validate_sql query."""

return "Let's run some invalid SQL and see if we get an error!"

@pytest.fixture(scope="class")
def expected_exception(self) -> Type[Exception]:
"""Returns the Exception type thrown by a failed query.

Defaults to dbt_common.exceptions.DbtRuntimeError because that is the most common
base exception for adapters to throw."""
return DbtRuntimeError

@pytest.fixture(scope="class")
def mock_relation(self):
mock = MagicMock()
mock.__str__ = lambda x: "test.table"
return mock

def test_calculate_freshness_from_custom_sql_success(
self, adapter: BaseAdapter, valid_sql: str, mock_relation
) -> None:
with adapter.connection_named("test_freshness_custom_sql"):
adapter.calculate_freshness_from_custom_sql(mock_relation, valid_sql)

def test_calculate_freshness_from_custom_sql_failure(
self,
adapter: BaseAdapter,
invalid_sql: str,
expected_exception: Type[Exception],
mock_relation,
) -> None:
with pytest.raises(expected_exception=expected_exception):
with adapter.connection_named("test_infreshness_custom_sql"):
adapter.calculate_freshness_from_custom_sql(mock_relation, invalid_sql)


class TestCalculateFreshnessMethod(BaseCalculateFreshnessMethod):
pass
77 changes: 40 additions & 37 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"
CUSTOM_SQL_FRESHNESS_MACRO_NAME = "collect_freshness_custom_sql"
GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified"
DEFAULT_BASE_BEHAVIOR_FLAGS = [
{
Expand Down Expand Up @@ -1327,6 +1328,31 @@ def cancel_open_connections(self):
"""Cancel all open connections."""
return self.connections.cancel_open()

def _process_freshness_execution(
self,
macro_name: str,
kwargs: Dict[str, Any],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Execute and process a freshness macro to generate a FreshnessResponse"""
import agate

result = self.execute_macro(macro_name, kwargs=kwargs, macro_resolver=macro_resolver)

if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
table = result
adapter_response = None
else:
adapter_response, table = result.response, result.table

# Process the results table
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(macro_name, table)

freshness_response = self._create_freshness_response(table[0][0], table[0][1])
return adapter_response, freshness_response

def calculate_freshness(
self,
source: BaseRelation,
Expand All @@ -1335,49 +1361,26 @@ def calculate_freshness(
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Calculate the freshness of sources in dbt, and return it"""
import agate

kwargs: Dict[str, Any] = {
kwargs = {
"source": source,
"loaded_at_field": loaded_at_field,
"filter": filter,
}
return self._process_freshness_execution(FRESHNESS_MACRO_NAME, kwargs, macro_resolver)

# run the macro
# in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly
# starting in v1.5, by default, we return both the table and the adapter response (metadata about the query)
result: Union[
AttrDict, # current: contains AdapterResponse + "agate.Table"
"agate.Table", # previous: just table
]
result = self.execute_macro(
FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=macro_resolver
)
if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
adapter_response = None
table = result
else:
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(FRESHNESS_MACRO_NAME, table)
if table[0][0] is None:
# no records in the table, so really the max_loaded_at was
# infinitely long ago. Just call it 0:00 January 1 year UTC
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(table[0][0], source, loaded_at_field)

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
def calculate_freshness_from_custom_sql(
self,
source: BaseRelation,
sql: str,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs = {
"source": source,
"loaded_at_query": sql,
}
return adapter_response, freshness
return self._process_freshness_execution(
CUSTOM_SQL_FRESHNESS_MACRO_NAME, kwargs, macro_resolver
)

def calculate_freshness_from_metadata_batch(
self,
Expand Down
16 changes: 16 additions & 0 deletions dbt/include/global_project/macros/adapters/freshness.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@
{% endcall %}
{{ return(load_result('collect_freshness')) }}
{% endmacro %}

{% macro collect_freshness_custom_sql(source, loaded_at_query) %}
{{ return(adapter.dispatch('collect_freshness_custom_sql', 'dbt')(source, loaded_at_query))}}
{% endmacro %}

{% macro default__collect_freshness_custom_sql(source, loaded_at_query) %}
{% call statement('collect_freshness_custom_sql', fetch_result=True, auto_begin=False) -%}
with source_query as (
{{ loaded_at_query }}
)
select
(select * from source_query) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
{% endcall %}
{{ return(load_result('collect_freshness_custom_sql')) }}
{% endmacro %}
148 changes: 148 additions & 0 deletions tests/unit/test_base_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport

from datetime import datetime
from unittest.mock import MagicMock, patch
import agate
import pytz
from dbt.adapters.contracts.connection import AdapterResponse


class TestBaseAdapterConstraintRendering:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -234,3 +240,145 @@ def test_render_raw_model_constraints_unsupported(

rendered_constraints = BaseAdapter.render_raw_model_constraints(constraints)
assert rendered_constraints == []


class TestCalculateFreshnessFromCustomSQL:
@pytest.fixture
def adapter(self):
# Create mock config and context
config = MagicMock()

# Create test adapter class that implements abstract methods
class TestAdapter(BaseAdapter):
def convert_boolean_type(self, *args, **kwargs):
return None

def convert_date_type(self, *args, **kwargs):
return None

def convert_datetime_type(self, *args, **kwargs):
return None

def convert_number_type(self, *args, **kwargs):
return None

def convert_text_type(self, *args, **kwargs):
return None

def convert_time_type(self, *args, **kwargs):
return None

def create_schema(self, *args, **kwargs):
return None

def date_function(self, *args, **kwargs):
return None

def drop_relation(self, *args, **kwargs):
return None

def drop_schema(self, *args, **kwargs):
return None

def expand_column_types(self, *args, **kwargs):
return None

def get_columns_in_relation(self, *args, **kwargs):
return None

def is_cancelable(self, *args, **kwargs):
return False

def list_relations_without_caching(self, *args, **kwargs):
return []

def list_schemas(self, *args, **kwargs):
return []

def quote(self, *args, **kwargs):
return ""

def rename_relation(self, *args, **kwargs):
return None

def truncate_relation(self, *args, **kwargs):
return None

return TestAdapter(config, MagicMock())

@pytest.fixture
def mock_relation(self):
mock = MagicMock()
mock.__str__ = lambda x: "test.table"
return mock

@patch("dbt.adapters.base.BaseAdapter.execute_macro")
def test_calculate_freshness_from_customsql_success(
self, mock_execute_macro, adapter, mock_relation
):
"""Test successful freshness calculation from custom SQL"""

# Setup test data
current_time = datetime.now(pytz.UTC)
last_modified = datetime(2023, 1, 1, tzinfo=pytz.UTC)

# Create mock agate table with test data
mock_table = agate.Table.from_object(
[{"last_modified": last_modified, "snapshotted_at": current_time}]
)

# Configure mock execute_macro
mock_execute_macro.return_value = MagicMock(
response=AdapterResponse("SUCCESS"), table=mock_table
)

# Execute method under test
adapter_response, freshness_response = adapter.calculate_freshness_from_custom_sql(
source=mock_relation, sql="SELECT max(updated_at) as last_modified"
)

# Verify execute_macro was called correctly
mock_execute_macro.assert_called_once_with(
"collect_freshness_custom_sql",
kwargs={
"source": mock_relation,
"loaded_at_query": "SELECT max(updated_at) as last_modified",
},
macro_resolver=None,
)

# Verify adapter response
assert adapter_response._message == "SUCCESS"

# Verify freshness response
assert freshness_response["max_loaded_at"] == last_modified
assert freshness_response["snapshotted_at"] == current_time
assert isinstance(freshness_response["age"], float)

@patch("dbt.adapters.base.BaseAdapter.execute_macro")
def test_calculate_freshness_from_customsql_null_last_modified(
self, mock_execute_macro, adapter, mock_relation
):
"""Test freshness calculation when last_modified is NULL"""

current_time = datetime.now(pytz.UTC)

# Create mock table with NULL last_modified
mock_table = agate.Table.from_object(
[{"last_modified": None, "snapshotted_at": current_time}]
)

mock_execute_macro.return_value = MagicMock(
response=AdapterResponse("SUCCESS"), table=mock_table
)

# Execute method
_, freshness_response = adapter.calculate_freshness_from_custom_sql(
source=mock_relation, sql="SELECT max(updated_at) as last_modified"
)

# Verify NULL last_modified is handled by using datetime.min
expected_min_date = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
assert freshness_response["max_loaded_at"] == expected_min_date
assert freshness_response["snapshotted_at"] == current_time
assert isinstance(freshness_response["age"], float)
Loading