Skip to content

Commit

Permalink
Allow users to opt-out dbtRunner during DAG parsing (#1495)
Browse files Browse the repository at this point in the history
Introduce `RenderConfig(invocation_mode)` so users can opt out of the now default behaviour of Cosmos of running dbt ls using `dbtRunner` as opposed to the Python subprocess  - similar to what was done for task execution in `ExecutionConfig`.

While the change introduced in  #1484 is a significant performance gain, and we believe we should be Cosmos' default behaviour, we realise it may break some users, as described below.

A customer mentioned they had the following setup:
- `dbt-databricks` installed in the same Python virtualenv as Cosmos/Airflow
- `dbt-bigquery` installed in a separate Python virtualenv using Astro Dockerfile

And run DAGs using both with the same image. This means 1.9.0a3 breaks
them since they use `LoadMode.DBT_LS` and only `dbt-data bricks` can be
parsed.

Users can now opt-out of using `dbtRunner` to run `dbt ls` during DAG parsing by setting:

```
    DbtDag(
        render_config=RenderConfig(
            load_method=LoadMode.DBT_LS, invocation_mode=InvocationMode.SUBPROCESS
        )
        # ...,
    )

```
  • Loading branch information
tatiana authored Jan 29, 2025
1 parent e11e5ae commit c4e4abc
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 12 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ Changelog
1.9.0a1 (2025-01-20)
--------------------

Breaking changes

* When using ``LoadMode.DBT_LS``, Cosmos will now attempt to use the ``dbtRunner`` as opposed to subprocess to run ``dbt ls``.
While this represents significant performance improvements (half the vCPU usage and some memory consumption improvement), this may not work in
scenarios where users had multiple Python virtual environments to manage different versions of dbt and its adaptors. In those cases,
please, set ``RenderConfig(invocation_mode=InvocationMode.SUBPROCESS)`` to have the same behaviour Cosmos had in previous versions.

Features

* Use ``dbtRunner`` in the DAG Processor when using ``LoadMode.DBT_LS`` if dbt-core is available by @tatiana in #1484
* Allow users to opt-out of ``dbtRunner`` during DAG parsing with ``InvocationMode.SUBPROCESS`` by @tatiana in #1495

Bug Fixes

* Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RenderConfig:
dependencies. Defaults to True
:param test_behavior: The behavior for running tests. Defaults to after each (model)
:param load_method: The parsing method for loading the dbt model. Defaults to AUTOMATIC
:param invocation_mode: If `LoadMode.DBT_LS` is used, define how dbt ls should be run: using dbtRunner (default) or Python subprocess.
:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param selector: Name of a dbt YAML selector to use for parsing. Only supported when using ``load_method=LoadMode.DBT_LS``.
Expand All @@ -70,6 +71,7 @@ class RenderConfig:
emit_datasets: bool = True
test_behavior: TestBehavior = TestBehavior.AFTER_EACH
load_method: LoadMode = LoadMode.AUTOMATIC
invocation_mode: InvocationMode = InvocationMode.DBT_RUNNER
select: list[str] = field(default_factory=list)
exclude: list[str] = field(default_factory=list)
selector: str | None = None
Expand Down
15 changes: 11 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
DBT_TARGET_PATH_ENVVAR,
DbtResourceType,
ExecutionMode,
InvocationMode,
LoadMode,
SourceRenderingBehavior,
)
Expand Down Expand Up @@ -218,15 +219,21 @@ def run_command_with_dbt_runner(command: list[str], tmp_dir: Path | None, env_va
return stdout


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str], log_dir: Path | None = None) -> str:
def run_command(
command: list[str],
tmp_dir: Path,
env_vars: dict[str, str],
invocation_mode: InvocationMode,
log_dir: Path | None = None,
) -> str:
"""Run a command either with dbtRunner or Python subprocess, returning the stdout."""

runner = "dbt Runner" if dbt_runner.is_available() else "Python subprocess"
command = [str(arg) if arg is not None else "<None>" for arg in command]
logger.info("Running command with %s: `%s`", runner, " ".join(command))
logger.debug("Environment variable keys: %s", env_vars.keys())

if dbt_runner.is_available():
if invocation_mode == InvocationMode.DBT_RUNNER and dbt_runner.is_available():
stdout = run_command_with_dbt_runner(command, tmp_dir, env_vars)
else:
stdout = run_command_with_subprocess(command, tmp_dir, env_vars)
Expand Down Expand Up @@ -530,7 +537,7 @@ def run_dbt_ls(
ls_command.extend(self.local_flags)
ls_command.extend(ls_args)

stdout = run_command(ls_command, tmp_dir, env_vars, self.log_dir)
stdout = run_command(ls_command, tmp_dir, env_vars, self.render_config.invocation_mode, self.log_dir)

if self.should_use_dbt_ls_cache():
self.save_dbt_ls_cache(stdout)
Expand Down Expand Up @@ -591,7 +598,7 @@ def run_dbt_deps(self, dbt_cmd: str, dbt_project_path: Path, env: dict[str, str]
deps_command = [dbt_cmd, "deps"]
deps_command.extend(self.local_flags)
self._add_vars_arg(deps_command)
run_command(deps_command, dbt_project_path, env, self.log_dir)
run_command(deps_command, dbt_project_path, env, self.render_config.invocation_mode, self.log_dir)

def load_via_dbt_ls_without_cache(self) -> None:
"""
Expand Down
23 changes: 21 additions & 2 deletions docs/configuration/parsing-methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,39 @@ If you don't have a ``manifest.json`` file, Cosmos will attempt to generate one

When Cosmos runs ``dbt ls``, it also passes your ``select`` and ``exclude`` arguments to the command. This means that Cosmos will only generate a manifest for the models you want to run.

To use this:

.. code-block:: python
DbtDag(
render_config=RenderConfig(
load_method=LoadMode.DBT_LS,
)
# ...,
)
Starting in Cosmos 1.5, Cosmos will cache the output of the ``dbt ls`` command, to improve the performance of this
parsing method. Learn more `here <./caching.html>`_.

To use this:
Since Cosmos 1.9, it will attempt to use dbt as a library, and run ``dbt ls`` using the ``dbtRunner`` that is available for `dbt programmatic invocations <https://docs.getdbt.com/reference/programmatic-invocations>`__. This mode requires dbt version 1.5.0 or higher.
This mode, named ``InvocationMode.DBT_RUNNER``, also depends on dbt being installed in the same Python virtual environment as Airflow.
In previous Cosmos versions, Cosmos would always run ``dbt ls`` using the Python ``subprocess`` module, which can lead to significant CPU and memory usage.

Users can force Cosmos to run ``dbt ls`` with subprocess and not ``dbtRunner``, by setting:

.. code-block:: python
DbtDag(
render_config=RenderConfig(
load_method=LoadMode.DBT_LS,
load_method=LoadMode.DBT_LS, invocation_mode=InvocationMode.SUBPROCESS
)
# ...,
)
For more information, check the `RenderConfig docs <./render-config.html>`_.


``dbt_ls_file``
----------------

Expand Down
23 changes: 23 additions & 0 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies <lineage.html>`_. If a model in the project has a name containing multibyte characters, the dataset name will be URL-encoded.
- ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior <testing-behavior.html>`_ section.
- ``load_method``: how to load your dbt project. See `Parsing Methods <parsing-methods.html>`_ for more information.
- ``invocation_mode``: (new in v1.9) how to run ``dbt ls``, when using ``LoadMode.DBT_LS``. Learn more about this below.
- ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding <selecting-excluding.html>`_ for more information.
- ``selector``: (new in v1.3) name of a dbt YAML selector to use for DAG parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. See `Selecting & Excluding <selecting-excluding.html>`_ for more information.
- ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True
Expand All @@ -23,6 +24,28 @@ The ``RenderConfig`` class takes the following arguments:
- ``load_method``: how to load your dbt project. See `Parsing Methods <parsing-methods.html>`_ for more information.
- ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Parsing Methods <testing-behavior.html>`_ for more information.


How to run dbt ls (invocation mode)
-----------------------------------

When using ``LoadMode.DBT_LS``, Cosmos runs ``dbt ls`` to parse the dbt project.

Since Cosmos 1.9, it will attempt to use dbt as a library, and run ``dbt ls`` using the ``dbtRunner`` that is available for `dbt programmatic invocations <https://docs.getdbt.com/reference/programmatic-invocations>`__. This mode requires dbt version 1.5.0 or higher.
This mode, named ``InvocationMode.DBT_RUNNER``, also depends on dbt being installed in the same Python virtual environment as Airflow.
In previous Cosmos versions, Cosmos would always run ``dbt ls`` using the Python ``subprocess`` module, which can lead to significant CPU and memory usage (including OOM errors), both in the scheduler and worker nodes.

Although ``InvocationMode.DBT_RUNNER`` is the default behaviour in Cosmos 1.9, users can still specify which mode they would like to use:

1. ``InvocationMode.SUBPROCESS``: (behaviour before Cosmos 1.9) In this mode, Cosmos runs dbt cli commands using the Python ``subprocess`` module and parses the output to capture logs and to raise exceptions.

2. ``InvocationMode.DBT_RUNNER``: (default since Cosmos 1.9) In this mode, Cosmos uses the ``dbtRunner`` available for `dbt programmatic invocations <https://docs.getdbt.com/reference/programmatic-invocations>`__ to run dbt commands. \
In order to use this mode, dbt must be installed in the same local environment. This mode does not have the overhead of spawning new subprocesses or parsing the output of dbt commands and is faster than ``InvocationMode.SUBPROCESS``. \
This mode requires dbt version 1.5.0 or higher. It is up to the user to resolve :ref:`execution-modes-local-conflicts` when using this mode.

Users may opt to use ``InvocationMode.SUBPROCESS`` when they have multiple Python virtual environments with different versions of dbt and its adaptors,
and do not want Cosmos to use the dbt version installed in the same Python Virtualenv as Airflow to parse the DAG.


Customizing how nodes are rendered (experimental)
-------------------------------------------------

Expand Down
58 changes: 52 additions & 6 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DBT_TARGET_DIR_NAME,
DbtResourceType,
ExecutionMode,
InvocationMode,
SourceRenderingBehavior,
)
from cosmos.dbt.graph import (
Expand Down Expand Up @@ -1162,7 +1163,7 @@ def test_run_command(mock_popen, stdout, returncode):
mock_popen.return_value.communicate.return_value = (stdout, "")
mock_popen.return_value.returncode = returncode

return_value = run_command(fake_command, fake_dir, env_vars)
return_value = run_command(fake_command, fake_dir, env_vars, InvocationMode.DBT_RUNNER)
args, kwargs = mock_popen.call_args
assert args[0] == fake_command
assert kwargs["cwd"] == fake_dir
Expand All @@ -1176,14 +1177,55 @@ def test_run_command(mock_popen, stdout, returncode):
def test_run_command_success_with_log(tmp_dbt_project_dir):
project_dir = tmp_dbt_project_dir / DBT_PROJECT_NAME
(project_dir / DBT_LOG_FILENAME).touch()
response = run_command(command=["dbt", "deps"], env_vars=os.environ, tmp_dir=project_dir, log_dir=project_dir)
response = run_command(
command=["dbt", "deps"],
env_vars=os.environ,
tmp_dir=project_dir,
invocation_mode=InvocationMode.SUBPROCESS,
log_dir=project_dir,
)
assert "Installing dbt-labs/dbt_utils" in response


@patch("cosmos.dbt.graph.run_command_with_subprocess")
@patch("cosmos.dbt.graph.run_command_with_dbt_runner")
def test_run_command_forcing_subprocess(mock_dbt_runner, mock_subprocess, tmp_dbt_project_dir):
project_dir = tmp_dbt_project_dir / DBT_PROJECT_NAME
run_command(
command=["dbt", "deps"],
env_vars=os.environ,
tmp_dir=project_dir,
invocation_mode=InvocationMode.SUBPROCESS,
log_dir=project_dir,
)
assert mock_subprocess.called
assert not mock_dbt_runner.called


@patch("cosmos.dbt.graph.run_command_with_subprocess")
@patch("cosmos.dbt.graph.run_command_with_dbt_runner")
def test_run_command_forcing_dbt_runner(mock_dbt_runner, mock_subprocess, tmp_dbt_project_dir):
project_dir = tmp_dbt_project_dir / DBT_PROJECT_NAME
run_command(
command=["dbt", "deps"],
env_vars=os.environ,
tmp_dir=project_dir,
invocation_mode=InvocationMode.DBT_RUNNER,
log_dir=project_dir,
)
assert not mock_subprocess.called
assert mock_dbt_runner.called


@pytest.mark.integration
def test_run_command_with_dbt_runner_exception(tmp_dbt_project_dir):
with pytest.raises(CosmosLoadDbtException) as err_info:
run_command(command=["dbt", "ls"], env_vars=os.environ, tmp_dir=tmp_dbt_project_dir / DBT_PROJECT_NAME)
run_command(
command=["dbt", "ls"],
env_vars=os.environ,
invocation_mode=InvocationMode.DBT_RUNNER,
tmp_dir=tmp_dbt_project_dir / DBT_PROJECT_NAME,
)
err_msg = "Unable to run dbt ls command due to missing dbt_packages"
assert err_msg in str(err_info.value)

Expand All @@ -1199,7 +1241,9 @@ def test_run_command_with_dbt_runner_error(tmp_dbt_project_dir):
fp.writelines("select 1 as id")

with pytest.raises(CosmosLoadDbtException) as err_info:
run_command(command=["dbt", "run"], env_vars=os.environ, tmp_dir=project_dir)
run_command(
command=["dbt", "run"], env_vars=os.environ, invocation_mode=InvocationMode.DBT_RUNNER, tmp_dir=project_dir
)
assert "Unable to run ['dbt', 'run']" in str(err_info.value)


Expand All @@ -1212,7 +1256,7 @@ def test_run_command_none_argument(mock_popen, caplog):

mock_popen.return_value.communicate.return_value = ("Invalid None argument", None)
with pytest.raises(CosmosLoadDbtException) as exc_info:
run_command(fake_command, fake_dir, env_vars)
run_command(fake_command, fake_dir, env_vars, InvocationMode.SUBPROCESS)

expected = "Unable to run ['invalid-cmd', '<None>'] due to the error:\nstderr: None\nstdout: Invalid None argument"
assert str(exc_info.value) == expected
Expand Down Expand Up @@ -1656,7 +1700,9 @@ def test_run_dbt_deps(run_command_mock):
graph = DbtGraph(project=project_config)
graph.local_flags = []
graph.run_dbt_deps("dbt", "/some/path", {})
run_command_mock.assert_called_with(["dbt", "deps", "--vars", '{"var-key": "var-value"}'], "/some/path", {}, None)
run_command_mock.assert_called_with(
["dbt", "deps", "--vars", '{"var-key": "var-value"}'], "/some/path", {}, InvocationMode.DBT_RUNNER, None
)


@pytest.fixture()
Expand Down

0 comments on commit c4e4abc

Please sign in to comment.