Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelTable Musings #185

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
95 changes: 95 additions & 0 deletions daskms/parallel_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
import threading
from pyrap.tables import table as Table
from weakref import finalize
from pathlib import Path


log = logging.getLogger(__name__)


def _parallel_table_finalizer(_cached_tables):

for table in _cached_tables.values():
link_path = Path(table.name())
table.close()
link_path.unlink()


class ParallelTable(Table):
Copy link
Member

@sjperkins sjperkins Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class ParallelTable(Table):
class ParallelTable(metaclass=...)

As I read the ParallelTable class, it is predicated around proxying the encapsulated Table objects as opposed to overriding inherited methods of the Table class.

Therefore it's unnecessary to inherit from Table as proxied table objects are created in _get_table.

This way, it's also possible to use a metaclass without getting tangled up with a boost python subclass.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to grasp this, but I am still struggling. The problem I see is that failing to inherit from pyrap.tables.table means that the ParallelTable object will not in fact appear to be a table i.e. self._table_future = table = ex.impl.submit(factory, *args, **kwargs) will not be a future pointing at a table. What I could see working is defining a ParralelTableProxy which simply inherits from TableProxy but defines its own metaclass which modifies the behaviour of get* operations. Currently, ParallelTable is itself a table (i.e. you can do something like pt.taql(query, ParallelTable)) in addition to simply rerouting get* operation through a cache. In other words, there will always be one extra copy open - the "root" table which gets used for unmodified methods. I will take a swing at the ParallelTableProxy idea.

Copy link
Collaborator Author

@JSKenyon JSKenyon Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind clarifying what the problem is with this approach? Currently, all the ParallelTable does is override some inherited methods of pyrap.tables.table prior (!!) to the being embedded in a TableProxy. This just means that the proxy proxies these special methods, rather than those on the base class. This yields a really simple solution as subsequent operations proceed as normal. The ParallelTable is a table, and supports all pyrap.tables.table methods, and will have all the relevant multiton patterns applied inside the TableProxy. I have tried creating a ParallelTableProxy, but that becomes difficult as one needs to access the cache inside get* methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented a ParallelTableProxy. At present it segfaults, but I know why. The problem stems from the fact that getter_wrapper in reads.py calls methods directly on the underlying table object, rather than via the TableProxy. This is problematic as TableProxy.method may not be the same as pyrap.tables.table.method. This is precisely where my current segfaults come from, as getter_wrapper calls the non-threadsafe get* functions on the underlying table.


@classmethod
def _map_create_parallel_table(cls, args, kwargs):
""" Support pickling of kwargs in ParallelTable.__reduce__ """
return cls(*args, **kwargs)

def __init__(self, *args, **kwargs):

self._args = args
self._kwargs = kwargs

self._cached_tables = {}
self._table_path = args[0] # TODO: This should be checked.

super().__init__(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
super().__init__(*args, **kwargs)


finalize(self, _parallel_table_finalizer, self._cached_tables)

def __reduce__(self):
""" Defer to _map_create_parallel_table to support kwarg pickling """
return (
ParallelTable._map_create_parallel_table,
JSKenyon marked this conversation as resolved.
Show resolved Hide resolved
(ParallelTable, self._args, self._kwargs)
)

def getcol(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcol(*args, **kwargs)

def getcolslice(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcolslice(*args, **kwargs)

def getcolnp(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcolnp(*args, **kwargs)

def getvarcol(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getvarcol(*args, **kwargs)

def getcell(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcell(*args, **kwargs)

def getcellslice(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcellslice(*args, **kwargs)

def getkeywords(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getkeywords(*args, **kwargs)

def getcolkeywords(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcolkeywords(*args, **kwargs)

def _get_table(self, thread_id):

try:
table = self._cached_tables[thread_id]
except KeyError:
print(f"opening for {thread_id}")

table_path = Path(self._table_path)
table_name = table_path.name
link_path = Path(f"/tmp/{thread_id}-{table_name}")

link_path.symlink_to(table_path)

self._cached_tables[thread_id] = table = Table(
str(link_path),
**self._kwargs
)

return table
3 changes: 2 additions & 1 deletion daskms/reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from daskms.dataset import Dataset
from daskms.table_executor import executor_key
from daskms.table import table_exists
from daskms.parallel_table import ParallelTable
from daskms.table_proxy import TableProxy, READLOCK
from daskms.table_schemas import lookup_table_schema
from daskms.utils import table_path_split
Expand Down Expand Up @@ -305,7 +306,7 @@ def __init__(self, table, select_cols, group_cols, index_cols, **kwargs):
raise ValueError(f"Unhandled kwargs: {kwargs}")

def _table_proxy_factory(self):
return TableProxy(pt.table, self.table_path, ack=False,
return TableProxy(ParallelTable, self.table_path, ack=False,
readonly=True, lockoptions='user',
__executor_key__=executor_key(self.canonical_name))

Expand Down
26 changes: 25 additions & 1 deletion daskms/table_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __call__(cls, key=STANDARD_EXECUTOR):
class Executor(object, metaclass=ExecutorMetaClass):
def __init__(self, key=STANDARD_EXECUTOR):
# Initialise a single thread
self.impl = impl = cf.ThreadPoolExecutor(1)
self.impl = impl = DummyThreadPoolExecutor(1)
self.key = key

# Register a finaliser shutting down the
Expand All @@ -54,6 +54,30 @@ def __repr__(self):
__str__ = __repr__


class DummyThreadPoolExecutor(object):

def __init__(self, nthread):
pass

def submit(self, fn, *args, **kwargs):

return DummyFuture(fn(*args, **kwargs))

def shutdown(self, wait=True):
pass


class DummyFuture(object):

def __init__(self, value):

self.value = value

def result(self):

return self.value


def executor_key(table_name):
"""
Product an executor key from table_name
Expand Down
33 changes: 33 additions & 0 deletions daskms/tests/test_table_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,36 @@ def _getcol(tp, column):

del futures, data, u, tab_fut
assert_liveness(0, 0)


@pytest.mark.parametrize("scheduler",
["sync", "threads", "processes", "distributed"])
def test_softlinks(ms, scheduler):

import dask.array as da
from dask.distributed import Client, LocalCluster
from daskms import xds_from_ms

if scheduler == "distributed":

cluster = LocalCluster(
processes=True,
n_workers=2,
threads_per_worker=2,
memory_limit=0
)

client = Client(cluster) # noqa

# ms = "/home/jonathan/reductions/3C147/msdir/C147_unflagged.MS"
ms = "/home/jonathan/recipes/proxy_experiments/vla_empty.ms"

xdsl = xds_from_ms(
ms,
group_cols=["DATA_DESC_ID", "SCAN_NUMBER", "FIELD_ID"],
chunks={"row": 30000, "chan": -1, "corr": -1}
)

result = [xds.DATA.data.map_blocks(lambda x: x[:1, :1, :1]) for xds in xdsl]

da.compute(result, scheduler=scheduler, num_workers=4)