-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathssd_clean.py
141 lines (117 loc) · 4.74 KB
/
ssd_clean.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import hashlib
import sys
import shutil
import concurrent.futures
def hashfile(filename, fast=True):
try:
if filename.endswith('.fseventsd') or filename.startswith('/.fseventsd') or '.fseventsd' in filename:
print(f"Skipping .fseventsd file: {filename}")
return None # Skip .fseventsd files
if 'Backups.backupdb' in filename or filename.startswith('/Volumes/.timemachine'):
print(f"Skipping Time Machine file: {filename}")
return None # skip time machine backups
if os.path.isdir(filename) or os.path.basename(filename).startswith('.'):
return None # Skip directories and system files
hasher = hashlib.md5() # not cryptographically secure, but ok for file checksums
with open (filename, "rb") as f:
if fast:
data = f.read(8192) #read first 8kb first to fasten the hashing
hasher.update(data)
else:
while True:
data = f.read(1024)
if not data:
break
hasher.update(data)
return hasher.hexdigest()
except PermissionError as e:
print(f"Permission denied: {filename}")
return None
except Exception as e:
print(f"Error hashing file {filename}: {e}")
return None
def parallel_hashing(files):
with concurrent.futures.ThreadPoolExecutor() as executor:
return list(executor.map(hashfile, files))
def get_all_files(directory):
files = []
for root, dirnames, filenames in os.walk(directory):
print(f"Checking {root}")
dirnames[:] = [
d for d in dirnames
if 'Backups.backupdb' not in d
and not d.startswith('.Apple')
]
for filename in filenames:
files.append(os.path.join(root, filename))
for dirname in dirnames:
files.append(os.path.join(root, dirname))
return files
def find_duplicates(dir_1, dir_2):
dir_1_files = get_all_files(dir_1)
dir_2_files = get_all_files(dir_2)
hash_cache = {}
duplicates = []
to_remove = []
# Helper to get a file hash with caching
def get_file_hash(file):
if file in hash_cache:
return hash_cache[file]
file_hash = hashfile(file)
if file_hash:
hash_cache[file] = file_hash
return file_hash
# Parallel hashing for dir_1
with concurrent.futures.ThreadPoolExecutor() as executor:
dir_1_hashes = {
file_hash: file for file, file_hash in zip(dir_1_files, executor.map(get_file_hash, dir_1_files)) if file_hash
}
# Parallel hashing for dir_2
with concurrent.futures.ThreadPoolExecutor() as executor:
for file, file_hash in zip(dir_2_files, executor.map(get_file_hash, dir_2_files)):
if file_hash and file_hash in dir_1_hashes:
duplicates.append((dir_1_hashes[file_hash], file))
to_remove.append(file)
return duplicates, to_remove
def record(dupl):
output_file = "duplicates.txt"
with open(output_file, "w") as file:
print(f"{len(dupl)} duplicates found")
file.write(f"Found {len(dupl)} duplicate files: \n\n")
for d in dupl:
print(f"File: {d[0]}\nDuplicate: {d[1]}")
file.write(f"File: {d[0]}\nDuplicate: {d[1]}\n\n")
def remove_duplicates(to_remove, disk2_path):
remove = input("Do you want to remove duplicates? Y/N: ")
record = "removed.txt"
if remove == "Y":
for item in to_remove:
if os.path.isfile(item):
os.remove(item)
for item in sorted(to_remove, key=len, reverse=True): # Sort by path length, deepest paths first
if os.path.isdir(item):
if not os.listdir(item): #Check whether empty
shutil.rmtree(item)
print(f"{len(to_remove)} duplicates were removed from {disk2_path}")
with open(record, "w") as file:
file.write(f"Duplicate files removed: \n\n")
for item in to_remove:
file.write(item + "\n")
else:
print("OK Daddy-O!")
if __name__ == "__main__":
disk1_path = (".")
disk2_path = "path"
if not os.path.isdir(disk1_path):
print(f"Error: Directory '{disk1_path}' does not exist or is not a valid directory.")
sys.exit(1)
if not os.path.isdir(disk2_path):
print(f"Error: Directory '{disk2_path}' does not exist or is not a valid directory.")
sys.exit(1)
duplicates, to_remove = find_duplicates(disk1_path, disk2_path)
if duplicates:
record(duplicates)
remove_duplicates(to_remove, disk2_path)
else:
print("No duplicate files found.")