Skip to content

Commit

Permalink
Formatted with black
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre0512 committed Dec 20, 2022
1 parent 6f5ad02 commit fa2265f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 47 deletions.
41 changes: 29 additions & 12 deletions lidlplus/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from getpass import getpass
from pathlib import Path

if __name__ == '__main__':
if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent))
from lidlplus import LidlPlusApi
from lidlplus.exceptions import WebBrowserException, LoginError
Expand All @@ -18,11 +18,17 @@ def get_arguments():
parser.add_argument("-l", "--language", help="language (de, be, nl, at, ...)")
parser.add_argument("-u", "--user", help="Lidl Plus login user")
parser.add_argument("-p", "--password", help="Lidl Plus login password")
parser.add_argument("--2fa", choices=["phone", "email"], default="phone", help="set 2fa method")
parser.add_argument(
"--2fa", choices=["phone", "email"], default="phone", help="set 2fa method"
)
parser.add_argument("-r", "--refresh-token", help="refresh token to authenticate")
subparser = parser.add_subparsers(title="commands", metavar="command", required=True)
subparser = parser.add_subparsers(
title="commands", metavar="command", required=True
)
auth = subparser.add_parser("auth", help="authenticate and get refresh_token")
auth.add_argument("auth", help="authenticate and get refresh_token", action="store_true")
auth.add_argument(
"auth", help="authenticate and get refresh_token", action="store_true"
)
receipt = subparser.add_parser("receipt", help="last receipt as json")
receipt.add_argument("receipt", help="last receipt as json", action="store_true")
receipt.add_argument("-a", "--all", help="fetch all receipts", action="store_true")
Expand All @@ -34,10 +40,13 @@ def check_auth():
import oic
import seleniumwire
import getpass
import webdriver_manager
except ImportError:
print("To login and receive a refresh token you need to install all auth requirements:")
print(" pip install \"lidl-plus[auth]\"")
print("You also need google chrome to be installed.")
print(
"To login and receive a refresh token you need to install all auth requirements:\n"
' pip install "lidl-plus[auth]"\n'
"You also need google chrome to be installed."
)
exit(1)


Expand All @@ -46,15 +55,21 @@ def lidl_plus_login(args):
country = args.get("country") or input("Enter your country (de, at, ...): ")
if args.get("refresh_token"):
return LidlPlusApi(language, country, args.get("refresh_token"))
username = args.get("username") or input("Enter your lidl plus username (phone number): ")
username = args.get("username") or input(
"Enter your lidl plus username (phone number): "
)
password = args.get("password") or getpass("Enter your lidl plus password: ")
check_auth()
lidl_plus = LidlPlusApi(language, country)
try:
text = f"Enter the verify code you received via {args['2fa']}: "
lidl_plus.login(username, password, lambda: input(text), verify_mode=args["2fa"])
lidl_plus.login(
username, password, lambda: input(text), verify_mode=args["2fa"]
)
except WebBrowserException:
print("Can't connect to web browser. Please install Chrome, Chromium or Firefox")
print(
"Can't connect to web browser. Please install Chrome, Chromium or Firefox"
)
exit(101)
except LoginError:
print("Login failed. Check your username and password")
Expand All @@ -65,7 +80,9 @@ def lidl_plus_login(args):
def print_refresh_token(args):
lidl_plus = lidl_plus_login(args)
length = len(token := lidl_plus.refresh_token) - len("refresh token")
print(f"{'-' * (length // 2)} refresh token {'-' * (length // 2 - 1)}\n{token}\n{'-' * len(token)}")
print(
f"{'-' * (length // 2)} refresh token {'-' * (length // 2 - 1)}\n{token}\n{'-' * len(token)}"
)


def print_tickets(args):
Expand All @@ -92,5 +109,5 @@ def start():
print("Aborted.")


if __name__ == '__main__':
if __name__ == "__main__":
start()
110 changes: 75 additions & 35 deletions lidlplus/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,20 @@ def token(self):
def _register_oauth_client(self):
from oic.oic import Client
from oic.utils.authn.client import CLIENT_AUTHN_METHOD

if self._login_url:
return self._login_url
client = Client(client_authn_method=CLIENT_AUTHN_METHOD, client_id=self._CLIENT_ID)
client = Client(
client_authn_method=CLIENT_AUTHN_METHOD, client_id=self._CLIENT_ID
)
client.provider_config(self._AUTH_API)
code_challenge, self._code_verifier = client.add_code_challenge()
args = {
"client_id": client.client_id,
"response_type": "code",
"scope": ["openid profile offline_access lpprofile lpapis"],
"redirect_uri": f'{self._APP}://callback',
**code_challenge
"redirect_uri": f"{self._APP}://callback",
**code_challenge,
}
auth_req = client.construct_AuthorizationRequest(request_args=args)
self._login_url = auth_req.request(client.authorization_endpoint)
Expand All @@ -56,27 +59,35 @@ def _init_chrome(self, headless=True):
from getuseragent import UserAgent
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service as ChromeService

user_agent = UserAgent(self._OS.lower()).Random()
logging.getLogger('WDM').setLevel(logging.NOTSET)
logging.getLogger("WDM").setLevel(logging.NOTSET)
options = webdriver.ChromeOptions()
if headless:
options.add_argument('headless')
options.add_argument("headless")
options.add_experimental_option("mobileEmulation", {"userAgent": user_agent})
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
return webdriver.Chrome(
service=ChromeService(ChromeDriverManager().install()), options=options
)

def _init_firefox(self, headless=True):
from seleniumwire import webdriver
from getuseragent import UserAgent
from webdriver_manager.firefox import GeckoDriverManager

user_agent = UserAgent(self._OS.lower()).Random()
logging.getLogger('WDM').setLevel(logging.NOTSET)
logging.getLogger("WDM").setLevel(logging.NOTSET)
options = webdriver.FirefoxOptions()
if headless:
options.headless = True
profile = webdriver.FirefoxProfile()
profile.set_preference("general.useragent.override", user_agent)
return webdriver.Firefox(executable_path=GeckoDriverManager().install(), firefox_binary="/usr/bin/firefox",
options=options, firefox_profile=profile)
return webdriver.Firefox(
executable_path=GeckoDriverManager().install(),
firefox_binary="/usr/bin/firefox",
options=options,
firefox_profile=profile,
)

def _get_browser(self, headless=True):
try:
Expand All @@ -89,74 +100,103 @@ def _get_browser(self, headless=True):

def _auth(self, payload):
headers = {
'Authorization': f'Basic {base64.b64encode(f"{self._CLIENT_ID}:secret".encode()).decode()}',
'Content-Type': 'application/x-www-form-urlencoded',
"Authorization": f'Basic {base64.b64encode(f"{self._CLIENT_ID}:secret".encode()).decode()}',
"Content-Type": "application/x-www-form-urlencoded",
}
response = requests.post(f"{self._AUTH_API}/connect/token", headers=headers, data=payload).json()
response = requests.post(
f"{self._AUTH_API}/connect/token", headers=headers, data=payload
).json()
self._expires = datetime.utcnow() + timedelta(seconds=response["expires_in"])
self._token = response["access_token"]
self._refresh_token = response["refresh_token"]

def _renew_token(self):
payload = {'refresh_token': self._refresh_token, 'grant_type': 'refresh_token'}
payload = {"refresh_token": self._refresh_token, "grant_type": "refresh_token"}
return self._auth(payload)

def _authorization_code(self, code):
payload = {
"grant_type": 'authorization_code',
"grant_type": "authorization_code",
"code": code,
"redirect_uri": f'{self._APP}://callback',
"code_verifier": self._code_verifier
"redirect_uri": f"{self._APP}://callback",
"code_verifier": self._code_verifier,
}
return self._auth(payload)

def login(self, phone, password, verify_token_func, headless=True, verify_mode="phone"):
def login(
self, phone, password, verify_token_func, headless=True, verify_mode="phone"
):
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import TimeoutException

if verify_mode not in ["phone", "email"]:
raise ValueError("Only \"phone\" or \"email\" supported")
raise ValueError('Only "phone" or "email" supported')
browser = self._get_browser(headless=headless)
browser.get(f"{self._register_oauth_client()}&Country={self._country}&language={self._language}-{self._country}")
browser.get(
f"{self._register_oauth_client()}&Country={self._country}&language={self._language}-{self._country}"
)
wait = WebDriverWait(browser, 10)
wait.until(expected_conditions.visibility_of_element_located((By.ID, "button_welcome_login"))).click()
wait.until(expected_conditions.visibility_of_element_located((By.NAME, "EmailOrPhone"))).send_keys(phone)
wait.until(
expected_conditions.visibility_of_element_located(
(By.ID, "button_welcome_login")
)
).click()
wait.until(
expected_conditions.visibility_of_element_located((By.NAME, "EmailOrPhone"))
).send_keys(phone)
browser.find_element(By.ID, "button_btn_submit_email").click()
browser.find_element(By.ID, "button_btn_submit_email").click()
try:
wait.until(expected_conditions.element_to_be_clickable((By.ID, "field_Password"))).send_keys(password)
wait.until(
expected_conditions.element_to_be_clickable((By.ID, "field_Password"))
).send_keys(password)
browser.find_element(By.ID, "button_submit").click()
element = wait.until(expected_conditions.visibility_of_element_located((By.CLASS_NAME, verify_mode)))
element = wait.until(
expected_conditions.visibility_of_element_located(
(By.CLASS_NAME, verify_mode)
)
)
except TimeoutException:
raise LoginError("Wrong credentials")
element.find_element(By.TAG_NAME, "button").click()
verify_code = verify_token_func()
browser.find_element(By.NAME, "VerificationCode").send_keys(verify_code)
browser.find_element(By.CLASS_NAME, "role_next").click()
code = re.findall("code=([0-9A-F]+)", browser.requests[-1].response.headers.get("Location", ""))[0]
code = re.findall(
"code=([0-9A-F]+)",
browser.requests[-1].response.headers.get("Location", ""),
)[0]
self._authorization_code(code)

def _default_headers(self):
if (not self._token and self._refresh_token) or datetime.utcnow() >= self._expires:
if (
not self._token and self._refresh_token
) or datetime.utcnow() >= self._expires:
self._renew_token()
if not self._token:
raise Exception("You need to login!")
return {'Authorization': f'Bearer {self._token}',
'App-Version': '999.99.9',
'Operating-System': self._OS,
'App': 'com.lidl.eci.lidl.plus',
'Accept-Language': self._language
}
return {
"Authorization": f"Bearer {self._token}",
"App-Version": "999.99.9",
"Operating-System": self._OS,
"App": "com.lidl.eci.lidl.plus",
"Accept-Language": self._language,
}

def tickets(self):
url = f"{self._TICKET_API}/{self._country}/list"
ticket = requests.get(f"{url}/1", headers=self._default_headers()).json()
tickets = ticket['records']
for i in range(2, int(ticket['totalCount'] / ticket['size'] + 2)):
tickets += requests.get(f"{url}/{i}", headers=self._default_headers()).json()['records']
tickets = ticket["records"]
for i in range(2, int(ticket["totalCount"] / ticket["size"] + 2)):
tickets += requests.get(
f"{url}/{i}", headers=self._default_headers()
).json()["records"]
return tickets

def ticket(self, ticket_id):
url = f"{self._TICKET_API}/{self._country}/tickets"
return requests.get(f"{url}/{ticket_id}", headers=self._default_headers()).json()
return requests.get(
f"{url}/{ticket_id}", headers=self._default_headers()
).json()

0 comments on commit fa2265f

Please sign in to comment.