Skip to content

Commit

Permalink
Merge pull request #94 from airalab/dev
Browse files Browse the repository at this point in the history
refactoring
  • Loading branch information
tubleronchik authored Oct 25, 2024
2 parents a5c24de + a099703 commit 4451989
Show file tree
Hide file tree
Showing 28 changed files with 442 additions and 240 deletions.
4 changes: 2 additions & 2 deletions connectivity/src/drivers/sds011.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import serial

from ...constants import MOBILE_GPS
from ..sensors import SensorSDS011
from connectivity.constants import MOBILE_GPS
from connectivity.src.sensors import SensorSDS011


def sds011_codec(data: bytes, pk: str) -> dict:
Expand Down
196 changes: 33 additions & 163 deletions connectivity/src/feeders/datalog_feeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
Every `dump_interval` (from the config) the buffer writes to the file which pins to IPFS.
IPFS hash of the file sends to Robonomics Datalog.
"""
from crustinterface import Mainnet
import json
import logging.config
import os
import threading
import time
import typing as tp
from tempfile import NamedTemporaryFile

import ipfshttpclient2
import requests
from pinatapy import PinataPy
from prometheus_client import Enum
from robonomicsinterface import RWS, Account, Datalog

from connectivity.config.logging import LOGGING_CONFIG
from connectivity.utils.datalog_db import DatalogDB
from connectivity.utils.ipfs_db import IPFSDB
from ...constants import MOBILE_GPS
from connectivity.utils.datalog_payload import (
create_payload,
sort_payload,
create_tmp_file,
)
from connectivity.constants import PING_MODEL

from ...constants import PING_MODEL
from .ifeeder import IFeeder
from .pinning_services import PinningManager

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("sensors-connectivity")
Expand All @@ -45,123 +45,6 @@
)


def _sort_payload(data: dict) -> dict:
"""Sort measurements dict with timestamp.
:param data: Measurements dict.
:return: Sorted measurements dict.
"""

ordered = {}
for k, v in data.items():
meas = sorted(v["measurements"], key=lambda x: x["timestamp"])
if v.get("geo"):
ordered[k] = {"model": v["model"], "geo": v["geo"], "donated_by": v["donated_by"], "measurements": meas}
else:
ordered[k] = {"model": v["model"], "donated_by": v["donated_by"], "measurements": meas}
return ordered


def _get_multihash(buf: set, db: object, endpoint: str = "/ip4/127.0.0.1/tcp/5001/http") -> tuple:
"""Write sorted measurements to the temp file, add file to IPFS and add
measurements and hash in the database with 'not sent' status.
:param buf: Set of measurements from all sensors.
:param db: Database class object.
:param endpoint: Endpoint for IPFS node. Default is local.
:return: IPFS hash of the file and path to the temp file.
"""

payload = {}
for m in buf:
try:
if m.public in payload:
payload[m.public]["measurements"].append(m.measurement)
else:
if m.model == MOBILE_GPS:
payload[m.public] = {
"model": m.model,
"donated_by": m.donated_by,
"measurements": [m.measurement],
}
else:
payload[m.public] = {
"model": m.model,
"geo": "{},{}".format(m.geo_lat, m.geo_lon),
"donated_by": m.donated_by,
"measurements": [m.measurement],
}
except Exception as e:
logger.warning(f"Datalog Feeder: Couldn't store data: {e}")


logger.debug(f"Payload before sorting: {payload}")
payload = _sort_payload(payload)
logger.debug(f"Payload sorted: {payload}")
try:
temp = NamedTemporaryFile(mode="w", delete=False)
logger.debug(f"Created temp file: {temp.name}")
temp.write(json.dumps(payload))
temp.close()
DATALOG_MEMORY_METRIC.state("success")
except Exception as e:
DATALOG_MEMORY_METRIC.state("error")

with ipfshttpclient2.connect(endpoint) as client:
response = client.add(temp.name)
db.add_data("not sent", response["Hash"], time.time(), json.dumps(payload))
return (response["Hash"], temp.name, response["Size"])


def _pin_to_pinata(file_path: str, config: dict) -> None:
"""Pin file to Pinata for for better accessibility.
Need to provide pinata credentials in the config file.
:param file_path: Path to the temp file.
:param config: Configuration dictionary.
"""

pinata_api = config["datalog"]["pinata_api"]
pinata_secret = config["datalog"]["pinata_secret"]
if pinata_secret:
try:
logger.info("DatalogFeeder: Pinning file to Pinata")
pinata = PinataPy(pinata_api, pinata_secret)
pinata.pin_file_to_ipfs(path_to_file=file_path, save_absolute_paths=False)
hash = pinata.pin_list()["rows"][0]["ipfs_pin_hash"]
logger.info(f"DatalogFeeder: File sent to pinata. Hash is {hash}")
except Exception as e:
logger.warning(f"DatalogFeeder: Failed while pining file to Pinata. Error: {e}")


def _upload_to_crust(hash: str, file_size: int, seed: str) -> None:
mainnet = Mainnet(seed=seed)
try:
# Check balance
balance = mainnet.get_balance()
logger.debug(f"DatalogFeeder: Actual balance in crust network - {balance}")

# Check price in Main net. Price in pCRUs
price = mainnet.get_appx_store_price(file_size)
logger.debug(f"DatalogFeeder: Approximate cost to store the file - {price}")

except Exception as e:
logger.warning(f"DatalogFeeder: Error while getting account balance - {e}")
return None

if price >= balance:
logger.warning(f"DatalogFeeder: Not enough account balance to store the file in Crust Network")
return None

try:
logger.info(f"DatalogFeeder: Start adding {hash} to crust with size {file_size}")
file_stored = mainnet.store_file(hash, file_size)
logger.info(f"DatalogFeeder: File stored in Crust. Extrinsic data is {file_stored}")
except Exception as e:
logger.warning(f"error while uploading file to crust - {e}")
return None


class DatalogFeeder(IFeeder):
"""
The feeder is responsible for collecting measurements and
Expand All @@ -180,15 +63,13 @@ def __init__(self, config) -> None:
self.last_time: float = time.time()
self.buffer: set = set()
self.interval: int = self.config["datalog"]["dump_interval"]
self.ipfs_endpoint: str = (
config["robonomics"]["ipfs_provider"]
if config["robonomics"]["ipfs_provider"]
else "/ip4/127.0.0.1/tcp/5001/http"
self.datalog_db: DatalogDB = DatalogDB(
self.config["general"]["datalog_db_path"]
)
self.datalog_db: DatalogDB = DatalogDB(self.config["general"]["datalog_db_path"])
self.ipfs_db: IPFSDB = IPFSDB(self.config["general"]["ipfs_db_path"])
self.datalog_db.create_table()
self.ipfs_db.create_table()
self.pinning_manager = PinningManager(self.config)

def feed(self, data: tp.List[dict]) -> None:
"""Main function of the feeder and it is called in `main.py`. It collects
Expand All @@ -199,54 +80,41 @@ def feed(self, data: tp.List[dict]) -> None:
"""
if self.config["datalog"]["enable"]:
if data:
for d in data:
for d in data:
if d.public and d.model != PING_MODEL:
logger.debug(f"DatalogFeeder: Adding data to buffer: {d}")
self.buffer.add(d)

if (time.time() - self.last_time) >= self.interval:
if self.buffer:
self.last_time = time.time()
logger.debug("Datalog Feeder: About to publish collected data...")
logger.debug(
"Datalog Feeder: About to publish collected data..."
)
logger.debug(f"Datalog Feeder: Buffer is {self.buffer}")
ipfs_hash, file_path, file_size = _get_multihash(self.buffer, self.datalog_db, self.ipfs_endpoint)
self.ipfs_db.add_hash(ipfs_hash)
self._pin_to_temporal(file_path)
_pin_to_pinata(file_path, self.config)
payload = create_payload(self.buffer)
sorted_payload = sort_payload(payload)
try:
tmp_file_path = create_tmp_file(sorted_payload)
DATALOG_MEMORY_METRIC.state("success")
except Exception as e:
DATALOG_MEMORY_METRIC.state("error")
logger.warning(
f"Datalog Feeder: couldn't create tmp file: {e}"
)

ipfs_hash = self.pinning_manager.pin_to_gateways(tmp_file_path)
self.datalog_db.add_data(
"not sent", ipfs_hash, time.time(), json.dumps(payload)
)
self.buffer = set()
_upload_to_crust(ipfs_hash, int(file_size), self.config["datalog"]["suri"])
os.unlink(file_path)
os.unlink(tmp_file_path)
self.to_datalog(ipfs_hash)
else:
logger.info("Datalog Feeder:Nothing to publish")
else:
logger.info("Datalog Feeder: Still collecting measurements...")

def _pin_to_temporal(self, file_path: str) -> None:
"""Pin file to Temporal Cloud for for better accessibility.
Need to provide corresponding credentials in the config file.
:param file_path: Path to the temp file.
"""

username = self.config["datalog"]["temporal_username"]
password = self.config["datalog"]["temporal_password"]
if username and password:
auth_url = "https://api.temporal.cloud/v2/auth/login"
token_resp = requests.post(auth_url, json={"username": username, "password": password})
token = token_resp.json()

url_add = "https://api.temporal.cloud/v2/ipfs/public/file/add"
headers = {"Authorization": f"Bearer {token['token']}"}
resp = requests.post(
url_add,
files={"file": open(file_path), "hold_time": (None, 1)},
headers=headers,
)

if resp.status_code == 200:
logger.info("Datalog Feeder: File pinned to Temporal Cloud")

def to_datalog(self, ipfs_hash: str) -> None:
"""Send IPFS hash to Robonomics Datalog. It uses seed pharse from the config file.
It can be sent either with RWS or general Datalog. To use RWS the account of the provided seed
Expand Down Expand Up @@ -275,5 +143,7 @@ def to_datalog(self, ipfs_hash: str) -> None:
DATALOG_STATUS_METRIC.state("success")
self.datalog_db.update_status("sent", ipfs_hash)
except Exception as e:
logger.warning(f"Datalog Feeder: Something went wrong during extrinsic submission to Robonomics: {e}")
logger.warning(
f"Datalog Feeder: Something went wrong during extrinsic submission to Robonomics: {e}"
)
DATALOG_STATUS_METRIC.state("error")
1 change: 1 addition & 0 deletions connectivity/src/feeders/pinning_services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .pinning_manager import PinningManager
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .crust import CrustGateway
from .local import LocalGateway
from .pinata import PinataGateway
from .temporal import TemporalGateway
from .pinning_gateway import PinArgs
61 changes: 61 additions & 0 deletions connectivity/src/feeders/pinning_services/gateways/crust.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import typing as tp
import logging.config
from crustinterface import Mainnet

from connectivity.config.logging import LOGGING_CONFIG
from .pinning_gateway import (
PinningGateway,
PinArgs,
)

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("sensors-connectivity")


class CrustGateway(PinningGateway):
def __init__(self, seed: str) -> None:
self.mainnet = Mainnet(seed=seed)


def pin(self, args: PinArgs) -> None:
file_hash: str = args.hash
file_size: int = args.file_size
if self._can_upload(file_size):
try:
logger.info(
f"CrustGateway: Start adding {file_hash} to crust with size :{file_size}"
)
file_stored = self.mainnet.store_file(file_hash, file_size)
logger.info(
f"CrustGateway: File stored in Crust. Extrinsic data is: {file_stored}"
)
except Exception as e:
logger.warning(
f"CrustGateway: error while uploading file to crust: {e}"
)
return None
else:
logger.warning(
f"CrustGateway: Not enough account balance to store the file in Crust Network"
)

def _can_upload(self, file_size: int) -> bool:
"""Check whether there is enough tokens on balance"""
balance = self._get_balance()
approximately_price = self._get_store_price(file_size)
return balance >= approximately_price

def _get_balance(self) -> tp.Optional[int]:
try:
balance = self.mainnet.get_balance()
logger.debug(f"CrustGateway: Actual balance in crust network: {balance}")
return balance
except Exception as e:
logger.warning(f"CrustGateway: Error while getting account balance: {e}")
return None

def _get_store_price(self, file_size: int) -> tp.Optional[int]:
"""Check price in Main net. Price in pCRUs"""
price = self.mainnet.get_appx_store_price(int(file_size))
logger.debug(f"CrustGateway: Approximate cost to store the file: {price}")
return price
31 changes: 31 additions & 0 deletions connectivity/src/feeders/pinning_services/gateways/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import typing as tp
import logging.config
import ipfshttpclient2

from connectivity.config.logging import LOGGING_CONFIG
from .pinning_gateway import (
PinningGateway,
PinArgs,
)

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("sensors-connectivity")


class LocalGateway(PinningGateway):
def __init__(self, endpoint: str) -> None:
self.ipfs_endpoint = endpoint

def pin(self, args: PinArgs) -> tp.Optional[tp.Tuple[str, int]]:
file_path: str = args.file_path
try:
with ipfshttpclient2.connect(self.ipfs_endpoint) as client:
response = client.add(file_path)
file_hash = response["Hash"]
file_size = response["Size"]
logger.debug(f"LocalGateway: Hash, size: {file_hash}, {file_size}")
return (file_hash, file_size)
except Exception as e:
logger.warning(
f"LocalGateway: cou;dn't add file or connect to local gateway: {e}"
)
Loading

0 comments on commit 4451989

Please sign in to comment.