From b81af3423b640d430c1948873eb74e2653d857b0 Mon Sep 17 00:00:00 2001 From: containerscrew Date: Fri, 17 Jan 2025 23:02:49 +0100 Subject: [PATCH] Wip: egress connection --- .pre-commit-config.yaml | 12 +- nflux-ebpf/src/egress.rs | 223 ++++++------------------------------- nflux-ebpf/src/handlers.rs | 109 ++++++++++++++++++ nflux-ebpf/src/logger.rs | 50 +++++++++ nflux-ebpf/src/main.rs | 2 + nflux-ebpf/src/old_main.rs | 11 +- nflux.toml | 9 +- nflux/src/main.rs | 9 +- 8 files changed, 215 insertions(+), 210 deletions(-) create mode 100644 nflux-ebpf/src/handlers.rs create mode 100644 nflux-ebpf/src/logger.rs diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2ce0012..f830d1c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,12 +18,12 @@ repos: hooks: - id: mtoc args: [ "-e", ".target/" ] -# - repo: https://github.com/doublify/pre-commit-rust -# rev: v1.0 -# hooks: -# - id: fmt -# args: ['--verbose', '--'] -# - id: cargo-check + - repo: https://github.com/doublify/pre-commit-rust + rev: v1.0 + hooks: + - id: fmt + args: ['--verbose', '--'] + - id: cargo-check # - id: clippy # args: ["--", "-D", "warnings", "-D", "unused-imports"] # - repo: local diff --git a/nflux-ebpf/src/egress.rs b/nflux-ebpf/src/egress.rs index 82e750a..2f1fd14 100644 --- a/nflux-ebpf/src/egress.rs +++ b/nflux-ebpf/src/egress.rs @@ -1,75 +1,11 @@ -use core::mem; - use aya_ebpf::bindings::TC_ACT_PIPE; -use aya_ebpf::helpers::gen::bpf_get_current_pid_tgid; use aya_ebpf::programs::TcContext; -use aya_log_ebpf::{debug, warn}; use network_types::eth::{EthHdr, EtherType}; use network_types::ip::{IpProto, Ipv4Hdr, Ipv6Hdr}; -use network_types::tcp::TcpHdr; -use network_types::udp::UdpHdr; -use nflux_common::{EgressConfig, EgressEvent}; - -use crate::maps::{ACTIVE_CONNECTIONS, EGRESS_CONFIG, EGRESS_EVENT}; - -#[inline] -fn ptr_at(ctx: &TcContext, offset: usize) -> Result<*const T, ()> { - let start = ctx.data(); - let end = ctx.data_end(); - let len = mem::size_of::(); - - if start + offset + len > end { - return Err(()); - } - - Ok((start + offset) as *const T) -} +use nflux_common::EgressConfig; -#[inline] -unsafe fn log_connection( - ctx: &TcContext, - log_new_connection: u8, - destination: u32, - src_port: u16, - dst_port: u16, - protocol: u8, - pid: u64, -) { - // If log_only_new_connections is enabled - // Only log connections to different endpoints (ips) - match log_new_connection { - 0 => { - // Log all connections - let event = EgressEvent { - dst_ip: destination, - src_port, - dst_port, - protocol, - pid, - }; - EGRESS_EVENT.output(ctx, &event, 0); - } - 1 => { - // Check if this destination is already active - if ACTIVE_CONNECTIONS.get(&destination).is_none() { - let event = EgressEvent { - dst_ip: destination, - src_port, - dst_port, - protocol, - pid, - }; - EGRESS_EVENT.output(ctx, &event, 0); - - // Mark connection as active - if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() { - return; - } - } - } - _ => {} - } -} +use crate::handlers::{handle_icmp_packet, handle_tcp_packet, handle_udp_packet}; +use crate::maps::EGRESS_CONFIG; fn handle_ipv4_packet(ctx: &TcContext, egress_config: &EgressConfig) -> Result { let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?; @@ -83,141 +19,48 @@ fn handle_ipv4_packet(ctx: &TcContext, egress_config: &EgressConfig) -> Result Result { - let pid_tgid = unsafe { bpf_get_current_pid_tgid() }; - let pid = pid_tgid >> 32; - - if egress_config.log_icmp_connections == 1 { - unsafe { - log_connection( - ctx, - egress_config.log_only_new_connections, - destination, - 0, - 0, - IpProto::Icmp as u8, - pid, - ) - }; - } - - Ok(TC_ACT_PIPE) -} - - -fn handle_tcp_packet( - ctx: &TcContext, - egress_config: &EgressConfig, - destination: u32, -) -> Result { - let tcphdr: *const TcpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; - let src_port = u16::from_be((unsafe { *tcphdr }).source); - let dst_port = u16::from_be((unsafe { *tcphdr }).dest); - let protocol = IpProto::Tcp as u8; - let pid_tgid = unsafe { bpf_get_current_pid_tgid() }; - let pid = pid_tgid >> 32; - - if egress_config.log_tcp_connections == 1 { - unsafe { - log_connection( - ctx, - egress_config.log_only_new_connections, - destination, - src_port, - dst_port, - protocol, - pid, - ) - }; - } - - Ok(TC_ACT_PIPE) -} - -fn handle_udp_packet( - ctx: &TcContext, - egress_config: &EgressConfig, - destination: u32, -) -> Result { - let udphdr: *const UdpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; - let src_port = u16::from_be((unsafe { *udphdr }).source); - let dst_port = u16::from_be((unsafe { *udphdr }).dest); - let protocol = IpProto::Udp as u8; - let pid_tgid = unsafe { bpf_get_current_pid_tgid() }; - let pid = pid_tgid >> 32; - - if egress_config.log_udp_connections == 1 { - unsafe { - log_connection( - ctx, - egress_config.log_only_new_connections, - destination, - src_port, - dst_port, - protocol, - pid, - ) - }; - } - - Ok(TC_ACT_PIPE) -} - -fn handle_non_ipv4_packet(ctx: &TcContext, ethhdr: &EthHdr) -> Result { - match ethhdr.ether_type { - EtherType::Ipv6 => { - debug!(ctx, "is an ipv6 packet!"); - Ok(TC_ACT_PIPE) - } - EtherType::FibreChannel => { - debug!(ctx, "ether type fibrechannel!!"); - Ok(TC_ACT_PIPE) - } - EtherType::Arp => { - debug!(ctx, "ARP packet!!"); - Ok(TC_ACT_PIPE) - } - _ => { - debug!(ctx, "Unknown ether type."); - Ok(TC_ACT_PIPE) - } - } -} - pub fn try_tc_egress_physical(ctx: TcContext) -> Result { let ethhdr: EthHdr = ctx.load(0).map_err(|_| ())?; let egress_config = EGRESS_CONFIG.get(0).ok_or(())?; match ethhdr.ether_type { EtherType::Ipv4 => handle_ipv4_packet(&ctx, &egress_config), - _ => handle_non_ipv4_packet(&ctx, ðhdr), + EtherType::Ipv6 => { + // IPV6 traffic is not implemented yet + + Ok(TC_ACT_PIPE) + } + _ => Ok(TC_ACT_PIPE), } } pub fn try_tc_egress_virtual(ctx: TcContext) -> Result { - // Parse IPv4 header - let ipv4hdr: Ipv4Hdr = match ctx.load(0) { - Ok(hdr) => hdr, - Err(_) => { - warn!(&ctx, "Failed to load IPv4 header"); - return Ok(TC_ACT_PIPE); - } - }; - let egress_config = EGRESS_CONFIG.get(0).ok_or(())?; - let destination = u32::from_be(ipv4hdr.dst_addr); - match ipv4hdr.proto { - IpProto::Tcp => handle_tcp_packet(&ctx, egress_config, destination), - IpProto::Udp => handle_udp_packet(&ctx, egress_config, destination), - IpProto::Icmp => handle_icmp_packet(&ctx, egress_config, destination), - _ => { - debug!(&ctx, "Probably ipv6 traffic, not implemented yet!"); - Ok(TC_ACT_PIPE) + // Parse IPv4 o IPv6 header + let ipv4hdr: Option = ctx.load(0).ok(); + let ipv6hdr: Option = ctx.load(0).ok(); + + if let Some(ipv4hdr) = ipv4hdr { + let destination = u32::from_be(ipv4hdr.dst_addr); + + match ipv4hdr.proto { + IpProto::Tcp => handle_tcp_packet(&ctx, egress_config, destination), + IpProto::Udp => handle_udp_packet(&ctx, egress_config, destination), + IpProto::Icmp => handle_icmp_packet(&ctx, egress_config, destination), + _ => Ok(TC_ACT_PIPE), } + } else if let Some(_) = ipv6hdr { + // IPV6 traffic is not implemented yet + + // match ipv6hdr.next_hdr { + // IpProto::Tcp => handle_tcp_packet(&ctx, egress_config, u32::from_be_bytes(unsafe {ipv6hdr.dst_addr.in6_u.u6_addr8[0..4].try_into().unwrap()})), + // IpProto::Udp => handle_udp_packet(&ctx, egress_config, u32::from_be_bytes(unsafe { ipv6hdr.dst_addr.in6_u.u6_addr8[0..4].try_into().unwrap() })), + // //IpProto::Icmpv6 => handle_icmpv6_packet(&ctx, egress_config, ipv6hdr.dst_addr), + // _ => Ok(TC_ACT_PIPE), + // } + Ok(TC_ACT_PIPE) + } else { + Ok(TC_ACT_PIPE) } } diff --git a/nflux-ebpf/src/handlers.rs b/nflux-ebpf/src/handlers.rs new file mode 100644 index 0000000..6d3768d --- /dev/null +++ b/nflux-ebpf/src/handlers.rs @@ -0,0 +1,109 @@ +use core::mem; + +use aya_ebpf::{bindings::TC_ACT_PIPE, helpers::bpf_get_current_pid_tgid, programs::TcContext}; +use network_types::{ + eth::EthHdr, + ip::{IpProto, Ipv4Hdr}, + tcp::TcpHdr, + udp::UdpHdr, +}; +use nflux_common::EgressConfig; + +use crate::logger::log_connection; + +#[inline] +fn ptr_at(ctx: &TcContext, offset: usize) -> Result<*const T, ()> { + let start = ctx.data(); + let end = ctx.data_end(); + let len = mem::size_of::(); + + if start + offset + len > end { + return Err(()); + } + + Ok((start + offset) as *const T) +} + +pub fn handle_icmp_packet( + ctx: &TcContext, + egress_config: &EgressConfig, + destination: u32, +) -> Result { + let pid_tgid = { bpf_get_current_pid_tgid() }; + let pid = pid_tgid >> 32; + + if egress_config.log_icmp_connections == 1 { + unsafe { + log_connection( + ctx, + egress_config.log_only_new_connections, + destination, + 0, + 0, + IpProto::Icmp as u8, + pid, + ) + }; + } + + Ok(TC_ACT_PIPE) +} + +pub fn handle_tcp_packet( + ctx: &TcContext, + egress_config: &EgressConfig, + destination: u32, +) -> Result { + let tcphdr: *const TcpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; + + let src_port = u16::from_be((unsafe { *tcphdr }).source); + let dst_port = u16::from_be((unsafe { *tcphdr }).dest); + let protocol = IpProto::Tcp as u8; + let pid_tgid = { bpf_get_current_pid_tgid() }; + let pid = pid_tgid >> 32; + + if egress_config.log_tcp_connections == 1 { + unsafe { + log_connection( + ctx, + egress_config.log_only_new_connections, + destination, + src_port, + dst_port, + protocol, + pid, + ) + }; + } + + Ok(TC_ACT_PIPE) +} + +pub fn handle_udp_packet( + ctx: &TcContext, + egress_config: &EgressConfig, + destination: u32, +) -> Result { + let udphdr: *const UdpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; + let src_port = u16::from_be((unsafe { *udphdr }).source); + let dst_port = u16::from_be((unsafe { *udphdr }).dest); + let protocol = IpProto::Udp as u8; + let pid_tgid = { bpf_get_current_pid_tgid() }; + let pid = pid_tgid >> 32; + + if egress_config.log_udp_connections == 1 { + unsafe { + log_connection( + ctx, + egress_config.log_only_new_connections, + destination, + src_port, + dst_port, + protocol, + pid, + ) + }; + } + + Ok(TC_ACT_PIPE) +} diff --git a/nflux-ebpf/src/logger.rs b/nflux-ebpf/src/logger.rs new file mode 100644 index 0000000..858e91a --- /dev/null +++ b/nflux-ebpf/src/logger.rs @@ -0,0 +1,50 @@ +use aya_ebpf::programs::TcContext; +use nflux_common::EgressEvent; + +use crate::maps::{ACTIVE_CONNECTIONS, EGRESS_EVENT}; + +#[inline] +pub unsafe fn log_connection( + ctx: &TcContext, + log_new_connection: u8, + destination: u32, + src_port: u16, + dst_port: u16, + protocol: u8, + pid: u64, +) { + // If log_only_new_connections is enabled + // Only log connections to different endpoints (ips) + match log_new_connection { + 0 => { + // Log all connections + let event = EgressEvent { + dst_ip: destination, + src_port, + dst_port, + protocol, + pid, + }; + EGRESS_EVENT.output(ctx, &event, 0); + } + 1 => { + // Check if this destination is already active + if ACTIVE_CONNECTIONS.get(&destination).is_none() { + let event = EgressEvent { + dst_ip: destination, + src_port, + dst_port, + protocol, + pid, + }; + EGRESS_EVENT.output(ctx, &event, 0); + + // Mark connection as active + if ACTIVE_CONNECTIONS.insert(&destination, &1, 0).is_err() { + return; + } + } + } + _ => {} + } +} diff --git a/nflux-ebpf/src/main.rs b/nflux-ebpf/src/main.rs index ccd53d0..68654d6 100644 --- a/nflux-ebpf/src/main.rs +++ b/nflux-ebpf/src/main.rs @@ -4,6 +4,8 @@ mod egress; mod firewall; +mod handlers; +mod logger; mod maps; use aya_ebpf::bindings::xdp_action::XDP_ABORTED; diff --git a/nflux-ebpf/src/old_main.rs b/nflux-ebpf/src/old_main.rs index 4ff9195..fc9d543 100644 --- a/nflux-ebpf/src/old_main.rs +++ b/nflux-ebpf/src/old_main.rs @@ -4,8 +4,12 @@ mod egress; +use crate::egress::try_tc_egress; +use aya_ebpf::bindings::{TC_ACT_PIPE, TC_ACT_SHOT}; +use aya_ebpf::macros::classifier; use aya_ebpf::maps::lpm_trie::Key; use aya_ebpf::maps::{Array, LpmTrie, LruHashMap}; +use aya_ebpf::programs::TcContext; use aya_ebpf::{ bindings::xdp_action, macros::{map, xdp}, @@ -13,9 +17,6 @@ use aya_ebpf::{ programs::XdpContext, }; use core::mem; -use aya_ebpf::bindings::{TC_ACT_PIPE, TC_ACT_SHOT}; -use aya_ebpf::macros::classifier; -use aya_ebpf::programs::TcContext; use network_types::ip::{IpProto, Ipv6Hdr}; use network_types::{ eth::{EthHdr, EtherType}, @@ -24,7 +25,6 @@ use network_types::{ udp::UdpHdr, }; use nflux_common::{ConnectionEvent, EgressEvent, IpRule, LpmKeyIpv4, LpmKeyIpv6}; -use crate::egress::try_tc_egress; #[map] static IPV4_RULES: LpmTrie = LpmTrie::with_max_entries(1024, 0); @@ -96,7 +96,8 @@ fn start_nflux(ctx: XdpContext) -> Result { if let Some(rule) = IPV4_RULES.get(&key) { match proto { IpProto::Tcp => { - let tcphdr: *const TcpHdr = unsafe { ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)? }; + let tcphdr: *const TcpHdr = + unsafe { ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)? }; let dst_port = u16::from_be(unsafe { (*tcphdr).dest }); let syn = unsafe { (*tcphdr).syn() }; let ack = unsafe { (*tcphdr).ack() }; diff --git a/nflux.toml b/nflux.toml index 7bd5a2c..66761c6 100644 --- a/nflux.toml +++ b/nflux.toml @@ -1,5 +1,5 @@ [logging] -log_level = "debug" # trace, debug, info, warn, or error (default: info) +log_level = "info" # trace, debug, info, warn, or error (default: info) log_type = "text" # text or json (default: text) [firewall] @@ -17,13 +17,14 @@ icmp_ping = "false" # Allow or deny ICMP ping requests [egress] enabled = "true" physical_interfaces = ["enp0s20f0u4"] # Physical interfaces, your LAN interface -virtual_interfaces = ["proton0"] # If using VPN, add the virtual interface here +virtual_interfaces = [] # If using VPN, add the virtual interface here [egress.logging] +# log_only_new_connections: # For example, if you perform 1000 requests to google.com, just log 1 (or every you execute) # This affects to tcp and udp connections -log_only_new_connections = "false" -log_udp_connections = "false" # Decide if udp packets should be logged +log_only_new_connections = "true" +log_udp_connections = "true" # Decide if udp packets should be logged log_tcp_connections = "true" # Decide if tcp packets should be logged log_icmp_connections = "true" # Decide if icmp packets should be logged diff --git a/nflux/src/main.rs b/nflux/src/main.rs index a84ad7d..baed7f7 100644 --- a/nflux/src/main.rs +++ b/nflux/src/main.rs @@ -8,7 +8,6 @@ use anyhow::Context; use aya::maps::AsyncPerfEventArray; use aya::util::online_cpus; use aya::{include_bytes_aligned, Ebpf}; -use aya_log::EbpfLogger; use std::process; use crate::egress::process_egress_events; @@ -20,7 +19,7 @@ use egress::{ use firewall::{attach_xdp_program, process_firewall_events}; use logger::setup_logger; use tokio::task; -use tracing::{error, info, warn}; +use tracing::{error, info}; use utils::{is_root_user, print_firewall_rules, set_mem_limit, wait_for_shutdown}; #[tokio::main] @@ -47,9 +46,9 @@ async fn main() -> anyhow::Result<()> { let mut bpf = Ebpf::load(include_bytes_aligned!(concat!(env!("OUT_DIR"), "/nflux")))?; // Necessary to debug something in the ebpf code - if let Err(e) = EbpfLogger::init(&mut bpf) { - warn!("failed to initialize eBPF logger: {}", e); - } + // if let Err(e) = EbpfLogger::init(&mut bpf) { + // warn!("failed to initialize eBPF logger: {}", e); + // } // Attach XDP program (monitor ingress connections to local ports) match config.firewall.enabled {