From 1e484563d62049364599951c708ee84d457b2c44 Mon Sep 17 00:00:00 2001 From: Ian Fenty Date: Thu, 13 Jun 2024 04:46:31 +0000 Subject: [PATCH] added code to make single kerchunk and mzz json versions of ecco v4r4 datasets on pocloud --- .../generate_ecco_v4r4_jsons.ipynb | 1352 +++++++++++++++++ 1 file changed, 1352 insertions(+) create mode 100644 AWS/NetCDF_to_JSON/generate_ecco_v4r4_jsons.ipynb diff --git a/AWS/NetCDF_to_JSON/generate_ecco_v4r4_jsons.ipynb b/AWS/NetCDF_to_JSON/generate_ecco_v4r4_jsons.ipynb new file mode 100644 index 0000000..92c8a46 --- /dev/null +++ b/AWS/NetCDF_to_JSON/generate_ecco_v4r4_jsons.ipynb @@ -0,0 +1,1352 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 36, + "id": "21bd2789-8523-43e2-bbf5-f21e6f0cfafd", + "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'distributed' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[36], line 102\u001b[0m\n\u001b[1;32m 96\u001b[0m warnings\u001b[38;5;241m.\u001b[39mfilterwarnings(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mignore\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 98\u001b[0m \u001b[38;5;66;03m# Dask often throws probably harmless warnings when the client/cluster leave\u001b[39;00m\n\u001b[1;32m 99\u001b[0m \u001b[38;5;66;03m# the context manager about \"stream closed\" connectionPool heartbeat_worker etc.\u001b[39;00m\n\u001b[1;32m 100\u001b[0m \u001b[38;5;66;03m# Silencing the warning with the following as per:\u001b[39;00m\n\u001b[1;32m 101\u001b[0m \u001b[38;5;66;03m# https://github.com/dask/distributed/issues/7105\u001b[39;00m\n\u001b[0;32m--> 102\u001b[0m warnings\u001b[38;5;241m.\u001b[39msimplefilter(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mignore\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[43mdistributed\u001b[49m\u001b[38;5;241m.\u001b[39mcomm\u001b[38;5;241m.\u001b[39mcore\u001b[38;5;241m.\u001b[39mCommClosedError)\n", + "\u001b[0;31mNameError\u001b[0m: name 'distributed' is not defined" + ] + } + ], + "source": [ + "# kerchunk ECCO v4r4 netcdfs stored on podaac s3 \n", + "# to single jsons and multi-zarr jsons\n", + "\n", + "# -- Ian Fenty\n", + "# -- 2024-05-29, 5-30, 6-10, 6-11, 6-12\n", + "\n", + "# executed mainly on a g4dn.8xlarge with 32 cpus and 128 Gb ram\n", + "\n", + "# performance ranges from about 0.1-1 second per granule\n", + "# depending on granule size (number of fields x number of levels)\n", + "\n", + "# total processing time of all ECCO v4r4 granules is about 2 days\n", + "\n", + "\n", + "# Fastest Way to Crunch Through the Granules\n", + "############################################\n", + "# BEST: dask with \"ncpu\" workers as \"processes\" each with 1 thread.\n", + "# much slower: is using 8 workers each with 4 threads \n", + "# much slower: is using \"threadpool\" multiprocessing\n", + "\n", + "# Why?\n", + "# I think the reason is that the kerchunk calculations are\n", + "# \"embarassingly parallel\" and involves zero memory sharing. \n", + "# We have 32 cores available that can work in parallel \n", + "# to read one netcdf file from s3 and make the kerchunk json\n", + "# When tell dask to have 8 workers each with 4 threads,\n", + "# I think they often have to wait for the release of the\n", + "# python 'global interpreter lock' GIL.\n", + "# See https://dask.discourse.group/t/understanding-how-dask-is-executing-processes-vs-threads/666/4\n", + "# \n", + "# Also see: https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers\n", + "# \"Typically one decides between these choices based on the workload. \n", + "# The difference here is due to Python's Global Interpreter Lock, \n", + "# which limits parallelism for some kinds of data. If you are working\n", + "# mostly with Numpy, Pandas, Scikit-Learn, or other numerical programming \n", + "# libraries in Python then you don't need to worry about the GIL, and you \n", + "# probably want to prefer few processes with many threads each. This helps \n", + "# because it allows data to move freely between your cores because it all \n", + "# lives in the same process. However, if you're doing mostly Pure Python \n", + "# programming, like dealing with text data, dictionaries/lists/sets, and \n", + "# doing most of your computation in tight Python for loops then you'll \n", + "# want to prefer having many processes with few threads each. \n", + "# This incurs extra communication costs, but lets you bypass the GIL.\"\n", + "\n", + "# Also: https://docs.dask.org/en/latest/best-practices.html\n", + "#If you’re doing mostly numeric work with Numpy, pandas, Scikit-learn,\n", + "# Numba, and other libraries that release the GIL, then use mostly threads.\n", + "# If you’re doing work on text data or Python collections like lists\n", + "# and dicts then use mostly processes.\n", + "#If you’re on larger machines with a high thread count (greater than 10),\n", + "# then you should probably split things up into at least a few processes \n", + "# regardless. Python can be highly productive with 10 threads per process with\n", + "# numeric work, but not 50 threads.\n", + "\n", + "\n", + "\n", + "import boto3\n", + "from concurrent.futures import ThreadPoolExecutor\n", + "from concurrent.futures import wait\n", + "from concurrent.futures import ProcessPoolExecutor\n", + "from concurrent.futures import as_completed\n", + "\n", + "import dask\n", + "from dask.distributed import Client, LocalCluster\n", + "from datetime import datetime\n", + "import fsspec\n", + "from glob import glob\n", + "from http.cookiejar import CookieJar\n", + "from kerchunk.hdf import SingleHdf5ToZarr \n", + "from kerchunk.combine import MultiZarrToZarr\n", + "import logging\n", + "import matplotlib.pyplot as plt\n", + "from netrc import netrc\n", + "import numpy as np\n", + "import os\n", + "from os.path import basename, isfile, isdir, join, expanduser\n", + "import pandas as pd\n", + "import pathlib\n", + "from pathlib import Path\n", + "from platform import system\n", + "from pprint import pprint\n", + "import shutil\n", + "import subprocess\n", + "import requests\n", + "import s3fs\n", + "import time as time\n", + "from tqdm import tqdm\n", + "import ujson\n", + "from urllib import request\n", + "import warnings\n", + "import xarray as xr\n", + "\n", + "#https://pypi.org/project/python-cmr/\n", + "from cmr import CollectionQuery, GranuleQuery, ToolQuery, ServiceQuery, VariableQuery\n", + "\n", + "warnings.filterwarnings(\"ignore\")\n", + "\n", + "# Dask often throws probably harmless warnings when the client/cluster leave\n", + "# the context manager about \"stream closed\" connectionPool heartbeat_worker etc.\n", + "# Silencing the warning with the following as per:\n", + "# https://github.com/dask/distributed/issues/7105\n", + "warnings.simplefilter(\"ignore\", distributed.comm.core.CommClosedError)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "5456464a-7c80-4e35-ae28-2bda97ded39b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "ee700cbb-01c2-4628-9349-f3d06443f258", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/tmp/dask-scratch-space does not exist\n" + ] + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dtmp = Path('/tmp/dask-scratch-space');\n", + "\n", + "if dtmp.exists():\n", + " try:\n", + " shutil.rmtree(dtmp)\n", + " except: \n", + " print('could not delete dask scratch space')\n", + "else:\n", + " print(f'{dtmp} does not exist')\n", + "\n", + "# from the following:\n", + "# https://dask.discourse.group/t/dask-workers-killed-because-of-heartbeat-fail/856\n", + "from dask import config as cfg \n", + "cfg.set({'distributed.scheduler.worker-ttl': None})\n", + "cfg.set({'heartbeat': '90s', 'scheduler-info-interval': '90s'})\n", + "cfg.set(temporary_directory=str('/tmp'))" + ] + }, + { + "cell_type": "markdown", + "id": "2c8fdb90-60e6-4eec-ad6c-7638cdd3abe8", + "metadata": {}, + "source": [ + "# subroutines" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "40ddf61a-75ea-4009-aa97-d1051a8c383c", + "metadata": {}, + "outputs": [], + "source": [ + "## Define Helper Subroutines\n", + "### Helper subroutine to log into and access files on NASA EarthData\n", + "\n", + "# not pretty but it works\n", + "def setup_earthdata_login_auth(url: str='urs.earthdata.nasa.gov'):\n", + " # look for the netrc file and use the login/password\n", + " try:\n", + " username, _, password = netrc(file=_netrc).authenticators(url)\n", + "\n", + " # if the file is not found, prompt the user for the login/password\n", + " except (FileNotFoundError, TypeError):\n", + " print('Please provide Earthdata Login credentials for access.')\n", + " username, password = input('Username: '), getpass('Password: ')\n", + " \n", + " manager = request.HTTPPasswordMgrWithDefaultRealm()\n", + " manager.add_password(None, url, username, password)\n", + " auth = request.HTTPBasicAuthHandler(manager)\n", + " jar = CookieJar()\n", + " processor = request.HTTPCookieProcessor(jar)\n", + " opener = request.build_opener(auth, processor)\n", + " request.install_opener(opener)\n", + "\n", + "\n", + "def init_S3FileSystem():\n", + " \"\"\"\n", + " This routine automatically pulls your EDL crediential from .netrc file and use it to obtain an AWS S3 credential \n", + " through a PO.DAAC service accessible at https://archive.podaac.earthdata.nasa.gov/s3credentials.\n", + " From the PO.DAAC Github (https://podaac.github.io/tutorials/external/July_2022_Earthdata_Webinar.html).\n", + " \n", + " Returns:\n", + " =======\n", + " \n", + " s3: an AWS S3 filesystem\n", + " \"\"\"\n", + " \n", + " import requests,s3fs\n", + " credentials = requests.get('https://archive.podaac.earthdata.nasa.gov/s3credentials').json()\n", + " s3 = s3fs.S3FileSystem(anon=False,\n", + " key=credentials['accessKeyId'],\n", + " secret=credentials['secretAccessKey'], \n", + " token=credentials['sessionToken'])\n", + " return s3, credentials\n", + "\n", + "\n", + "# Uses https://pypi.org/project/python-cmr/\n", + "def get_granule_urls(ShortName):\n", + " gq = GranuleQuery()\n", + " gq.short_name(ShortName)\n", + " tmp = gq.get_all()\n", + " \n", + " s3_files_list = []\n", + " for t in tmp:\n", + " s3_files_list.append(t['links'][0]['href'])\n", + "\n", + " s3_urls = [x.split('s3://')[1] for x in s3_files_list]\n", + " \n", + " return s3_urls, s3_files_list\n", + " \n", + "# Uses https://pypi.org/project/python-cmr/\n", + "def get_dataset_names():\n", + " cq = CollectionQuery()\n", + " ds_names = [];\n", + " tmp = cq.keyword('ECCO*V4R4').get_all()\n", + " for t in tmp:\n", + " ds_names.append(t['short_name'])\n", + " return ds_names\n" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "ac587828-4f2b-400e-9c00-ca1eef5038c0", + "metadata": {}, + "outputs": [], + "source": [ + "# recursive delete directory (found somewhere on the internet)\n", + "def rm_tree(pth: Path):\n", + " try:\n", + " for child in pth.iterdir():\n", + " if child.is_file():\n", + " child.unlink()\n", + " else:\n", + " rm_tree(child)\n", + " pth.rmdir()\n", + " except:\n", + " print('could not delete ', pth)\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "489335a1-a479-400a-a72f-864ada938665", + "metadata": {}, + "outputs": [], + "source": [ + "def make_output_dir(output_dir, make_clean=False):\n", + " # prepare output directory\n", + "\n", + " if type(output_dir)!=pathlib.PosixPath:\n", + " output_dir = Path(output_dir)\n", + " \n", + " # ... delete output directory if it already exists\n", + " if make_clean:\n", + " try:\n", + " print(f'deleting {output_dir}')\n", + " rm_tree(output_dir)\n", + " except:\n", + " print(f'could not delete {output_dir}')\n", + " \n", + " # ... make new output directory\n", + " try:\n", + " print(f'making {output_dir}')\n", + " \n", + " output_dir.mkdir(exist_ok=True, parents=True)\n", + " except:\n", + " print(f'could not make {output_dir}')" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "755b6375-c5fb-4a2d-8682-7b4029a83428", + "metadata": {}, + "outputs": [], + "source": [ + "# loads a dataset from the 'kerchunk' json where the\n", + "# netcdf files are stored on s3\n", + "def load_dataset_from_json_s3(json_file_path, credentials, time_chunk=20):\n", + " \n", + " fs = fsspec.filesystem(\n", + " \"reference\", \n", + " fo=json_file_path, \n", + " remote_protocol=\"s3\", \n", + " remote_options={'anon':False, \n", + " 'key':credentials['accessKeyId'], \n", + " 'secret':credentials['secretAccessKey'],\n", + " 'token':credentials['sessionToken']},\n", + " skip_instance_cache=True\n", + " )\n", + " m = fs.get_mapper(\"\")\n", + " if time_chunk > 0:\n", + " ds = xr.open_dataset(m, engine='zarr', consolidated=False, chunks={'time':time_chunk})\n", + " else:\n", + " ds = xr.open_dataset(m, engine='zarr', consolidated=False)\n", + " \n", + " ds.close()\n", + " \n", + " return ds" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "9ff4905f-a6ae-48ca-b95e-9134b7de8872", + "metadata": {}, + "outputs": [], + "source": [ + "# loads a dataset from the 'kerchunk' json where the\n", + "# netcdf files are stored on a local disk\n", + "def load_dataset_from_json_local(json_file_path, s3_options):\n", + " fs = fsspec.filesystem(\n", + " \"reference\", \n", + " fo=json_file_path, \n", + " skip_instance_cache=True\n", + " )\n", + " m = fs.get_mapper(\"\")\n", + " ds = xr.open_dataset(m, engine='zarr', consolidated=False)\n", + " print(ds)\n", + " ds.close()\n", + " return ds" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "id": "726784d2-b582-4fa2-a06b-50925b5864d2", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "def update_credential(credentials, force=False):\n", + " now=np.datetime64(datetime.now())\n", + " \n", + " # expiration time of current credential\n", + " exp=np.datetime64(credentials['expiration'][:-6])\n", + " # current time\n", + " \n", + " # how much time is left [seconds]\n", + " td_sec = np.double(exp-now)/1e6\n", + "\n", + " # if < 1800 seconds left before credential expires, renew it\n", + " if (td_sec < 1800) or (force==True):\n", + " print(f'... updating credentials, {td_sec}s remaining')\n", + " s3, credentials = init_S3FileSystem()\n", + " exp=np.datetime64(credentials['expiration'][:-6])\n", + " td_sec = np.double(exp-now)/1e6\n", + " print(f'... after credential update, {td_sec}s remaining')\n", + " else:\n", + " print(f'... not updating credentials, {td_sec}s remaining')\n", + " \n", + " return credentials\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "131343a5-4ba8-4eb3-a1e3-bb28b298b464", + "metadata": {}, + "outputs": [], + "source": [ + "# use Kerchunk to make a local json corresponding to\n", + "# a single netcdf file stored on s3 \n", + "\n", + "# inputs:\n", + "# url: netcdf url on s3\n", + "# output_dir : where to save this bad boy\n", + "# credentials: podaac aws credentials\n", + "# inline treshold, see below\n", + "\n", + "def gen_json(url, output_dir, credentials, inline_threshold=10000):\n", + " \n", + " # not surethe implication of using nonzero inline threshold\n", + " # ... \"inline_threshold – int Size below which binary blocks are included directly in the output\"\n", + " # ... \"inline_threshold – Byte size below which an array will be embedded in the output. Use 0 to disable inlining.\"\n", + " # from https://fsspec.github.io/kerchunk/reference.html\n", + " \n", + " so = dict(\n", + " mode=\"rb\", anon=False, \n", + " default_fill_cache=False,\n", + " default_cache_type=\"readahead\", \n", + " key=credentials['accessKeyId'], \n", + " secret=credentials['secretAccessKey'],\n", + " token=credentials['sessionToken'],\n", + " )\n", + " with fsspec.open(url, **so) as inf:\n", + " # h5chunks translate operation takes the vast majority of the time\n", + " h5chunks = SingleHdf5ToZarr(inf, url, inline_threshold=inline_threshold).translate()\n", + "\n", + " # in contrast, ujson.dumps and write to disk are super fast\n", + " with open(f\"{output_dir}/{url.split('/')[-1]}.json\", 'wb') as outf:\n", + " outf.write(ujson.dumps(h5chunks).encode())" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "2e12cfd0-8a53-47f7-9085-f5b18e5a3637", + "metadata": {}, + "outputs": [], + "source": [ + "import concurrent.futures\n", + "import urllib.request\n", + "\n", + "def single_kerchunk_driver_pool(ShortName, \n", + " output_base_dir = Path('/home/jpluser/efs-mount-point/ifenty/ecco_v4r4'),\n", + " inline_threshold=10000,\n", + " debug=True):\n", + " \n", + " print('\\n***************************************')\n", + " print('\\nSelected dataset ', ShortName)\n", + " print('\\n***************************************')\n", + " \n", + " print('locating urls:')\n", + " s3_urls, full_urls = get_granule_urls(ShortName) \n", + " print(f'\\n{ShortName} has {len(s3_urls)} files')\n", + "\n", + " # make output dir\n", + " output_dir = output_base_dir / ShortName\n", + " make_output_dir(output_dir, make_clean=True)\n", + "\n", + " urls_to_process = full_urls\n", + " if debug:\n", + " urls_to_process = full_urls[:ncpus*2]\n", + " n_urls = len(urls_to_process)\n", + " print(f'processing urls {n_urls}')\n", + "\n", + " # split into batches because of podaac credential expiration\n", + " # each batch would process 128 granules\n", + " nmzz_jobs = int(n_urls/(312))+1\n", + "\n", + " # divide into batches\n", + " url_split = np.array_split(np.array(urls_to_process), nmzz_jobs) \n", + " url_split = [list(x) for x in url_split]\n", + " print(f'splitting urls into {nmzz_jobs} batches')\n", + "\n", + " # begin processing granules\n", + " start_time=time.time()\n", + "\n", + " url_split_count = 0\n", + " for url_subset in url_split:\n", + " # update credentials if necessary\n", + " update_credential(credentials)\n", + " \n", + " url_split_count = url_split_count +1\n", + " print(f'... url_subset {url_split_count}, n={len(url_subset)}')\n", + "\n", + " with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:\n", + " start_time_b=time.time()\n", + "\n", + " futures = []\n", + " for url in url_subset:\n", + " futures.append(executor.submit(gen_json, url=url, output_dir=output_dir, credentials=credentials))\n", + " \n", + " n_returned = 0\n", + " for future in concurrent.futures.as_completed(futures):\n", + " n_returned = n_returned + 1\n", + " if n_returned % 100 ==0:\n", + " print(' ', n_returned)\n", + "\n", + " end_time_b = time.time()\n", + " tt = end_time_b-start_time_b\n", + " tpg = tt/len(url_subset)\n", + " \n", + " print(f'** subset {url_split_count} finished processing {len(url_subset)} in {tt:.2f}s, time per granule {tpg:.2f}s')\n", + " \n", + " end_time = time.time()\n", + " tt = end_time-start_time\n", + " tpg = tt/n_urls\n", + " print(f'\\n** SINGLE KERCHUNK finished processing {n_urls} in {tt:.2f}s, time per granule {tpg:.2f}s')\n", + " \n", + " \n", + " return output_dir\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "17b559f8-ea1f-4483-9785-382210169fee", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# Makes a kerchunk json for all granules in the podaac dataset with ShortName\n", + "# dask slightly speeds up the processing\n", + "# the lag is in the .translate command of the kerchunk\n", + "# it doesn't seem to parallelize. may be related to fsspec\n", + "\n", + "def single_kerchunk_driver_dask(ShortName, \n", + " output_base_dir = Path('/home/jpluser/efs-mount-point/ifenty/ecco_v4r4'),\n", + " debug=True):\n", + "\n", + " # spin up a local cluster\n", + " with LocalCluster(n_workers=32,\n", + " processes=True,\n", + " threads_per_worker=1,\n", + " memory_limit='auto',\n", + " dashboard_address=':8787') as cluster, Client(cluster) as client:\n", + " \n", + " print(f'dask dashboard at : {cluster.dashboard_link}')\n", + " ncores = client.ncores()\n", + " num_dask_threads= np.sum([ client.ncores()[key] for key in list(ncores.keys())])\n", + " \n", + " print('\\n***************************************')\n", + " print('Selected dataset ', ShortName)\n", + " print('***************************************')\n", + " \n", + " # get url list\n", + " s3_urls, full_urls = get_granule_urls(ShortName) \n", + " print(f'\\n{ShortName} has {len(s3_urls)} files')\n", + " \n", + " # make output dir\n", + " output_dir = output_base_dir / ShortName\n", + " make_output_dir(output_dir)\n", + " \n", + " # process nc files at urls to single kerchunk json files\n", + " if debug:\n", + " urls_to_process = full_urls[:128]\n", + " else:\n", + " urls_to_process = full_urls\n", + " \n", + " n_urls=len(urls_to_process)\n", + " print(f'\\nProcessing {n_urls} files!')\n", + " \n", + " # split into batches because of podaac credential expiration\n", + " nmzz_jobs = int(n_urls/(312))+1\n", + " print(f'** splitting url master list into {nmzz_jobs} jobs')\n", + " \n", + " # divide into batches\n", + " url_split = np.array_split(np.array(urls_to_process), nmzz_jobs) \n", + " url_split = [list(x) for x in url_split]\n", + " \n", + " # process in batches\n", + " start_time=time.time()\n", + " url_split_count = 1\n", + " \n", + " print(\"** starting single kerchunk processing at \", datetime.now())\n", + " \n", + " # ... loop through batches\n", + " for url_subset in url_split:\n", + " # update credentials if necessary\n", + " update_credential(credentials)\n", + " \n", + " print(f'... starting url_subset {url_split_count} of {nmzz_jobs}')\n", + " stb = time.time()\n", + " \n", + " # loop through urls in this batch, append dask delayed job to \"results\" list\n", + " results = []\n", + " for url in url_subset:\n", + " results.append(dask.delayed(gen_json)(url=url, output_dir=output_dir, credentials=credentials))\n", + " # compute all of the delayed gen_json commands\n", + " d = dask.compute(*results)\n", + " \n", + " # ... stats\n", + " etb = time.time(); ttb = etb-stb; tpgb = ttb/len(url_subset)\n", + " print(f'... url_subset finished processing {len(url_subset)} in {ttb:.2f}s, time per granule {tpgb:.2f}s')\n", + " \n", + " # increment counter\n", + " url_split_count = url_split_count + 1\n", + " \n", + " # status\n", + " end_time = time.time();tt = end_time-start_time;tpg = tt/n_urls\n", + " print(f'** SINGLE KERCHUNK finished processing {n_urls} in {tt:.2f}s, time per granule {tpg:.2f}s')\n", + " \n", + " client.close(timeout=360)\n", + " cluster.close(timeout=360)\n", + " client=None; cluster=None\n", + " \n", + " return str(output_dir)" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "74a99523-a20c-4bdb-aefe-b4dc22a7f10d", + "metadata": {}, + "outputs": [], + "source": [ + "# Multi-Zarr to Zarr. Converts the many individual jsons (one for each file) to a single mega json\n", + "# very important to specify identical dims so the routine doesn't add\n", + "# time dimensions to non-time coordinates and dimensions (like XC, YC)\n", + "\n", + "# this version just returns the translated jsons so I can \n", + "# use dask distributed to parallelize this step\n", + "def multi_multizarr(params, inline_threshold=10000):\n", + " json_list = params[0]\n", + " credentials = params[1]\n", + " time_invariant_dims_and_coords= params[2]\n", + " \n", + " mzz = MultiZarrToZarr(\n", + " json_list,\n", + " remote_protocol=\"s3\",\n", + " remote_options={'anon':False, \n", + " 'key':credentials['accessKeyId'], \n", + " 'secret':credentials['secretAccessKey'],\n", + " 'token':credentials['sessionToken']},\n", + " concat_dims='time',\n", + " inline_threshold=inline_threshold,\n", + " identical_dims=time_invariant_dims_and_coords\n", + " )\n", + " \n", + " return mzz.translate()\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "b52c6e5f-b531-46d8-afe8-69992dd118d7", + "metadata": {}, + "outputs": [], + "source": [ + "def mzz_driver(ShortName, \n", + " credentials, \n", + " single_json_output_dir, \n", + " mzz_output_dir, \n", + " use_dask =False, debug=True):\n", + " \n", + " ##### inputs:\n", + " # ShortName: podaac shortname\n", + " # credentials: dictionary of aws credentials\n", + " # single_json_output_dir: directory with all the single kerchunk json files for this ShortName dataset\n", + " # mzz_output_dir : directory where we will save the final single kerchunk json file for the dataset\n", + " # use_dask : boolean\n", + " # debug : boolean\n", + " ####\n", + "\n", + " ##### outputs:\n", + " # final_json_fname: the full path to the new single kerchunk json file\n", + " \n", + " ## MAKE LIST OF ALL SINGLE JSON KERCHUNK FILES\n", + " print(single_json_output_dir, type(single_json_output_dir))\n", + " json_list = np.sort(list(single_json_output_dir.glob('*json')))\n", + " json_list = [str(p) for p in json_list]\n", + "\n", + " ## MAKE OUTPUT DIRECTORY FOR THE NEW MZZ FILE\n", + " make_output_dir(mzz_output_dir)\n", + " \n", + " # MAKE THE NAME OF THE NEW MZZ FILE\n", + " # ... pass the list of dimensions that are time invariant\n", + " final_json_fname = f'{mzz_output_dir}/{ShortName}.json'\n", + " print(f'\\n... final reference output json filename: {final_json_fname}')\n", + "\n", + " # FIND DIMENSIONS AND COORDS OF THIS DATASET THAT ARE DO NOT VARY IN TIME\n", + " # ... load one example json file (the first one)\n", + " print('\\nfinding time invariant dims:')\n", + " example_dataset = load_dataset_from_json_s3(str(json_list[0]), credentials)\n", + "\n", + " if 'time' not in example_dataset.dims:\n", + " return\n", + " \n", + " # ... then find all coords and dims without 'time' in the name \n", + " time_invariant_dims_and_coords = find_time_invariant_dims_and_coords(example_dataset)\n", + " \n", + " start_time=time.time()\n", + " n_jsons = len(json_list)\n", + " print(f'found {n_jsons} json files in {single_json_output_dir}')\n", + "\n", + " print(\"** starting mzz processing at \", datetime.now())\n", + " if use_dask: # IF WE USE DASK\n", + " # # split up the MZZ in n parallel jobs\n", + " # # inspired by this https://gist.github.com/peterm790/5f901453ed7ac75ac28ed21a7138dcf8\n", + "\n", + " # spin up a cluster\n", + " with LocalCluster(n_workers=8,\n", + " processes=True,\n", + " threads_per_worker=1,\n", + " memory_limit='auto',\n", + " dashboard_address=':8787') as cluster, Client(cluster) as client:\n", + "\n", + " print(f'dask dashboard at : {cluster.dashboard_link}')\n", + " ncores = client.ncores()\n", + " num_dask_threads= np.sum([ client.ncores()[key] for key in list(ncores.keys())])\n", + " #print(ncores, num_dask_threads)\n", + "\n", + " # default, process all jsons, use all threads\n", + " # split into batches because of podaac credential expiration\n", + " \n", + " jsons_to_process = json_list\n", + " n_jsons = len(jsons_to_process)\n", + " \n", + " nmzz_jobs = int(n_jsons/(32))+1\n", + " print(f'** splitting url master list into {nmzz_jobs} jobs')\n", + " \n", + " # split the long list of jsons into nmzz_jobs shorter lists\n", + " json_split = np.array_split(np.array(jsons_to_process), nmzz_jobs) \n", + " json_split = [list(x) for x in json_split] \n", + " \n", + " print(f'** using dask we split jsons_to_process into {len(json_split)} batches')\n", + "\n", + " credentials = update_credential(credentials)\n", + " # using dask, we send each shorter list ot multi_multizarr\n", + " # for parallel processing\n", + " intermediate_jsons = []\n", + " for json_sublist in json_split:\n", + " params = [json_sublist, credentials, time_invariant_dims_and_coords]\n", + " x= dask.delayed(multi_multizarr)(params)\n", + " intermediate_jsons.append(x)\n", + "\n", + " print('... making second level dask delayed')\n", + " # this is the trippy bit, we don't compute the intermediate jsons yet\n", + " # we send the *intermediate* jsons to dask delayed and compute afterwards\n", + " # ... some kind of recursive json merging\n", + " params = [intermediate_jsons, credentials, time_invariant_dims_and_coords]\n", + " d = dask.delayed(multi_multizarr)(params)\n", + " print('... ordering dask compute')\n", + " d = d.compute()\n", + " \n", + " \n", + " else: # IF WE DON'T USE DASK\n", + " if debug:\n", + " n_jsons = 6\n", + "\n", + " jsons_to_process = json_list[:n_jsons]\n", + " d = multi_multizarr([jsons_to_process, credentials, time_invariant_dims_and_coords])\n", + "\n", + " ## FINISHED PROCESSING ... SAVE TO DISK\n", + " print('... writing to disk ', final_json_fname)\n", + " with open(final_json_fname, 'wb') as f:\n", + " f.write(ujson.dumps(d).encode())\n", + "\n", + " ## STATS\n", + " end_time = time.time()\n", + " tt = end_time-start_time\n", + " tpg = tt/n_jsons\n", + " \n", + " print(f'** MZZ finished processing {n_jsons} in {tt:.2f}s')\n", + " print(f'** MZZ time per jsons_file {tpg:.2f}s')\n", + "\n", + " return final_json_fname\n" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "945ae237-453e-4e02-8f64-f79c9ae58f94", + "metadata": {}, + "outputs": [], + "source": [ + "# find the set of time invariant dimensions and coordinates for this dataset\n", + "# ... the only coords and dimensions that vary with time have the name time in them!\n", + "# ... this is needed for MZZ\n", + "# ... so we don't duplicate time-invariant dims and coords in the multi-zarr json\n", + "\n", + "def find_time_invariant_dims_and_coords(example_dataset):\n", + " #... use set instead of list to avoid duplicates\n", + " time_invariant_dims_and_coords = set()\n", + " for c in example_dataset.coords:\n", + " if 'time' not in c:\n", + " time_invariant_dims_and_coords.add(c)\n", + " for d in example_dataset.dims:\n", + " if 'time' not in d:\n", + " time_invariant_dims_and_coords.add(d)\n", + " \n", + " #... convert set to tuple\n", + " time_invariant_dims_and_coords = tuple(time_invariant_dims_and_coords)\n", + " print(f'\\ntime_invariant_dims_and_coords: {time_invariant_dims_and_coords}')\n", + " \n", + " return time_invariant_dims_and_coords\n", + " " + ] + }, + { + "cell_type": "markdown", + "id": "2709802e-388d-4891-a527-07fa84a60db8", + "metadata": {}, + "source": [ + "# Run from here" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "dab59e61-3ec9-4636-8877-98b5fa775ce9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/home/jpluser/.netrc \n" + ] + } + ], + "source": [ + "_netrc = join(expanduser('~'), \"_netrc\" if system()==\"Windows\" else \".netrc\")\n", + "print(_netrc, type(_netrc))" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "addd538d-9299-4ea5-92e3-541e2e96b52d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "32\n" + ] + } + ], + "source": [ + "import multiprocessing\n", + "ncpus = multiprocessing.cpu_count()\n", + "print(ncpus)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "9496d31f-eed2-47b8-9159-c6e36fd75bf8", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "=============== 05DEG_DAILY ===========\n", + "13\n", + "['ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_BOLUS_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_HEAT_FLUX_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_OBP_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_OCEAN_VEL_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_SEA_ICE_CONC_THICKNESS_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_SEA_ICE_VELOCITY_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_SSH_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_STRESS_05DEG_DAILY_V4R4',\n", + " 'ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4']\n", + "=============== LLC0090GRID_DAILY ===========\n", + "20\n", + "['ECCO_L4_ATM_STATE_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_BOLUS_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_DENS_STRAT_PRESS_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_FRESH_FLUX_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_HEAT_FLUX_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_MIXED_LAYER_DEPTH_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_OBP_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_OCEAN_3D_MOMENTUM_TEND_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_OCEAN_3D_SALINITY_FLUX_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_OCEAN_3D_TEMPERATURE_FLUX_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_OCEAN_3D_VOLUME_FLUX_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_OCEAN_BOLUS_STREAMFUNCTION_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_OCEAN_VEL_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_SEA_ICE_CONC_THICKNESS_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_SEA_ICE_HORIZ_VOLUME_FLUX_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_SEA_ICE_SALT_PLUME_FLUX_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_SEA_ICE_VELOCITY_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_SSH_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_STRESS_LLC0090GRID_DAILY_V4R4',\n", + " 'ECCO_L4_TEMP_SALINITY_LLC0090GRID_DAILY_V4R4']\n", + "=============== GRID_SNAP ===========\n", + "5\n", + "['ECCO_L4_OBP_LLC0090GRID_SNAPSHOT_V4R4',\n", + " 'ECCO_L4_SEA_ICE_CONC_THICKNESS_LLC0090GRID_SNAPSHOT_V4R4',\n", + " 'ECCO_L4_SEA_ICE_VELOCITY_LLC0090GRID_SNAPSHOT_V4R4',\n", + " 'ECCO_L4_SSH_LLC0090GRID_SNAPSHOT_V4R4',\n", + " 'ECCO_L4_TEMP_SALINITY_LLC0090GRID_SNAPSHOT_V4R4']\n", + "=============== GEO ===========\n", + "2\n", + "['ECCO_L4_GEOMETRY_05DEG_V4R4', 'ECCO_L4_GEOMETRY_LLC0090GRID_V4R4']\n", + "=============== MIX_COEFFS ===========\n", + "2\n", + "['ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4',\n", + " 'ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4']\n" + ] + } + ], + "source": [ + "## time varying\n", + "\n", + "#globs = []\n", + "#globs = ['GEOMETRY', 'MIX_COEFFS', '05DEG_MONTHLY', 'LLC0090GRID_MONTHLY']\n", + "\n", + "#globs = ['05DEG_MONTHLY', 'LLC0090GRID_MONTHLY']\n", + "globs = [ '05DEG_DAILY', 'LLC0090GRID_DAILY', 'GRID_SNAP',\n", + " 'GEO', 'MIX_COEFFS']\n", + "\n", + "ds_names_all = get_dataset_names()\n", + "\n", + "ds_names_dict = dict()\n", + "for glob in globs:\n", + " ds_names_dict[glob] = []\n", + " \n", + "for ds_name in ds_names_all:\n", + " for glob in globs:\n", + "\n", + " if glob in ds_name:\n", + " ds_names_dict[glob].append(ds_name)\n", + "\n", + "for glob in globs: \n", + " print(f'=============== {glob} ===========')\n", + " print(len(ds_names_dict[glob]))\n", + " pprint(ds_names_dict[glob])" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "01f0317c-1307-412a-834f-e16f045cc98d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 40, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "cfg.set({'distributed.scheduler.worker-ttl': None})" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "2e474151-893f-4e0e-a884-29ff62c0b295", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "**** Processing ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.179394s remaining\n", + "found ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_BOLUS_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.178401s remaining\n", + "found ECCO_L4_BOLUS_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.177916s remaining\n", + "found ECCO_L4_DENS_STRAT_PRESS_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.177501s remaining\n", + "found ECCO_L4_FRESH_FLUX_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_HEAT_FLUX_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.177089s remaining\n", + "found ECCO_L4_HEAT_FLUX_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.176707s remaining\n", + "found ECCO_L4_MIXED_LAYER_DEPTH_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OBP_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.176299s remaining\n", + "found ECCO_L4_OBP_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OCEAN_VEL_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.175888s remaining\n", + "found ECCO_L4_OCEAN_VEL_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_CONC_THICKNESS_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.175509s remaining\n", + "found ECCO_L4_SEA_ICE_CONC_THICKNESS_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_VELOCITY_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.175106s remaining\n", + "found ECCO_L4_SEA_ICE_VELOCITY_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SSH_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.174698s remaining\n", + "found ECCO_L4_SSH_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_STRESS_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.17429s remaining\n", + "found ECCO_L4_STRESS_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4\n", + "... not updating credentials, 3599.173887s remaining\n", + "found ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_ATM_STATE_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.173507s remaining\n", + "found ECCO_L4_ATM_STATE_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_BOLUS_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.172698s remaining\n", + "found ECCO_L4_BOLUS_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_DENS_STRAT_PRESS_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.172264s remaining\n", + "found ECCO_L4_DENS_STRAT_PRESS_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_FRESH_FLUX_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.171837s remaining\n", + "found ECCO_L4_FRESH_FLUX_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_HEAT_FLUX_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.171381s remaining\n", + "found ECCO_L4_HEAT_FLUX_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_MIXED_LAYER_DEPTH_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.170907s remaining\n", + "found ECCO_L4_MIXED_LAYER_DEPTH_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OBP_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.170504s remaining\n", + "found ECCO_L4_OBP_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OCEAN_3D_MOMENTUM_TEND_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.17009s remaining\n", + "found ECCO_L4_OCEAN_3D_MOMENTUM_TEND_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OCEAN_3D_SALINITY_FLUX_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.169717s remaining\n", + "found ECCO_L4_OCEAN_3D_SALINITY_FLUX_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OCEAN_3D_TEMPERATURE_FLUX_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.169322s remaining\n", + "found ECCO_L4_OCEAN_3D_TEMPERATURE_FLUX_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OCEAN_3D_VOLUME_FLUX_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.168931s remaining\n", + "found ECCO_L4_OCEAN_3D_VOLUME_FLUX_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OCEAN_BOLUS_STREAMFUNCTION_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.168542s remaining\n", + "found ECCO_L4_OCEAN_BOLUS_STREAMFUNCTION_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OCEAN_VEL_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.16817s remaining\n", + "found ECCO_L4_OCEAN_VEL_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_CONC_THICKNESS_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.1678s remaining\n", + "found ECCO_L4_SEA_ICE_CONC_THICKNESS_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_HORIZ_VOLUME_FLUX_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.16746s remaining\n", + "found ECCO_L4_SEA_ICE_HORIZ_VOLUME_FLUX_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_SALT_PLUME_FLUX_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.167068s remaining\n", + "found ECCO_L4_SEA_ICE_SALT_PLUME_FLUX_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_VELOCITY_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.166671s remaining\n", + "found ECCO_L4_SEA_ICE_VELOCITY_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SSH_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.166278s remaining\n", + "found ECCO_L4_SSH_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_STRESS_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.165899s remaining\n", + "found ECCO_L4_STRESS_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_TEMP_SALINITY_LLC0090GRID_DAILY_V4R4\n", + "... not updating credentials, 3599.165471s remaining\n", + "found ECCO_L4_TEMP_SALINITY_LLC0090GRID_DAILY_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_OBP_LLC0090GRID_SNAPSHOT_V4R4\n", + "... not updating credentials, 3599.165007s remaining\n", + "found ECCO_L4_OBP_LLC0090GRID_SNAPSHOT_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_CONC_THICKNESS_LLC0090GRID_SNAPSHOT_V4R4\n", + "... not updating credentials, 3599.164279s remaining\n", + "found ECCO_L4_SEA_ICE_CONC_THICKNESS_LLC0090GRID_SNAPSHOT_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SEA_ICE_VELOCITY_LLC0090GRID_SNAPSHOT_V4R4\n", + "... not updating credentials, 3599.163874s remaining\n", + "found ECCO_L4_SEA_ICE_VELOCITY_LLC0090GRID_SNAPSHOT_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_SSH_LLC0090GRID_SNAPSHOT_V4R4\n", + "... not updating credentials, 3599.163517s remaining\n", + "found ECCO_L4_SSH_LLC0090GRID_SNAPSHOT_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_TEMP_SALINITY_LLC0090GRID_SNAPSHOT_V4R4\n", + "... not updating credentials, 3599.163138s remaining\n", + "found ECCO_L4_TEMP_SALINITY_LLC0090GRID_SNAPSHOT_V4R4.json!, skipping\n", + "\n", + "**** Processing ECCO_L4_GEOMETRY_05DEG_V4R4\n", + "... not updating credentials, 3599.162694s remaining\n", + "dask dashboard at : http://127.0.0.1:8787/status\n", + "\n", + "***************************************\n", + "Selected dataset ECCO_L4_GEOMETRY_05DEG_V4R4\n", + "***************************************\n", + "\n", + "ECCO_L4_GEOMETRY_05DEG_V4R4 has 1 files\n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_05DEG_V4R4\n", + "\n", + "Processing 1 files!\n", + "** splitting url master list into 1 jobs\n", + "** starting single kerchunk processing at 2024-06-13 04:41:16.846780\n", + "... not updating credentials, 3597.153191s remaining\n", + "... starting url_subset 1 of 1\n", + "... url_subset finished processing 1 in 0.60s, time per granule 0.60s\n", + "** SINGLE KERCHUNK finished processing 1 in 0.60s, time per granule 0.60s\n", + "... updating credentials, 3596.120413s remaining\n", + "... after credential update, 3601.120413s remaining\n", + "mzz /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_05DEG_V4R4\n", + "/home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_05DEG_V4R4 \n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO\n", + "\n", + "... final reference output json filename: /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_05DEG_V4R4.json\n", + "\n", + "finding time invariant dims:\n", + "** finished processing ECCO_L4_GEOMETRY_05DEG_V4R4\n", + "\n", + "\n", + "\n", + "**** Processing ECCO_L4_GEOMETRY_LLC0090GRID_V4R4\n", + "... not updating credentials, 3598.915288s remaining\n", + "dask dashboard at : http://127.0.0.1:8787/status\n", + "\n", + "***************************************\n", + "Selected dataset ECCO_L4_GEOMETRY_LLC0090GRID_V4R4\n", + "***************************************\n", + "\n", + "ECCO_L4_GEOMETRY_LLC0090GRID_V4R4 has 1 files\n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_LLC0090GRID_V4R4\n", + "\n", + "Processing 1 files!\n", + "** splitting url master list into 1 jobs\n", + "** starting single kerchunk processing at 2024-06-13 04:41:22.105731\n", + "... not updating credentials, 3596.89424s remaining\n", + "... starting url_subset 1 of 1\n", + "... url_subset finished processing 1 in 1.58s, time per granule 1.58s\n", + "** SINGLE KERCHUNK finished processing 1 in 1.58s, time per granule 1.58s\n", + "... updating credentials, 3594.908645s remaining\n", + "... after credential update, 3600.908645s remaining\n", + "mzz /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_LLC0090GRID_V4R4\n", + "/home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_LLC0090GRID_V4R4 \n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO\n", + "\n", + "... final reference output json filename: /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_GEO/ECCO_L4_GEOMETRY_LLC0090GRID_V4R4.json\n", + "\n", + "finding time invariant dims:\n", + "** finished processing ECCO_L4_GEOMETRY_LLC0090GRID_V4R4\n", + "\n", + "\n", + "\n", + "**** Processing ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4\n", + "... not updating credentials, 3598.858753s remaining\n", + "dask dashboard at : http://127.0.0.1:8787/status\n", + "\n", + "***************************************\n", + "Selected dataset ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4\n", + "***************************************\n", + "\n", + "ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4 has 1 files\n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4\n", + "\n", + "Processing 1 files!\n", + "** splitting url master list into 1 jobs\n", + "** starting single kerchunk processing at 2024-06-13 04:41:28.497030\n", + "... not updating credentials, 3596.502941s remaining\n", + "... starting url_subset 1 of 1\n", + "... url_subset finished processing 1 in 1.71s, time per granule 1.71s\n", + "** SINGLE KERCHUNK finished processing 1 in 1.71s, time per granule 1.71s\n", + "... updating credentials, 3594.343234s remaining\n", + "... after credential update, 3601.343234s remaining\n", + "mzz /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4\n", + "/home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4 \n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS\n", + "\n", + "... final reference output json filename: /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4.json\n", + "\n", + "finding time invariant dims:\n", + "** finished processing ECCO_L4_OCEAN_3D_MIX_COEFFS_05DEG_V4R4\n", + "\n", + "\n", + "\n", + "**** Processing ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4\n", + "... not updating credentials, 3599.509189s remaining\n", + "dask dashboard at : http://127.0.0.1:8787/status\n", + "\n", + "***************************************\n", + "Selected dataset ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4\n", + "***************************************\n", + "\n", + "ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4 has 1 files\n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4\n", + "\n", + "Processing 1 files!\n", + "** splitting url master list into 1 jobs\n", + "** starting single kerchunk processing at 2024-06-13 04:41:34.404592\n", + "... not updating credentials, 3597.595379s remaining\n", + "... starting url_subset 1 of 1\n", + "... url_subset finished processing 1 in 1.96s, time per granule 1.96s\n", + "** SINGLE KERCHUNK finished processing 1 in 1.96s, time per granule 1.96s\n", + "... updating credentials, 3595.227853s remaining\n", + "... after credential update, 3601.227853s remaining\n", + "mzz /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4\n", + "/home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4 \n", + "making /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS\n", + "\n", + "... final reference output json filename: /home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_MIX_COEFFS/ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4.json\n", + "\n", + "finding time invariant dims:\n", + "** finished processing ECCO_L4_OCEAN_3D_MIX_COEFFS_LLC0090GRID_V4R4\n", + "\n", + "\n" + ] + } + ], + "source": [ + "# loop through each dataset type\n", + "debug=False\n", + "use_dask=True\n", + "\n", + "s3, credentials = init_S3FileSystem()\n", + "for glob in globs:\n", + " if debug:\n", + " output_base_dir = Path(f'/tmp/ecco_v4r4/MZZ_{glob}/')\n", + " else:\n", + " output_base_dir = Path(f'/home/jpluser/efs-mount-point/ifenty/ecco_v4r4/MZZ_{glob}/')\n", + " \n", + " ds_names_to_process = ds_names_dict[glob]\n", + "\n", + " # loop through each dataset of this type and convert\n", + " for ShortName in ds_names_to_process:\n", + " print('\\n**** Processing ', ShortName)\n", + " \n", + " credentials = update_credential(credentials)\n", + "\n", + " test_path = output_base_dir / f'{ShortName}.json'\n", + " if test_path.exists():\n", + " print(f'found {ShortName}.json!, skipping')\n", + " continue\n", + "\n", + " if use_dask:\n", + " try:\n", + " output_dir = single_kerchunk_driver_dask(ShortName, \n", + " output_base_dir=output_base_dir, \n", + " debug=debug)\n", + " except Exception as ex:\n", + " print('skdd failed with ', ex)\n", + " \n", + " else:\n", + " output_dir = single_kerchunk_driver_pool(ShortName, \n", + " output_base_dir=output_base_dir, \n", + " debug=debug)\n", + "\n", + " \n", + " # update credentials before mzz\n", + " credentials = update_credential(credentials, force=True)\n", + " print('mzz ', output_dir)\n", + " final_mzz_fname = mzz_driver(ShortName, \n", + " credentials=credentials,\n", + " single_json_output_dir=Path(output_dir), \n", + " mzz_output_dir=output_base_dir, \n", + " use_dask =use_dask, debug=debug)\n", + "\n", + " print(f'** finished processing {ShortName}\\n\\n')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3c126d93-a343-4bc4-8b11-0e2565502108", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}