Skip to content

Commit

Permalink
Refactor nflux.toml & implement egress monitoring for vpn virtual int…
Browse files Browse the repository at this point in the history
…erfaces
  • Loading branch information
containerscrew committed Dec 26, 2024
1 parent b493ee2 commit 73ee8f1
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 59 deletions.
1 change: 1 addition & 0 deletions nflux-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct ConnectionEvent {
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct EgressEvent {
pub dst_ip: u32,
pub dst_port: u16,
}

#[repr(C)]
Expand Down
82 changes: 70 additions & 12 deletions nflux-ebpf/src/egress.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,92 @@
use core::mem;

use aya_ebpf::bindings::TC_ACT_PIPE;
use aya_ebpf::macros::map;
use aya_ebpf::maps::{LruHashMap, PerfEventArray};
use aya_ebpf::programs::TcContext;
use aya_log_ebpf::info;
use network_types::eth::{EthHdr, EtherType};
use network_types::ip::Ipv4Hdr;
use network_types::ip::{IpProto, Ipv4Hdr, Ipv6Hdr};
use network_types::tcp::TcpHdr;
use network_types::udp::UdpHdr;
use nflux_common::EgressEvent;

use crate::maps::{ACTIVE_CONNECTIONS, EGRESS_EVENT};


#[inline]
fn ptr_at<T>(ctx: &TcContext, offset: usize) -> Result<*const T, ()> {
let start = ctx.data();
let end = ctx.data_end();
let len = mem::size_of::<T>();

if start + offset + len > end {
return Err(());
}

Ok((start + offset) as *const T)
}


pub fn try_tc_egress(ctx: TcContext) -> Result<i32, ()> {
let ethhdr: EthHdr = ctx.load(0).map_err(|_| ())?;

match ethhdr.ether_type {
EtherType::Ipv4 => unsafe {
info!(&ctx, "is an ipv4 packet!");
let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?;
let destination = u32::from_be(ipv4hdr.dst_addr);

// Check if this destination is already active
if ACTIVE_CONNECTIONS.get(&destination).is_none() {
// Log only new connections
let event = EgressEvent { dst_ip: destination };
EGRESS_EVENT.output(&ctx, &event, 0);
match ipv4hdr.proto {
IpProto::Tcp => {
let tcphdr: *const TcpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?;
let dst_port = u16::from_be((*tcphdr).dest);

// Check if this destination is already active
if ACTIVE_CONNECTIONS.get(&destination).is_none() {
// Log only new connections
let event = EgressEvent { dst_ip: destination, dst_port: dst_port };
EGRESS_EVENT.output(&ctx, &event, 0);

// Mark connection as active
if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() {
return Err(());
// Mark connection as active
if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() {
return Err(());
}
}
return Ok(TC_ACT_PIPE)
}
IpProto::Udp => {
let udphdr: *const UdpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?;
let dst_port = u16::from_be((*udphdr).dest);
let event = EgressEvent { dst_ip: destination, dst_port: dst_port };
EGRESS_EVENT.output(&ctx, &event, 0);

return Ok(TC_ACT_PIPE)
}
_ => {}
}
}
_ => return Ok(TC_ACT_PIPE),
EtherType::Ipv6 => {
info!(&ctx, "is an ipv6 packet!");
//let ipv6hdr: *const Ipv6Hdr = unsafe { ptr_at(&ctx, EthHdr::LEN)? };
//let proto = unsafe { (*ipv6hdr).next_hdr };
//let destination = unsafe { (*ipv6hdr).dst_addr.in6_u.u6_addr8 };

// Create here a fake event
let event = EgressEvent { dst_ip: u32::from_be_bytes([192, 67, 4, 2]), dst_port: 99 };
EGRESS_EVENT.output(&ctx, &event, 0);
return Ok(TC_ACT_PIPE)
}
EtherType::FibreChannel => {
info!(&ctx, "ether type fibrechannel!!");
return Ok(TC_ACT_PIPE)
}
EtherType::Arp => {
info!(&ctx, "ARP!!");
return Ok(TC_ACT_PIPE)
}
_ => {
//info!(&ctx, "other ether type!");
return Ok(TC_ACT_PIPE)
},
}

Ok(TC_ACT_PIPE)
Expand Down
78 changes: 78 additions & 0 deletions nflux-ebpf/src/egress_vpn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use core::mem;

use aya_ebpf::bindings::TC_ACT_PIPE;
use aya_ebpf::programs::TcContext;
use aya_log_ebpf::info;
use network_types::ip::{IpProto, Ipv4Hdr};
use network_types::tcp::TcpHdr;
use network_types::udp::UdpHdr;
use nflux_common::EgressEvent;

use crate::maps::{ACTIVE_CONNECTIONS, EGRESS_EVENT};

#[inline]
fn ptr_at<T>(ctx: &TcContext, offset: usize) -> Result<*const T, ()> {
let start = ctx.data();
let end = ctx.data_end();
let len = mem::size_of::<T>();

if start + offset + len > end {
return Err(());
}

Ok((start + offset) as *const T)
}

pub fn try_tc_egress_vpn(ctx: TcContext) -> Result<i32, ()> {
// Parse IPv4 header
let ipv4hdr: Ipv4Hdr = match ctx.load(0) {
Ok(hdr) => hdr,
Err(_) => {
info!(&ctx, "Failed to load IPv4 header");
return Ok(TC_ACT_PIPE);
}
};

let destination = u32::from_be(ipv4hdr.dst_addr);
match ipv4hdr.proto {
IpProto::Tcp => {
let tcphdr: *const TcpHdr = ptr_at(&ctx, Ipv4Hdr::LEN)?;
let dst_port = u16::from_be(unsafe { (*tcphdr).dest });
// Check if this destination is already active
if unsafe { ACTIVE_CONNECTIONS.get(&destination).is_none() } {
// Log only new connections
let event = EgressEvent {
dst_ip: destination,
dst_port: dst_port,
};
EGRESS_EVENT.output(&ctx, &event, 0);

// Mark connection as active
if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() {
return Err(());
}
return Ok(TC_ACT_PIPE)
}

//info!(&ctx, "TCP packet to port {}", dst_port);
}
IpProto::Udp => {
let udphdr: *const UdpHdr = ptr_at(&ctx, Ipv4Hdr::LEN)?;
let dst_port = u16::from_be(unsafe { (*udphdr).dest });

let event = EgressEvent {
dst_ip: destination,
dst_port: dst_port,
};
EGRESS_EVENT.output(&ctx, &event, 0);

return Ok(TC_ACT_PIPE)
//info!(&ctx, "UDP packet to port {}", dst_port);
}
_ => {
return Ok(TC_ACT_PIPE)
}
}

Ok(TC_ACT_PIPE)
}
6 changes: 3 additions & 3 deletions nflux-ebpf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![allow(nonstandard_style, dead_code)]

mod egress;
mod egress_vpn;
mod maps;

use aya_ebpf::bindings::xdp_action::{XDP_ABORTED, XDP_DROP, XDP_PASS};
Expand All @@ -12,11 +13,11 @@ use aya_ebpf::{
macros::xdp,
programs::XdpContext,
};
use egress_vpn::try_tc_egress_vpn;
use maps::{ACTIVE_CONNECTIONS, CONNECTION_EVENTS, CONNECTION_TRACKER, ICMP_RULE, IPV4_RULES};
use core::mem;
use aya_ebpf::bindings::TC_ACT_SHOT;
use aya_ebpf::macros::classifier;
use aya_log_ebpf::info;
use aya_ebpf::programs::TcContext;
use network_types::ip::IpProto;
use network_types::{
Expand All @@ -26,7 +27,6 @@ use network_types::{
udp::UdpHdr,
};
use nflux_common::{ConnectionEvent, LpmKeyIpv4};
use crate::egress::try_tc_egress;

#[xdp]
pub fn nflux(ctx: XdpContext) -> u32 {
Expand All @@ -38,7 +38,7 @@ pub fn nflux(ctx: XdpContext) -> u32 {

#[classifier]
pub fn tc_egress(ctx: TcContext) -> i32 {
try_tc_egress(ctx).unwrap_or_else(|_| TC_ACT_SHOT)
try_tc_egress_vpn(ctx).unwrap_or_else(|_| TC_ACT_SHOT)
}

#[inline(always)]
Expand Down
13 changes: 8 additions & 5 deletions nflux.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
[nflux]
# Global configuration for nflux
[ingress]
enabled = "false"
interface_name = "wlp2s0"
# Control ICMP ping packets for each IP I think is not necessary by the moment
# Lets decide if ICMP packets will be allowed or denied globally
icmp_ping = "true"

[egress]
enabled = "true"
interface_name = "proton0"


[logging]
log_level = "info" # trace, debug, info, warn, or error. Defaults to info if not set
log_type = "json" # text or json. Defaults to text if not set

[ip_rules]
# The /32 CIDR block is used to represent a single IP address rather than a range
"192.168.0.0/24" = { priority = 1, action = "allow", ports = [22, 8000, 80], protocol = "tcp", log = true, description = "Allow some web servers" }
"192.168.0.0/24" = { priority = 1, action = "allow", ports = [22, 8000, 80], protocol = "tcp", description = "Allow some web servers" }

# curl -6 -v http://\[::ffff:192.168.0.26\]:80
#"fe80::5bc2:662b:ac2f:7e8b/128" = { priority = 3, action = "allow", ports = [80], protocol = "tcp", log = false, description = "Deny HTTP for specific IPv6 address" }
Expand Down
20 changes: 12 additions & 8 deletions nflux/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ pub enum IsEnabled {
True,
False,
}

// General firewall configuration
#[derive(Debug, Deserialize)]
pub struct NfluxConfig {
pub struct Ingress {
pub enabled: IsEnabled,
pub interface_name: String,
pub icmp_ping: IsEnabled,
}

#[derive(Debug, Deserialize)]
pub struct Egress {
pub enabled: IsEnabled,
pub interface_name: String,
}

// Logging config
#[derive(Debug, Deserialize)]
pub struct LoggingConfig {
Expand All @@ -50,15 +55,15 @@ pub struct IpRules {
pub action: Action,
pub ports: Vec<u16>,
pub protocol: Protocol,
pub log: bool,
pub description: String,
}

// Top-level configuration structure
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct Nflux {
pub nflux: NfluxConfig,
pub ingress: Ingress,
pub egress: Egress,
pub logging: LoggingConfig,
pub ip_rules: HashMap<String, IpRules>,
}
Expand Down Expand Up @@ -146,8 +151,8 @@ mod tests {
let config = Nflux::load_config().unwrap();

// Assertions
assert_eq!(config.nflux.interface_name, "wlan0");
assert_eq!(config.nflux.icmp_ping, IsEnabled::True);
assert_eq!(config.ingress.interface_name, "wlan0");
assert_eq!(config.ingress.icmp_ping, IsEnabled::True);
assert_eq!(config.logging.log_level, "debug");
assert_eq!(config.logging.log_type, "json");

Expand All @@ -156,7 +161,6 @@ mod tests {
assert_eq!(rule.action, Action::Allow);
assert_eq!(rule.ports, vec![22]);
assert_eq!(rule.protocol, Protocol::Tcp);
assert_eq!(rule.log, true);
assert_eq!(rule.description, "SSH rule");
}

Expand Down
Loading

0 comments on commit 73ee8f1

Please sign in to comment.