Skip to content

Commit

Permalink
fixing smaller issues
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloem committed Jun 13, 2022
1 parent 1e4d365 commit 320ca19
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ ignore =
I
N
avoid-escape = no
per-file-ignores =
*ray_runner_test.py: B008
2 changes: 1 addition & 1 deletion ray_beam_runner/portability/context_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from apache_beam.runners.portability.fn_api_runner import translations
from apache_beam.runners.portability.fn_api_runner import worker_handlers
from apache_beam.runners.portability.fn_api_runner.execution import PartitionableBuffer
from apache_beam.runners.portability.fn_api_runner.execution import OutputTimers
from apache_beam.runners.portability.fn_api_runner.fn_runner import OutputTimers
from apache_beam.runners.portability.fn_api_runner.translations import DataOutput
from apache_beam.runners.portability.fn_api_runner.translations import TimerFamilyId
from apache_beam.runners.worker import bundle_processor
Expand Down
7 changes: 4 additions & 3 deletions ray_beam_runner/portability/ray_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import ray_beam_runner.portability.ray_fn_runner
import ray


if statesampler.FAST_SAMPLER:
DEFAULT_SAMPLING_PERIOD_MS = statesampler.DEFAULT_SAMPLING_PERIOD_MS
else:
Expand Down Expand Up @@ -567,7 +568,7 @@ def is_buffered_correctly(actual):
# Assert that each grouping consists of elements belonging to the
# same window to ensure states and timers were properly partitioned.
for b in actual:
parity = set(ord(e) % 2 for e in b)
parity = {ord(e) % 2 for e in b}
self.assertEqual(1, len(parity), b)

with self.create_pipeline() as p:
Expand Down Expand Up @@ -663,7 +664,7 @@ def process(
cur += 1
return

with self.assertRaises(Exception):
with self.assertRaisesRegex(ValueError, "is not done"):
with self.create_pipeline() as p:
data = ["abc", "defghijklmno", "pqrstuv", "wxyz"]
_ = p | beam.Create(data) | beam.ParDo(ExpandingStringsDoFn())
Expand Down Expand Up @@ -1192,7 +1193,7 @@ def expand(self, pcoll):
)

counters = res.metrics().query(beam.metrics.MetricsFilter())["counters"]
step_names = set(m.key.step for m in counters if m.key.step)
step_names = {m.key.step for m in counters if m.key.step}
pipeline_options = p._options
if assert_using_counter_names:
if pipeline_options.view_as(StandardOptions).streaming:
Expand Down

0 comments on commit 320ca19

Please sign in to comment.