-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsharppyify.py
103 lines (93 loc) · 4.92 KB
/
sharppyify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
# Converts csv files generated by NOAA's SkySonde to SPC/SHARPpy compatible format
# Created 12 July 2022 by Sam Gardner <[email protected]>
import pandas as pd
import numpy as np
from os import path, remove
import sys
from datetime import datetime as dt, timedelta
from metpy import calc as mpcalc
from metpy.units import units
# python3 sharppyify.py <input> <ICAO> <output>
def generateSounding(data, type):
validTime = data.index[0]
if type == "down":
data = data[::-1]
sharppyHeader = "%TITLE%\n "+sys.argv[2]+" "+validTime.strftime("%Y%m%d/%H%M")[2:]+"\n\n LEVEL HGHT TEMP DWPT WDIR WSPD\n-------------------------------------------------------------------\n%RAW%\n"
outputData = pd.DataFrame()
outputData["PRES"] = data["iMet pressure [mb]"]
outputData["HGHT"] = data["altitude (from iMet PTU) [km]"] * 1000
outputData["TEMP"] = data["iMet air temperature (corrected) [deg C]"]
outputData["DWPT"] = [mpcalc.dewpoint_from_relative_humidity((row["iMet air temperature (corrected) [deg C]"] * units.degC), (row["iMet humidity [RH %]"] / 100)).magnitude for _, row in data.iterrows()]
outputData["WDIR"] = data["GPS wind direction [deg]"]
outputData["WSPD"] = [(speed * units.meters / units.seconds).to("knots").magnitude for speed in data["GPS wind speed [m/s]"]]
outputData = outputData.dropna(axis=0, how="any")
sharppyFooter = "\n%END%"
csvStr = outputData.to_csv(index=False, columns=["PRES", "HGHT", "TEMP", "DWPT", "WDIR", "WSPD"], header=False)
toExport = sharppyHeader+csvStr+sharppyFooter
outputPath = sys.argv[3].replace(".txt", "")+"_"+type.upper()+".txt"
if path.exists(outputPath):
remove(outputPath)
outputFile = open(outputPath, "w")
outputFile.write(toExport)
outputFile.close()
if __name__ == "__main__":
fileHandle = open(sys.argv[1])
lastLine = ""
while "date" not in lastLine:
lastLine = fileHandle.readline()
csvData = pd.read_csv(fileHandle, header=None, names=lastLine.split(", "))
fileHandle.close()
csvData = csvData.replace(99999, np.nan)
launchTime = None
burstTime = None
landTime = None
pyDts = list()
for idx, row in csvData.iterrows():
pyDts.append(dt.strptime(row["date [y-m-d GMT]"]+row["time [h:m:s GMT]"], "%Y-%m-%d %H:%M:%S") + timedelta(milliseconds=int(row["milliseconds"])))
if idx < 2:
continue
if launchTime == None:
if csvData.iloc[idx-2]["iMet ascent rate [m/s]"] >= 0.7:
if csvData.iloc[idx-1]["iMet ascent rate [m/s]"] >= 0.7:
if csvData.iloc[idx]["iMet ascent rate [m/s]"] >= 0.7:
launchRow = csvData.iloc[idx-2]
launchTime = dt.strptime(launchRow["date [y-m-d GMT]"]+launchRow["time [h:m:s GMT]"], "%Y-%m-%d %H:%M:%S") + timedelta(milliseconds=int(launchRow["milliseconds"]))
print("Launch detected at time "+launchTime.strftime("%Y-%m-%d %H:%M:%S.%f"))
elif burstTime == None:
if csvData.iloc[idx-2]["iMet ascent rate [m/s]"] <= -0.7:
if csvData.iloc[idx-1]["iMet ascent rate [m/s]"] <= -0.7:
if csvData.iloc[idx]["iMet ascent rate [m/s]"] <= -0.7:
burstRow = csvData.iloc[idx-2]
burstTime = dt.strptime(burstRow["date [y-m-d GMT]"]+burstRow["time [h:m:s GMT]"], "%Y-%m-%d %H:%M:%S") + timedelta(milliseconds=int(burstRow["milliseconds"]))
print("Burst detected at time "+burstTime.strftime("%Y-%m-%d %H:%M:%S.%f"))
elif landTime == None:
if csvData.iloc[idx-2]["iMet ascent rate [m/s]"] >= -0.2:
if csvData.iloc[idx-1]["iMet ascent rate [m/s]"] >= -0.2:
if csvData.iloc[idx]["iMet ascent rate [m/s]"] >= -0.2:
landRow = csvData.iloc[idx-2]
landTime = dt.strptime(landRow["date [y-m-d GMT]"]+landRow["time [h:m:s GMT]"], "%Y-%m-%d %H:%M:%S") + timedelta(milliseconds=int(landRow["milliseconds"]))
print("Landing detected at time "+landTime.strftime("%Y-%m-%d %H:%M:%S.%f"))
csvData["pyDateTimes"] = pyDts
csvData = csvData.set_index(["pyDateTimes"], drop=True)
upSounding = None
downSounding = None
if launchTime == None:
print("No launch detected, I guess try launching another balloon :/")
else:
if burstTime == None:
upSounding = csvData[launchTime:]
else:
upSounding = csvData[launchTime:burstTime]
if landTime == None:
downSounding = csvData[burstTime:]
else:
downSounding = csvData[burstTime:landTime]
if type(upSounding) == type(None):
pass
else:
generateSounding(upSounding, "up")
if type(downSounding) == type(None):
pass
else:
generateSounding(downSounding, "down")