Skip to content

Commit

Permalink
Wip: egress connection
Browse files Browse the repository at this point in the history
  • Loading branch information
containerscrew committed Jan 18, 2025
1 parent 392220d commit 67bdb2c
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 88 deletions.
7 changes: 3 additions & 4 deletions nflux-ebpf/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use core::mem;

use aya_ebpf::{
bindings::TC_ACT_PIPE, helpers::bpf_get_current_pid_tgid, programs::TcContext
};
use aya_ebpf::{bindings::TC_ACT_PIPE, helpers::bpf_get_current_pid_tgid, programs::TcContext};

use network_types::{
eth::EthHdr,
ip::{IpProto, Ipv4Hdr},
Expand Down Expand Up @@ -31,7 +30,7 @@ pub fn handle_icmp_packet(
egress_config: &EgressConfig,
destination: u32,
) -> Result<i32, ()> {
let pid_tgid = { bpf_get_current_pid_tgid() };
let pid_tgid = { bpf_get_current_pid_tgid() };
let pid = pid_tgid >> 32;

if egress_config.log_icmp_connections == 1 {
Expand Down
6 changes: 3 additions & 3 deletions nflux.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ physical_interfaces = ["enp0s20f0u4"] # Physical interfaces, your LAN interface
# Traffic sniffer for virtual interfaces like wireguard/openvpn, is not working correctly
# If you are using a VPN, switch off if you want to use the egress logging feature
# You will see the egress traffic for your physical interface (wifi or ethernet)
virtual_interfaces = ["proton0"] # If using VPN, add the virtual interface here
virtual_interfaces = [] # If using VPN, add the virtual interface here

[egress.logging]
# log_only_new_connections:
# For example, if you perform 1000 requests to google.com, just log 1 (or every you execute)
# This affects to tcp and udp connections
log_only_new_connections = "true"
log_only_new_connections = "false"
log_udp_connections = "false" # Decide if udp packets should be logged
log_tcp_connections = "true" # Decide if tcp packets should be logged
log_icmp_connections = "false" # Decide if icmp packets should be logged
log_icmp_connections = "true" # Decide if icmp packets should be logged

#[egress_rules]
# TODO: filter outgoint traffic (block/deny)
Expand Down
88 changes: 15 additions & 73 deletions nflux/src/egress.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::config::{Egress, IsEnabled};
use crate::utils::{get_process_name, lookup_address};
use anyhow::Context;
use aya::maps::perf::{AsyncPerfEventArrayBuffer, PerfBufferError};
use aya::maps::{Array, MapData};
Expand Down Expand Up @@ -43,16 +42,17 @@ pub fn populate_egress_config(bpf: &mut Ebpf, config: Egress) -> anyhow::Result<
Ok(())
}

pub fn attach_tc_egress_program_virtual_interfaces(
pub fn attach_tc_egress_program(
bpf: &mut Ebpf,
virtual_interfaces: &[String],
program_name: &str,
interfaces: &[String],
) -> anyhow::Result<()> {
// Retrieve the eBPF program
let program = match bpf.program_mut("tc_egress_virtual") {
let program = match bpf.program_mut(program_name) {
Some(p) => p,
None => {
error!("Failed to find the tc_egress program in BPF object");
return Err(anyhow::anyhow!("tc_egress program not found"));
error!("Failed to find the {} program in BPF object", program_name);
return Err(anyhow::anyhow!("{} program not found", program_name));
}
};

Expand All @@ -61,21 +61,21 @@ pub fn attach_tc_egress_program_virtual_interfaces(
Ok(p) => p,
Err(e) => {
error!(
"Failed to convert tc_egress program to SchedClassifier: {:?}",
e
"Failed to convert {} program to SchedClassifier: {:?}",
program_name, e
);
return Err(e.into());
}
};

// Load the eBPF program into the kernel
if let Err(e) = program.load() {
error!("Failed to load tc_egress program: {:?}", e);
error!("Failed to load {} program: {:?}", program_name, e);
return Err(e.into());
}

// Iterate over interfaces and attach the program
for interface in virtual_interfaces {
for interface in interfaces {
// Add clsact qdisc to the interface
if let Err(e) = tc::qdisc_add_clsact(interface) {
warn!(
Expand All @@ -87,68 +87,12 @@ pub fn attach_tc_egress_program_virtual_interfaces(
// Attach the eBPF program to the egress path of the interface
if let Err(e) = program.attach(interface, TcAttachType::Egress) {
error!(
"Failed to attach tc_egress program to interface {}: {:?}",
interface, e
);
return Err(anyhow::anyhow!(
"Failed to attach tc_egress program to interface {}",
interface
));
}
}

Ok(())
}

pub fn attach_tc_egress_program_physical_interfaces(
bpf: &mut Ebpf,
physical_interfaces: &[String],
) -> anyhow::Result<()> {
// Retrieve the eBPF program
let program = match bpf.program_mut("tc_egress_physical") {
Some(p) => p,
None => {
error!("Failed to find the tc_egress program in BPF object");
return Err(anyhow::anyhow!("tc_egress program not found"));
}
};

// Try converting the program into a SchedClassifier
let program: &mut SchedClassifier = match program.try_into() {
Ok(p) => p,
Err(e) => {
error!(
"Failed to convert tc_egress program to SchedClassifier: {:?}",
e
);
return Err(e.into());
}
};

// Load the eBPF program into the kernel
if let Err(e) = program.load() {
error!("Failed to load tc_egress program: {:?}", e);
return Err(e.into());
}

// Iterate over interfaces and attach the program
for interface in physical_interfaces {
// Add clsact qdisc to the interface
if let Err(e) = tc::qdisc_add_clsact(interface) {
warn!(
"Failed to add clsact qdisc to interface {}: {:?}",
interface, e
);
}

// Attach the eBPF program to the egress path of the interface
if let Err(e) = program.attach(interface, TcAttachType::Egress) {
error!(
"Failed to attach tc_egress program to interface {}: {:?}",
interface, e
"Failed to attach {} program to interface {}: {:?}",
program_name, interface, e
);
return Err(anyhow::anyhow!(
"Failed to attach tc_egress program to interface {}",
"Failed to attach {} program to interface {}",
program_name,
interface
));
}
Expand All @@ -173,14 +117,12 @@ pub async fn process_egress_events(
match parse_egress_event(buf) {
Ok(event) => {
info!(
"new egress connection protocol={}, ip={}, src_port={}, dst_port={}, fqdn={}, pid={}, comm={}",
"event=egress protocol={}, ip={}, src_port={}, dst_port={}, pid={}",
convert_protocol(event.protocol),
Ipv4Addr::from(event.dst_ip),
event.src_port,
event.dst_port,
lookup_address(event.dst_ip),
event.pid,
get_process_name(event.pid)
);
}
Err(e) => error!("Failed to parse egress event on CPU {}: {}", cpu_id, e),
Expand Down
11 changes: 5 additions & 6 deletions nflux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use std::process;

use crate::egress::process_egress_events;
use config::{IsEnabled, Nflux};
use egress::{
attach_tc_egress_program_physical_interfaces, attach_tc_egress_program_virtual_interfaces,
populate_egress_config,
};
use egress::{attach_tc_egress_program, populate_egress_config};
use firewall::{attach_xdp_program, process_firewall_events};
use logger::setup_logger;
use tokio::task;
Expand Down Expand Up @@ -76,8 +73,9 @@ async fn main() -> anyhow::Result<()> {
"Attaching TC egress program to physical interfaces: {:?}",
config.egress.physical_interfaces
);
attach_tc_egress_program_physical_interfaces(
attach_tc_egress_program(
&mut bpf,
"tc_egress_physical",
&config.egress.physical_interfaces,
)?;
}
Expand All @@ -87,8 +85,9 @@ async fn main() -> anyhow::Result<()> {
"Attaching TC egress program to virtual interfaces: {:?}",
config.egress.virtual_interfaces
);
attach_tc_egress_program_virtual_interfaces(
attach_tc_egress_program(
&mut bpf,
"tc_egress_virtual",
&config.egress.virtual_interfaces,
)?;
}
Expand Down
4 changes: 2 additions & 2 deletions nflux/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn parse_cidr_v6(cidr: &str) -> anyhow::Result<(Ipv6Addr, u32)> {
Ok((ip, prefix_len))
}

pub fn lookup_address(ip: u32) -> String {
pub fn _lookup_address(ip: u32) -> String {
match is_private_ip(ip) {
true => "Private IP".to_string(),
false => {
Expand All @@ -79,7 +79,7 @@ pub fn lookup_address(ip: u32) -> String {
}
}

pub fn get_process_name(pid: u64) -> String {
pub fn _get_process_name(pid: u64) -> String {
let mut s = System::new_all();

// Is this causing overhead?
Expand Down

0 comments on commit 67bdb2c

Please sign in to comment.