From f44e2e7ccda049ef003a89e7c2cebdd50f32522a Mon Sep 17 00:00:00 2001 From: Eric Junyuan Xie Date: Thu, 26 Nov 2020 13:43:01 +0800 Subject: [PATCH] Add log for success tag check (#431) * Add log for success tag check * fix * fix * fix --- .../data_join/data_portal_job_manager.py | 47 ++++++++++++++----- test/data_join/test_data_portal_master.py | 7 ++- 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/fedlearner/data_join/data_portal_job_manager.py b/fedlearner/data_join/data_portal_job_manager.py index f99235301..05331f103 100644 --- a/fedlearner/data_join/data_portal_job_manager.py +++ b/fedlearner/data_join/data_portal_job_manager.py @@ -264,11 +264,7 @@ def _update_portal_manifest(self, new_portal_manifest): def _launch_new_portal_job(self): assert self._sync_processing_job() is None - all_fpaths = self._list_input_dir() - rest_fpaths = [] - for fpath in all_fpaths: - if fpath not in self._processed_fpath: - rest_fpaths.append(fpath) + rest_fpaths = self._list_input_dir() if len(rest_fpaths) == 0: logging.info("no file left for portal") return @@ -296,21 +292,50 @@ def _list_input_dir(self): all_inputs = [] wildcard = self._portal_manifest.input_file_wildcard dirs = [self._portal_manifest.input_base_dir] + + num_dirs = 0 + num_files = 0 + num_target_files = 0 while len(dirs) > 0: fdir = dirs[0] dirs = dirs[1:] - has_succ = gfile.Exists(path.join(fdir, '_SUCCESS')) fnames = gfile.ListDirectory(fdir) for fname in fnames: fpath = path.join(fdir, fname) + # OSS does not retain folder structure. + # For example, if we have file oss://test/1001/a.txt + # list(oss://test) returns 1001/a.txt instead of 1001 + basename = path.basename(fpath) + if basename == '_SUCCESS': + continue if gfile.IsDirectory(fpath): dirs.append(fpath) - elif fname != '_SUCCESS' and ( - len(wildcard) == 0 or fnmatch(fname, wildcard)): - if self._check_success_tag and not has_succ: - continue + num_dirs += 1 + continue + num_files += 1 + if len(wildcard) == 0 or fnmatch(basename, wildcard): + num_target_files += 1 + if self._check_success_tag: + has_succ = gfile.Exists( + path.join(path.dirname(fpath), '_SUCCESS')) + if not has_succ: + logging.warning( + 'File %s skipped because _SUCCESS file is ' + 'missing under %s', + fpath, fdir) + continue all_inputs.append(fpath) - return all_inputs + + rest_fpaths = [] + for fpath in all_inputs: + if fpath not in self._processed_fpath: + rest_fpaths.append(fpath) + logging.info( + 'Listing %s: found %d dirs, %d files, %d files matching wildcard, ' + '%d files with success tag, %d new files to process', + self._portal_manifest.input_base_dir, num_dirs, num_files, + num_target_files, len(all_inputs), len(rest_fpaths)) + return rest_fpaths def _sync_job_part(self, job_id, partition_id): if partition_id not in self._job_part_map or \ diff --git a/test/data_join/test_data_portal_master.py b/test/data_join/test_data_portal_master.py index ab8024037..3fe7f7731 100644 --- a/test/data_join/test_data_portal_master.py +++ b/test/data_join/test_data_portal_master.py @@ -67,16 +67,19 @@ def test_api(self): if gfile.Exists(portal_input_base_dir): gfile.DeleteRecursively(portal_input_base_dir) gfile.MakeDirs(portal_input_base_dir) - all_fnames = ['{}.done'.format(i) for i in range(100)] + all_fnames = ['1001/{}.done'.format(i) for i in range(100)] all_fnames.append('{}.xx'.format(100)) + all_fnames.append('1001/_SUCCESS') for fname in all_fnames: fpath = os.path.join(portal_input_base_dir, fname) + gfile.MakeDirs(os.path.dirname(fpath)) with gfile.Open(fpath, "w") as f: f.write('xxx') portal_master_addr = 'localhost:4061' portal_options = dp_pb.DataPotraMasterlOptions( use_mock_etcd=True, - long_running=False + long_running=False, + check_success_tag=True, ) data_portal_master = DataPortalMasterService( int(portal_master_addr.split(':')[1]),