Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests for spark #128

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added coconnect/io/__init__.py
Empty file.
Empty file.
70 changes: 70 additions & 0 deletions coconnect/io/plugins/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pandas as pd
from coconnect.tools.logger import Logger

class InputData:
def __init__(self,chunksize):
self.chunksize = chunksize

self.__file_readers = {}
self.__dataframe = {}

self.logger = Logger(self.__class__.__name__)
self.logger.info("InputData Object Created")
if self.chunksize is not None:
self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows")

def all(self):
return {
key:self[key]
for key in self.keys()
}

def keys(self):
return self.__file_readers.keys()

def next(self):
#loop over all loaded files
for key in self.keys():
#get the next dataframe chunk for this file
self.__dataframe[key] = self.get_df_chunk(key)

#check if all __dataframe objects are empty
#if they are, reaise a StopIteration as processing has finished
if all([x.empty for x in self.__dataframe.values()]):
self.logger.debug("All input files have now been processed.")
raise StopIteration

self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})")


def get_df_chunk(self,key):
#obtain the file by key
obj = self.__file_readers[key]
#if it is a TextFileReader, get a dataframe chunk
if isinstance(obj,pd.io.parsers.TextFileReader):
try:
#for this file reader, get the next chunk of data and update self.__dataframe
return obj.get_chunk(self.chunksize)
except StopIteration:
#otherwise, if at the end of the file reader, return an empty frame
return pd.DataFrame(columns=self.__dataframe[key].columns)
else:
#if we're handling non-chunked data
#return an empty dataframe if we've already loaded this dataframe
if key in self.__dataframe.keys():
return pd.DataFrame()
#otherwise return the dataframe as it's the first time we're getting it
return obj


def __getitem__(self,key):
if key not in self.__dataframe.keys():
self.__dataframe[key] = self.get_df_chunk(key)
return self.__dataframe[key]

def __setitem__(self,key,obj):
if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)):
raise NotImplementedError("When using InputData, the object must be of type "
f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ")
self.logger.info(f"Registering {key} [{type(obj)}]")
self.__file_readers[key] = obj
93 changes: 93 additions & 0 deletions coconnect/io/plugins/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pandas as pd
from coconnect.tools.logger import Logger

from pyspark.sql import SparkSession

class SparkData:
def __init__(self,_map):

self.spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()


for name,obj in _map.items():
fname = obj['file']
df = self.spark.read.option("header",True) \
.csv(fname)
print (df)
exit(0)
print (_map)
exit(0)


self.__file_readers = {}
self.__dataframe = {}

self.logger = Logger(self.__class__.__name__)
self.logger.info("InputData Object Created")
if self.chunksize is not None:
self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows")

def all(self):
return {
key:self[key]
for key in self.keys()
}

def keys(self):
return self.__file_readers.keys()

def next(self):
#loop over all loaded files
for key in self.keys():
#get the next dataframe chunk for this file
self.__dataframe[key] = self.get_df_chunk(key)

#check if all __dataframe objects are empty
#if they are, reaise a StopIteration as processing has finished
if all([x.empty for x in self.__dataframe.values()]):
self.logger.debug("All input files have now been processed.")
raise StopIteration

self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})")


def get_df_chunk(self,key):
#obtain the file by key
obj = self.__file_readers[key]
#if it is a TextFileReader, get a dataframe chunk
if isinstance(obj,pd.io.parsers.TextFileReader):
try:
#for this file reader, get the next chunk of data and update self.__dataframe
return obj.get_chunk(self.chunksize)
except StopIteration:
#otherwise, if at the end of the file reader, return an empty frame
return pd.DataFrame(columns=self.__dataframe[key].columns)
else:
#if we're handling non-chunked data
#return an empty dataframe if we've already loaded this dataframe
if key in self.__dataframe.keys():
return pd.DataFrame()
#otherwise return the dataframe as it's the first time we're getting it
return obj


def __getitem__(self,key):
if key not in self.__dataframe.keys():
self.__dataframe[key] = self.get_df_chunk(key)
return self.__dataframe[key]

def __setitem__(self,key,obj):

print (key)
print (obj)
exit(0)

if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)):
raise NotImplementedError("When using InputData, the object must be of type "
f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ")
self.logger.info(f"Registering {key} [{type(obj)}]")
self.__file_readers[key] = obj
79 changes: 8 additions & 71 deletions coconnect/tools/file_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import json
import pandas as pd
from coconnect.tools.logger import Logger
from coconnect.io.plugins.local import InputData
from coconnect.io.plugins.spark import SparkData


class MissingInputFiles(Exception):
pass
Expand All @@ -13,75 +16,7 @@ class DifferingRows(Exception):
pass


class InputData:
def __init__(self,chunksize):
self.chunksize = chunksize

self.__file_readers = {}
self.__dataframe = {}

self.logger = Logger(self.__class__.__name__)
self.logger.info("InputData Object Created")
if self.chunksize is not None:
self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows")

def all(self):
return {
key:self[key]
for key in self.keys()
}

def keys(self):
return self.__file_readers.keys()

def next(self):
#loop over all loaded files
for key in self.keys():
#get the next dataframe chunk for this file
self.__dataframe[key] = self.get_df_chunk(key)

#check if all __dataframe objects are empty
#if they are, reaise a StopIteration as processing has finished
if all([x.empty for x in self.__dataframe.values()]):
self.logger.debug("All input files have now been processed.")
raise StopIteration

self.logger.info(f"Moving onto the next chunk of data (of size {self.chunksize})")


def get_df_chunk(self,key):
#obtain the file by key
obj = self.__file_readers[key]
#if it is a TextFileReader, get a dataframe chunk
if isinstance(obj,pd.io.parsers.TextFileReader):
try:
#for this file reader, get the next chunk of data and update self.__dataframe
return obj.get_chunk(self.chunksize)
except StopIteration:
#otherwise, if at the end of the file reader, return an empty frame
return pd.DataFrame(columns=self.__dataframe[key].columns)
else:
#if we're handling non-chunked data
#return an empty dataframe if we've already loaded this dataframe
if key in self.__dataframe.keys():
return pd.DataFrame()
#otherwise return the dataframe as it's the first time we're getting it
return obj


def __getitem__(self,key):
if key not in self.__dataframe.keys():
self.__dataframe[key] = self.get_df_chunk(key)
return self.__dataframe[key]

def __setitem__(self,key,obj):
if not (isinstance(obj,pd.DataFrame) or isinstance(obj,pd.io.parsers.TextFileReader)):
raise NotImplementedError("When using InputData, the object must be of type "
f"{pd.DataFrame} or {pd.io.parsers.TextFileReader} ")
self.logger.info(f"Registering {key} [{type(obj)}]")
self.__file_readers[key] = obj



def load_json_delta(f_in,original):
logger = Logger("load_json_delta")
logger.info(f"loading a json from '{f_in}' as a delta")
Expand Down Expand Up @@ -173,8 +108,10 @@ def load_csv(_map,chunksize=None,nrows=None,lower_col_names=False,load_path="",r
if k in source_map
}

retval = InputData(chunksize)

retval = SparkData(_map)
print (retval)
exit(0)

for key,obj in _map.items():
fields = None
if isinstance(obj,str):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ sqlalchemy-utils
pyyaml
python-daemon
inquirer
pyspark