Skip to content

Commit

Permalink
added h5 and metadata readers, included systemkernel tests in test_env
Browse files Browse the repository at this point in the history
  • Loading branch information
Vedant P Iyer committed Dec 18, 2024
1 parent 82e8948 commit 0eb9f32
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 29 deletions.
2 changes: 1 addition & 1 deletion dsi/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Terminal():
BACKEND_IMPLEMENTATIONS = ['gufi', 'sqlite', 'parquet']
PLUGIN_PREFIX = ['dsi.plugins']
PLUGIN_IMPLEMENTATIONS = ['env', 'file_reader', 'file_writer']
VALID_PLUGINS = ['Hostname', 'SystemKernel', 'GitInfo', 'Bueno', 'Csv', 'ER_Diagram', 'YAML1', 'TOML1', "Table_Plot", "Schema", "Csv_Writer"]
VALID_PLUGINS = ['Hostname', 'SystemKernel', 'GitInfo', 'Bueno', 'Csv', 'ER_Diagram', 'YAML1', 'TOML1', "Table_Plot", "Schema", "Csv_Writer", "HDF5Reader1", "MetadataReader1"]
VALID_BACKENDS = ['Gufi', 'Sqlite', 'Parquet']
VALID_MODULES = VALID_PLUGINS + VALID_BACKENDS
VALID_MODULE_FUNCTIONS = {'plugin': ['reader', 'writer'],
Expand Down
123 changes: 116 additions & 7 deletions dsi/plugins/file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml
try: import tomllib
except ModuleNotFoundError: import pip._vendor.tomli as tomllib
import h5py
# import ast

from dsi.plugins.metadata import StructuredMetadata
Expand Down Expand Up @@ -224,12 +225,12 @@ def add_rows(self) -> None:
objs = []
for idx, filename in enumerate(self.filenames):
with open(filename, 'r') as fh:
file_content = json.load(fh)
objs.append(file_content)
for key, val in file_content.items():
# Check if column already exists
if key not in self.key_data:
self.key_data.append(key)
file_content = json.load(fh)
objs.append(file_content)
for key, val in file_content.items():
# Check if column already exists
if key not in self.key_data:
self.key_data.append(key)
if not self.schema_is_set():
self.pack_header()
for key in self.key_data:
Expand Down Expand Up @@ -584,4 +585,112 @@ def add_rows(self) -> None:
self.text_file_data[f"{self.target_table_prefix}__text_file"] = OrderedDict(df.to_dict(orient='list'))
else:
self.text_file_data["text_file"] = OrderedDict(df.to_dict(orient='list'))
self.set_schema_2(self.text_file_data)
self.set_schema_2(self.text_file_data)

class HDF5Reader1(FileReader):

def __init__(self, filenames, target_table_prefix = None, **kwargs):
'''
`filenames`: one hdf5 file or a list of hdf5 files to be ingested
`target_table_prefix`: prefix to be added to every table created to differentiate between other hdf5 file sources
'''
super().__init__(filenames, **kwargs)
if isinstance(filenames, str):
self.hdf5_files = [filenames]
else:
self.hdf5_files = filenames
self.hdf5_file_data = OrderedDict()
self.target_table_prefix = target_table_prefix

def add_rows(self) -> None:
"""
Parses hdf5 files and creates an ordered dict whose keys are group names and values are an ordered dict of each group's data (metadata of several datasets).
"""
file_counter = 0
for filename in self.hdf5_files:
with h5py.File(filename, 'r') as h5_file:
for group_name in h5_file.keys():
if self.target_table_prefix is not None:
group_name = self.target_table_prefix + "__" + group_name
if group_name not in self.hdf5_file_data.keys():
self.hdf5_file_data[group_name] = OrderedDict()

# group_dict = OrderedDict()
group = h5_file[group_name]
for col_name, col_data in group.items():
dataset = col_data[:]

if f"{col_name}_shape" not in self.hdf5_file_data[group_name].keys():
self.hdf5_file_data[group_name][col_name + "_shape"] = [None] * (file_counter)
self.hdf5_file_data[group_name][col_name + "_min"] = [None] * (file_counter)
self.hdf5_file_data[group_name][col_name + "_max"] = [None] * (file_counter)
self.hdf5_file_data[group_name][col_name + "_shape"].append(dataset.shape)
self.hdf5_file_data[group_name][col_name + "_min"].append(dataset.min())
self.hdf5_file_data[group_name][col_name + "_max"].append(dataset.max())

# group_dict[col_name + "_shape"] = dataset.shape
# group_dict[col_name + "_min"] = dataset.min()
# group_dict[col_name + "_max"] = dataset.max()
# self.hdf5_file_data[group_name] = group_dict

# PADDING MISMATCHED COLUMNS SIZE
max_length = max(len(lst) for lst in self.hdf5_file_data[group_name].values())
for key, value in self.hdf5_file_data[group_name].items():
if len(value) < max_length:
self.hdf5_file_data[group_name][key] = value + [None] * (max_length - len(value))
file_counter += 1
self.set_schema2(self.hdf5_file_data)

class MetadataReader1(FileReader):

def __init__(self, filenames, target_table_prefix = None, **kwargs):
'''
`filenames`: one metadata json file or a list of metadata json files to be ingested
`target_table_prefix`: prefix to be added to every table created to differentiate between other metadata json file sources
'''
super().__init__(filenames, **kwargs)
if isinstance(filenames, str):
self.metadata_files = [filenames]
else:
self.metadata_files = filenames
self.metadata_file_data = OrderedDict()
self.target_table_prefix = target_table_prefix

def add_rows(self) -> None:
"""
Parses metadata json files and creates an ordered dict whose keys are file names and values are an ordered dict of that file's data
"""
file_counter = 0
for filename in self.metadata_files:
json_data = OrderedDict()
with open(filename, 'r') as meta_file:
file_content = json.load(meta_file)
for key, col_data in file_content.items():
col_name = key
if isinstance(col_data, dict):
for inner_key, inner_val in col_data.items():
old_col_name = col_name
col_name = col_name + "__" + inner_key
if isinstance(inner_val, dict):
for key2, val2 in inner_val.items():
old_col_name2 = col_name
col_name = col_name + "__" + key2
json_data[col_name] = [val2]
col_name = old_col_name2
elif isinstance(inner_val, list):
json_data[col_name] = [str(inner_val)]
else:
json_data[col_name] = [inner_val]
col_name = old_col_name

elif isinstance(col_data, list):
json_data[col_name] = [str(col_data)]
else:
json_data[col_name] = [col_data]

if self.target_table_prefix is not None:
filename = self.target_table_prefix + "__" + filename
self.metadata_file_data[filename] = json_data
json_data.clear()

self.set_schema_2(self.metadata_file_data)
38 changes: 19 additions & 19 deletions dsi/plugins/tests/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,31 @@ def test_hostname_plugin_row_shape():
for col in column_values[1:]:
assert len(col) == row_shape == row_cnt

# SYSTEM KERNEL FUNCTIONS DONT WORK
# def test_systemkernel_plugin_type():
# plug = SystemKernel()
# assert type(plug.output_collector) == collections.OrderedDict
# SYSTEM KERNEL FUNCTIONS ONLY WORK ON LINUX
def test_systemkernel_plugin_type():
plug = SystemKernel()
assert type(plug.output_collector["SystemKernel"]) == collections.OrderedDict

# def test_systemkernel_plugin_adds_rows():
# plug = SystemKernel()
# plug.add_rows()
# plug.add_rows()
def test_systemkernel_plugin_adds_rows():
plug = SystemKernel()
plug.add_rows()
plug.add_rows()

# for key, val in plug.output_collector.items():
# assert len(val) == 2
for key, val in plug.output_collector["SystemKernel"].items():
assert len(val) == 2

# # 1 SystemKernel column + 4 inherited Env cols
# assert len(plug.output_collector.keys()) == 5
# 1 SystemKernel column + 4 inherited Env cols
assert len(plug.output_collector["SystemKernel"].keys()) == 5

# def test_systemkernel_plugin_blob_is_big():
# plug = SystemKernel()
# plug.add_rows()
def test_systemkernel_plugin_blob_is_big():
plug = SystemKernel()
plug.add_rows()

# blob = plug.output_collector["kernel_info"][0]
# info_dict = loads(blob)
blob = plug.output_collector["SystemKernel"]["kernel_info"][0]
#info_dict = loads(blob)

# # dict should have more than 1000 (~7000) keys
# assert len(info_dict.keys()) > 1000
# dict should have more than 1000 (~7000) keys
assert len(blob.keys()) > 1000

def test_git_plugin_type():
root = get_git_root('.')
Expand Down
4 changes: 2 additions & 2 deletions examples/coreterminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

'''This is an example workflow using core.py'''

a=Terminal(debug_flag=False, backup_db_flag=False)
a=Terminal(debug_flag=False)

a.load_module('plugin','Bueno','reader', filenames=['data/bueno1.data', 'data/bueno2.data'])
a.load_module('plugin','Hostname','reader')
Expand All @@ -14,7 +14,7 @@

# a.load_module('plugin', "Table_Plot", "writer", table_name = "student__physics", filename = "student__physics")
# a.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.pdf')#, target_table_prefix = "physics")
a.transload()
# a.transload()

a.load_module('backend','Sqlite','back-write', filename='data/data.db')
# a.load_module('backend','Parquet','back-write',filename='data/bueno.pq')
Expand Down

0 comments on commit 0eb9f32

Please sign in to comment.