forked from jeffhacks/smbscan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmbscan.py
168 lines (146 loc) · 6.32 KB
/
smbscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/python
__author__ = "jeffhacks"
import getpass
import ipaddress
import logging
import os
import re
import sys
import time
from impacket.smb import SMB_DIALECT
from slugify import slugify
from logging import handlers #import logging.handlers as handlers
from arg_parser import setup_command_line_args, Options
from scan import scan_single, User
from multiprocessing.pool import Pool
from multiprocessing.pool import ThreadPool
from multiprocessing import set_start_method
from threading import Thread
def valid_ip(addr):
try:
ipaddress.IPv4Network(str(addr))
return True
except ipaddress.AddressValueError:
return False
def main():
args = setup_command_line_args()
# Refactor Options - config file?
options = Options()
options.jitter = args.jitter
options.jitterTarget = args.jitter if args.jitter_target is None else args.jitter_target
options.jitterOperation = args.jitter if args.jitter_operation is None else args.jitter_operation
options.timeout = args.timeout
options.threads = args.threads
options.logDirectory = os.path.abspath(args.log_directory)
os.makedirs(args.log_directory, exist_ok=True)
options.csvFile = os.path.join(
options.logDirectory, "smbscan-" + time.strftime("%Y%m%d-%H%M%S") + ".log"
)
if args.state_file == None:
options.stateFile = os.path.join(
options.logDirectory, "smbscan-" + time.strftime("%Y%m%d-%H%M%S") + ".state"
)
else:
state_filename = args.state_file
filename, file_extension = os.path.splitext(state_filename)
if not file_extension == '.state':
state_filename = f'{state_filename}.state'
options.stateFile = os.path.join(
options.logDirectory, state_filename
)
options.crawlShares = not args.shares_only
options.maxDepth = args.max_depth
options.patternsFile = args.patterns_file
options.downloadFiles = args.download_files
options.logLevel = logging.DEBUG if args.debug else logging.INFO
if str(args.include_paths) != "None":
options.includePaths = str(args.include_paths).split(",")
if str(args.exclude_paths) != "None":
options.excludePaths = str(args.exclude_paths).split(",")
if str(args.include_shares) != "None":
options.includeShares = str(args.include_shares).split(",")
if str(args.exclude_shares) != "None":
options.excludeShares = str(args.exclude_shares).split(",")
if str(args.exclude_hosts) != "None":
options.excludeHosts = str(args.exclude_hosts).split(",")
with open(options.patternsFile, "r") as k_file:
for line in k_file:
options.patterns.append(re.compile(line.strip(), re.IGNORECASE))
user = User()
if args.user:
user.username = args.user
ntlmHash = args.hash
if ntlmHash:
user.lmhash = ntlmHash.split(':')[0]
user.nthash = ntlmHash.split(':')[1]
elif args.no_pass:
pass
else:
user.password = args.password if args.password else getpass.getpass()
user.domain = args.domain if args.domain else ""
options.kerberos = args.kerberos
if args.kerberos:
user.domain = args.domain if args.domain else ""
options.dc_ip = args.dc_ip if args.dc_ip else ""
options.aesKey = args.aesKey if args.aesKey else ""
logger = logging.getLogger('smbscan')
logger.setLevel(options.logLevel)
formatter = logging.Formatter("[%(asctime)s %(threadName)s %(levelname)s] %(message)s",
"%Y-%m-%d %H:%M:%S")
logFileHandler = handlers.RotatingFileHandler(options.csvFile,
maxBytes=1024 * 1024 * 5,
backupCount=2)
logFileHandler.setLevel(options.logLevel)
logFileHandler.setFormatter(formatter)
logger.addHandler(logFileHandler)
stdoutHandler = logging.StreamHandler(sys.stdout)
stdoutHandler.setLevel(options.logLevel)
stdoutHandler.setFormatter(formatter)
logger.addHandler(stdoutHandler)
# Record arguments
logger.info(' '.join(sys.argv[0:]))
# Load targets into list
target_list = []
if args.target:
logger.info(f"Scanning {args.target}")
if valid_ip(args.target):
for targetIP in ipaddress.IPv4Network(str(args.target)):
target_list.append((targetIP, user, options))
else:
target_list.append((str(args.target), user, options))
else:
with args.file as file:
target = file.readline().strip()
while target:
if len(target.split(',')) == 3:
hash_target = target.split(',')[0]
hash_username = target.split(',')[1]
hash_hash = target.split(',')[2]
logger.debug(f'Hash Target: {hash_target}, User: {hash_username}, Hash: {hash_hash}')
hash_user = User()
hash_user.username = hash_username
ntlmHash = hash_hash
if ntlmHash:
hash_user.lmhash = ntlmHash.split(':')[0]
hash_user.nthash = ntlmHash.split(':')[1]
if valid_ip(hash_target):
for targetIP in ipaddress.IPv4Network(str(hash_target)):
target_list.append((targetIP, hash_user, options))
else:
target_list.append((str(hash_target), hash_user, options))
else:
logger.debug(f'Standard Target: {target}, User: {user.username}')
if valid_ip(target):
for targetIP in ipaddress.IPv4Network(str(target)):
target_list.append((targetIP, user, options))
else:
target_list.append((str(target), user, options))
target = file.readline().strip()
logger.info(f'Scanning with {options.threads} threads')
set_start_method("spawn")
with ThreadPool(processes=options.threads) as pool:
result = pool.starmap_async(scan_single, target_list, chunksize=1)
result.wait()
logger.info("Scan completed")
if __name__ == "__main__":
main()