-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathverify_pdfs.py
315 lines (268 loc) · 9.04 KB
/
verify_pdfs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import csv
from multiprocessing import Manager, Pool, Process
from multiprocessing.synchronize import Event as EventType
from pathlib import Path
from queue import Empty, Queue
import pypdf
from pypdf._doc_common import DocumentInformation
from pypdf.errors import EmptyFileError, PdfStreamError
from tqdm import tqdm
import pandas as pd
def is_valid_pdf(file_path: Path) -> bool:
"""Check if a file is a valid PDF.
Parameters
----------
file_path : Path
Path to the file to check
Returns
-------
bool
True if file is a valid PDF, False otherwise
"""
try:
with open(file_path, "rb") as f:
header = f.read(5)
return header.startswith(b"%PDF-")
except Exception:
return False
def validate_pdfs(directory: Path, num_processes: int = 4) -> dict:
"""Validate all PDFs in a directory using multiprocessing.
Parameters
----------
directory : Path
Directory to search for PDFs
num_processes : int, optional
Number of processes to use, by default 4
Returns
-------
dict
Dictionary with results containing:
- total_files: Total number of PDF files found
- valid_files: Number of valid PDFs
- invalid_files: Number of invalid PDFs
- invalid_paths: List of paths to invalid PDFs
"""
from multiprocessing import Pool
# Find all PDF files
pdf_files = list(directory.rglob("*.pdf"))
if not pdf_files:
return {
"total_files": 0,
"valid_files": 0,
"invalid_files": 0,
"invalid_paths": [],
}
# Process files in parallel
with Pool(processes=num_processes) as pool:
results = pool.map(is_valid_pdf, pdf_files)
# Combine results
valid_results = list(zip(pdf_files, results))
invalid_pdfs = [
str(pdf) for pdf, is_valid in valid_results if not is_valid
]
return {
"total_files": len(pdf_files),
"valid_files": sum(results),
"invalid_files": len(invalid_pdfs),
"invalid_paths": invalid_pdfs,
}
def segregate_pdfs(validation_results: dict) -> None:
"""Move invalid PDFs to an 'invalid' subdirectory.
Parameters
----------
validation_results : dict
Dictionary containing validation results with:
- invalid_paths: List of paths to invalid PDFs
"""
if not validation_results["invalid_paths"]:
return
# Get directory from first invalid path
first_path = Path(validation_results["invalid_paths"][0])
base_dir = first_path.parent
# Create invalid subdirectory
invalid_dir = base_dir / "invalid"
invalid_dir.mkdir(exist_ok=True)
# Move invalid files
for invalid_path in validation_results["invalid_paths"]:
src_path = Path(invalid_path)
dst_path = invalid_dir / src_path.name
src_path.rename(dst_path)
def extract_hhs_info(pdf: Path) -> dict[str, str | None]:
"""
Extract HHS (Health and Human Services) related information from a
PDF file.
Parameters
----------
pdf : Path
Path to the PDF file from which to extract information.
Returns
-------
dict[str, str | None]
A dictionary containing HHS-related PDF metadata with the
following keys:
- 'name': Full path of the PDF file
- 'producer': PDF producer metadata
- 'creator': PDF creator metadata
- 'header': PDF header information
- 'has_hhs_text': Whether 'HHS Public Access' text is found
- 'error': Any error encountered during extraction, or None
Notes
-----
Attempts to extract PDF metadata and check for 'HHS Public Access' text.
Handles various potential errors during PDF reading and text extraction.
"""
try:
with open(pdf, "rb") as file:
pdf_reader = pypdf.PdfReader(file)
metadata = pdf_reader.metadata
try:
page = pdf_reader.pages[0]
text = page.extract_text()
has_hhs_text: bool = "HHS Public Access" in text
except Exception as e:
hhs_info: dict[str, str | None] = {
"name": str(pdf.absolute()),
"producer": None,
"creator": None,
"header": None,
"has_hhs_text": None,
"error": str(e),
}
return hhs_info
if isinstance(metadata, DocumentInformation):
hhs_info = {
"name": str(pdf.absolute()),
"producer": metadata.producer,
"creator": metadata.creator,
"header": pdf_reader.pdf_header,
"has_hhs_text": str(has_hhs_text),
"error": None,
}
else:
hhs_info = {
"name": str(pdf.absolute()),
"producer": None,
"creator": None,
"header": None,
"has_hhs_text": str(has_hhs_text),
"error": "No metadata",
}
return hhs_info
except (PdfStreamError, OSError, EmptyFileError) as e:
hhs_info = {
"name": str(pdf.absolute()),
"producer": None,
"creator": None,
"header": None,
"has_hhs_text": None,
"error": str(e),
}
return hhs_info
def writer_process(
queue: Queue, done_event: EventType, output_file: Path
) -> None:
"""Process that handles writing results to CSV."""
fieldnames = [
"name",
"producer",
"creator",
"header",
"has_hhs_text",
"error",
]
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
while not (done_event.is_set() and queue.empty()):
try:
result = queue.get(timeout=1)
writer.writerow(result)
except Empty:
continue
except Exception as e:
print(f"Error writing to CSV: {e}")
def process_pdf(args: tuple[Path, Queue]) -> bool:
"""Process a single PDF and put results in queue."""
pdf_path, queue = args
try:
result = extract_hhs_info(pdf_path)
queue.put(result)
return True
except Exception as e:
queue.put(
{
"name": str(pdf_path),
"producer": None,
"creator": None,
"header": None,
"has_hhs_text": None,
"error": f"Processing error: {str(e)}",
}
)
return False
def process_pdfs_parallel(
directory: Path, output_csv: Path, num_processes: int = 12
) -> None:
"""Process PDFs in parallel and write results to CSV."""
# Find all PDF files
pdf_files = list(directory.glob("*.pdf"))
total_files = len(pdf_files)
if total_files == 0:
print("No PDF files found")
return
# Set up multiprocessing manager and queue
with Manager() as manager:
result_queue = manager.Queue()
done_event = manager.Event()
writer_proc = Process(
target=writer_process, args=(result_queue, done_event, output_csv)
)
writer_proc.start()
try:
with Pool(processes=num_processes) as pool:
args = [(pdf, result_queue) for pdf in pdf_files]
list(
tqdm(
pool.imap_unordered(process_pdf, args),
total=total_files,
desc="Processing PDFs",
)
)
finally:
done_event.set()
writer_proc.join()
print(f"Results written to {output_csv}")
def segregate_hhs(csv_path: Path) -> None:
"""Segregate PDFs based on HHS status from CSV results.
Parameters
----------
csv_path : Path
Path to CSV file containing HHS extraction results
Notes
-----
Creates 'hhs' and 'unknown' subdirectories in the same directory as the PDFs.
Moves files with HHS status True to 'hhs' dir and files with NA status to 'unknown' dir.
"""
# Read CSV
df = pd.read_csv(csv_path)
# Get base directory from first file path
first_path = Path(df["name"].iloc[0])
base_dir = first_path.parent
# Create subdirectories
hhs_dir = base_dir / "hhs"
unknown_dir = base_dir / "unknown"
hhs_dir.mkdir(exist_ok=True)
unknown_dir.mkdir(exist_ok=True)
# Move unknown files
unknown_files = df.loc[df["has_hhs_text"].isna(), "name"]
for file_path in unknown_files:
src = Path(file_path)
dst = unknown_dir / src.name
src.rename(dst)
df.dropna(subset=["has_hhs_text"], inplace=True)
# Move HHS files
hhs_files = df.loc[df["has_hhs_text"] == True, "name"]
for file_path in hhs_files:
src = Path(file_path)
dst = hhs_dir / src.name
src.rename(dst)