diff --git a/deploy/scripts/data_join/run_data_join_worker.sh b/deploy/scripts/data_join/run_data_join_worker.sh index f4bca467e..e2778c259 100755 --- a/deploy/scripts/data_join/run_data_join_worker.sh +++ b/deploy/scripts/data_join/run_data_join_worker.sh @@ -36,7 +36,8 @@ example_id_dump_threshold=$(normalize_env_to_args "--example_id_dump_threshold" data_block_builder=$(normalize_env_to_args "--data_block_builder" $DATA_BLOCK_BUILDER) data_block_compressed_type=$(normalize_env_to_args "--data_block_compressed_type" $DATA_BLOCK_COMPRESSED_TYPE) kvstore_type=$(normalize_env_to_args '--kvstore_type' $KVSTORE_TYPE) - +max_conversion_delay=$(normalize_env_to_args '--max_conversion_delay' $MAX_CONVERSION_DELAY) +enable_negative_example_generator=$(normalize_env_to_args '--enable_negative_example_generator' $ENABLE_NEGATIVE_EXAMPLE_GENERATOR) python -m fedlearner.data_join.cmd.data_join_worker_service \ $PEER_ADDR \ $MASTER_POD_NAMES \ @@ -47,4 +48,5 @@ python -m fedlearner.data_join.cmd.data_join_worker_service \ $data_block_dump_interval $data_block_dump_threshold \ $example_id_dump_interval $example_id_dump_threshold \ $data_block_builder $data_block_compressed_type \ - $kvstore_type + $kvstore_type $max_conversion_delay \ + $enable_negative_example_generator diff --git a/fedlearner/common/argparse_util.py b/fedlearner/common/argparse_util.py new file mode 100644 index 000000000..a279452a1 --- /dev/null +++ b/fedlearner/common/argparse_util.py @@ -0,0 +1,26 @@ +# Copyright 2020 The FedLearner Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding: utf-8 + +import argparse + +def str_as_bool(v): + if isinstance(v, bool): + return v + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + if v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + raise argparse.ArgumentTypeError('Boolean value expected.') diff --git a/fedlearner/data_join/cmd/data_join_worker_service.py b/fedlearner/data_join/cmd/data_join_worker_service.py index e5bdd0e10..d5aa3dfa4 100644 --- a/fedlearner/data_join/cmd/data_join_worker_service.py +++ b/fedlearner/data_join/cmd/data_join_worker_service.py @@ -20,8 +20,10 @@ import tensorflow from fedlearner.common import data_join_service_pb2 as dj_pb +from fedlearner.common.argparse_util import str_as_bool from fedlearner.data_join.common import get_kvstore_config from fedlearner.data_join.data_join_worker import DataJoinWorkerService +from fedlearner.data_join.common import interval_to_timestamp tensorflow.compat.v1.enable_eager_execution() if __name__ == "__main__": @@ -76,6 +78,14 @@ parser.add_argument('--data_block_compressed_type', type=str, default='', choices=['', 'ZLIB', 'GZIP'], help='the compressed type for data block') + parser.add_argument('--max_conversion_delay', type=str, default="7D", + help='the max delay of an impression occurred '\ + 'before a conversion as an attribution pair, unit: '\ + '{Y|M|D|H|N|S}, i.e. 1N20S equals 80 seconds') + parser.add_argument('--enable_negative_example_generator', type=str_as_bool, + default=False, const=True, nargs='?', + help="enable the negative example auto-generator, "\ + "filled with label: 0") args = parser.parse_args() worker_options = dj_pb.DataJoinWorkerOptions( use_mock_etcd=(args.kvstore_type == 'mock'), @@ -91,6 +101,10 @@ max_matching_window=args.max_matching_window, data_block_dump_interval=args.data_block_dump_interval, data_block_dump_threshold=args.data_block_dump_threshold, + max_conversion_delay=interval_to_timestamp(\ + args.max_conversion_delay), + enable_negative_example_generator=\ + args.enable_negative_example_generator, ), example_id_dump_options=dj_pb.ExampleIdDumpOptions( example_id_dump_interval=args.example_id_dump_interval, diff --git a/fedlearner/data_join/common.py b/fedlearner/data_join/common.py index 450a08fbd..edfe9e37a 100644 --- a/fedlearner/data_join/common.py +++ b/fedlearner/data_join/common.py @@ -371,3 +371,29 @@ def get_kvstore_config(kvstore_type): addr = os.environ.get('ETCD_ADDR', 'localhost:2379') base_dir = os.environ.get('ETCD_BASE_DIR', 'fedlearner') return name, addr, None, None, base_dir + +def interval_to_timestamp(itv): + unit = ["Y", "M", "D", "H", "N", "S"] + multiple = [3600*24*30*12, 3600*24*30, 3600*24, 3600, 60, 1] + unit_order, unit_no = {}, {} + for i, item in enumerate(unit): + unit_order[item] = len(unit) - i + s_no = "" + prv_order = len(unit) + 1 + for c in itv: + if c.isdigit(): + s_no += c + else: + c = c.upper() + if c not in unit_order or prv_order <= unit_order[c]: + return None + unit_no[c] = s_no + prv_order = unit_order[c] + s_no = "" + tmstmp = 0 + if len(s_no) > 0 and "S" not in unit_no: + unit_no["S"] = s_no + for i, item in enumerate(unit): + if item in unit_no: + tmstmp += int(unit_no[item]) * multiple[i] + return tmstmp diff --git a/fedlearner/data_join/data_block_dumper.py b/fedlearner/data_join/data_block_dumper.py index bcc413985..7f2d4e266 100644 --- a/fedlearner/data_join/data_block_dumper.py +++ b/fedlearner/data_join/data_block_dumper.py @@ -164,7 +164,9 @@ def _dump_data_block_by_meta(self, meta): example_num = len(meta.example_ids) for (index, item) in self._raw_data_visitor: example_id = item.example_id - if example_id == meta.example_ids[match_index]: + # ELements in meta.example_ids maybe duplicated + while match_index < example_num and\ + example_id == meta.example_ids[match_index]: data_block_builder.write_item(item) match_index += 1 if match_index >= example_num: diff --git a/fedlearner/data_join/data_block_manager.py b/fedlearner/data_join/data_block_manager.py index c7c3c3d8c..de4692562 100644 --- a/fedlearner/data_join/data_block_manager.py +++ b/fedlearner/data_join/data_block_manager.py @@ -63,7 +63,8 @@ def init_by_meta(self, meta): def set_data_block_manager(self, data_block_manager): self._data_block_manager = data_block_manager - def append_item(self, item, leader_index, follower_index, event_time=None): + def append_item(self, item, leader_index, follower_index, event_time=None,\ + allow_dup=False): example_id = item.example_id if event_time is None: event_time = item.event_time @@ -75,10 +76,18 @@ def append_item(self, item, leader_index, follower_index, event_time=None): self._data_block_meta.start_time = event_time self._data_block_meta.end_time = event_time else: - assert self._data_block_meta.leader_start_index < leader_index, \ - "leader start index should be incremental" - assert self._data_block_meta.leader_end_index < leader_index, \ - "leader end index should be incremental" + if not allow_dup: + assert self._data_block_meta.leader_start_index < leader_index,\ + "leader start index should be incremental" + assert self._data_block_meta.leader_end_index < leader_index, \ + "leader end index should be incremental" + else: + assert self._data_block_meta.leader_start_index <= \ + leader_index,\ + "leader start index should be incremental by GE" + assert self._data_block_meta.leader_end_index <= leader_index, \ + "leader end index should be incremental by LE" + self._data_block_meta.leader_end_index = leader_index if event_time < self._data_block_meta.start_time: self._data_block_meta.start_time = event_time diff --git a/fedlearner/data_join/joiner_impl/attribution_joiner.py b/fedlearner/data_join/joiner_impl/attribution_joiner.py new file mode 100644 index 000000000..7a25b32cc --- /dev/null +++ b/fedlearner/data_join/joiner_impl/attribution_joiner.py @@ -0,0 +1,562 @@ +# Copyright 2020 The FedLearner Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding: utf-8 + +import logging +import time + +from fedlearner.common import metrics + +import fedlearner.data_join.common as common +from fedlearner.data_join.joiner_impl.example_joiner import ExampleJoiner + +class NegativeExampleGenerator(object): + def __init__(self): + self._buf = {} + + def update(self, mismatches): + self._buf.update(mismatches) + + def generate(self, fe, prev_leader_idx, leader_idx): + for idx in range(prev_leader_idx, leader_idx): + if idx not in self._buf: + continue + example_id = self._buf[idx].example_id + if isinstance(example_id, bytes): + example_id = example_id.decode() + event_time = self._buf[idx].event_time + example = type(fe).make(example_id, event_time, + example_id, ["label"], [0]) + yield (example, idx, 0) + del self._buf[idx] + + del_keys = [k for k in self._buf if k < prev_leader_idx] + for k in del_keys: + del self._buf[k] + +class _Attributor(object): + """ + How we attribute the covertion event to the show event + """ + def __init__(self, max_conversion_delay): + self._max_conversion_delay = max_conversion_delay + + def match(self, conv, show): + assert hasattr(conv, "example_id"), "invalid item, no example id" + assert hasattr(show, "example_id"), "invalid item, no example id" + return conv.example_id == show.example_id and \ + conv.event_time > show.event_time and \ + conv.event_time <= show.event_time + \ + self._max_conversion_delay + +class _Accumulator(object): + def __init__(self, attributor): + self._attributor = attributor + + def join(self, conv_window, show_window): + """ + Assume show stream is exponentially larger than convert stream, + we cache the all the events from now back to watermark. example id + maybe duplicated in convert stream + Return: + show_matches: an array with tuples(convet, show), in same order + with show event + """ + show_matches = [] + show_mismatches = {} + conv_dict = conv_window.as_dict() + idx = 0 + while idx < show_window.size(): + show = show_window[idx][1] + if show.example_id in conv_dict: + cd = conv_dict[show.example_id] + for i in reversed(range(len(cd))): + #A show can match multiple conversion event, add + # all the matched conversion-show pair to result + conv = conv_window[cd[i]][1] + if self._attributor.match(conv, show): + show_matches.append((cd[i], idx)) + else: + show_mismatches[show_window[idx][0]] = show + idx += 1 + return show_matches, show_mismatches + +class _Trigger(object): + """ + Decide how to move forward the watermark + """ + def __init__(self, max_conversion_delay): + self._max_conversion_delay = max_conversion_delay + self._watermark = 0 + + def watermark(self): + return self._watermark + + def shrink(self, conv_window): + ed = conv_window[conv_window.size() - 1] + idx = 0 + while idx < conv_window.size() and ed[1].event_time > \ + conv_window[idx][1].event_time + self._max_conversion_delay: + self._watermark = conv_window[idx][1].event_time + idx += 1 + return idx + + def trigger(self, conv_window, show_window): + conv_stride, show_stride = 0, 0 + ## step can be increased to accelerate this + step = 1 + show_win_size = show_window.size() - 1 + conv_win_size = conv_window.size() - 1 + sid = 0 + while show_stride <= show_win_size and sid <= show_win_size \ + and conv_win_size >= 0 \ + and conv_window[0][1].event_time > \ + show_window[sid][1].event_time + \ + self._max_conversion_delay: + show_stride += step + sid += 1 + + cid = 0 + while conv_stride <= conv_win_size and show_win_size >= 0 and \ + 0 <= cid <= conv_win_size and \ + show_window[show_win_size][1].event_time > \ + conv_window[cid][1].event_time + \ + self._max_conversion_delay: + #FIXME current et is not always the watermark + self._watermark = max(conv_window[cid][1].event_time, \ + self._watermark) + conv_stride += step + cid += 1 + + logging.info("Watermark triggered forward to %d by " \ + "(conv: %d, show: %d)", \ + self._watermark, conv_stride, show_stride) + return (conv_stride, show_stride) + +class _SlidingWindow(object): + """ + Sliding and unfixed-size window + """ + def __init__(self, init_window_size, max_window_size): + self._init_window_size = max(init_window_size, 1) + self._max_window_size = max_window_size + self._ring_buffer = list(range(self._init_window_size)) + self._start = 0 + self._end = 0 + self._alloc_size = self._init_window_size + self._size = 0 + self._debug_extend_cnt = 0 + + def __str__(self): + return "start: %d, end: %d, size: %d, alloc_size: " \ + "%d, ring_buffer: %s" % \ + (self._start, self._end, self._size, self._alloc_size, \ + self._ring_buffer[self.start(): \ + (self.start()+20)%self._alloc_size]) + def as_dict(self): + buf = {} + idx = 0 + ## assume that the duplicated example is few + while idx < self.size(): + item = self.__getitem__(idx)[1] + if item.example_id not in buf: + buf[item.example_id] = [idx] + else: + buf[item.example_id].append(idx) + idx += 1 + return buf + + def start(self): + return self._start + + def end(self): + return self._end + + def size(self): + return self._size + + def is_full(self): + return self._size == self._alloc_size + + def et_span(self): + if self._size == 0: + return 0 + st = self._ring_buffer[self._start][1].event_time + ed = self._ring_buffer[self._index(self._size - 1)][1].event_time + return ed - st + + def reserved_size(self): + return self._max_window_size - self._size + + def append(self, index, item): + if self._size >= self._alloc_size: + self.extend() + assert self._size < self._alloc_size, "Window extend failed" + self._ring_buffer[self._end] = (index, item) + self._end = (self._end + 1) % self._alloc_size + self._size += 1 + + def _defragment(self, new_buf): + """ + defragment the ring buffer, and copy it to new_buf + """ + if self._end == 0: + new_buf[0:self._size] = \ + self._ring_buffer[self._start:self._alloc_size] + elif self._start < self._end: + new_buf[0:self._size] = \ + self._ring_buffer[self._start:self._end] + else: + part_1 = self._alloc_size - self._start + new_buf[0:part_1] = self._ring_buffer[self._start:self._alloc_size] + part_2 = self._end + if part_2 >= 0: + new_buf[part_1:part_1+part_2] = self._ring_buffer[0:part_2] + + def extend(self): + logging.info("%s extend begin, begin=%d, end=%d, size=%d, " + "alloc_size=%d, len(ring_buffer)=%d, extend_cnt=%d", \ + self.__class__.__name__, self._start, self._end, \ + self._size, self._alloc_size, len(self._ring_buffer), \ + self._debug_extend_cnt) + assert self._alloc_size < self._max_window_size, \ + "Can't extend ring buffer due to max_window_size limit" + new_alloc_size = min(self._alloc_size * 2, self._max_window_size) + new_buf = list(range(new_alloc_size)) + self._defragment(new_buf) + self._start = 0 + self._end = self._size + self._alloc_size = new_alloc_size + self._ring_buffer = new_buf + self._debug_extend_cnt += 1 + logging.info("%s extend end, begin=%d, end=%d, size=%d, " + "alloc_size=%d, len(ring_buffer)=%d, extend_cnt=%d", \ + self.__class__.__name__, self._start, self._end, \ + self._size, self._alloc_size, len(self._ring_buffer), \ + self._debug_extend_cnt) + + def reset(self, new_buffer, state_stale): + self._start = 0 + self._end = len(new_buffer) + self._size = len(new_buffer) + self._ring_buffer[0:self._size-1] = new_buffer[0:self._size-1] + + def _index(self, index): + return (self._start + index) % self._alloc_size + + def __getitem__(self, index): + if index > self._alloc_size: + logging.warning("index %d out of range %d, be truncated", \ + index, self._alloc_size) + return self._ring_buffer[self._index(index)] + + def forward(self, step): + if self._size < step: + return False + self._start = self._index(step) + self._size -= step + return True + +class AttributionJoiner(ExampleJoiner): + def __init__(self, example_joiner_options, raw_data_options, + data_block_builder_options, kvstore, data_source, + partition_id): + super(AttributionJoiner, self).__init__(example_joiner_options, + raw_data_options, + data_block_builder_options, + kvstore, data_source, + partition_id) + self._min_window_size = example_joiner_options.min_matching_window + # max_window_size must be lesser than max_conversion_delay + self._max_window_size = example_joiner_options.max_matching_window + self._max_conversion_delay = \ + example_joiner_options.max_conversion_delay + self._leader_join_window = _SlidingWindow(self._min_window_size, + 1000000) + self._follower_join_window = _SlidingWindow( + self._min_window_size, self._max_window_size) + self._leader_restart_index = -1 + self._sorted_buf_by_leader_index = [] + self._dedup_by_follower_index = {} + + self._trigger = _Trigger(self._max_conversion_delay) + attri = _Attributor(self._max_conversion_delay) + self._acc = _Accumulator(attri) + + self._enable_negative_example_generator = \ + example_joiner_options.enable_negative_example_generator + if self._enable_negative_example_generator: + self._negative_example_generator = NegativeExampleGenerator() + + @classmethod + def name(cls): + return 'ATTRIBUTION_JOINER' + + def _inner_joiner(self, state_stale): + if self.is_join_finished(): + return + sync_example_id_finished, raw_data_finished = \ + self._prepare_join(state_stale) + join_data_finished = False + + leader_fstep = 1 + while True: + leader_filled = self._fill_leader_join_window() + leader_exhausted = sync_example_id_finished and \ + self._leader_join_window.et_span() < \ + self._max_conversion_delay + follower_filled = self._fill_follower_join_window() + follower_exhausted = raw_data_finished and \ + self._follower_join_window.et_span() < \ + self._max_conversion_delay + logging.info("Fill: leader_filled=%s, leader_exhausted=%s,"\ + " follower_filled=%s, follower_exhausted=%s,"\ + " sync_example_id_finished=%s, raw_data_finished=%s",\ + leader_filled, leader_exhausted, \ + follower_filled, follower_exhausted,\ + sync_example_id_finished, raw_data_finished) + + watermark = self._trigger.watermark() + #1. find all the matched pairs in current window + raw_pairs, mismatches = self._acc.join(self._follower_join_window,\ + self._leader_join_window) + if self._enable_negative_example_generator: + self._negative_example_generator.update(mismatches) + #2. cache the pairs, evict the show events which are out of + # watermark + pairs = self._sort_and_evict_attri_buf(raw_pairs, watermark) + #3. push the result into builder + if len(pairs) > 0: + for meta in self._dump_joined_items(pairs): + yield meta + self._leader_restart_index = pairs[len(pairs) - 1][1] + self._follower_restart_index = pairs[len(pairs) - 1][2] + logging.info("Restart index for leader %d, follwer %d", \ + self._leader_restart_index, \ + self._follower_restart_index) + + #4. update the watermark + stride = self._trigger.trigger(self._follower_join_window, \ + self._leader_join_window) + self._follower_join_window.forward(stride[0]) + self._leader_join_window.forward(stride[1]) + logging.info("Stat: pair_buf=%d, raw_pairs=%d, pairs=%d, " \ + "stride=%s", \ + len(self._sorted_buf_by_leader_index), \ + len(raw_pairs), len(pairs), stride) + + if not leader_filled and not sync_example_id_finished: + logging.info("Wait for Leader syncing example id...") + break + if leader_exhausted: + join_data_finished = True + break + + if stride == (0, 0): + if raw_data_finished: + self._leader_join_window.forward(leader_fstep) + leader_fstep = min(leader_fstep * 2, + (self._leader_join_window.size()+1)//2) + + if sync_example_id_finished: + force_stride = \ + self._trigger.shrink(self._follower_join_window) + self._follower_join_window.forward(force_stride) + else: + leader_fstep = 1 + + if self._get_data_block_builder(False) is not None and \ + (self._need_finish_data_block_since_interval() or + join_data_finished): + yield self._finish_data_block() + if join_data_finished: + self._set_join_finished() + logging.info("finish join example for partition %d by %s", + self._partition_id, self.name()) + + def _latest_attri(self, index): + lf, rt = 0, len(self._sorted_buf_by_leader_index) + while lf < rt: + mid = (lf + rt) // 2 + if index < self._sorted_buf_by_leader_index[mid][1]: + rt = mid + else: + lf = mid + 1 + return lf + + def _sort_and_evict_attri_buf(self, raw_matches, watermark): + """ + Push the matched pairs to order-by-leader-index list, + and evict the pairs which are out of watermark + """ + for (cid, sid) in raw_matches: + #fi: follower index, fe: follower example + assert cid < self._follower_join_window.size(), "Invalid l index" + assert sid < self._leader_join_window.size(), "Invalid f index" + (fi, fe) = self._follower_join_window[cid] + (li, le) = self._leader_join_window[sid] + assert fe.example_id == le.example_id, "Example id must be equal" + if li <= self._leader_restart_index: + logging.warning("Unordered event ignored, leader index should"\ + " be greater %d > %d for follower idx %d is" \ + " false", li, self._leader_restart_index, fi) + continue + + # cache the latest show event + updated = False + if fi in self._dedup_by_follower_index: + if self._dedup_by_follower_index[fi][1] > le.event_time: + self._dedup_by_follower_index[fi] = (li, le.event_time) + updated = True + else: + self._dedup_by_follower_index[fi] = (li, le.event_time) + updated = True + # sort by leader index + if not updated: + continue + latest_pos = self._latest_attri(li) + if latest_pos > 0: + # remove the dups + latest_item = \ + self._sorted_buf_by_leader_index[latest_pos - 1] + if latest_item[1] == li and latest_item[2] == fi: + continue + self._sorted_buf_by_leader_index.insert(latest_pos, \ + (fe, li, fi)) + matches = [] + idx = 0 + for (fe, li, fi) in self._sorted_buf_by_leader_index: + if fe.event_time <= watermark: + assert fi in self._dedup_by_follower_index, "Invalid f index" + (leader_index, _) = self._dedup_by_follower_index[fi] + if leader_index == li: + matches.append((fe, li, fi)) + del self._dedup_by_follower_index[fi] + else: + logging.info("Example %s matching leader index %s is"\ + " older than %d", fe.example_id, li, \ + leader_index) + else: + # FIXME: Assume the unordered range is limited, + # or this will bring an out-of-memory crash + break + idx += 1 + self._sorted_buf_by_leader_index \ + = self._sorted_buf_by_leader_index[idx:] + return matches + + # useless + def _reset_joiner_state(self, state_stale): + self._leader_join_window.reset([], state_stale) + if state_stale: + self._follower_join_window.reset([], True) + + def _prepare_join(self, state_stale): + if state_stale: + self._reset_joiner_state(True) + return super(AttributionJoiner, self)._prepare_join(state_stale) + + def _dump_joined_items(self, matching_list): + start_tm = time.time() + prev_leader_idx = self._leader_restart_index + 1 + for item in matching_list: + (fe, li, fi) = item + if self._enable_negative_example_generator and li > prev_leader_idx: + logging.info("Neg example generating from %d to %d", + prev_leader_idx, li) + for example in \ + self._negative_example_generator.generate( + fe, prev_leader_idx, li): + + builder = self._get_data_block_builder(True) + assert builder is not None, "data block builder must be "\ + "not None if before dummping" + builder.append_item(example[0], example[1], + example[2], None, True) + if builder.check_data_block_full(): + yield self._finish_data_block() + prev_leader_idx = li + 1 + + builder = self._get_data_block_builder(True) + assert builder is not None, "data block builder must be "\ + "not None if before dummping" + builder.append_item(fe, li, fi, None, True) + if builder.check_data_block_full(): + yield self._finish_data_block() + metrics.emit_timer(name='attribution_joiner_dump_joined_items', + value=int(time.time()-start_tm), + tags=self._metrics_tags) + + def _fill_leader_join_window(self): + start_tm = time.time() + idx = self._leader_join_window.size() + filled_new_example = self._fill_join_windows(self._leader_visitor, + self._leader_join_window) + eids = [] + while idx < self._leader_join_window.size(): + eids.append((self._leader_join_window[idx][0], + self._leader_join_window[idx][1].example_id)) + idx += 1 + + self._joiner_stats.fill_leader_example_ids(eids) + metrics.emit_timer(name=\ + 'attribution_joiner_fill_leader_join_window', + value=int(time.time()-start_tm), + tags=self._metrics_tags) + return filled_new_example + + def _fill_follower_join_window(self): + start_tm = time.time() + idx = self._follower_join_window.size() + filled_new_example = self._fill_join_windows(self._follower_visitor, + self._follower_join_window) + eids = [] + while idx < self._follower_join_window.size(): + eids.append((self._follower_join_window[idx][0], + self._follower_join_window[idx][1].example_id)) + idx += 1 + + self._joiner_stats.fill_follower_example_ids(eids) + metrics.emit_timer(name=\ + 'attribution_joiner_fill_follower_join_window', + value=int(time.time()-start_tm), + tags=self._metrics_tags) + return filled_new_example + + def _fill_join_windows(self, visitor, join_window): + size = join_window.size() + while not visitor.finished() and not join_window.is_full(): + required_item_count = join_window.reserved_size() + self._consume_item_until_count( + visitor, join_window, + required_item_count + ) + return join_window.size() > size + + def _consume_item_until_count(self, visitor, windows, + required_item_count): + for (index, item) in visitor: + if item.example_id == common.InvalidExampleId: + logging.warning("ignore item indexed as %d from %s since "\ + "invalid example id", index, visitor.name()) + elif item.event_time == common.InvalidEventTime: + logging.warning("ignore item indexed as %d from %s since "\ + "invalid event time", index, visitor.name()) + else: + windows.append(index, item) + if windows.size() >= required_item_count: + return + assert visitor.finished(), "visitor shoud be finished of "\ + "required_item is not satisfied" diff --git a/fedlearner/data_join/raw_data_iter_impl/csv_dict_iter.py b/fedlearner/data_join/raw_data_iter_impl/csv_dict_iter.py index b8823bb6e..0ab9b978a 100644 --- a/fedlearner/data_join/raw_data_iter_impl/csv_dict_iter.py +++ b/fedlearner/data_join/raw_data_iter_impl/csv_dict_iter.py @@ -33,6 +33,19 @@ def __init__(self, raw): self._raw = raw self._tf_record = None + @classmethod + def make(cls, example_id, event_time, raw_id, fname=None, fvalue=None): + raw = OrderedDict() + raw["example_id"] = example_id + raw["event_time"] = event_time + raw["raw_id"] = raw_id + if not fname: + assert len(fname) == len(fvalue), \ + "Field name should match field value" + for i, v in enumerate(fname): + raw[v] = fvalue[i] + return cls(raw) + @property def example_id(self): if 'example_id' not in self._raw: diff --git a/fedlearner/data_join/raw_data_iter_impl/raw_data_iter.py b/fedlearner/data_join/raw_data_iter_impl/raw_data_iter.py index 74c1bfdd5..8c7b32187 100644 --- a/fedlearner/data_join/raw_data_iter_impl/raw_data_iter.py +++ b/fedlearner/data_join/raw_data_iter_impl/raw_data_iter.py @@ -48,6 +48,10 @@ def csv_record(self): "csv_record not implement for basic Item" ) + @classmethod + def make(cls, example_id, event_time, raw_id, fname=None, fvalue=None): + raise NotImplementedError("make not implement for basic Item") + def __init__(self, options): self._fiter = None self._index_meta = None diff --git a/fedlearner/data_join/raw_data_iter_impl/tf_record_iter.py b/fedlearner/data_join/raw_data_iter_impl/tf_record_iter.py index a12da8aa6..65b662641 100644 --- a/fedlearner/data_join/raw_data_iter_impl/tf_record_iter.py +++ b/fedlearner/data_join/raw_data_iter_impl/tf_record_iter.py @@ -15,6 +15,7 @@ # coding: utf-8 import logging +from collections import OrderedDict from contextlib import contextmanager import tensorflow.compat.v1 as tf @@ -33,6 +34,20 @@ def __init__(self, record_str): self._csv_record = None self._gc_example(example) + @classmethod + def make(cls, example_id, event_time, raw_id, fname=None, fvalue=None): + fields = OrderedDict() + fields["example_id"] = example_id + fields["event_time"] = event_time + fields["raw_id"] = raw_id + if not fname: + assert len(fname) == len(fvalue), \ + "Field name should match field value" + for i, v in enumerate(fname): + fields[v] = fvalue[i] + ex = common.convert_dict_to_tf_example(fields) + return cls(ex.SerializeToString()) + @property def example_id(self): if self._example_id == common.InvalidExampleId: diff --git a/protocols/fedlearner/common/data_join_service.proto b/protocols/fedlearner/common/data_join_service.proto index 3d6646ac7..fa9c2e9a1 100644 --- a/protocols/fedlearner/common/data_join_service.proto +++ b/protocols/fedlearner/common/data_join_service.proto @@ -248,6 +248,8 @@ message ExampleJoinerOptions { int64 max_matching_window = 3; int64 data_block_dump_interval = 4; int64 data_block_dump_threshold = 5; + int64 max_conversion_delay = 6; + bool enable_negative_example_generator = 7; } message ExampleIdDumpOptions { diff --git a/test/data_join/test_attribution_join.py b/test/data_join/test_attribution_join.py new file mode 100644 index 000000000..2e657e897 --- /dev/null +++ b/test/data_join/test_attribution_join.py @@ -0,0 +1,141 @@ +# Copyright 2020 The FedLearner Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding: utf-8 + +import unittest +import random +import numpy as np +import time +import logging +import sys + +from fedlearner.data_join.common import interval_to_timestamp +from fedlearner.data_join.joiner_impl import attribution_joiner as aj +from fedlearner.data_join.raw_data_iter_impl.csv_dict_iter import CsvItem + +class _Item(object): + def __init__(self, eid, et, idx): + self.example_id = eid + self.event_time = et + self.index = idx + + @classmethod + def make(cls, example_id, event_time, raw_id, fname=None, fvalue=None): + assert fname and fvalue, "Invalid fname" + assert fname[0] == "label" and fvalue[0] == 0, "Invalid fvalue" + return cls(example_id, event_time, 0) + +class TestAttributionJoiner(unittest.TestCase): + def setUp(self): + pass + + #@unittest.skip("21") + def test_sliding_window_append(self): + sliding_win = aj._SlidingWindow(0, 100000000) + idx = 0 + skips = 0 + size, steps, forward, dt = 100, 20, 10, 1 + for it in [size for i in range(steps)]: + forward_step = random.randint(1, forward) + if sliding_win.forward(forward_step): + skips += forward_step + for i in range(it): + sliding_win.append(idx, _Item(idx, idx-dt, idx)) + self.assertEqual(idx, sliding_win[idx - skips][0]) + idx += 1 + self.assertEqual(skips + sliding_win.size(), idx) + return sliding_win, skips + + + def make_sliding_window(self, size, steps, forward, dt, repeated=False): + st = time.time() + sliding_win = aj._SlidingWindow(0, 100000000) + idx = 0 + skips = 0 + repeated_ev = [] + for it in [size for i in range(steps)]: + forward_step = forward #random.randint(1, forward) + luckydog= [] + if sliding_win.forward(forward_step): + skips += forward_step + for i in range(it): + ## disorder event + if i % 7 == 0: + luckydog.append((idx, _Item(idx, idx-dt, idx))) + else: + sliding_win.append(idx, _Item(idx, idx-dt, idx)) + self.assertEqual(idx, sliding_win[idx - skips - len(luckydog)][0]) + idx += 1 + for ld in luckydog: + sliding_win.append(ld[0], ld[1]) + if repeated: + repeated_ev.append((ld[0], ld[1])) + + dur = time.time() - st + print("window mem usage %dM" % (sys.getsizeof(sliding_win._ring_buffer) / 1024 / 1024)) + print("time costs: %d(s), append num %d, skips %d, window_size=%d, alloc_size=%d" %\ + (dur, idx, skips, sliding_win.size(), sliding_win._alloc_size)) + self.assertEqual(skips + sliding_win.size(), idx) + + for (k, v) in repeated_ev: + sliding_win.append(k, v) + return sliding_win, skips, len(repeated_ev) + + #@unittest.skip("121") + def test_sliding_window_accumulator(self): + watermark = 100000000 + follower_start_index = 0 + conv_size, conv_steps = 1000, 1000 + attri = aj._Attributor(watermark) + acc = aj._Accumulator(attri) + conv_window, conv_skips, _ = self.make_sliding_window(conv_size, conv_steps, 10, 1) + show_window, show_skips, _ = self.make_sliding_window(conv_size, conv_steps, 1, 2) + res, _ = acc.join(conv_window, show_window) + self.assertEqual(conv_size * conv_steps - conv_skips, len(res)) + + + def test_sliding_window_accumulator_with_repeated_conversion(self): + watermark = 100000000 + follower_start_index = 0 + conv_size, conv_steps = 1000, 1 + attri = aj._Attributor(watermark) + acc = aj._Accumulator(attri) + conv_window, conv_skips, repeated = self.make_sliding_window(conv_size, conv_steps, 10, 1, True) + show_window, show_skips, _ = self.make_sliding_window(conv_size, conv_steps, 1, 2) + res, _ = acc.join(conv_window, show_window) + self.assertEqual(conv_size * conv_steps - conv_skips + repeated, len(res)) + + def test_interval_to_timestamp(self): + test_in = ["1000s", "1n", "10D", "1Y2M1d3h", "1d1y", "1s1S", "111111s", "1234", "0y1n1"] + test_out = [1000, 60, 864000, 36385200, None, None, 111111, 1234, 61] + test_out_ = list(map(interval_to_timestamp, test_in)) + self.assertEqual(test_out, test_out_) + + def test_negative_example_generator(self): + gen = aj.NegativeExampleGenerator() + index_list = list(range(128)) + for i in index_list: + gen.update({i: _Item("example_id_%d"%i, i, i)}) + item_tpl = _Item("example_id", 100, 0) + + for item in gen.generate(item_tpl, 1, 128): + pass + + def tearDown(self): + pass + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/test/data_join/test_attribution_join_integrity.py b/test/data_join/test_attribution_join_integrity.py new file mode 100644 index 000000000..397ab3341 --- /dev/null +++ b/test/data_join/test_attribution_join_integrity.py @@ -0,0 +1,263 @@ +# Copyright 2020 The FedLearner Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding: utf-8 + +import unittest +import os +import random +import logging + +import tensorflow.compat.v1 as tf +tf.enable_eager_execution() +import tensorflow_io +from tensorflow.compat.v1 import gfile +from google.protobuf import timestamp_pb2 + +from fedlearner.common import mysql_client +from fedlearner.common import common_pb2 as common_pb +from fedlearner.common import data_join_service_pb2 as dj_pb +from fedlearner.data_join.common import interval_to_timestamp + +from fedlearner.data_join import ( + data_block_manager, common, data_block_dumper, + raw_data_manifest_manager, joiner_impl, + example_id_dumper, raw_data_visitor, visitor +) +from fedlearner.data_join.data_block_manager import DataBlockBuilder +from fedlearner.data_join.raw_data_iter_impl.tf_record_iter import TfExampleItem + +class TestAttributionJoin(unittest.TestCase): + def setUp(self): + data_source = common_pb.DataSource() + data_source.data_source_meta.name = "milestone-f" + data_source.data_source_meta.partition_num = 1 + data_source.output_base_dir = "./ds_output" + self.raw_data_dir = "./raw_data" + self.data_source = data_source + self.raw_data_options = dj_pb.RawDataOptions( + raw_data_iter='TF_RECORD', + compressed_type='' + ) + self.example_id_dump_options = dj_pb.ExampleIdDumpOptions( + example_id_dump_interval=1, + example_id_dump_threshold=1024 + ) + self.example_joiner_options = dj_pb.ExampleJoinerOptions( + example_joiner='ATTRIBUTION_JOINER', + min_matching_window=32, + max_matching_window=512, + max_conversion_delay=interval_to_timestamp("124"), + enable_negative_example_generator=True, + data_block_dump_interval=32, + data_block_dump_threshold=128 + ) + if gfile.Exists(self.data_source.output_base_dir): + gfile.DeleteRecursively(self.data_source.output_base_dir) + if gfile.Exists(self.raw_data_dir): + gfile.DeleteRecursively(self.raw_data_dir) + self.kvstore = mysql_client.DBClient('test_cluster', 'localhost:2379', + 'test_user', 'test_password', + 'fedlearner', True) + self.kvstore.delete_prefix(common.data_source_kvstore_base_dir(self.data_source.data_source_meta.name)) + self.total_raw_data_count = 0 + self.total_example_id_count = 0 + self.manifest_manager = raw_data_manifest_manager.RawDataManifestManager( + self.kvstore, self.data_source) + self.g_data_block_index = 0 + + def generate_raw_data(self, begin_index, item_count): + raw_data_dir = os.path.join(self.raw_data_dir, common.partition_repr(0)) + if not gfile.Exists(raw_data_dir): + gfile.MakeDirs(raw_data_dir) + self.total_raw_data_count += item_count + useless_index = 0 + rdm = raw_data_visitor.RawDataManager(self.kvstore, self.data_source, 0) + fpaths = [] + for block_index in range(0, item_count // 2048): + builder = DataBlockBuilder( + self.raw_data_dir, + self.data_source.data_source_meta.name, + 0, block_index, dj_pb.WriterOptions(output_writer='TF_RECORD'), None + ) + cands = list(range(begin_index + block_index * 2048, + begin_index + (block_index + 1) * 2048)) + start_index = cands[0] + for i in range(len(cands)): + if random.randint(1, 4) > 2: + continue + a = random.randint(i - 32, i + 32) + b = random.randint(i - 32, i + 32) + if a < 0: + a = 0 + if a >= len(cands): + a = len(cands) - 1 + if b < 0: + b = 0 + if b >= len(cands): + b = len(cands) - 1 + if (abs(cands[a]-i-start_index) <= 32 and + abs(cands[b]-i-start_index) <= 32): + cands[a], cands[b] = cands[b], cands[a] + for example_idx in cands: + feat = {} + example_id = '{}'.format(example_idx).encode() + feat['example_id'] = tf.train.Feature( + bytes_list=tf.train.BytesList(value=[example_id])) + event_time = 150000000 + example_idx + 1 + feat['event_time'] = tf.train.Feature( + int64_list=tf.train.Int64List(value=[event_time])) + example = tf.train.Example(features=tf.train.Features(feature=feat)) + builder.append_item(TfExampleItem(example.SerializeToString()), + useless_index, useless_index) + useless_index += 1 + + feat = {} + feat['example_id'] = tf.train.Feature( + bytes_list=tf.train.BytesList(value=[example_id])) + event_time = 150000000 + example_idx + 111 + feat['event_time'] = tf.train.Feature( + int64_list=tf.train.Int64List(value=[event_time])) + example = tf.train.Example(features=tf.train.Features(feature=feat)) + builder.append_item(TfExampleItem(example.SerializeToString()), + useless_index, useless_index) + useless_index += 1 + + + meta = builder.finish_data_block() + fname = common.encode_data_block_fname( + self.data_source.data_source_meta.name, meta + ) + fpath = os.path.join(raw_data_dir, fname) + fpaths.append(dj_pb.RawDataMeta(file_path=fpath, + timestamp=timestamp_pb2.Timestamp(seconds=3))) + self.g_data_block_index += 1 + all_files = [os.path.join(raw_data_dir, f) + for f in gfile.ListDirectory(raw_data_dir) + if not gfile.IsDirectory(os.path.join(raw_data_dir, f))] + for fpath in all_files: + if not fpath.endswith(common.DataBlockSuffix): + gfile.Remove(fpath) + self.manifest_manager.add_raw_data(0, fpaths, False) + + def generate_example_id(self, dumper, start_index, item_count): + self.total_example_id_count += item_count + for req_index in range(start_index // 512, self.total_example_id_count // 512): + example_id_batch = dj_pb.LiteExampleIds( + partition_id=0, + begin_index=req_index * 512 + ) + cands = list(range(req_index * 512, (req_index + 1) * 512)) + start_index = cands[0] + for i in range(len(cands)): + if random.randint(1, 4) > 1: + continue + a = random.randint(i - 64, i + 64) + b = random.randint(i - 64, i + 64) + if a < 0: + a = 0 + if a >= len(cands): + a = len(cands) - 1 + if b < 0: + b = 0 + if b >= len(cands): + b = len(cands) - 1 + if (abs(cands[a]-i-start_index) <= 64 and + abs(cands[b]-i-start_index) <= 64): + cands[a], cands[b] = cands[b], cands[a] + for example_idx in cands: + example_id_batch.example_id.append('{}'.format(example_idx).encode()) + example_id_batch.event_time.append(150000000 + example_idx) + packed_example_id_batch = dj_pb.PackedLiteExampleIds( + partition_id=0, + begin_index=req_index*512, + example_id_num=len(cands), + sered_lite_example_ids=example_id_batch.SerializeToString() + ) + dumper.add_example_id_batch(packed_example_id_batch) + self.assertEqual(dumper.get_next_index(), (req_index + 1) * 512) + self.assertTrue(dumper.need_dump()) + with dumper.make_example_id_dumper() as eid: + eid() + + def test_example_joiner(self): + sei = joiner_impl.create_example_joiner( + self.example_joiner_options, + self.raw_data_options, + dj_pb.WriterOptions(output_writer='TF_RECORD'), + self.kvstore, self.data_source, 0 + ) + metas = [] + with sei.make_example_joiner() as joiner: + for meta in joiner: + metas.append(meta) + self.assertEqual(len(metas), 0) + self.generate_raw_data(0, 2 * 2048) + dumper = example_id_dumper.ExampleIdDumperManager( + self.kvstore, self.data_source, 0, self.example_id_dump_options + ) + self.generate_example_id(dumper, 0, 3 * 2048) + with sei.make_example_joiner() as joiner: + for meta in joiner: + metas.append(meta) + self.generate_raw_data(2 * 2048, 2048) + self.generate_example_id(dumper, 3 * 2048, 3 * 2048) + with sei.make_example_joiner() as joiner: + for meta in joiner: + metas.append(meta) + self.generate_raw_data(3 * 2048, 5 * 2048) + self.generate_example_id(dumper, 6 * 2048, 2048) + with sei.make_example_joiner() as joiner: + for meta in joiner: + metas.append(meta) + self.generate_raw_data(8 * 2048, 2 * 2048) + with sei.make_example_joiner() as joiner: + for meta in joiner: + metas.append(meta) + self.generate_example_id(dumper, 7 * 2048, 3 * 2048) + with sei.make_example_joiner() as joiner: + for meta in joiner: + metas.append(meta) + sei.set_sync_example_id_finished() + sei.set_raw_data_finished() + with sei.make_example_joiner() as joiner: + for meta in joiner: + metas.append(meta) + + dbm = data_block_manager.DataBlockManager(self.data_source, 0) + data_block_num = dbm.get_dumped_data_block_count() + self.assertEqual(len(metas), data_block_num) + join_count = 0 + for data_block_index in range(data_block_num): + meta = dbm.get_data_block_meta_by_index(data_block_index) + self.assertEqual(meta, metas[data_block_index]) + join_count += len(meta.example_ids) + + print("join rate {}/{}({}), min_matching_window {}, "\ + "max_matching_window {}".format( + join_count, 20480 * 2, + (join_count+.0)/(10 * 2048 * 2), + self.example_joiner_options.min_matching_window, + self.example_joiner_options.max_matching_window)) + + def tearDown(self): + if gfile.Exists(self.data_source.output_base_dir): + gfile.DeleteRecursively(self.data_source.output_base_dir) + if gfile.Exists(self.raw_data_dir): + gfile.DeleteRecursively(self.raw_data_dir) + self.kvstore.delete_prefix(common.data_source_kvstore_base_dir(self.data_source.data_source_meta.name)) + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + unittest.main()