Skip to content

Commit

Permalink
Merge pull request #47 from gjmoore/ian
Browse files Browse the repository at this point in the history
small changes to enable creation of llc snapshots
  • Loading branch information
ifenty authored Nov 13, 2024
2 parents 5c7a29d + 499c3ea commit 9c491b5
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ job_logs*.json
*.DS_Store
# emacs temporary files:
*~
*._*
\#*\#
53 changes: 39 additions & 14 deletions processing/src/ecco_dataset_production/apps/create_job_task_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import importlib.resources
import json
import logging
import numpy as np
import os
import pandas as pd
import re
Expand All @@ -20,6 +21,7 @@
from .. import ecco_metadata_store
from .. import ecco_time
from .. import metadata
from pprint import pprint


logging.basicConfig(
Expand Down Expand Up @@ -262,9 +264,7 @@ def create_job_task_list(
Job = collections.namedtuple(
'Job',['metadata_groupings_id','product_type','frequency','time_steps'])


with open(jobfile,'r') as fh:

for line in fh:

#
Expand Down Expand Up @@ -303,7 +303,8 @@ def create_job_task_list(
time_coverage_duration = time_coverage_resolution = 'P1M'
dataset_description_head = 'This dataset contains monthly-averaged '
elif job.frequency.lower() == 'snap':
#TODO: path/file_freq_pat
path_freq_pat = 'diags_inst'
file_freq_pat = 'day_snap'
time_long_name = 'snapshot time'
time_coverage_duration = time_coverage_resolution = 'PT0S'
dataset_description_head = 'This dataset contains instantaneous '
Expand Down Expand Up @@ -349,6 +350,7 @@ def create_job_task_list(
one_to_one = False

if not one_to_one:
#('IAN not one_to_one')

# variable depends on component inputs; determine
# availability of input files and gather accordingly:
Expand All @@ -371,12 +373,14 @@ def create_job_task_list(
if isinstance(job.time_steps,str) and 'all'==job.time_steps.lower():
# get all possible time matches:
if aws.ecco_aws.is_s3_uri(ecco_source_root):

s3_key_pat = re.compile(
s3_parts.path.strip('/') # remove leading '/' from urlpath
+ '.*' # allow anything between path and filename
+ ecco_file.ECCOMDSFilestr(
prefix=variable_input_component_key,
averaging_period=file_freq_pat).re_filestr)

variable_input_component_files.extend(
[os.path.join(
urllib.parse.urlunparse(
Expand Down Expand Up @@ -489,13 +493,15 @@ def create_job_task_list(
variable_files = []

if aws.ecco_aws.is_s3_uri(ecco_source_root):
prefix=os.path.join(
s3_parts.path,
path_freq_pat,
'_'.join([variable,file_freq_pat]))
all_var_files_in_bucket = s3_list_files(
s3_client=s3c,
bucket=s3_parts.netloc,
prefix=os.path.join(
s3_parts.path,
path_freq_pat,
'_'.join([variable,file_freq_pat])))
prefix=prefix)


if isinstance(job.time_steps,str) and 'all'==job.time_steps.lower():
#if 'all' == job.time_steps.lower():
Expand Down Expand Up @@ -551,6 +557,9 @@ def create_job_task_list(

variable_files.sort()
variable_files_as_list_of_lists = []

print('number of files ', len(variable_files))

for f in variable_files:
if ecco_file.ECCOMDSFilestr(os.path.basename(f)).ext == 'data':
tmplist = [f]
Expand Down Expand Up @@ -607,13 +616,29 @@ def create_job_task_list(
# ECCOTask()'; subsequent operations using class functions.
task = {}

tb,center_time = ecco_time.make_time_bounds_metadata(
granule_time=time,
model_start_time=cfg['model_start_time'],
model_end_time=cfg['model_end_time'],
model_timestep=cfg['model_timestep'],
model_timestep_units=cfg['model_timestep_units'],
averaging_period=job.frequency)
model_start_time=cfg['model_start_time']
model_end_time=cfg['model_end_time']
model_timestep=cfg['model_timestep']
model_timestep_units=cfg['model_timestep_units']

if 'snap' in job.frequency.lower():
mst = np.datetime64(model_start_time)
td64 = np.timedelta64(int(time)*model_timestep ,model_timestep_units)
center_time = \
mst + \
td64
tb = []
tb.append(center_time)
tb.append(center_time)

else:
tb,center_time = ecco_time.make_time_bounds_metadata(
granule_time=time,
model_start_time=model_start_time,
model_end_time=model_end_time,
model_timestep=model_timestep,
model_timestep_units=model_timestep_units,
averaging_period=job.frequency)

if file_freq_pat == 'mon_mean':
# in the case of monthly means, ensure file date stamp is
Expand Down
7 changes: 5 additions & 2 deletions processing/src/ecco_dataset_production/ecco_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ def __init__(self,filestr=None,**kwargs):
"""
if filestr:
#print('IAN filestr ', filestr)
# use filestr to set all attributes (a little complicated because
# ECCO variable names may include underscores):
try:
re_so = re.search('_day_inst|_day_mean|_mon_mean',filestr)
re_so = re.search('_day_snap|_day_mean|_mon_mean',filestr)
self.prefix = filestr[:re_so.span()[0]]
self.averaging_period = filestr[re_so.span()[0]+1:re_so.span()[1]]
time_and_ext = filestr[re_so.span()[1]+1:]
Expand Down Expand Up @@ -151,10 +152,12 @@ def __init__(self,filestr=None,**kwargs):
"""
if filestr:
#print('IAN ECCOGranuleFilestr filestr ', filestr)

# use filestr to set all attributes (a little complicated because
# ECCO variable names may include underscores):
try:
re_so = re.search('_day_inst|_day_mean|_mon_mean',filestr)
re_so = re.search('_day_snap|_day_mean|_mon_mean',filestr)
self.prefix = filestr[:re_so.span()[0]]
self.averaging_period = filestr[re_so.span()[0]+1:re_so.span()[1]]
date_version_grid_type_grid_label_and_ext = filestr[re_so.span()[1]+1:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def ecco_make_granule( task, cfg,
task (dict):
cfg
"""

log = logging.getLogger('edp.'+__name__)
if log_level:
log.setLevel(log_level)
Expand All @@ -60,8 +61,10 @@ def ecco_make_granule( task, cfg,
merged_variable_dataset = xr.merge(variable_datasets)

elif this_task.is_native:

log.info('generating %s ...', os.path.basename(this_task['granule']))
for variable in this_task.variable_names:

log.debug('... adding %s using:', variable)
for infile in itertools.chain.from_iterable(this_task.variable_inputs(variable)):
log.debug(' %s', infile)
Expand Down Expand Up @@ -93,12 +96,14 @@ def ecco_make_granule( task, cfg,
this_task['granule'], encoding=encoding)
else:
with tempfile.TemporaryDirectory() as tmpdir:
# temporary directory will self-destruct at end of with block
_src = os.path.basename(this_task['granule'])
_dest = this_task['granule']
merged_variable_dataset_with_all_metadata.to_netcdf(
os.path.join(tmpdir,_src), encoding=encoding)
log.info('uploading %s to %s', os.path.join(tmpdir,_src), _dest)
ecco_aws_s3_cp.aws_s3_cp( src=os.path.join(tmpdir,_src), dest=_dest, **kwargs)

log.info('... done')


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash


# Run in a directory with one or more json task files (extension .json)
# Loops through each one; sends them all off to run in parallel
# takes one argument, the root name of the task file json file(s)


EDP_root_dir=/home/jpluser/edp
CFGFILE="${EDP_root_dir}/ECCO-Dataset-Production/processing/configs/product_generation_config_updated.yaml"

# loop through all task files
TASKFILE=${1}

echo $TASKFILE

#KEYGEN='/usr/local/bin/aws-login.darwin.amd64'
#PROFILE='saml-pub'

edp_generate_dataproducts --tasklist ${TASKFILE} --cfgfile ${CFGFILE} --log DEBUG
#> LOG_$TASKFILE.log 2> LOG_$TASKFILE.log
# #--keygen ${KEYGEN} \
# #--profile ${PROFILE} \

70 changes: 70 additions & 0 deletions processing/src/ecco_dataset_production/utils/split_task_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import json
import math
import argparse

def split_json(input_file, num_files, output_base):
"""
Splits a JSON file into multiple smaller JSON files.
Arguments:
- input_file (str): Path to the input JSON file to split.
- num_files (int): Number of output JSON files to create.
- output_base (str): Base name for the output files. Each file will be named
as `output_base_001.json`, `output_base_002.json`, etc.
The script reads a JSON file, divides it into `num_files` parts, and writes
each part into a new JSON file. Each new file will contain a roughly equal
number of entries from the original JSON file.
"""

# Load the original JSON file
with open(input_file, 'r') as infile:
data = json.load(infile)

# Total number of entries in the input JSON
n = len(data)

# Calculate the number of entries per file (ceiling division to ensure all data is covered)
entries_per_file = math.ceil(n / num_files)

# Split the data and write to new JSON files
for i in range(num_files):
start_index = i * entries_per_file
end_index = start_index + entries_per_file

# Create a subset of the data for the current split
subset = data[start_index:end_index]

# Output filename with zero-padded numbers (e.g., output_base_001.json)
output_filename = f'{output_base}_{i+1:03}.json'

# Write the subset to the output file
with open(output_filename, 'w') as outfile:
json.dump(subset, outfile, indent=4)

print(f"Split into {num_files} files.")

# Set up argument parsing
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Split a JSON file into multiple smaller files.'
)

# Positional argument for input file
parser.add_argument('input_file', type=str,
help='Path to the input JSON file to be split.')

# Positional argument for number of files to split into
parser.add_argument('num_files', type=int,
help='Number of output JSON files to create.')

# Positional argument for output base name
parser.add_argument('output_base', type=str,
help='Base name for the output files (e.g., "output" will result in "output_001.json", "output_002.json", etc.)')

# Parse the command-line arguments
args = parser.parse_args()

# Call the function to split the JSON
split_json(args.input_file, args.num_files, args.output_base)

7 changes: 7 additions & 0 deletions tests/SSH_native_latlon_local/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# two steps

# 1. create task list
./edp_create_job_task_list_SSH_native_latlon_mon_mean.sh SSH_native_latlon_mon_mean_jobs.txt SSH_native_latlon_mon_mean_tasks.json.sav

# 2. generate data products from task list
./edp_generate_dataproducts_SSH_native_latlon_mon_mean.sh SSH_native_latlon_mon_mean_tasks.json.sav
Loading

0 comments on commit 9c491b5

Please sign in to comment.