diff --git a/lidlplus/__main__.py b/lidlplus/__main__.py index fa9cab0..f68a3d1 100755 --- a/lidlplus/__main__.py +++ b/lidlplus/__main__.py @@ -5,7 +5,7 @@ from getpass import getpass from pathlib import Path -if __name__ == '__main__': +if __name__ == "__main__": sys.path.insert(0, str(Path(__file__).parent.parent)) from lidlplus import LidlPlusApi from lidlplus.exceptions import WebBrowserException, LoginError @@ -18,11 +18,17 @@ def get_arguments(): parser.add_argument("-l", "--language", help="language (de, be, nl, at, ...)") parser.add_argument("-u", "--user", help="Lidl Plus login user") parser.add_argument("-p", "--password", help="Lidl Plus login password") - parser.add_argument("--2fa", choices=["phone", "email"], default="phone", help="set 2fa method") + parser.add_argument( + "--2fa", choices=["phone", "email"], default="phone", help="set 2fa method" + ) parser.add_argument("-r", "--refresh-token", help="refresh token to authenticate") - subparser = parser.add_subparsers(title="commands", metavar="command", required=True) + subparser = parser.add_subparsers( + title="commands", metavar="command", required=True + ) auth = subparser.add_parser("auth", help="authenticate and get refresh_token") - auth.add_argument("auth", help="authenticate and get refresh_token", action="store_true") + auth.add_argument( + "auth", help="authenticate and get refresh_token", action="store_true" + ) receipt = subparser.add_parser("receipt", help="last receipt as json") receipt.add_argument("receipt", help="last receipt as json", action="store_true") receipt.add_argument("-a", "--all", help="fetch all receipts", action="store_true") @@ -34,10 +40,13 @@ def check_auth(): import oic import seleniumwire import getpass + import webdriver_manager except ImportError: - print("To login and receive a refresh token you need to install all auth requirements:") - print(" pip install \"lidl-plus[auth]\"") - print("You also need google chrome to be installed.") + print( + "To login and receive a refresh token you need to install all auth requirements:\n" + ' pip install "lidl-plus[auth]"\n' + "You also need google chrome to be installed." + ) exit(1) @@ -46,15 +55,21 @@ def lidl_plus_login(args): country = args.get("country") or input("Enter your country (de, at, ...): ") if args.get("refresh_token"): return LidlPlusApi(language, country, args.get("refresh_token")) - username = args.get("username") or input("Enter your lidl plus username (phone number): ") + username = args.get("username") or input( + "Enter your lidl plus username (phone number): " + ) password = args.get("password") or getpass("Enter your lidl plus password: ") check_auth() lidl_plus = LidlPlusApi(language, country) try: text = f"Enter the verify code you received via {args['2fa']}: " - lidl_plus.login(username, password, lambda: input(text), verify_mode=args["2fa"]) + lidl_plus.login( + username, password, lambda: input(text), verify_mode=args["2fa"] + ) except WebBrowserException: - print("Can't connect to web browser. Please install Chrome, Chromium or Firefox") + print( + "Can't connect to web browser. Please install Chrome, Chromium or Firefox" + ) exit(101) except LoginError: print("Login failed. Check your username and password") @@ -65,7 +80,9 @@ def lidl_plus_login(args): def print_refresh_token(args): lidl_plus = lidl_plus_login(args) length = len(token := lidl_plus.refresh_token) - len("refresh token") - print(f"{'-' * (length // 2)} refresh token {'-' * (length // 2 - 1)}\n{token}\n{'-' * len(token)}") + print( + f"{'-' * (length // 2)} refresh token {'-' * (length // 2 - 1)}\n{token}\n{'-' * len(token)}" + ) def print_tickets(args): @@ -92,5 +109,5 @@ def start(): print("Aborted.") -if __name__ == '__main__': +if __name__ == "__main__": start() diff --git a/lidlplus/api.py b/lidlplus/api.py index bd264d6..5d39cf9 100644 --- a/lidlplus/api.py +++ b/lidlplus/api.py @@ -35,17 +35,20 @@ def token(self): def _register_oauth_client(self): from oic.oic import Client from oic.utils.authn.client import CLIENT_AUTHN_METHOD + if self._login_url: return self._login_url - client = Client(client_authn_method=CLIENT_AUTHN_METHOD, client_id=self._CLIENT_ID) + client = Client( + client_authn_method=CLIENT_AUTHN_METHOD, client_id=self._CLIENT_ID + ) client.provider_config(self._AUTH_API) code_challenge, self._code_verifier = client.add_code_challenge() args = { "client_id": client.client_id, "response_type": "code", "scope": ["openid profile offline_access lpprofile lpapis"], - "redirect_uri": f'{self._APP}://callback', - **code_challenge + "redirect_uri": f"{self._APP}://callback", + **code_challenge, } auth_req = client.construct_AuthorizationRequest(request_args=args) self._login_url = auth_req.request(client.authorization_endpoint) @@ -56,27 +59,35 @@ def _init_chrome(self, headless=True): from getuseragent import UserAgent from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service as ChromeService + user_agent = UserAgent(self._OS.lower()).Random() - logging.getLogger('WDM').setLevel(logging.NOTSET) + logging.getLogger("WDM").setLevel(logging.NOTSET) options = webdriver.ChromeOptions() if headless: - options.add_argument('headless') + options.add_argument("headless") options.add_experimental_option("mobileEmulation", {"userAgent": user_agent}) - return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) + return webdriver.Chrome( + service=ChromeService(ChromeDriverManager().install()), options=options + ) def _init_firefox(self, headless=True): from seleniumwire import webdriver from getuseragent import UserAgent from webdriver_manager.firefox import GeckoDriverManager + user_agent = UserAgent(self._OS.lower()).Random() - logging.getLogger('WDM').setLevel(logging.NOTSET) + logging.getLogger("WDM").setLevel(logging.NOTSET) options = webdriver.FirefoxOptions() if headless: options.headless = True profile = webdriver.FirefoxProfile() profile.set_preference("general.useragent.override", user_agent) - return webdriver.Firefox(executable_path=GeckoDriverManager().install(), firefox_binary="/usr/bin/firefox", - options=options, firefox_profile=profile) + return webdriver.Firefox( + executable_path=GeckoDriverManager().install(), + firefox_binary="/usr/bin/firefox", + options=options, + firefox_profile=profile, + ) def _get_browser(self, headless=True): try: @@ -89,74 +100,103 @@ def _get_browser(self, headless=True): def _auth(self, payload): headers = { - 'Authorization': f'Basic {base64.b64encode(f"{self._CLIENT_ID}:secret".encode()).decode()}', - 'Content-Type': 'application/x-www-form-urlencoded', + "Authorization": f'Basic {base64.b64encode(f"{self._CLIENT_ID}:secret".encode()).decode()}', + "Content-Type": "application/x-www-form-urlencoded", } - response = requests.post(f"{self._AUTH_API}/connect/token", headers=headers, data=payload).json() + response = requests.post( + f"{self._AUTH_API}/connect/token", headers=headers, data=payload + ).json() self._expires = datetime.utcnow() + timedelta(seconds=response["expires_in"]) self._token = response["access_token"] self._refresh_token = response["refresh_token"] def _renew_token(self): - payload = {'refresh_token': self._refresh_token, 'grant_type': 'refresh_token'} + payload = {"refresh_token": self._refresh_token, "grant_type": "refresh_token"} return self._auth(payload) def _authorization_code(self, code): payload = { - "grant_type": 'authorization_code', + "grant_type": "authorization_code", "code": code, - "redirect_uri": f'{self._APP}://callback', - "code_verifier": self._code_verifier + "redirect_uri": f"{self._APP}://callback", + "code_verifier": self._code_verifier, } return self._auth(payload) - def login(self, phone, password, verify_token_func, headless=True, verify_mode="phone"): + def login( + self, phone, password, verify_token_func, headless=True, verify_mode="phone" + ): from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions from selenium.webdriver.support.ui import WebDriverWait from selenium.common.exceptions import TimeoutException + if verify_mode not in ["phone", "email"]: - raise ValueError("Only \"phone\" or \"email\" supported") + raise ValueError('Only "phone" or "email" supported') browser = self._get_browser(headless=headless) - browser.get(f"{self._register_oauth_client()}&Country={self._country}&language={self._language}-{self._country}") + browser.get( + f"{self._register_oauth_client()}&Country={self._country}&language={self._language}-{self._country}" + ) wait = WebDriverWait(browser, 10) - wait.until(expected_conditions.visibility_of_element_located((By.ID, "button_welcome_login"))).click() - wait.until(expected_conditions.visibility_of_element_located((By.NAME, "EmailOrPhone"))).send_keys(phone) + wait.until( + expected_conditions.visibility_of_element_located( + (By.ID, "button_welcome_login") + ) + ).click() + wait.until( + expected_conditions.visibility_of_element_located((By.NAME, "EmailOrPhone")) + ).send_keys(phone) browser.find_element(By.ID, "button_btn_submit_email").click() browser.find_element(By.ID, "button_btn_submit_email").click() try: - wait.until(expected_conditions.element_to_be_clickable((By.ID, "field_Password"))).send_keys(password) + wait.until( + expected_conditions.element_to_be_clickable((By.ID, "field_Password")) + ).send_keys(password) browser.find_element(By.ID, "button_submit").click() - element = wait.until(expected_conditions.visibility_of_element_located((By.CLASS_NAME, verify_mode))) + element = wait.until( + expected_conditions.visibility_of_element_located( + (By.CLASS_NAME, verify_mode) + ) + ) except TimeoutException: raise LoginError("Wrong credentials") element.find_element(By.TAG_NAME, "button").click() verify_code = verify_token_func() browser.find_element(By.NAME, "VerificationCode").send_keys(verify_code) browser.find_element(By.CLASS_NAME, "role_next").click() - code = re.findall("code=([0-9A-F]+)", browser.requests[-1].response.headers.get("Location", ""))[0] + code = re.findall( + "code=([0-9A-F]+)", + browser.requests[-1].response.headers.get("Location", ""), + )[0] self._authorization_code(code) def _default_headers(self): - if (not self._token and self._refresh_token) or datetime.utcnow() >= self._expires: + if ( + not self._token and self._refresh_token + ) or datetime.utcnow() >= self._expires: self._renew_token() if not self._token: raise Exception("You need to login!") - return {'Authorization': f'Bearer {self._token}', - 'App-Version': '999.99.9', - 'Operating-System': self._OS, - 'App': 'com.lidl.eci.lidl.plus', - 'Accept-Language': self._language - } + return { + "Authorization": f"Bearer {self._token}", + "App-Version": "999.99.9", + "Operating-System": self._OS, + "App": "com.lidl.eci.lidl.plus", + "Accept-Language": self._language, + } def tickets(self): url = f"{self._TICKET_API}/{self._country}/list" ticket = requests.get(f"{url}/1", headers=self._default_headers()).json() - tickets = ticket['records'] - for i in range(2, int(ticket['totalCount'] / ticket['size'] + 2)): - tickets += requests.get(f"{url}/{i}", headers=self._default_headers()).json()['records'] + tickets = ticket["records"] + for i in range(2, int(ticket["totalCount"] / ticket["size"] + 2)): + tickets += requests.get( + f"{url}/{i}", headers=self._default_headers() + ).json()["records"] return tickets def ticket(self, ticket_id): url = f"{self._TICKET_API}/{self._country}/tickets" - return requests.get(f"{url}/{ticket_id}", headers=self._default_headers()).json() + return requests.get( + f"{url}/{ticket_id}", headers=self._default_headers() + ).json()