diff --git a/docker/Dockerfile b/docker/Dockerfile index 8116ed4..1572a89 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -8,6 +8,7 @@ RUN set -eux ;\ cargo install bpf-linker ;\ rustup install stable && rustup toolchain install nightly --component rust-src +# cargo xtask build --release RUN cargo xtask build --release FROM gcr.io/distroless/cc-debian12 diff --git a/docs/todo.md b/docs/todo.md index 3da2760..88f411a 100644 --- a/docs/todo.md +++ b/docs/todo.md @@ -9,3 +9,4 @@ - [3] Implement in how much time a log_connection_event will be sended to Perf Ring Buffer. - [3] Allow the user to change the config in the runtime. - [4] Implement default values in nflux.toml (config.rs) +- [5] Rate limiting to avoid DoS attacks. diff --git a/nflux-common/src/lib.rs b/nflux-common/src/lib.rs index 1795dfa..f8a75e0 100644 --- a/nflux-common/src/lib.rs +++ b/nflux-common/src/lib.rs @@ -14,6 +14,12 @@ pub struct ConnectionEvent { pub action: u8, // 0 for deny, 1 for allow } +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct EgressEvent { + pub dst_ip: u32, +} + #[repr(C)] #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct IpRule { diff --git a/nflux-ebpf/src/main.rs b/nflux-ebpf/src/main.rs index 12ff16a..23bfb6d 100644 --- a/nflux-ebpf/src/main.rs +++ b/nflux-ebpf/src/main.rs @@ -11,6 +11,9 @@ use aya_ebpf::{ programs::XdpContext, }; use core::mem; +use aya_ebpf::bindings::{TC_ACT_PIPE, TC_ACT_SHOT}; +use aya_ebpf::macros::classifier; +use aya_ebpf::programs::TcContext; use network_types::ip::{IpProto, Ipv6Hdr}; use network_types::{ eth::{EthHdr, EtherType}, @@ -18,13 +21,7 @@ use network_types::{ tcp::TcpHdr, udp::UdpHdr, }; -use nflux_common::{ConnectionEvent, IpRule, LpmKeyIpv4, LpmKeyIpv6}; - -#[cfg(not(test))] -#[panic_handler] -fn panic(_info: &core::panic::PanicInfo) -> ! { - loop {} -} +use nflux_common::{ConnectionEvent, EgressEvent, IpRule, LpmKeyIpv4, LpmKeyIpv6}; #[map] static IPV4_RULES: LpmTrie = LpmTrie::with_max_entries(1024, 0); @@ -38,6 +35,9 @@ static CONNECTION_EVENTS: PerfEventArray = PerfEventArray::new( #[map] static ICMP_RULE: Array = Array::with_max_entries(1, 0); +#[map] +static EGRESS_EVENT: PerfEventArray = PerfEventArray::new(0); + #[xdp] pub fn nflux(ctx: XdpContext) -> u32 { match start_nflux(ctx) { @@ -46,6 +46,11 @@ pub fn nflux(ctx: XdpContext) -> u32 { } } +#[classifier] +pub fn tc_egress(ctx: TcContext) -> i32 { + try_tc_egress(ctx).unwrap_or_else(|_| TC_ACT_SHOT) +} + #[inline(always)] unsafe fn ptr_at(ctx: &XdpContext, offset: usize) -> Result<*const T, ()> { let start = ctx.data(); @@ -70,6 +75,23 @@ fn log_new_connection(ctx: XdpContext, src_addr: u32, dst_port: u16, protocol: u CONNECTION_EVENTS.output(&ctx, &event, 0); } +fn try_tc_egress(ctx: TcContext) -> Result { + let ethhdr: EthHdr = ctx.load(0).map_err(|_| ())?; + match ethhdr.ether_type { + EtherType::Ipv4 => {} + _ => return Ok(TC_ACT_PIPE), + } + + let ipv4hdr: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?; + let destination = u32::from_be(ipv4hdr.dst_addr); + + let event = EgressEvent { dst_ip: destination }; + + EGRESS_EVENT.output(&ctx, &event, 0); + + Ok(TC_ACT_PIPE) +} + fn start_nflux(ctx: XdpContext) -> Result { let ethhdr: *const EthHdr = unsafe { ptr_at(&ctx, 0)? }; @@ -249,3 +271,9 @@ fn start_nflux(ctx: XdpContext) -> Result { _ => Ok(xdp_action::XDP_DROP), } } + +#[cfg(not(test))] +#[panic_handler] +fn panic(_info: &core::panic::PanicInfo) -> ! { + loop {} +} diff --git a/nflux/src/main.rs b/nflux/src/main.rs index 2325363..3368bc3 100644 --- a/nflux/src/main.rs +++ b/nflux/src/main.rs @@ -8,7 +8,7 @@ use anyhow::Context; use aya::maps::lpm_trie::Key; use aya::maps::perf::{AsyncPerfEventArrayBuffer, PerfBufferError}; use aya::maps::{AsyncPerfEventArray, LpmTrie, MapData}; -use aya::programs::{Xdp, XdpFlags}; +use aya::programs::{tc, SchedClassifier, TcAttachType, Xdp, XdpFlags}; use aya::util::online_cpus; use aya::{include_bytes_aligned, Ebpf}; use bytes::BytesMut; @@ -56,19 +56,36 @@ async fn main() -> anyhow::Result<()> { .context( "Failed to attach XDP program. Ensure the interface is physical and not virtual.", )?; - - // Log startup info - info!("nflux started successfully!"); info!( "XDP program attached to interface: {:?}", config.nflux.interface_names[0] ); + // Attach TC program + let _ = tc::qdisc_add_clsact(&config.nflux.interface_names[0]); + let program: &mut SchedClassifier = + bpf.program_mut("tc_egress").unwrap().try_into()?; + program.load()?; + program.attach(&config.nflux.interface_names[0], TcAttachType::Egress)?; + info!( + "TC egress program attached to interface: {:?}", + config.nflux.interface_names[0] + ); + + // Log startup info + info!("nflux started successfully!"); + // Start processing events from the eBPF program let mut events = AsyncPerfEventArray::try_from( bpf.take_map("CONNECTION_EVENTS") .context("Failed to find CONNECTION_EVENTS map")?, )?; + + let mut egress_events = AsyncPerfEventArray::try_from( + bpf.take_map("EGRESS_EVENTS") + .context("Failed to find EGRESS_EVENTS map")?, + )?; + let cpus = online_cpus().map_err(|(_, error)| error)?; for cpu_id in cpus { @@ -97,8 +114,7 @@ async fn process_events( match parse_connection_event(buf) { Ok(event) => { info!( - "CPU={} program=xdp protocol={} port={} ip={} action={}", - cpu_id, + "direction=incoming protocol={} port={} ip={} action={}", convert_protocol(event.protocol), event.dst_port, Ipv4Addr::from(event.src_addr),