diff --git a/client.go b/client.go index cce21b9..528a650 100644 --- a/client.go +++ b/client.go @@ -27,6 +27,7 @@ type HTTPClientSettings struct { FollowRedirects bool TCPTimeout time.Duration TLSHandshakeTimeout time.Duration + RandomLocalIP bool } type CustomHTTPClient struct { @@ -43,6 +44,7 @@ type CustomHTTPClient struct { FullOnDisk bool MaxReadBeforeTruncate int DataTotal *ratecounter.Counter + randomLocalIP bool } func (c *CustomHTTPClient) Close() error { @@ -69,6 +71,12 @@ func (c *CustomHTTPClient) Close() error { func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient *CustomHTTPClient, err error) { httpClient = new(CustomHTTPClient) + // Configure random local IP + httpClient.randomLocalIP = HTTPClientSettings.RandomLocalIP + if httpClient.randomLocalIP { + go getAvailableIPs() + } + // Init data counters httpClient.DataTotal = new(ratecounter.Counter) diff --git a/dialer.go b/dialer.go index f6074c4..9500d23 100644 --- a/dialer.go +++ b/dialer.go @@ -93,6 +93,17 @@ func (d *customDialer) CustomDial(network, address string) (conn net.Conn, err e return nil, err } } else { + if d.client.randomLocalIP { + localAddr := getLocalAddr(network, address) + if localAddr != nil { + if network == "tcp" { + d.LocalAddr = localAddr.(*net.TCPAddr) + } else if network == "udp" { + d.LocalAddr = localAddr.(*net.UDPAddr) + } + } + } + conn, err = d.Dial(network, address) if err != nil { return nil, err @@ -114,6 +125,17 @@ func (d *customDialer) CustomDialTLS(network, address string) (net.Conn, error) return nil, err } } else { + if d.client.randomLocalIP { + localAddr := getLocalAddr(network, address) + if localAddr != nil { + if network == "tcp" { + d.LocalAddr = localAddr.(*net.TCPAddr) + } else if network == "udp" { + d.LocalAddr = localAddr.(*net.UDPAddr) + } + } + } + plainConn, err = d.Dial(network, address) if err != nil { return nil, err diff --git a/random_local_ip.go b/random_local_ip.go new file mode 100644 index 0000000..c18aa8a --- /dev/null +++ b/random_local_ip.go @@ -0,0 +1,124 @@ +package warc + +import ( + "net" + "strings" + "sync" + "sync/atomic" + "time" +) + +var ( + IPv6 *availableIPs + IPv4 *availableIPs +) + +type availableIPs struct { + sync.Mutex + Index uint32 + IPs []net.IP +} + +func getAvailableIPs() (IPs []net.IP, err error) { + if IPv6 == nil { + IPv6 = &availableIPs{} + } + + if IPv4 == nil { + IPv4 = &availableIPs{} + } + + for { + // Get all network interfaces + interfaces, err := net.Interfaces() + if err != nil { + time.Sleep(time.Second) + continue + } + + // Iterate over the interfaces + var newIPv4 []net.IP + var newIPv6 []net.IP + for _, iface := range interfaces { + if strings.Contains(iface.Name, "docker") { + continue + } + + // Get the addresses associated with the interface + addrs, err := iface.Addrs() + if err != nil { + continue + } + + // Iterate over the addresses + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if ok && !ipNet.IP.IsLoopback() { + // Add IPv6 addresses to the list + if ipNet.IP.IsGlobalUnicast() { + if ipNet.IP.To4() == nil && !strings.HasPrefix(ipNet.IP.String(), "fe80") { + newIPv6 = append(newIPv6, ipNet.IP) + } else if ipNet.IP.To4() != nil { + // Add IPv4 addresses to the list + newIPv4 = append(newIPv4, ipNet.IP) + } + } + } + } + } + + // Add the new addresses to the list + IPv6.Lock() + IPv6.IPs = newIPv6 + IPv6.Unlock() + + IPv4.Lock() + IPv4.IPs = newIPv4 + IPv4.Unlock() + + time.Sleep(time.Second) + } +} + +func getNextIP(availableIPs *availableIPs) net.IP { + availableIPs.Lock() + defer availableIPs.Unlock() + + if len(availableIPs.IPs) == 0 { + return nil + } + + currentIndex := atomic.AddUint32(&availableIPs.Index, 1) - 1 + ip := availableIPs.IPs[int(currentIndex)%len(availableIPs.IPs)] + + return ip +} + +func getLocalAddr(network, address string) any { + lastColon := strings.LastIndex(address, ":") + destAddr := address[:lastColon] + + destAddr = strings.TrimPrefix(destAddr, "[") + destAddr = strings.TrimSuffix(destAddr, "]") + + destIP := net.ParseIP(destAddr) + if destIP == nil { + return nil + } + + if destIP.To4() != nil { + if network == "tcp" { + return &net.TCPAddr{IP: getNextIP(IPv4)} + } else if network == "udp" { + return &net.UDPAddr{IP: getNextIP(IPv4)} + } + return nil + } else { + if network == "tcp" { + return &net.TCPAddr{IP: getNextIP(IPv6)} + } else if network == "udp" { + return &net.UDPAddr{IP: getNextIP(IPv6)} + } + return nil + } +} diff --git a/read.go b/read.go index 3fadf15..4ac5d9e 100644 --- a/read.go +++ b/read.go @@ -19,7 +19,6 @@ type Reader struct { func (r *Reader) Close() { r.gzipReader.Close() } - type reader interface { ReadString(delim byte) (line string, err error) } diff --git a/utils.go b/utils.go index 1a6be3c..c083f89 100644 --- a/utils.go +++ b/utils.go @@ -144,7 +144,7 @@ func NewWriter(writer io.Writer, fileName string, compression string, contentLen FileWriter: bufio.NewWriter(gzipWriter), }, nil } else if compression == "ZSTD" { - zstdWriter, err := zstd.NewWriter(writer) + zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.SpeedBestCompression)) if err != nil { return nil, err }