Skip to content

Commit

Permalink
push the progress
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Jan 30, 2025
1 parent dd595f7 commit f6e17a5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
27 changes: 26 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,32 @@ def on_kill(self) -> None:


class DbtLocalBaseOperator(AbstractDbtLocalBase, BaseOperator):
pass
def __init__(self, *args, **kwargs):
import inspect

abstract_dbt_local_base_kwargs = {}
base_operator_kwargs = {}
abstract_dbt_local_base_args_keys = (
inspect.getfullargspec(AbstractDbtBase.__init__).args
+ inspect.getfullargspec(AbstractDbtLocalBase.__init__).args
)
base_operator_args = set(inspect.signature(BaseOperator.__init__).parameters.keys())
# breakpoint()
for arg_key, arg_value in kwargs.items():
if arg_key in abstract_dbt_local_base_args_keys:
abstract_dbt_local_base_kwargs[arg_key] = arg_value
if arg_key in base_operator_args:
base_operator_kwargs[arg_key] = arg_value
# breakpoint()

# super().__init__(*args, **kwargs)
task_id = kwargs.pop("task_id")
# kwargs.pop("extra_context", None)
# project_dir = kwargs.pop("project_dir")
# AbstractDbtLocalBase.__init__(self, task_id=task_id, **abstract_dbt_local_base_kwargs)
AbstractDbtLocalBase.__init__(self, **abstract_dbt_local_base_kwargs)
kwargs["task_id"] = task_id
BaseOperator.__init__(self, **base_operator_kwargs)


class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator):
Expand Down
6 changes: 3 additions & 3 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
DbtBuildLocalOperator,
DbtCloneLocalOperator,
DbtDocsLocalOperator,
DbtLocalBase,
DbtLocalBaseOperator,
DbtLSLocalOperator,
DbtRunLocalOperator,
DbtRunOperationLocalOperator,
Expand Down Expand Up @@ -47,7 +47,7 @@ def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> Any:
return wrapper


class DbtVirtualenvBaseOperator(DbtLocalBase):
class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
"""
Executes a dbt core cli command within a Python Virtual Environment, that is created before running the dbt command
and deleted at the end of the operator execution.
Expand All @@ -62,7 +62,7 @@ class DbtVirtualenvBaseOperator(DbtLocalBase):
:param is_virtualenv_dir_temporary: Tells Cosmos if virtualenv should be persisted or not.
"""

template_fields = DbtLocalBase.template_fields + ("virtualenv_dir", "is_virtualenv_dir_temporary") # type: ignore[operator]
template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir", "is_virtualenv_dir_temporary") # type: ignore[operator]

def __init__(
self,
Expand Down

0 comments on commit f6e17a5

Please sign in to comment.