Skip to content

Commit

Permalink
Parquet (#608)
Browse files Browse the repository at this point in the history
* first pass at sd2pq() sasdata2parquet method

* add partitioning, static columns and parameterization options (#601)

* add partitioning, static columns and parameterization options

* ignore pandas performance warning

* fix parameter order

* new version; take 2

* update to latest version (#607)

* integrate the lateset into other access methods and base, data

---------

Co-authored-by: Rainer Mensing <[email protected]>
  • Loading branch information
tomweber-sas and rainermensing authored Jun 27, 2024
1 parent 139bed9 commit e96339e
Show file tree
Hide file tree
Showing 6 changed files with 1,667 additions and 6 deletions.
177 changes: 171 additions & 6 deletions saspy/sasbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1757,9 +1757,16 @@ def sd2df(self, table: str, libref: str = '', dsopts: dict = None,
from bytes to chars. If the variables in the SAS data set have invalid characters (from truncation or other)
then you can provide values like 'replace' or 'ignore' to load the invalid data instead of failing.
:param kwargs: a dictionary. These vary per access method, and are generally NOT needed.
They are either access method specific parms or specific pandas parms.
See the specific sasdata2dataframe* method in the access method for valid possibilities.
These vary per access method, and are generally NOT needed. They are either access method specific parms or specific \
pandas parms. See the specific sasdata2dataframe* method in the access method for valid possibilities.
:param kwargs: a dictionary.
These two options are for advanced usage. They override how saspy imports data. For more info
see https://sassoftware.github.io/saspy/advanced-topics.html#advanced-sd2df-and-df2sd-techniques
:param dtype: this is the parameter to Pandas read_csv, overriding what saspy generates and uses
:param my_fmts: bool, if True, overrides the formats saspy would use, using those on the data set or in dsopts=
:return: Pandas DataFrame
"""
Expand Down Expand Up @@ -1813,6 +1820,13 @@ def sd2df_CSV(self, table: str, libref: str = '', dsopts: dict = None, tempfile:
They are either access method specific parms or specific pandas parms.
See the specific sasdata2dataframe* method in the access method for valid possibilities.
These two options are for advanced usage. They override how saspy imports data. For more info
see https://sassoftware.github.io/saspy/advanced-topics.html#advanced-sd2df-and-df2sd-techniques
:param dtype: this is the parameter to Pandas read_csv, overriding what saspy generates and uses
:param my_fmts: bool, if True, overrides the formats saspy would use, using those on the data set or in dsopts=
:return: Pandas DataFrame
"""
dsopts = dsopts if dsopts is not None else {}
Expand Down Expand Up @@ -1864,6 +1878,12 @@ def sd2df_DISK(self, table: str, libref: str = '', dsopts: dict = None, tempfile
They are either access method specific parms or specific pandas parms.
See the specific sasdata2dataframe* method in the access method for valid possibilities.
These two options are for advanced usage. They override how saspy imports data. For more info
see https://sassoftware.github.io/saspy/advanced-topics.html#advanced-sd2df-and-df2sd-techniques
:param dtype: this is the parameter to Pandas read_csv, overriding what saspy generates and uses
:param my_fmts: bool, if True, overrides the formats saspy would use, using those on the data set or in dsopts=
:return: Pandas DataFrame
"""
dsopts = dsopts if dsopts is not None else {}
Expand Down Expand Up @@ -1924,9 +1944,16 @@ def sasdata2dataframe(self, table: str, libref: str = '', dsopts: dict = None,
from bytes to chars. If the variables in the SAS data set have invalid characters (from truncation or other)
then you can provide values like 'replace' or 'ignore' to load the invalid data instead of failing.
:param kwargs: a dictionary. These vary per access method, and are generally NOT needed.
They are either access method specific parms or specific pandas parms.
See the specific sasdata2dataframe* method in the access method for valid possibilities.
These vary per access method, and are generally NOT needed. They are either access method specific parms or specific \
pandas parms. See the specific sasdata2dataframe* method in the access method for valid possibilities.
:param kwargs: a dictionary.
These two options are for advanced usage. They override how saspy imports data. For more info
see https://sassoftware.github.io/saspy/advanced-topics.html#advanced-sd2df-and-df2sd-techniques
:param dtype: this is the parameter to Pandas read_csv, overriding what saspy generates and uses
:param my_fmts: bool, if True, overrides the formats saspy would use, using those on the data set or in dsopts=
:return: Pandas DataFrame
"""
Expand Down Expand Up @@ -1955,6 +1982,144 @@ def sasdata2dataframe(self, table: str, libref: str = '', dsopts: dict = None,
self._lastlog = self._io._log[lastlog:]
return df

def sd2pq(self, parquet_file_path: str, table: str, libref: str ='', dsopts: dict = None,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
coerce_timestamp_errors=True,
static_columns:list = None,
rowsep: str = '\x01', colsep: str = '\x02',
rowrep: str = ' ', colrep: str = ' ',
**kwargs) -> None:
"""
This method exports the SAS Data Set to a Parquet file. This is an alias for sasdata2parquet.
:param parquet_file_path: path of the parquet file to create
:param table: the name of the SAS Data Set you want to export to a Pandas Data Frame
:param libref: the libref for the SAS Data Set.
:param dsopts: data set options for the input SAS Data Set
:param pa_parquet_kwargs: Additional parameters to pass to pyarrow.parquet.ParquetWriter (default is {"compression": 'snappy', "flavor": "spark", "write_statistics": False}).
:param pa_pandas_kwargs: Additional parameters to pass to pyarrow.Table.from_pandas (default is {}).
:param partitioned: Boolean indicating whether the parquet file should be written in partitions (default is False).
:param partition_size_mb: The size in MB of each partition in memory (default is 128).
:param chunk_size_mb: The chunk size in MB at which the stream is processed (default is 4).
:param coerce_timestamp_errors: Whether to coerce errors when converting timestamps (default is True).
:param static_columns: List of tuples (name, value) representing static columns that will be added to the parquet file (default is None).
:param rowsep: the row seperator character to use; defaults to '\x01'
:param colsep: the column seperator character to use; defaults to '\x02'
:param rowrep: the char to convert to for any embedded rowsep chars, defaults to ' '
:param colrep: the char to convert to for any embedded colsep chars, defaults to ' '
These two options are for advanced usage. They override how saspy imports data. For more info
see https://sassoftware.github.io/saspy/advanced-topics.html#advanced-sd2df-and-df2sd-techniques
:param dtype: this is the parameter to Pandas read_csv, overriding what saspy generates and uses
:param my_fmts: bool, if True, overrides the formats saspy would use, using those on the data set or in dsopts=
:return: None
"""
dsopts = dsopts if dsopts is not None else {}
return self.sasdata2parquet(parquet_file_path = parquet_file_path,
table = table,
libref = libref,
dsopts = dsopts,
pa_parquet_kwargs = pa_parquet_kwargs,
pa_pandas_kwargs = pa_pandas_kwargs,
partitioned = partitioned,
partition_size_mb = partition_size_mb,
chunk_size_mb = chunk_size_mb,
coerce_timestamp_errors=coerce_timestamp_errors,
static_columns = static_columns,
rowsep = rowsep,
colsep = colsep,
rowrep = rowrep,
colrep = colrep,
**kwargs)


def sasdata2parquet(self,
parquet_file_path: str,
table: str,
libref: str ='',
dsopts: dict = None,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
coerce_timestamp_errors=True,
static_columns:list = None,
rowsep: str = '\x01',
colsep: str = '\x02',
rowrep: str = ' ',
colrep: str = ' ',
**kwargs) -> None:
"""
This method exports the SAS Data Set to a Parquet file. This is an alias for sasdata2parquet.
:param parquet_file_path: path of the parquet file to create
:param table: the name of the SAS Data Set you want to export to a Pandas Data Frame
:param libref: the libref for the SAS Data Set.
:param dsopts: data set options for the input SAS Data Set
:param pa_parquet_kwargs: Additional parameters to pass to pyarrow.parquet.ParquetWriter (default is {"compression": 'snappy', "flavor": "spark", "write_statistics": False}).
:param pa_pandas_kwargs: Additional parameters to pass to pyarrow.Table.from_pandas (default is {}).
:param partitioned: Boolean indicating whether the parquet file should be written in partitions (default is False).
:param partition_size_mb: The size in MB of each partition in memory (default is 128).
:param chunk_size_mb: The chunk size in MB at which the stream is processed (default is 4).
:param coerce_timestamp_errors: Whether to coerce errors when converting timestamps (default is True).
:param static_columns: List of tuples (name, value) representing static columns that will be added to the parquet file (default is None).
:param rowsep: the row seperator character to use; defaults to '\x01'
:param colsep: the column seperator character to use; defaults to '\x02'
:param rowrep: the char to convert to for any embedded rowsep chars, defaults to ' '
:param colrep: the char to convert to for any embedded colsep chars, defaults to ' '
These two options are for advanced usage. They override how saspy imports data. For more info
see https://sassoftware.github.io/saspy/advanced-topics.html#advanced-sd2df-and-df2sd-techniques
:param dtype: this is the parameter to Pandas read_csv, overriding what saspy generates and uses
:param my_fmts: bool, if True, overrides the formats saspy would use, using those on the data set or in dsopts=
:return: None
"""
lastlog = len(self._io._log)

dsopts = dsopts if dsopts is not None else {}
if self.exist(table, libref) == 0:
logger.error('The SAS Data Set ' + libref + '.' + table + ' does not exist')
if self.sascfg.bcv < 3007009:
return None
else:
raise FileNotFoundError('The SAS Data Set ' + libref + '.' + table + ' does not exist')

if self.nosub:
print("too complicated to show the code, read the source :), sorry.")
else:
self._io.sasdata2parquet(
parquet_file_path = parquet_file_path,
table = table,
libref = libref,
dsopts = dsopts,
pa_parquet_kwargs = pa_parquet_kwargs,
pa_pandas_kwargs = pa_pandas_kwargs,
partitioned = partitioned,
partition_size_mb = partition_size_mb,
chunk_size_mb = chunk_size_mb,
coerce_timestamp_errors=coerce_timestamp_errors,
static_columns = static_columns,
rowsep = rowsep,
colsep = colsep,
rowrep = rowrep,
colrep = colrep,
**kwargs)
self._lastlog = self._io._log[lastlog:]
return None

def _dsopts(self, dsopts):
"""
:param dsopts: a dictionary containing any of the following SAS data set options(where, drop, keep, obs, firstobs):
Expand Down
63 changes: 63 additions & 0 deletions saspy/sasdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,69 @@ def score(self, file: str = '', code: str = '', out: 'SASdata' = None) -> 'SASda
else:
return ll

def to_pq(self, parquet_file_path: str,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
coerce_timestamp_errors=True,
static_columns:list = None,
rowsep: str = '\x01', colsep: str = '\x02',
rowrep: str = ' ', colrep: str = ' ',
**kwargs) -> None:
"""
This method exports the SAS Data Set to a Parquet file. This is an alias for sasdata2parquet.
:param parquet_file_path: path of the parquet file to create
:param pa_parquet_kwargs: Additional parameters to pass to pyarrow.parquet.ParquetWriter (default is {"compression": 'snappy', "flavor": "spark", "write_statistics": False}).
:param pa_pandas_kwargs: Additional parameters to pass to pyarrow.Table.from_pandas (default is {}).
:param partitioned: Boolean indicating whether the parquet file should be written in partitions (default is False).
:param partition_size_mb: The size in MB of each partition in memory (default is 128).
:param chunk_size_mb: The chunk size in MB at which the stream is processed (default is 4).
:param coerce_timestamp_errors: Whether to coerce errors when converting timestamps (default is True).
:param static_columns: List of tuples (name, value) representing static columns that will be added to the parquet file (default is None).
:param rowsep: the row seperator character to use; defaults to '\x01'
:param colsep: the column seperator character to use; defaults to '\x02'
:param rowrep: the char to convert to for any embedded rowsep chars, defaults to ' '
:param colrep: the char to convert to for any embedded colsep chars, defaults to ' '
These two options are for advanced usage. They override how saspy imports data. For more info
see https://sassoftware.github.io/saspy/advanced-topics.html#advanced-sd2df-and-df2sd-techniques
:param dtype: this is the parameter to Pandas read_csv, overriding what saspy generates and uses
:param my_fmts: bool, if True, overrides the formats saspy would use, using those on the data set or in dsopts=
:return: None
"""
lastlog = len(self.sas._io._log)
ll = self._is_valid()
self.sas._lastlog = self.sas._io._log[lastlog:]
if ll:
print(ll['LOG'])
return None
else:
self.sas.sasdata2parquet(parquet_file_path = parquet_file_path,
table = self.table,
libref = self.libref,
dsopts = self.dsopts,
pa_parquet_kwargs = pa_parquet_kwargs,
pa_pandas_kwargs = pa_pandas_kwargs,
partitioned = partitioned,
partition_size_mb = partition_size_mb,
chunk_size_mb = chunk_size_mb,
coerce_timestamp_errors=coerce_timestamp_errors,
static_columns = static_columns,
rowsep = rowsep,
colsep = colsep,
rowrep = rowrep,
colrep = colrep,
**kwargs)
self.sas._lastlog = self.sas._io._log[lastlog:]
return None

def to_frame(self, **kwargs) -> 'pandas.DataFrame':
"""
This is just an alias for to_df()
Expand Down
12 changes: 12 additions & 0 deletions saspy/sasiocom.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,18 @@ def sasdata2dataframeCSV(self, table: str, libref: str ='', dsopts: dict = None,

return df

def sasdata2parquet(self, parquet_file_path: str, table: str, libref: str ='',
dsopts: dict = None, pa_schema: 'pa_schema' = None,
static_columns:list = None,
partitioned = False, partition_size_mb = 128,
chunk_size_mb = 4, compression = 'snappy',
rowsep: str = '\x01', colsep: str = '\x02',
rowrep: str = ' ', colrep: str = ' ',
**kwargs) -> None:

logger.error("This access method doesn't support this method. Try the IOM access method instead.")
return None

def upload(self, local: str, remote: str, overwrite: bool=True, permission: str='', **kwargs):
"""
Upload a file to the SAS server.
Expand Down
Loading

0 comments on commit e96339e

Please sign in to comment.