Skip to content

Commit

Permalink
Merge branch 'main' into support/3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
dsuch committed Nov 9, 2023
2 parents b65d77c + 7c12b4b commit 2941507
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions code/zato-server/src/zato/server/file_transfer/observer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

# Zato
from zato.common.api import FILE_TRANSFER
from zato.common.typing_ import cast_
from zato.common.util.api import spawn_greenlet
from zato.common.util.file_transfer import path_string_list_to_list
from zato.server.file_transfer.snapshot import default_interval, DirSnapshotDiff
Expand All @@ -29,7 +28,7 @@

if 0:
from bunch import Bunch
from zato.common.typing_ import any_, anylist, anytuple
from zato.common.typing_ import any_, anylist, anytuple, callable_
from zato.server.file_transfer.api import FileTransferAPI
from zato.server.file_transfer.snapshot import BaseRemoteSnapshotMaker

Expand All @@ -54,6 +53,11 @@ def __init__(self, src_path:'str', is_dir:'bool') -> 'None':
# ################################################################################################################################

class BaseObserver:

# Type hints
_observe_func: 'callable_'
event_handler: 'any_'

observer_type_impl = '<observer-type-impl-not-set>'
observer_type_name = '<observer-type-name-not-set>'
observer_type_name_title = observer_type_name.upper()
Expand All @@ -69,7 +73,6 @@ def __init__(self, manager:'FileTransferAPI', channel_config:'Bunch') -> 'None':
self.name = channel_config.name
self.is_active = channel_config.is_active
self.sleep_time = default_interval
self.event_handler = None
self.path_list = ['<initial-observer>']
self.is_recursive = False
self.keep_running = True
Expand Down Expand Up @@ -98,17 +101,16 @@ def stop(self, needs_log:'bool'=True) -> 'None':

# ################################################################################################################################

def _start(self, observer_start_args):
def _start(self, observer_start_args:'any_') -> 'None':

for path in self.path_list:
path = cast_('str', path)

# Start only for paths that are valid - all invalid ones
# are handled by a background path inspector.
if self.is_path_valid(path):
logger.info('Starting %s file observer `%s` for `%s` (%s)',
self.observer_type_name, path, self.name, self.observer_type_impl)
spawn_greenlet(self._observe_func, path, observer_start_args)
_ = spawn_greenlet(self._observe_func, path, observer_start_args)
else:
logger.info('Skipping invalid path `%s` for `%s` (%s)', path, self.name, self.observer_type_impl)

Expand All @@ -121,14 +123,14 @@ def is_path_valid(self, *args:'any_', **kwargs:'any_') -> 'bool':

# ################################################################################################################################

def path_exists(self, path:'str', snapshot_maker:'BaseRemoteSnapshotMaker') -> 'bool':
def path_exists(self, path:'str', snapshot_maker:'BaseRemoteSnapshotMaker | None'=None) -> 'bool':
""" Returns True if path exists, False otherwise.
"""
raise NotImplementedError('Must be implemented by subclasses')

# ################################################################################################################################

def path_is_directory(self, path:'str', snapshot_maker:'BaseRemoteSnapshotMaker') -> 'bool':
def path_is_directory(self, path:'str', snapshot_maker:'BaseRemoteSnapshotMaker | None'=None) -> 'bool':
""" Returns True if path is a directory, False otherwise.
"""
raise NotImplementedError('Must be implemented by subclasses')
Expand Down Expand Up @@ -248,14 +250,14 @@ def observe_with_snapshots(
"""
try:

# How many times to run the loop - either given on input or, essentially, infinitely.
current_iter = 0

# Local aliases to avoid namespace lookups in self
timeout = self.sleep_time
handler_func = self.event_handler.on_created
is_recursive = self.is_recursive

# How many times to run the loop - either given on input or, essentially, infinitely.
current_iter = 0

# Take an initial snapshot
snapshot = snapshot_maker.get_snapshot(path, is_recursive, True, True)

Expand All @@ -270,7 +272,7 @@ def observe_with_snapshots(
new_snapshot = snapshot_maker.get_snapshot(path, is_recursive, False, False)

# .. difference between the old and new will return, in particular, new or modified files ..
diff = DirSnapshotDiff(snapshot, new_snapshot)
diff = DirSnapshotDiff(snapshot, new_snapshot) # type: ignore

for path_created in diff.files_created:

Expand Down Expand Up @@ -336,7 +338,7 @@ def observe_with_snapshots(

if log_stop_event:
logger.warning('Stopped %s file transfer observer `%s` for `%s` (snapshot:%s/%s)',
self.observer_type_name, self.name, path, current_iter, max_iters)
self.observer_type_name, self.name, path, current_iter, max_iters) # type: ignore

# ################################################################################################################################
# ################################################################################################################################
Expand All @@ -354,7 +356,7 @@ def __init__(

def start(self):
if self.observer.is_active:
spawn_greenlet(self.observer.wait_for_path, self.path, self.observer_start_args)
_ = spawn_greenlet(self.observer.wait_for_path, self.path, self.observer_start_args)

# ################################################################################################################################
# ################################################################################################################################

0 comments on commit 2941507

Please sign in to comment.