Skip to content

Commit

Permalink
Wip: egress connections
Browse files Browse the repository at this point in the history
  • Loading branch information
containerscrew committed Jan 12, 2025
1 parent c98e64e commit d54ad3f
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 156 deletions.
3 changes: 2 additions & 1 deletion nflux-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ pub const MAX_RULES_PORT: usize = 32;
pub const MAX_ALLOWED_PORTS: usize = 1024;
pub const MAX_ALLOWED_IPV4: usize = 1024;

pub mod utils;

#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct EgressConfig {
pub log_only_new_connections: u8, // 0 = no, 1 = yes
pub log_udp_connections: u8, // 0 = no, 1 = yes
pub log_tcp_connections: u8, // 0 = no, 1 = yes
pub log_private_connections: u8, // 0 = no, 1 = yes
}

#[repr(C)]
Expand Down
42 changes: 42 additions & 0 deletions nflux-common/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Checks if a given IPv4 address in u32 format is private.
// Returns `true` if the IP is private, otherwise `false`.
pub fn is_private_ip(ip: u32) -> bool {
// Convert u32 to the octet form for easier comparison
let octets = [
(ip >> 24) as u8, // First octet
(ip >> 16) as u8, // Second octet
(ip >> 8) as u8, // Third octet
ip as u8, // Fourth octet
];

// Check for private IP ranges
match octets {
[10, ..] => true, // 10.0.0.0/8
[172, 16..=31, ..] => true, // 172.16.0.0/12
[192, 168, ..] => true, // 192.168.0.0/16
[127, ..] => true, // 127.0.0.0/8 (loopback)
[169, 254, ..] => true, // 169.254.0.0/16 (link-local)
_ => false, // All others are public
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_private_ips() {
assert!(is_private_ip(0x0A000001)); // 10.0.0.1
assert!(is_private_ip(0xAC100001)); // 172.16.0.1
assert!(is_private_ip(0xC0A80001)); // 192.168.0.1
assert!(is_private_ip(0x7F000001)); // 127.0.0.1
assert!(is_private_ip(0xA9FE0001)); // 169.254.0.1
}

#[test]
fn test_public_ips() {
assert!(!is_private_ip(0x08080808)); // 8.8.8.8
assert!(!is_private_ip(0xC0000201)); // 192.0.2.1
assert!(!is_private_ip(0x64400001)); // 100.64.0.1
}
}
106 changes: 30 additions & 76 deletions nflux-ebpf/src/egress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::mem;
use aya_ebpf::bindings::TC_ACT_PIPE;
use aya_ebpf::helpers::gen::bpf_get_current_pid_tgid;
use aya_ebpf::programs::TcContext;
use aya_log_ebpf::info;
use aya_log_ebpf::debug;
use network_types::eth::{EthHdr, EtherType};
use network_types::ip::{IpProto, Ipv4Hdr};
use network_types::tcp::TcpHdr;
Expand All @@ -26,17 +26,28 @@ fn ptr_at<T>(ctx: &TcContext, offset: usize) -> Result<*const T, ()> {
}

#[inline]
unsafe fn log_connection(ctx: &TcContext, destination: u32, src_port: u16, dst_port: u16, protocol: u8, pid: u64) {
// Check if this destination is already active
if ACTIVE_CONNECTIONS.get(&destination).is_none() {
// Log only new connections
let event = EgressEvent { dst_ip: destination, src_port, dst_port, protocol, pid };
EGRESS_EVENT.output(ctx, &event, 0);

// Mark connection as active
if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() {
return;
unsafe fn log_connection(ctx: &TcContext, log_new_connection: u8, destination: u32, src_port: u16, dst_port: u16, protocol: u8, pid: u64) {
// If log_only_new_connections is enabled
// Only log connections to different endpoints (ips)
match log_new_connection{
0 => {
// Log all connections
let event = EgressEvent { dst_ip: destination, src_port, dst_port, protocol, pid };
EGRESS_EVENT.output(ctx, &event, 0);
}
1 => {
// Check if this destination is already active
if ACTIVE_CONNECTIONS.get(&destination).is_none() {
let event = EgressEvent { dst_ip: destination, src_port, dst_port, protocol, pid };
EGRESS_EVENT.output(ctx, &event, 0);

// Mark connection as active
if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() {
return;
}
}
}
_ => {}
}
}

Expand All @@ -45,13 +56,6 @@ pub fn try_tc_egress(ctx: TcContext) -> Result<i32, ()> {

let egress_config = EGRESS_CONFIG.get(0).ok_or(())?;

// if let Some(&egress_config) = EGRESS_CONFIG.get(0) {
// let egress_config = egres
// if egress_config.log_udp == 0 {
// info!(&ctx, "log_udp is disabled");
// }
// }

match ethhdr.ether_type {
EtherType::Ipv4 => unsafe {
let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?;
Expand All @@ -68,29 +72,7 @@ pub fn try_tc_egress(ctx: TcContext) -> Result<i32, ()> {

// If log_tcp_connections is enabled, log the connection
if egress_config.log_tcp_connections == 1 {
log_connection(&ctx, destination, src_port, dst_port, protocol, pid);
// If log_only_new_connections is enabled, only log new connections
// match egress_config.log_only_new_connections {
// 0 => {
// // Check if this destination is already active
// if ACTIVE_CONNECTIONS.get(&destination).is_none() {
// // Log only new connections
// let event = EgressEvent { dst_ip: destination, src_port, dst_port, protocol, pid };
// EGRESS_EVENT.output(&ctx, &event, 0);

// // Mark connection as active
// if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() {
// return Err(());
// }
// }
// }
// 1 => {
// // Log all connections
// let event = EgressEvent { dst_ip: destination, src_port, dst_port, protocol, pid };
// EGRESS_EVENT.output(&ctx, &event, 0);
// }
// _ => {}
// }
log_connection(&ctx, egress_config.log_only_new_connections, destination, src_port, dst_port, protocol, pid);
}
return Ok(TC_ACT_PIPE)
}
Expand All @@ -102,30 +84,9 @@ pub fn try_tc_egress(ctx: TcContext) -> Result<i32, ()> {
let pid_tgid = bpf_get_current_pid_tgid();
let pid = pid_tgid >> 32;

// If log_tcp_connections is enabled, log the connection
// If log_udp_connections is enabled, log the connection
if egress_config.log_udp_connections == 1 {
// If log_only_new_connections is enabled, only log new connections
match egress_config.log_only_new_connections {
0 => {
// Check if this destination is already active
if ACTIVE_CONNECTIONS.get(&destination).is_none() {
// Log only new connections
let event = EgressEvent { dst_ip: destination, src_port, dst_port, protocol, pid };
EGRESS_EVENT.output(&ctx, &event, 0);

// Mark connection as active
if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() {
return Err(());
}
}
}
1 => {
// Log all connections
let event = EgressEvent { dst_ip: destination, src_port, dst_port, protocol, pid };
EGRESS_EVENT.output(&ctx, &event, 0);
}
_ => {}
}
log_connection(&ctx, egress_config.log_only_new_connections, destination, src_port, dst_port, protocol, pid);
}

return Ok(TC_ACT_PIPE)
Expand All @@ -134,27 +95,20 @@ pub fn try_tc_egress(ctx: TcContext) -> Result<i32, ()> {
}
}
EtherType::Ipv6 => {
info!(&ctx, "is an ipv6 packet!");
//let ipv6hdr: *const Ipv6Hdr = unsafe { ptr_at(&ctx, EthHdr::LEN)? };
//let proto = unsafe { (*ipv6hdr).next_hdr };
//let destination = unsafe { (*ipv6hdr).dst_addr.in6_u.u6_addr8 };

// Create here a fake event
// IPV6 is not implemented yet
let event = EgressEvent { dst_ip: u32::from_be_bytes([192, 67, 4, 2]), src_port: 111, dst_port: 99, protocol: 6, pid: 1234};
EGRESS_EVENT.output(&ctx, &event, 0);
// Ipv6 not implemented yet
debug!(&ctx, "is an ipv6 packet!");
return Ok(TC_ACT_PIPE)
}
EtherType::FibreChannel => {
info!(&ctx, "ether type fibrechannel!!");
debug!(&ctx, "ether type fibrechannel!!");
return Ok(TC_ACT_PIPE)
}
EtherType::Arp => {
info!(&ctx, "ARP packet!!");
debug!(&ctx, "ARP packet!!");
return Ok(TC_ACT_PIPE)
}
_ => {
//info!(&ctx, "other ether type!");
debug!(&ctx, "Unknown ether type.");
return Ok(TC_ACT_PIPE)
},
}
Expand Down
3 changes: 1 addition & 2 deletions nflux.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ virtual_interfaces = ["protonvpn"] # If using VPN, add the virtual interface her
# For example, if you perform 1000 requests to google.com, just log 1 (or every you execute)
# This affects to tcp and udp connections
log_only_new_connections = "true"
log_udp_connections = "true" # Decide if udp packets should be logged
log_udp_connections = "false" # Decide if udp packets should be logged
log_tcp_connections = "true" # Decide if tcp packets should be logged
log_private_connections = "true" # Log private connections (10.X, 172.X, 192.X...) not only external ips

#[egress_rules]
# TODO: filter outgoint traffic (block/deny)
Expand Down
2 changes: 1 addition & 1 deletion nflux/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ pub struct EgressLogging {
pub log_only_new_connections: IsEnabled,
pub log_udp_connections: IsEnabled,
pub log_tcp_connections: IsEnabled,
pub log_private_connections: IsEnabled,
}

#[derive(Debug, Deserialize)]
pub struct Egress {
pub enabled: IsEnabled,
pub physical_interfaces: Vec<String>,
#[allow(dead_code)]
pub virtual_interfaces: Vec<String>,
pub logging: EgressLogging,
}
Expand Down
42 changes: 10 additions & 32 deletions nflux/src/egress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::BytesMut;
use tracing::{error, info, warn};
use nflux_common::{convert_protocol, EgressConfig, EgressEvent};
use crate::config::{Egress, IsEnabled};
use crate::utils::{is_private_ip, lookup_address};
use crate::utils::lookup_address;

pub fn populate_egress_config(bpf: &mut Ebpf, config: Egress) -> anyhow::Result<()> {
let mut egress_config = Array::<_, EgressConfig>::try_from(
Expand All @@ -29,10 +29,6 @@ pub fn populate_egress_config(bpf: &mut Ebpf, config: Egress) -> anyhow::Result<
IsEnabled::True => 1,
IsEnabled::False => 0,
},
log_private_connections: match config.logging.log_private_connections {
IsEnabled::True => 1,
IsEnabled::False => 0,
},
};

egress_config
Expand Down Expand Up @@ -96,7 +92,6 @@ pub fn attach_tc_egress_program(bpf: &mut Ebpf, interface_names: &[String]) -> a
pub async fn process_egress_events(
mut buf: AsyncPerfEventArrayBuffer<MapData>,
cpu_id: u32,
log_private_connections: &IsEnabled,
) -> Result<(), PerfBufferError> {
let mut buffers = vec![BytesMut::with_capacity(1024); 10];

Expand All @@ -109,32 +104,15 @@ pub async fn process_egress_events(
let buf = &buffers[i];
match parse_egress_event(buf) {
Ok(event) => {
match log_private_connections {
IsEnabled::True => {
info!(
"program=tc_egress protocol={}, ip={}, src_port={}, dst_port={}, fqdn={}, pid={}",
convert_protocol(event.protocol),
Ipv4Addr::from(event.dst_ip),
event.src_port,
event.dst_port,
"Private IP",
event.pid,
);
}
IsEnabled::False => {
if ! is_private_ip(event.dst_ip) {
info!(
"program=tc_egress protocol={}, ip={}, src_port={}, dst_port={}, fqdn={}, pid={}",
convert_protocol(event.protocol),
Ipv4Addr::from(event.dst_ip),
event.src_port,
event.dst_port,
lookup_address(event.dst_ip),
event.pid,
);
}
}
}
info!(
"program=tc_egress protocol={}, ip={}, src_port={}, dst_port={}, fqdn={}, pid={}",
convert_protocol(event.protocol),
Ipv4Addr::from(event.dst_ip),
event.src_port,
event.dst_port,
lookup_address(event.dst_ip),
event.pid,
);
}
Err(e) => error!("Failed to parse egress event on CPU {}: {}", cpu_id, e),
}
Expand Down
2 changes: 1 addition & 1 deletion nflux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn main() -> anyhow::Result<()> {
// Spawn task for egress events
{
let buf = egress_events.open(cpu_id, None)?;
task::spawn(process_egress_events(buf, cpu_id, &IsEnabled::False));
task::spawn(process_egress_events(buf, cpu_id));
}
}

Expand Down
45 changes: 2 additions & 43 deletions nflux/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashMap, net::{IpAddr, Ipv4Addr, Ipv6Addr}};
use dns_lookup::lookup_addr;
use libc::getuid;
use nflux_common::utils::is_private_ip;
use tokio::signal;
use tracing::{info, warn};

Expand Down Expand Up @@ -57,6 +58,7 @@ pub fn parse_cidr_v6(cidr: &str) -> anyhow::Result<(Ipv6Addr, u32)> {
let prefix_len = parts[1].parse::<u32>()?;
Ok((ip, prefix_len))
}

pub fn lookup_address(ip: u32) -> String {
match is_private_ip(ip) {
true => "Private IP".to_string(),
Expand All @@ -72,46 +74,3 @@ pub fn lookup_address(ip: u32) -> String {
},
}
}

// Checks if a given IPv4 address in u32 format is private.
// Returns `true` if the IP is private, otherwise `false`.
pub fn is_private_ip(ip: u32) -> bool {
// Convert u32 to the octet form for easier comparison
let octets = [
(ip >> 24) as u8, // First octet
(ip >> 16) as u8, // Second octet
(ip >> 8) as u8, // Third octet
ip as u8, // Fourth octet
];

// Check for private IP ranges
match octets {
[10, ..] => true, // 10.0.0.0/8
[172, 16..=31, ..] => true, // 172.16.0.0/12
[192, 168, ..] => true, // 192.168.0.0/16
[127, ..] => true, // 127.0.0.0/8 (loopback)
[169, 254, ..] => true, // 169.254.0.0/16 (link-local)
_ => false, // All others are public
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_private_ips() {
assert!(is_private_ip(0x0A000001)); // 10.0.0.1
assert!(is_private_ip(0xAC100001)); // 172.16.0.1
assert!(is_private_ip(0xC0A80001)); // 192.168.0.1
assert!(is_private_ip(0x7F000001)); // 127.0.0.1
assert!(is_private_ip(0xA9FE0001)); // 169.254.0.1
}

#[test]
fn test_public_ips() {
assert!(!is_private_ip(0x08080808)); // 8.8.8.8
assert!(!is_private_ip(0xC0000201)); // 192.0.2.1
assert!(!is_private_ip(0x64400001)); // 100.64.0.1
}
}

0 comments on commit d54ad3f

Please sign in to comment.