Using eBPF to load-balance traffic across UDP sockets with Go

Vincent Bernat

Akvorado collects sFlow and IPFIX flows over UDP. Because UDP does not retransmit lost packets, it needs to process them quickly. Akvorado runs several workers listening to the same port. The kernel should load-balance received packets fairly between these workers. However, this does not work as expected. A couple of workers exhibit high packet loss:

$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \
> | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p
packets_total{listener="0.0.0.0:2055",worker="0"} 0
packets_total{listener="0.0.0.0:2055",worker="1"} 0
packets_total{listener="0.0.0.0:2055",worker="2"} 0
packets_total{listener="0.0.0.0:2055",worker="3"} 1.614933572278264e+15
packets_total{listener="0.0.0.0:2055",worker="4"} 0
packets_total{listener="0.0.0.0:2055",worker="5"} 0
packets_total{listener="0.0.0.0:2055",worker="6"} 9.59964121598348e+14
packets_total{listener="0.0.0.0:2055",worker="7"} 0

eBPF can help by implementing an alternate balancing algorithm. 🐝

Options for load-balancing#

There are three methods to load-balance UDP packets across workers:

  1. One worker receives the packets and dispatches them to the other workers.
  2. All workers share the same socket.
  3. Each worker has its own socket, listening to the same port, with the SO_REUSEPORT socket option.

SO_REUSEPORT option#

Tom Hebert added the SO_REUSEPORT socket option in Linux 3.9. The cover letter for his patch series explains why this new option is better than the two existing ones from a performance point of view:

SO_REUSEPORT allows multiple listener sockets to be bound to the same port. […] Received packets are distributed to multiple sockets bound to the same port using a 4-tuple hash.

The motivating case for SO_RESUSEPORT in TCP would be something like a web server binding to port 80 running with multiple threads, where each thread might have it’s own listener socket. This could be done as an alternative to other models:

  1. have one listener thread which dispatches completed connections to workers, or
  2. accept on a single listener socket from multiple threads.

In case #1, the listener thread can easily become the bottleneck with high connection turn-over rate. In case #2, the proportion of connections accepted per thread tends to be uneven under high connection load. […] We have seen the disproportion to be as high as 3:1 ratio between thread accepting most connections and the one accepting the fewest. With SO_REUSEPORT the distribution is uniform.

The motivating case for SO_REUSEPORT in UDP would be something like a DNS server. An alternative would be to receive on the same socket from multiple threads. As in the case of TCP, the load across these threads tends to be disproportionate and we also see a lot of contection on the socket lock.

Akvorado uses the SO_REUSEPORT option to dispatch the packets across the workers. However, because the distribution uses a 4-tuple hash, a single socket handles all the flows from one exporter.

SO_ATTACH_REUSEPORT_EBPF option#

In Linux 4.5, Craig Gallek added the SO_ATTACH_REUSEPORT_EBPF option to attach an eBPF program to select the target UDP socket. In Linux 4.6, he extended it to support TCP. The socket(7) manual page documents this mechanism:1

The BPF program must return an index between 0 and N-1 representing the socket which should receive the packet (where N is the number of sockets in the group). If the BPF program returns an invalid index, socket selection will fall back to the plain SO_REUSEPORT mechanism.

In Linux 4.19, Martin KaFai Lau added the BPF_PROG_TYPE_SK_REUSEPORT program type. Such an eBPF program selects the socket from a BPF_MAP_TYPE_REUSEPORT_ARRAY map instead. This new approach is more reliable when switching target sockets from one instance to another—for example, when upgrading, a new instance can add its sockets and remove the old ones.

Load-balancing with eBPF and Go#

Altering the load-balancing algorithm for a group of sockets requires two steps:

  1. write and compile an eBPF program in C,2 and
  2. load it and attach it in Go.

eBPF program in C#

A simple load-balancing algorithm is to randomly choose the destination socket. The kernel provides the bpf_get_prandom_u32() helper function to get a pseudo-random number.

volatile const __u32 num_sockets; // ❶

struct {
    __uint(type, BPF_MAP_TYPE_REUSEPORT_SOCKARRAY);
    __type(key, __u32);
    __type(value, __u64);
    __uint(max_entries, 256);
} socket_map SEC(".maps"); // ❷

SEC("sk_reuseport")
int reuseport_balance_prog(struct sk_reuseport_md *reuse_md)
{
    __u32 index = bpf_get_prandom_u32() % num_sockets; // ❸
    bpf_sk_select_reuseport(reuse_md, &socket_map, &index, 0); // ❹
    return SK_PASS; // ❺
}

char _license[] SEC("license") = "GPL";

In ❶, we declare a volatile constant for the number of sockets in the group. We will initialize this constant before loading the eBPF program into the kernel. In ❷, we define the socket map. We will populate it with the socket file descriptors. In ❸, we randomly select the index of the target socket.3 In ❹, we invoke the bpf_sk_select_reuseport() helper to record our decision. Finally, in ❺, we accept the packet.

Header files#

If you compile the C source with clang, you get errors due to missing headers. The recommended way to solve this is to generate a vmlinux.h file with bpftool:

$ bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h

Then, include the following headers:4

#include "vmlinux.h"
#include <bpf/bpf_helpers.h>

For my 6.17 kernel, the generated vmlinux.h is quite big: 2.7 MiB. Moreover, bpf/bpf_helpers.h is shipped with libbpf. This adds another dependency for users. As the eBPF program is quite small, I prefer to put the strict minimum in vmlinux.h by cherry-picking the definitions I need.

Compilation#

The eBPF Library for Go ships bpf2go, a tool to compile eBPF programs and to generate some scaffolding code. We create a gen.go file with the following content:

package main

//go:generate go tool bpf2go -tags linux reuseport reuseport_kern.c

After running go generate ./..., we can inspect the resulting objects with readelf and llvm-objdump:

$ readelf -S reuseport_bpfeb.o
There are 14 section headers, starting at offset 0x840:
  [Nr] Name              Type             Address           Offset
[…]
  [ 3] sk_reuseport      PROGBITS         0000000000000000  00000040
  [ 6] .maps             PROGBITS         0000000000000000  000000c8
  [ 7] license           PROGBITS         0000000000000000  000000e8
[…]
$ llvm-objdump -S reuseport_bpfeb.o
reuseport_bpfeb.o:  file format elf64-bpf
Disassembly of section sk_reuseport:
0000000000000000 <reuseport_balance_prog>:
; {
       0:   bf 61 00 00 00 00 00 00     r6 = r1
;     __u32 index = bpf_get_prandom_u32() % num_sockets;
       1:   85 00 00 00 00 00 00 07     call 0x7
[…]

Usage from Go#

Let’s set up 10 workers listening to the same port.5 Each socket enables the SO_REUSEPORT option before binding:6

var (
    err error
    fds []uintptr
    conns []*net.UDPConn
)
workers := 10
listenAddr := "127.0.0.1:0"
listenConfig := net.ListenConfig{
    Control: func(_, _ string, c syscall.RawConn) error {
        c.Control(func(fd uintptr) {
            err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
            fds = append(fds, fd)
        })
        return err
    },
}
for range workers {
    pconn, err := listenConfig.ListenPacket(t.Context(), "udp", listenAddr)
    if err != nil {
        t.Fatalf("ListenPacket() error:\n%+v", err)
    }
    udpConn := pconn.(*net.UDPConn)
    listenAddr = udpConn.LocalAddr().String()
    conns = append(conns, udpConn)
}

The second step is to load the eBPF program, initialize the num_sockets variable, populate the socket map, and attach the program to the first socket.7

// Load the eBPF collection.
spec, err := loadReuseport()
if err != nil {
    t.Fatalf("loadVariables() error:\n%+v", err)
}

// Set "num_sockets" global variable to the number of file descriptors we will register
if err := spec.Variables["num_sockets"].Set(uint32(len(fds))); err != nil {
    t.Fatalf("NumSockets.Set() error:\n%+v", err)
}

// Load the map and the program into the kernel.
var objs reuseportObjects
if err := spec.LoadAndAssign(&objs, nil); err != nil {
    t.Fatalf("loadReuseportObjects() error:\n%+v", err)
}
t.Cleanup(func() { objs.Close() })

// Assign the file descriptors to the socket map.
for worker, fd := range fds {
    if err := objs.reuseportMaps.SocketMap.Put(uint32(worker), uint64(fd)); err != nil {
        t.Fatalf("SocketMap.Put() error:\n%+v", err)
    }
}

// Attach the eBPF program to the first socket.
socketFD := int(fds[0])
progFD := objs.reuseportPrograms.ReuseportBalanceProg.FD()
if err := unix.SetsockoptInt(socketFD, unix.SOL_SOCKET, unix.SO_ATTACH_REUSEPORT_EBPF, progFD); err != nil {
    t.Fatalf("SetsockoptInt() error:\n%+v", err)
}

We are now ready to process incoming packets. Each worker is a Go routine incrementing a counter for each received packet:8

var wg sync.WaitGroup
receivedPackets := make([]int, workers)
for worker := range workers {
    conn := conns[worker]
    packets := &receivedPackets[worker]
    wg.Go(func() {
        payload := make([]byte, 9000)
        for {
            if _, err := conn.Read(payload); err != nil {
                if errors.Is(err, net.ErrClosed) {
                    return
                }
                t.Logf("Read() error:\n%+v", err)
            }
            *packets++
        }
    })
}

Let’s send 1000 packets:

sentPackets := 1000
conn, err := net.Dial("udp", conns[0].LocalAddr().String())
if err != nil {
    t.Fatalf("Dial() error:\n%+v", err)
}
defer conn.Close()
for range sentPackets {
    if _, err := conn.Write([]byte("hello world!")); err != nil {
        t.Fatalf("Write() error:\n%+v", err)
    }
}

If we print the content of the receivedPackets array, we can check the balancing works as expected, with each worker getting about 100 packets:

=== RUN   TestUDPWorkerBalancing
    balancing_test.go:84: receivedPackets[0] = 107
    balancing_test.go:84: receivedPackets[1] = 92
    balancing_test.go:84: receivedPackets[2] = 99
    balancing_test.go:84: receivedPackets[3] = 105
    balancing_test.go:84: receivedPackets[4] = 107
    balancing_test.go:84: receivedPackets[5] = 96
    balancing_test.go:84: receivedPackets[6] = 102
    balancing_test.go:84: receivedPackets[7] = 105
    balancing_test.go:84: receivedPackets[8] = 99
    balancing_test.go:84: receivedPackets[9] = 88

    balancing_test.go:91: receivedPackets = 1000
    balancing_test.go:92: sentPackets     = 1000

Graceful restart#

You can also use SO_ATTACH_REUSEPORT_EBPF to gracefully restart an application. A new instance of the application binds to the same address and prepare its own version of the socket map. Once it attaches the eBPF program to the first socket, the kernel steers incoming packets to this new instance. The old instance needs to drain the already received packets before shutting down.

To check we are not losing any packet, we spawn a Go routine to send as many packets as possible:

sentPackets := 0
notSentPackets := 0
done := make(chan bool)
conn, err := net.Dial("udp", conns1[0].LocalAddr().String())
if err != nil {
    t.Fatalf("Dial() error:\n%+v", err)
}
defer conn.Close()
go func() {
    for {
        if _, err := conn.Write([]byte("hello world!")); err != nil {
            notSentPackets++
        } else {
            sentPackets++
        }
        select {
        case <-done:
            return
        default:
        }
    }
}()

Then, while the Go routine runs, we start the second set of workers. Once they are running, they start receiving packets. If we gracefully stop the initial set of workers, not a single packet is lost!9

=== RUN   TestGracefulRestart
    graceful_test.go:135: receivedPackets1[0] = 165
    graceful_test.go:135: receivedPackets1[1] = 195
    graceful_test.go:135: receivedPackets1[2] = 194
    graceful_test.go:135: receivedPackets1[3] = 190
    graceful_test.go:135: receivedPackets1[4] = 213
    graceful_test.go:135: receivedPackets1[5] = 187
    graceful_test.go:135: receivedPackets1[6] = 170
    graceful_test.go:135: receivedPackets1[7] = 190
    graceful_test.go:135: receivedPackets1[8] = 194
    graceful_test.go:135: receivedPackets1[9] = 155

    graceful_test.go:139: receivedPackets2[0] = 1631
    graceful_test.go:139: receivedPackets2[1] = 1582
    graceful_test.go:139: receivedPackets2[2] = 1594
    graceful_test.go:139: receivedPackets2[3] = 1611
    graceful_test.go:139: receivedPackets2[4] = 1571
    graceful_test.go:139: receivedPackets2[5] = 1660
    graceful_test.go:139: receivedPackets2[6] = 1587
    graceful_test.go:139: receivedPackets2[7] = 1605
    graceful_test.go:139: receivedPackets2[8] = 1631
    graceful_test.go:139: receivedPackets2[9] = 1689

    graceful_test.go:147: receivedPackets = 18014
    graceful_test.go:148: sentPackets     = 18014

Unfortunately, gracefully shutting down a UDP socket is not trivial in Go.10 Previously, we were terminating workers by closing their sockets. However, if we close them too soon, the application loses packets that were assigned to them but not yet processed. Before stopping, a worker needs to call conn.Read() until there are no more packets. A solution is to set a deadline for conn.Read() and check if we should stop the Go routine when the deadline is exceeded:

payload := make([]byte, 9000)
for {
    conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
    if _, err := conn.Read(payload); err != nil {
        if errors.Is(err, os.ErrDeadlineExceeded) {
            select {
            case <-done:
                return
            default:
                continue
            }
        }
        t.Logf("Read() error:\n%+v", err)
    }
    *packets++
}

With TCP, this aspect is simpler: after enabling the net.ipv4.tcp_migrate_req sysctl, the kernel automatically migrates waiting connections to a random socket in the same group. Alternatively, eBPF can also control this migration. Both features are available since Linux 5.14.

Addendum#

After implementing this strategy in Akvorado, all workers now drop packets! 😱

$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \
> | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p
packets_total{listener="0.0.0.0:2055",worker="0"} 838673
packets_total{listener="0.0.0.0:2055",worker="1"} 843675
packets_total{listener="0.0.0.0:2055",worker="2"} 837922
packets_total{listener="0.0.0.0:2055",worker="3"} 841443
packets_total{listener="0.0.0.0:2055",worker="4"} 840668
packets_total{listener="0.0.0.0:2055",worker="5"} 850274
packets_total{listener="0.0.0.0:2055",worker="6"} 835488
packets_total{listener="0.0.0.0:2055",worker="7"} 834479

The root cause is the default limit of 32 records for Kafka batch sizes. This limit is too low because the brokers have a large overhead when handling each batch: they need to ensure they persist correctly before acknowledging them. Increasing the limit to 4096 records fixes this issue.

While load-balancing incoming flows with eBPF remains useful, it did not solve the main issue. At least the even distribution of dropped packets helped identify the real bottleneck. 😅


  1. The current version of the manual page is incomplete and does not cover the evolution introduced in Linux 4.19. There is a pending patch about this. ↩︎

  2. Rust is another option. However, the program we use is so trivial that it does not make sense to use Rust. ↩︎

  3. As bpf_get_prandom_u32() returns a pseudo-random 32-bit unsigned value, this method exhibits a very slight bias towards the first indexes. This is unlikely to be worth fixing. ↩︎

  4. Some examples include <linux/bpf.h> instead of "vmlinux.h". This makes your eBPF program dependent on the installed kernel headers. ↩︎

  5. listenAddr is initially set to 127.0.0.1:0 to allocate a random port. After the first iteration, it is updated with the allocated port. ↩︎

  6. This is the setupSockets() function in fixtures_test.go↩︎

  7. This is the setupEBPF() function in fixtures_test.go↩︎

  8. The complete code is in balancing_test.go ↩︎

  9. The complete code is in graceful_test.go ↩︎

  10. In C, we would poll() both the socket and a pipe used to signal for shutdown. When the second condition is triggered, we drain the socket by executing a series of non-blocking read() until we get EWOULDBLOCK↩︎