From ebbb447488f5ff4d1fed316ddf11e0482261db75 Mon Sep 17 00:00:00 2001 From: Calum Macdonald Date: Wed, 3 Nov 2021 15:57:58 +0000 Subject: [PATCH 1/2] tests for spark --- coconnect/io/__init__.py | 0 coconnect/io/plugins/#spark.py# | 93 ++++++++++++++++++++++++++++++++ coconnect/io/plugins/__init__.py | 0 coconnect/io/plugins/local.py | 70 ++++++++++++++++++++++++ coconnect/io/plugins/spark.py | 93 ++++++++++++++++++++++++++++++++ coconnect/tools/file_helpers.py | 79 +++------------------------ requirements.txt | 1 + 7 files changed, 265 insertions(+), 71 deletions(-) create mode 100644 coconnect/io/__init__.py create mode 100644 coconnect/io/plugins/#spark.py# create mode 100644 coconnect/io/plugins/__init__.py create mode 100644 coconnect/io/plugins/local.py create mode 100644 coconnect/io/plugins/spark.py diff --git a/coconnect/io/__init__.py b/coconnect/io/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/coconnect/io/plugins/#spark.py# b/coconnect/io/plugins/#spark.py# new file mode 100644 index 00000000..6c5ed232 --- /dev/null +++ b/coconnect/io/plugins/#spark.py# @@ -0,0 +1,93 @@ +import pandas as pd +from coconnect.tools.logger import Logger + +from pyspark.sql import SparkSession + +class SparkData: + def __init__(self,_map): + + self.spark = SparkSession \ + .builder \ + .appName("Python Spark SQL basic example") \ + .config("spark.some.config.option", "some-value") \ + .getOrCreate() + + + for name,obj in _map.items(): + fname = obj['file'] + df = self.spark.read.option("header",True) \ + .csv(fname) + print (df) + exit(0) + print (_map) + exit(0) + + + self.__file_readers = {} + self.__dataframe = {} + + self.logger = Logger(self.__class__.__name__) + self.logger.info("InputData Object Created") + if self.chunksize is not None: + self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows") + + def all(self): + return { + key:self[key] + for key in self.keys() + } + + def keys(self): + return self.__file_readers.keys() + + def next(self): + #loop over all loaded files + for key in self.keys(): + #get the next dataframe chunk for this file + self.__dataframe[key] = self.get_df_chunk(key) + + #check if all __dataframe objects are empty + #if they are, reaise a StopIteration as processing has finished + if all([x.empty for x in self.__dataframe.values()]): + self.logger.debug("All input files have now been processed.") + raise StopIteration + + self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})") + + + def get_df_chunk(self,key): + #obtain the file by key + obj = self.__file_readers[key] + #if it is a TextFileReader, get a dataframe chunk + if isinstance(obj,pd.io.parsers.TextFileReader): + try: + #for this file reader, get the next chunk of data and update self.__dataframe + return obj.get_chunk(self.chunksize) + except StopIteration: + #otherwise, if at the end of the file reader, return an empty frame + return pd.DataFrame(columns=self.__dataframe[key].columns) + else: + #if we're handling non-chunked data + #return an empty dataframe if we've already loaded this dataframe + if key in self.__dataframe.keys(): + return pd.DataFrame() + #otherwise return the dataframe as it's the first time we're getting it + return obj + + + def __getitem__(self,key): + if key not in self.__dataframe.keys(): + self.__dataframe[key] = self.get_df_chunk(key) + return self.__dataframe[key] + + def __setitem__(self,key,obj): + + print (key) + print (obj) + exit(0) + + if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)): + raise NotImplementedError("When using InputData, the object must be of type " + f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ") + self.logger.info(f"Registering {key} [{type(obj)}]") + self.__file_readers[key] = obj diff --git a/coconnect/io/plugins/__init__.py b/coconnect/io/plugins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/coconnect/io/plugins/local.py b/coconnect/io/plugins/local.py new file mode 100644 index 00000000..a2cc91d1 --- /dev/null +++ b/coconnect/io/plugins/local.py @@ -0,0 +1,70 @@ +import pandas as pd +from coconnect.tools.logger import Logger + +class InputData: + def __init__(self,chunksize): + self.chunksize = chunksize + + self.__file_readers = {} + self.__dataframe = {} + + self.logger = Logger(self.__class__.__name__) + self.logger.info("InputData Object Created") + if self.chunksize is not None: + self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows") + + def all(self): + return { + key:self[key] + for key in self.keys() + } + + def keys(self): + return self.__file_readers.keys() + + def next(self): + #loop over all loaded files + for key in self.keys(): + #get the next dataframe chunk for this file + self.__dataframe[key] = self.get_df_chunk(key) + + #check if all __dataframe objects are empty + #if they are, reaise a StopIteration as processing has finished + if all([x.empty for x in self.__dataframe.values()]): + self.logger.debug("All input files have now been processed.") + raise StopIteration + + self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})") + + + def get_df_chunk(self,key): + #obtain the file by key + obj = self.__file_readers[key] + #if it is a TextFileReader, get a dataframe chunk + if isinstance(obj,pd.io.parsers.TextFileReader): + try: + #for this file reader, get the next chunk of data and update self.__dataframe + return obj.get_chunk(self.chunksize) + except StopIteration: + #otherwise, if at the end of the file reader, return an empty frame + return pd.DataFrame(columns=self.__dataframe[key].columns) + else: + #if we're handling non-chunked data + #return an empty dataframe if we've already loaded this dataframe + if key in self.__dataframe.keys(): + return pd.DataFrame() + #otherwise return the dataframe as it's the first time we're getting it + return obj + + + def __getitem__(self,key): + if key not in self.__dataframe.keys(): + self.__dataframe[key] = self.get_df_chunk(key) + return self.__dataframe[key] + + def __setitem__(self,key,obj): + if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)): + raise NotImplementedError("When using InputData, the object must be of type " + f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ") + self.logger.info(f"Registering {key} [{type(obj)}]") + self.__file_readers[key] = obj diff --git a/coconnect/io/plugins/spark.py b/coconnect/io/plugins/spark.py new file mode 100644 index 00000000..6c5ed232 --- /dev/null +++ b/coconnect/io/plugins/spark.py @@ -0,0 +1,93 @@ +import pandas as pd +from coconnect.tools.logger import Logger + +from pyspark.sql import SparkSession + +class SparkData: + def __init__(self,_map): + + self.spark = SparkSession \ + .builder \ + .appName("Python Spark SQL basic example") \ + .config("spark.some.config.option", "some-value") \ + .getOrCreate() + + + for name,obj in _map.items(): + fname = obj['file'] + df = self.spark.read.option("header",True) \ + .csv(fname) + print (df) + exit(0) + print (_map) + exit(0) + + + self.__file_readers = {} + self.__dataframe = {} + + self.logger = Logger(self.__class__.__name__) + self.logger.info("InputData Object Created") + if self.chunksize is not None: + self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows") + + def all(self): + return { + key:self[key] + for key in self.keys() + } + + def keys(self): + return self.__file_readers.keys() + + def next(self): + #loop over all loaded files + for key in self.keys(): + #get the next dataframe chunk for this file + self.__dataframe[key] = self.get_df_chunk(key) + + #check if all __dataframe objects are empty + #if they are, reaise a StopIteration as processing has finished + if all([x.empty for x in self.__dataframe.values()]): + self.logger.debug("All input files have now been processed.") + raise StopIteration + + self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})") + + + def get_df_chunk(self,key): + #obtain the file by key + obj = self.__file_readers[key] + #if it is a TextFileReader, get a dataframe chunk + if isinstance(obj,pd.io.parsers.TextFileReader): + try: + #for this file reader, get the next chunk of data and update self.__dataframe + return obj.get_chunk(self.chunksize) + except StopIteration: + #otherwise, if at the end of the file reader, return an empty frame + return pd.DataFrame(columns=self.__dataframe[key].columns) + else: + #if we're handling non-chunked data + #return an empty dataframe if we've already loaded this dataframe + if key in self.__dataframe.keys(): + return pd.DataFrame() + #otherwise return the dataframe as it's the first time we're getting it + return obj + + + def __getitem__(self,key): + if key not in self.__dataframe.keys(): + self.__dataframe[key] = self.get_df_chunk(key) + return self.__dataframe[key] + + def __setitem__(self,key,obj): + + print (key) + print (obj) + exit(0) + + if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)): + raise NotImplementedError("When using InputData, the object must be of type " + f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ") + self.logger.info(f"Registering {key} [{type(obj)}]") + self.__file_readers[key] = obj diff --git a/coconnect/tools/file_helpers.py b/coconnect/tools/file_helpers.py index 258b0290..4c540370 100644 --- a/coconnect/tools/file_helpers.py +++ b/coconnect/tools/file_helpers.py @@ -4,6 +4,9 @@ import json import pandas as pd from coconnect.tools.logger import Logger +from coconnect.io.plugins.local import InputData +from coconnect.io.plugins.spark import SparkData + class MissingInputFiles(Exception): pass @@ -13,75 +16,7 @@ class DifferingRows(Exception): pass -class InputData: - def __init__(self,chunksize): - self.chunksize = chunksize - - self.__file_readers = {} - self.__dataframe = {} - - self.logger = Logger(self.__class__.__name__) - self.logger.info("InputData Object Created") - if self.chunksize is not None: - self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows") - - def all(self): - return { - key:self[key] - for key in self.keys() - } - - def keys(self): - return self.__file_readers.keys() - - def next(self): - #loop over all loaded files - for key in self.keys(): - #get the next dataframe chunk for this file - self.__dataframe[key] = self.get_df_chunk(key) - - #check if all __dataframe objects are empty - #if they are, reaise a StopIteration as processing has finished - if all([x.empty for x in self.__dataframe.values()]): - self.logger.debug("All input files have now been processed.") - raise StopIteration - - self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})") - - - def get_df_chunk(self,key): - #obtain the file by key - obj = self.__file_readers[key] - #if it is a TextFileReader, get a dataframe chunk - if isinstance(obj,pd.io.parsers.TextFileReader): - try: - #for this file reader, get the next chunk of data and update self.__dataframe - return obj.get_chunk(self.chunksize) - except StopIteration: - #otherwise, if at the end of the file reader, return an empty frame - return pd.DataFrame(columns=self.__dataframe[key].columns) - else: - #if we're handling non-chunked data - #return an empty dataframe if we've already loaded this dataframe - if key in self.__dataframe.keys(): - return pd.DataFrame() - #otherwise return the dataframe as it's the first time we're getting it - return obj - - - def __getitem__(self,key): - if key not in self.__dataframe.keys(): - self.__dataframe[key] = self.get_df_chunk(key) - return self.__dataframe[key] - - def __setitem__(self,key,obj): - if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)): - raise NotImplementedError("When using InputData, the object must be of type " - f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ") - self.logger.info(f"Registering {key} [{type(obj)}]") - self.__file_readers[key] = obj - - + def load_json_delta(f_in,original): logger = Logger("load_json_delta") logger.info(f"loading a json from '{f_in}' as a delta") @@ -173,8 +108,10 @@ def load_csv(_map,chunksize=None,nrows=None,lower_col_names=False,load_path="",r if k in source_map } - retval = InputData(chunksize) - + retval = SparkData(_map) + print (retval) + exit(0) + for key,obj in _map.items(): fields = None if isinstance(obj,str): diff --git a/requirements.txt b/requirements.txt index de51ff8a..ad8b504e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ sqlalchemy-utils pyyaml python-daemon inquirer +pyspark From c55d8cf259bdba52884cefbe202b4de5616e9b6f Mon Sep 17 00:00:00 2001 From: Calum Macdonald Date: Tue, 8 Feb 2022 13:42:10 +0000 Subject: [PATCH 2/2] remove bad file --- coconnect/io/plugins/#spark.py# | 93 --------------------------------- 1 file changed, 93 deletions(-) delete mode 100644 coconnect/io/plugins/#spark.py# diff --git a/coconnect/io/plugins/#spark.py# b/coconnect/io/plugins/#spark.py# deleted file mode 100644 index 6c5ed232..00000000 --- a/coconnect/io/plugins/#spark.py# +++ /dev/null @@ -1,93 +0,0 @@ -import pandas as pd -from coconnect.tools.logger import Logger - -from pyspark.sql import SparkSession - -class SparkData: - def __init__(self,_map): - - self.spark = SparkSession \ - .builder \ - .appName("Python Spark SQL basic example") \ - .config("spark.some.config.option", "some-value") \ - .getOrCreate() - - - for name,obj in _map.items(): - fname = obj['file'] - df = self.spark.read.option("header",True) \ - .csv(fname) - print (df) - exit(0) - print (_map) - exit(0) - - - self.__file_readers = {} - self.__dataframe = {} - - self.logger = Logger(self.__class__.__name__) - self.logger.info("InputData Object Created") - if self.chunksize is not None: - self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows") - - def all(self): - return { - key:self[key] - for key in self.keys() - } - - def keys(self): - return self.__file_readers.keys() - - def next(self): - #loop over all loaded files - for key in self.keys(): - #get the next dataframe chunk for this file - self.__dataframe[key] = self.get_df_chunk(key) - - #check if all __dataframe objects are empty - #if they are, reaise a StopIteration as processing has finished - if all([x.empty for x in self.__dataframe.values()]): - self.logger.debug("All input files have now been processed.") - raise StopIteration - - self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})") - - - def get_df_chunk(self,key): - #obtain the file by key - obj = self.__file_readers[key] - #if it is a TextFileReader, get a dataframe chunk - if isinstance(obj,pd.io.parsers.TextFileReader): - try: - #for this file reader, get the next chunk of data and update self.__dataframe - return obj.get_chunk(self.chunksize) - except StopIteration: - #otherwise, if at the end of the file reader, return an empty frame - return pd.DataFrame(columns=self.__dataframe[key].columns) - else: - #if we're handling non-chunked data - #return an empty dataframe if we've already loaded this dataframe - if key in self.__dataframe.keys(): - return pd.DataFrame() - #otherwise return the dataframe as it's the first time we're getting it - return obj - - - def __getitem__(self,key): - if key not in self.__dataframe.keys(): - self.__dataframe[key] = self.get_df_chunk(key) - return self.__dataframe[key] - - def __setitem__(self,key,obj): - - print (key) - print (obj) - exit(0) - - if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)): - raise NotImplementedError("When using InputData, the object must be of type " - f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ") - self.logger.info(f"Registering {key} [{type(obj)}]") - self.__file_readers[key] = obj