-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathversion.py
44 lines (37 loc) · 1.46 KB
/
version.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from google.cloud import storage
from google.oauth2 import service_account
from .credentials import GOOGLE_APPLICATION_CREDENTIALS
class WastewaterVersion:
def get_release(self):
bucket_name = "outbreak-ww-data"
credentials = service_account.Credentials.from_service_account_info(
GOOGLE_APPLICATION_CREDENTIALS
)
def blob_metadata(bucket_name, blob_name):
"""Prints out a blob's metadata."""
storage_client = storage.Client(credentials=credentials)
bucket = storage_client.bucket(bucket_name)
blob = bucket.get_blob(blob_name)
return blob.updated
res_demix = blob_metadata(
bucket_name, "aggregate/aggregate_demix.json")
res_demix_weekly = blob_metadata(
bucket_name, "aggregate/aggregate_demix_weekly.json")
res_metadata = blob_metadata(
bucket_name, "aggregate/aggregate_metadata.json")
res_variants = blob_metadata(
bucket_name, "aggregate/aggregate_variants.json")
version = (
"_".join(
[
"demix=" + str(res_demix),
"demix-weekly=" + str(res_demix_weekly),
"metadata=" + str(res_metadata),
"variants=" + str(res_variants),
]
)
.replace(",", "")
.replace(":", "")
.replace(" ", "-")
)
return version