forked from hakiri/p4-learning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathl2_learning_controller.py
112 lines (94 loc) · 4.36 KB
/
l2_learning_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import nnpy
import struct
from p4utils.utils.helper import load_topo
from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI
from scapy.all import Ether, sniff, Packet, BitField, raw
class CpuHeader(Packet):
name = 'CpuPacket'
fields_desc = [BitField('macAddr',0,48), BitField('ingress_port', 0, 16)]
class L2Controller(object):
def __init__(self, sw_name):
self.topo = load_topo('topology.json')
self.sw_name = sw_name
self.thrift_port = self.topo.get_thrift_port(sw_name)
self.cpu_port = self.topo.get_cpu_port_index(self.sw_name)
self.controller = SimpleSwitchThriftAPI(self.thrift_port)
self.init()
def init(self):
self.controller.reset_state()
self.add_boadcast_groups()
self.add_mirror()
#self.fill_table_test()
def add_mirror(self):
if self.cpu_port:
self.controller.mirroring_add(100, self.cpu_port)
def add_boadcast_groups(self):
interfaces_to_port = self.topo.get_node_intfs(fields=['port'])[self.sw_name].copy()
# Filter lo and cpu port
interfaces_to_port.pop('lo', None)
interfaces_to_port.pop(self.topo.get_cpu_port_intf(self.sw_name), None)
mc_grp_id = 1
rid = 0
for ingress_port in interfaces_to_port.values():
port_list = list(interfaces_to_port.values())
del(port_list[port_list.index(ingress_port)])
#add multicast group
self.controller.mc_mgrp_create(mc_grp_id)
#add multicast node group
handle = self.controller.mc_node_create(rid, port_list)
#associate with mc grp
self.controller.mc_node_associate(mc_grp_id, handle)
#fill broadcast table
self.controller.table_add("broadcast", "set_mcast_grp", [str(ingress_port)], [str(mc_grp_id)])
mc_grp_id +=1
rid +=1
def fill_table_test(self):
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:01'], ['1'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:02'], ['2'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:03'], ['3'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:04'], ['4'])
def learn(self, learning_data):
for mac_addr, ingress_port in learning_data:
print("mac: %012X ingress_port: %s " % (mac_addr, ingress_port))
self.controller.table_add("smac", "NoAction", [str(mac_addr)])
self.controller.table_add("dmac", "forward", [str(mac_addr)], [str(ingress_port)])
def unpack_digest(self, msg, num_samples):
digest = []
starting_index = 32
for sample in range(num_samples):
mac0, mac1, ingress_port = struct.unpack(">LHH", msg[starting_index:starting_index+8])
starting_index +=8
mac_addr = (mac0 << 16) + mac1
digest.append((mac_addr, ingress_port))
return digest
def recv_msg_digest(self, msg):
topic, device_id, ctx_id, list_id, buffer_id, num = struct.unpack("<iQiiQi",
msg[:32])
digest = self.unpack_digest(msg, num)
self.learn(digest)
#Acknowledge digest
self.controller.client.bm_learning_ack_buffer(ctx_id, list_id, buffer_id)
def run_digest_loop(self):
sub = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
notifications_socket = self.controller.client.bm_mgmt_get_info().notifications_socket
sub.connect(notifications_socket)
sub.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, '')
while True:
msg = sub.recv()
self.recv_msg_digest(msg)
def recv_msg_cpu(self, pkt):
packet = Ether(raw(pkt))
if packet.type == 0x1234:
cpu_header = CpuHeader(bytes(packet.load))
self.learn([(cpu_header.macAddr, cpu_header.ingress_port)])
def run_cpu_port_loop(self):
cpu_port_intf = str(self.topo.get_cpu_port_intf(self.sw_name).replace("eth0", "eth1"))
sniff(iface=cpu_port_intf, prn=self.recv_msg_cpu)
if __name__ == "__main__":
import sys
sw_name = sys.argv[1]
receive_from = sys.argv[2]
if receive_from == "digest":
controller = L2Controller(sw_name).run_digest_loop()
elif receive_from == "cpu":
controller = L2Controller(sw_name).run_cpu_port_loop()