Skip to content

Commit

Permalink
using autlib for fetch token added lifetime of token check renew if i…
Browse files Browse the repository at this point in the history
…t is too old
  • Loading branch information
unl0ck committed Aug 6, 2024
1 parent 4265e1a commit 996e68a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 28 deletions.
9 changes: 7 additions & 2 deletions read_live_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from viessmann_gridbox_connector import GridboxConnector
from importlib.resources import files
import json

import time
loop = True
config_file = files('viessmann_gridbox_connector').joinpath('config.json')
with open(config_file, 'r') as file:
data = json.load(file)
Expand All @@ -10,4 +11,8 @@
connector = GridboxConnector(data)
# Retrieve live data
live_data = connector.retrieve_live_data()
print(live_data)
print(live_data)
while loop:
live_data = connector.retrieve_live_data()
print(live_data)
time.sleep(60)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
requests
authlib
64 changes: 39 additions & 25 deletions viessmann_gridbox_connector/GridboxConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@
import time
import logging
import os
from authlib.integrations.requests_client import OAuth2Session

class GridboxConnector:
id_token: str = ""
gateways: list[str] = []
token: dict = {}
client: OAuth2Session = None
config: dict = {}
username: str = ""
password: str = ""

def __init__(self, config):
self.init_logging()
self.config = config
self.login_url = config["urls"]["login"]
self.login_body = config["login"]
self.gateway_url = config["urls"]["gateways"]
self.live_url = config["urls"]["live"]
self.username = os.getenv('USERNAME', self.login_body["username"])
self.password = os.getenv('PASSWORD', self.login_body["password"])
self.init_auth()

def init_logging(self):
Expand All @@ -23,37 +32,42 @@ def init_logging(self):
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)


def get_new_token(self):
self.token = self.client.fetch_token(
self.login_url,
username=self.login_body["username"],
password=self.login_body["password"],
grant_type=self.login_body['grant_type'],
audience=self.login_body['audience'],
realm=self.login_body['realm'],
scope=self.login_body['scope']
)
self.logger.debug(f"Token expires at {self.token['expires_at']}")

# Funktion zum Überprüfen und Erneuern des Tokens
def ensure_valid_token(self):
# Prüfe, ob das Token abgelaufen ist
expires_at = self.token.get('expires_at')
if expires_at is None or expires_at < time.time():
self.logger.info("Token ist abgelaufen oder nicht vorhanden, erneuern...")
self.get_new_token()

def get_header(self):
self.ensure_valid_token()
return {"Authorization": f'Bearer {self.token["id_token"]}'}

def init_auth(self):
self.get_token()
self.generate_header()
client_id = self.login_body["client_id"]
client_secret = self.login_body["client_secret"]
self.client = OAuth2Session(client_id, client_secret, scope=self.login_body["scope"])
self.get_new_token()
self.get_gateway_id()

def get_token(self):
try:
response = requests.post(self.login_url, self.login_body)
response_json = response.json()
self.logger.debug(response_json)
if "id_token" in response_json:
self.id_token = response_json["id_token"]
else:
self.logger.warn("token not found")
print(response_json)
time.sleep(60)
self.get_token()
except Exception as e:
self.logger.error(e)
time.sleep(60)
self.get_token()

def generate_header(self):
self.headers = {"Authorization": "Bearer {}".format(self.id_token)}

def get_gateway_id(self):
self.gateways.clear()
try:
response = requests.get(self.gateway_url, headers=self.headers)
response = requests.get(self.gateway_url, headers=self.get_header())
response_json = response.json()
for gateway in response_json:
self.gateways.append(gateway["system"]["id"])
Expand All @@ -66,7 +80,7 @@ def retrieve_live_data(self):
responses = []
for id in self.gateways:
try:
response = requests.get(self.live_url.format(id), headers=self.headers)
response = requests.get(self.live_url.format(id), headers=self.get_header())
if response.status_code == 200:
response_json = response.json()
responses.append(response_json)
Expand Down
3 changes: 2 additions & 1 deletion viessmann_gridbox_connector/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"audience": "my.gridx",
"client_id": "oZpr934Ikn8OZOHTJEcrgXkjio0I0Q7b",
"scope": "email openid",
"realm": "viessmann-authentication-db"
"realm": "viessmann-authentication-db",
"client_secret": ""
}
}

0 comments on commit 996e68a

Please sign in to comment.