Skip to content

Commit

Permalink
black and flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
bri3d committed Dec 14, 2024
1 parent 7dbe525 commit ec77140
Show file tree
Hide file tree
Showing 28 changed files with 249 additions and 175 deletions.
50 changes: 30 additions & 20 deletions VW_Flash_GUI.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import glob
from pathlib import Path
import wx
Expand Down Expand Up @@ -34,8 +33,7 @@
dq250mqb,
dq381,
simos16,
simosshared,
haldex4motion
haldex4motion,
)

DEFAULT_STMIN = 350000
Expand Down Expand Up @@ -69,9 +67,11 @@ def module_selection_is_dq250(selection_index):
def module_selection_is_dq381(selection_index):
return selection_index == 3


def module_selection_is_haldex(selected_index):
return selected_index == 4


def split_interface_name(interface_string: str):
parts = interface_string.split("_", 1)
interface = parts[0]
Expand Down Expand Up @@ -323,7 +323,7 @@ def on_module_changed(self, event):
simos1810.s1810_flash_info,
dq250mqb.dsg_flash_info,
dq381.dsg_flash_info,
haldex4motion.haldex_flash_info
haldex4motion.haldex_flash_info,
][module_number]

def on_get_info(self, event):
Expand Down Expand Up @@ -354,16 +354,23 @@ def on_read_dtcs(self, event):
]

def flash_unlock(self, selected_file):
if module_selection_is_dq250(
self.module_choice.GetSelection()
) or module_selection_is_dq381(self.module_choice.GetSelection()) or module_selection_is_haldex(self.module_choice.GetSelection()):
self.feedback_text.AppendText("SKIPPED: Unlocking is unnecessary for Haldex/DSG\n")
if (
module_selection_is_dq250(self.module_choice.GetSelection())
or module_selection_is_dq381(self.module_choice.GetSelection())
or module_selection_is_haldex(self.module_choice.GetSelection())
):
self.feedback_text.AppendText(
"SKIPPED: Unlocking is unnecessary for Haldex/DSG\n"
)
return

input_bytes = Path(selected_file).read_bytes()
if str.endswith(selected_file, ".frf"):
self.feedback_text.AppendText("Extracting FRF for unlock...\n")
(flash_data, allowed_boxcodes,) = extract_flash.extract_flash_from_frf(
(
flash_data,
allowed_boxcodes,
) = extract_flash.extract_flash_from_frf(
input_bytes,
self.flash_info,
is_dsg=module_selection_is_dq250(self.module_choice.GetSelection()),
Expand Down Expand Up @@ -412,7 +419,10 @@ def flash_bin_file(self, selected_file, patch_cboot=False):
input_bytes = Path(self.row_obj_dict[selected_file]).read_bytes()
if str.endswith(self.row_obj_dict[selected_file], ".frf"):
self.feedback_text.AppendText("Extracting FRF...\n")
(flash_data, allowed_boxcodes,) = extract_flash.extract_flash_from_frf(
(
flash_data,
allowed_boxcodes,
) = extract_flash.extract_flash_from_frf(
input_bytes,
self.flash_info,
is_dsg=module_selection_is_dq250(self.module_choice.GetSelection()),
Expand All @@ -426,7 +436,9 @@ def flash_bin_file(self, selected_file, patch_cboot=False):
self.flash_bin(get_info=False, should_patch_cboot=patch_cboot)
elif len(input_bytes) == self.flash_info.binfile_size:
self.input_blocks = binfile.blocks_from_bin(
self.row_obj_dict[selected_file], self.flash_info, module_selection_is_haldex(self.module_choice.GetSelection())
self.row_obj_dict[selected_file],
self.flash_info,
module_selection_is_haldex(self.module_choice.GetSelection()),
)
self.flash_bin(get_info=False, should_patch_cboot=patch_cboot)
else:
Expand Down Expand Up @@ -641,14 +653,10 @@ def prepare_file(self, selected_file, output_dir):
)
output_file = Path(output_dir, "PATCHED_" + Path(selected_file).name)
outfile_data = binfile.bin_from_blocks(output_blocks, self.flash_info)
output_file.write_bytes(
outfile_data
)
output_file.write_bytes(outfile_data)

self.feedback_text.AppendText(
"File prepared and saved as : "
+ output_file.name
+ "\n"
"File prepared and saved as : " + output_file.name + "\n"
)

self.progress_bar.SetValue(0)
Expand Down Expand Up @@ -1048,6 +1056,10 @@ def on_select_extract_frf(self, event):
title = "Choose an output directory:"
dlg = wx.DirDialog(self, title)
if dlg.ShowModal() == wx.ID_OK:

def callback(progress):
wx.CallAfter(progress_dialog.Update, progress)

output_dir = dlg.GetPath()
progress_dialog = wx.ProgressDialog(
"Extracting FRF",
Expand All @@ -1056,9 +1068,6 @@ def on_select_extract_frf(self, event):
parent=self,
style=wx.PD_APP_MODAL | wx.PD_AUTO_HIDE,
)
callback = lambda progress: wx.CallAfter(
progress_dialog.Update, progress
)
frf_thread = threading.Thread(
target=self.extract_frf_task,
args=(frf_file, output_dir, callback),
Expand All @@ -1067,6 +1076,7 @@ def on_select_extract_frf(self, event):
progress_dialog.Pulse()
progress_dialog.Show()


if __name__ == "__main__":
app = wx.App(False)
frame = VW_Flash_Frame()
Expand Down
12 changes: 8 additions & 4 deletions extract_software_info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import argparse
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import csv
import itertools
import sys
Expand Down Expand Up @@ -156,9 +156,13 @@ def process_data(data: bytes, is_bin=False):
for block_number in flash_info.block_names_frf.keys():
flash_blocks[block_number] = constants.BlockData(
block_number,
flash_data[flash_info.block_names_frf[block_number]].block_bytes
if is_bin
else flash_data[flash_info.block_names_frf[block_number]],
(
flash_data[
flash_info.block_names_frf[block_number]
].block_bytes
if is_bin
else flash_data[flash_info.block_names_frf[block_number]]
),
block_name=flash_info.number_to_block_name[block_number],
)

Expand Down
1 change: 0 additions & 1 deletion extractodx.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from lib.modules import (
simos8,
simos10,
simos12,
simos18,
simos1810,
simos184,
Expand Down
3 changes: 1 addition & 2 deletions frf/decryptfrf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import pathlib
from sys import argv
from zipfile import ZipFile
import argparse
import io
import os
from lib import constants


# Implements a goofy "recursive xor" cypher used to encrypt FRF files, which at the end of the day are ZIP files containing either SGO (binary flash data) or ODX data.
def decrypt_data(key_material: bytes, encrypted_data: bytes):
output_data = bytearray()
Expand Down
55 changes: 35 additions & 20 deletions lib/binfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,39 +120,54 @@ def filter_blocks(input_blocks: dict[str, BlockData], flash_info: FlashInfo):
return input_blocks


def blocks_from_bin(bin_path: str, flash_info: FlashInfo, haldex_hack: bool = False) -> dict[str, BlockData]:
def blocks_from_bin(
bin_path: str, flash_info: FlashInfo, haldex_hack: bool = False
) -> dict[str, BlockData]:
bin_data = Path(bin_path).read_bytes()
return blocks_from_data(bin_data, flash_info, haldex_hack)


def blocks_from_data(data: bytes, flash_info: FlashInfo, haldex_hack: bool = False) -> dict[str, BlockData]:
def blocks_from_data(
data: bytes, flash_info: FlashInfo, haldex_hack: bool = False
) -> dict[str, BlockData]:
input_blocks = {}


for i in flash_info.block_names_frf.keys():
filename = flash_info.block_names_frf[i]
block_length = flash_info.block_lengths[i]

# TODO: Make this less awful

if haldex_hack:
if filename == 'FD_1DATA':
logger.info('Dynamically getting CAL length for Haldex...')
length = data[ (flash_info.binfile_layout[i] + 0x14) : (flash_info.binfile_layout[i] + 0x18)]
logger.info('Set Haldex CAL length to: '+length.hex())
block_length = struct.unpack('<I', length)[0]

if filename == 'FD_2DATA':
logger.info('Dynamically getting ASW length for Haldex...')
length = data[ (flash_info.binfile_layout[i] + 0x204) : (flash_info.binfile_layout[i] + 0x208)]
logger.info('Set Haldex ASW length to: '+length.hex())
block_length = struct.unpack('<I', length)[0]

if filename == 'FD_3DATA':
logger.info('Dynamically getting VERSION length for Haldex...')
length = data[ (flash_info.binfile_layout[i] + 0x4) : (flash_info.binfile_layout[i] + 0x8)]
logger.info('Set Haldex VERSION length to: '+length.hex())
block_length = struct.unpack('<I', length)[0]
if filename == "FD_1DATA":
logger.info("Dynamically getting CAL length for Haldex...")
length = data[
(flash_info.binfile_layout[i] + 0x14) : (
flash_info.binfile_layout[i] + 0x18
)
]
logger.info("Set Haldex CAL length to: " + length.hex())
block_length = struct.unpack("<I", length)[0]

if filename == "FD_2DATA":
logger.info("Dynamically getting ASW length for Haldex...")
length = data[
(flash_info.binfile_layout[i] + 0x204) : (
flash_info.binfile_layout[i] + 0x208
)
]
logger.info("Set Haldex ASW length to: " + length.hex())
block_length = struct.unpack("<I", length)[0]

if filename == "FD_3DATA":
logger.info("Dynamically getting VERSION length for Haldex...")
length = data[
(flash_info.binfile_layout[i] + 0x4) : (
flash_info.binfile_layout[i] + 0x8
)
]
logger.info("Set Haldex VERSION length to: " + length.hex())
block_length = struct.unpack("<I", length)[0]

input_blocks[filename] = BlockData(
i,
Expand Down
14 changes: 6 additions & 8 deletions lib/checksum.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import csv
import os
import struct
import logging
import sys

from . import fastcrc
from . import constants
Expand Down Expand Up @@ -222,12 +220,12 @@ def validate_ecm3(addresses, data_binary_cal: bytes, should_fix=False):
logger.warning("ECM3 Checksum did not match!")
if should_fix:
data_binary_cal = bytearray(data_binary_cal)
data_binary_cal[
checksum_location_cal : checksum_location_cal + 4
] = struct.pack("<I", checksum >> 32)
data_binary_cal[
checksum_location_cal + 4 : checksum_location_cal + 8
] = struct.pack("<I", checksum & 0xFFFFFFFF)
data_binary_cal[checksum_location_cal : checksum_location_cal + 4] = (
struct.pack("<I", checksum >> 32)
)
data_binary_cal[checksum_location_cal + 4 : checksum_location_cal + 8] = (
struct.pack("<I", checksum & 0xFFFFFFFF)
)
return (constants.ChecksumState.FIXED_CHECKSUM, data_binary_cal)
else:
return (constants.ChecksumState.INVALID_CHECKSUM, data_binary_cal)
4 changes: 2 additions & 2 deletions lib/connections/connection_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from udsoncan.connections import IsoTPSocketConnection
from .fake_connection import FakeConnection

from typing import Union
from typing import Optional

try:
from .j2534_connection import J2534Connection
Expand All @@ -27,7 +27,7 @@ def connection_setup(
txid,
rxid,
interface_path=None,
st_min: Union[int, None] = 350000,
st_min: Optional[int] = 350000,
dq3xx_hack=False,
):
if st_min is None:
Expand Down
2 changes: 0 additions & 2 deletions lib/connections/fake_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from udsoncan.exceptions import TimeoutException

import queue
import threading
import logging


class FakeConnection(BaseConnection):
Expand Down
6 changes: 2 additions & 4 deletions lib/connections/j2534.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
Structure,
WINFUNCTYPE,
POINTER,
cast,
c_long,
c_void_p,
c_ulong,
byref,
pointer,
)

import pprint
from enum import Enum

import logging
Expand Down Expand Up @@ -291,14 +289,14 @@ def PassThruStartPeriodicMsg(self, ChannelID, Data, MsgID=0, TimeInterval=100):
pMsg.Data = Data
pMsg.DataSize = len(Data)

result = dllPassThruStartPeriodicMsgMsgs(
result = dllPassThruStartPeriodicMsg(
ChannelID, byref(pMsg), byref(c_ulong(MsgID)), c_ulong(TimeInterval)
)

return Error_ID(result)

def PassThruStopPeriodicMsg(self, ChannelID, MsgID):
result = dllPassThruStopPeriodicMsgMsgs(ChannelID, MsgID)
result = dllPassThruStopPeriodicMsg(ChannelID, MsgID)

return Error_ID(result)

Expand Down
14 changes: 6 additions & 8 deletions lib/connections/j2534_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ def __init__(
self.baudrate = 500000

# Open the interface (connect to the DLL)
result, self.devID = self.interface.PassThruOpen()
_, self.devID = self.interface.PassThruOpen()

if debug:
result = self.interface.PassThruIoctl(
self.interface.PassThruIoctl(
Handle=0,
IoctlID=Ioctl_Flags.TX_IOCTL_SET_DLL_DEBUG_FLAGS,
ioctlInput=Ioctl_Flags.TX_IOCTL_DLL_DEBUG_FLAG_J2534_CALLS,
Expand Down Expand Up @@ -166,7 +166,7 @@ def rxthread_task(self):
while not self.exit_requested:

try:
result, data, numMessages = self.interface.PassThruReadMsgs(
_result, data, _numMessages = self.interface.PassThruReadMsgs(
self.channelID, self.protocol.value, 1, 1
)

Expand All @@ -179,15 +179,13 @@ def rxthread_task(self):
def close(self):
self.exit_requested = True
self.rxthread.join()
result = self.interface.PassThruDisconnect(self.channelID)
result = self.interface.PassThruClose(self.devID)
self.interface.PassThruDisconnect(self.channelID)
self.interface.PassThruClose(self.devID)
self.opened = False
self.logger.info("J2534 Connection closed")

def specific_send(self, payload):
result = self.interface.PassThruWriteMsgs(
self.channelID, payload, self.protocol.value
)
self.interface.PassThruWriteMsgs(self.channelID, payload, self.protocol.value)

def specific_wait_frame(self, timeout=4):
if not self.opened:
Expand Down
Loading

0 comments on commit ec77140

Please sign in to comment.