Using eBPF to load-balance traffic across UDP sockets with Go
Vincent Bernat
Akvorado collects sFlow and IPFIX flows over UDP. Because UDP does not retransmit lost packets, it needs to process them quickly. Akvorado runs several workers listening to the same port. The kernel should load-balance received packets fairly between these workers. However, this does not work as expected. A couple of workers exhibit high packet loss:
$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \ > | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p packets_total{listener="0.0.0.0:2055",worker="0"} 0 packets_total{listener="0.0.0.0:2055",worker="1"} 0 packets_total{listener="0.0.0.0:2055",worker="2"} 0 packets_total{listener="0.0.0.0:2055",worker="3"} 1.614933572278264e+15 packets_total{listener="0.0.0.0:2055",worker="4"} 0 packets_total{listener="0.0.0.0:2055",worker="5"} 0 packets_total{listener="0.0.0.0:2055",worker="6"} 9.59964121598348e+14 packets_total{listener="0.0.0.0:2055",worker="7"} 0
eBPF can help by implementing an alternate balancing algorithm. 🐝
Options for load-balancing#
There are three methods to load-balance UDP packets across workers:
- One worker receives the packets and dispatches them to the other workers.
- All workers share the same socket.
- Each worker has its own socket, listening to the same port, with the
SO_REUSEPORTsocket option.
SO_REUSEPORT option#
Tom Hebert added the SO_REUSEPORT socket option in Linux 3.9. The
cover letter for his patch series explains why this new option is better than
the two existing ones from a performance point of view:
SO_REUSEPORTallows multiple listener sockets to be bound to the same port. […] Received packets are distributed to multiple sockets bound to the same port using a 4-tuple hash.The motivating case for
SO_RESUSEPORTin TCP would be something like a web server binding to port 80 running with multiple threads, where each thread might have it’s own listener socket. This could be done as an alternative to other models:
- have one listener thread which dispatches completed connections to workers, or
- accept on a single listener socket from multiple threads.
In case #1, the listener thread can easily become the bottleneck with high connection turn-over rate. In case #2, the proportion of connections accepted per thread tends to be uneven under high connection load. […] We have seen the disproportion to be as high as 3:1 ratio between thread accepting most connections and the one accepting the fewest. With
SO_REUSEPORTthe distribution is uniform.The motivating case for
SO_REUSEPORTin UDP would be something like a DNS server. An alternative would be to receive on the same socket from multiple threads. As in the case of TCP, the load across these threads tends to be disproportionate and we also see a lot of contection on the socket lock.
Akvorado uses the SO_REUSEPORT option to dispatch the packets across the
workers. However, because the distribution uses a 4-tuple hash, a single socket
handles all the flows from one exporter.
SO_ATTACH_REUSEPORT_EBPF option#
In Linux 4.5, Craig Gallek added the SO_ATTACH_REUSEPORT_EBPF
option to attach an eBPF program to select the target UDP socket. In Linux 4.6,
he extended it to support TCP. The socket(7) manual page
documents this mechanism:1
The BPF program must return an index between 0 and N-1 representing the socket which should receive the packet (where N is the number of sockets in the group). If the BPF program returns an invalid index, socket selection will fall back to the plain
SO_REUSEPORTmechanism.
In Linux 4.19, Martin KaFai Lau added the
BPF_PROG_TYPE_SK_REUSEPORT program type. Such an eBPF program
selects the socket from a BPF_MAP_TYPE_REUSEPORT_ARRAY map instead. This new
approach is more reliable when switching target sockets from one instance to
another—for example, when upgrading, a new instance can add its sockets and
remove the old ones.
Load-balancing with eBPF and Go#
Altering the load-balancing algorithm for a group of sockets requires two steps:
- write and compile an eBPF program in C,2 and
- load it and attach it in Go.
eBPF program in C#
A simple load-balancing algorithm is to randomly choose the destination socket.
The kernel provides the bpf_get_prandom_u32() helper function to get a
pseudo-random number.
volatile const __u32 num_sockets; // ❶ struct { __uint(type, BPF_MAP_TYPE_REUSEPORT_SOCKARRAY); __type(key, __u32); __type(value, __u64); __uint(max_entries, 256); } socket_map SEC(".maps"); // ❷ SEC("sk_reuseport") int reuseport_balance_prog(struct sk_reuseport_md *reuse_md) { __u32 index = bpf_get_prandom_u32() % num_sockets; // ❸ bpf_sk_select_reuseport(reuse_md, &socket_map, &index, 0); // ❹ return SK_PASS; // ❺ } char _license[] SEC("license") = "GPL";
In ❶, we declare a volatile constant for the number of sockets in the group. We
will initialize this constant before loading the eBPF program into the kernel.
In ❷, we define the socket map. We will populate it with the socket file
descriptors. In ❸, we randomly select the index of the target socket.3
In ❹, we invoke the bpf_sk_select_reuseport() helper to record our decision.
Finally, in ❺, we accept the packet.
Header files#
If you compile the C source with clang, you get errors due to missing headers.
The recommended way to solve this is to generate a vmlinux.h file with
bpftool:
$ bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h
Then, include the following headers:4
#include "vmlinux.h" #include <bpf/bpf_helpers.h>
For my 6.17 kernel, the generated vmlinux.h is quite big: 2.7 MiB. Moreover,
bpf/bpf_helpers.h is shipped with libbpf. This adds another dependency for
users. As the eBPF program is quite small, I prefer to put the strict minimum in
vmlinux.h by cherry-picking the definitions I need.
Compilation#
The eBPF Library for Go ships bpf2go, a tool to compile eBPF programs and
to generate some scaffolding code. We create a gen.go file with the following
content:
package main //go:generate go tool bpf2go -tags linux reuseport reuseport_kern.c
After running go generate ./..., we can inspect the resulting objects with
readelf and llvm-objdump:
$ readelf -S reuseport_bpfeb.o There are 14 section headers, starting at offset 0x840: [Nr] Name Type Address Offset […] [ 3] sk_reuseport PROGBITS 0000000000000000 00000040 [ 6] .maps PROGBITS 0000000000000000 000000c8 [ 7] license PROGBITS 0000000000000000 000000e8 […] $ llvm-objdump -S reuseport_bpfeb.o reuseport_bpfeb.o: file format elf64-bpf Disassembly of section sk_reuseport: 0000000000000000 <reuseport_balance_prog>: ; { 0: bf 61 00 00 00 00 00 00 r6 = r1 ; __u32 index = bpf_get_prandom_u32() % num_sockets; 1: 85 00 00 00 00 00 00 07 call 0x7 […]
Usage from Go#
Let’s set up 10 workers listening to the same port.5 Each socket
enables the SO_REUSEPORT option before binding:6
var ( err error fds []uintptr conns []*net.UDPConn ) workers := 10 listenAddr := "127.0.0.1:0" listenConfig := net.ListenConfig{ Control: func(_, _ string, c syscall.RawConn) error { c.Control(func(fd uintptr) { err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) fds = append(fds, fd) }) return err }, } for range workers { pconn, err := listenConfig.ListenPacket(t.Context(), "udp", listenAddr) if err != nil { t.Fatalf("ListenPacket() error:\n%+v", err) } udpConn := pconn.(*net.UDPConn) listenAddr = udpConn.LocalAddr().String() conns = append(conns, udpConn) }
The second step is to load the eBPF program, initialize the num_sockets
variable, populate the socket map, and attach the program to the first
socket.7
// Load the eBPF collection. spec, err := loadReuseport() if err != nil { t.Fatalf("loadVariables() error:\n%+v", err) } // Set "num_sockets" global variable to the number of file descriptors we will register if err := spec.Variables["num_sockets"].Set(uint32(len(fds))); err != nil { t.Fatalf("NumSockets.Set() error:\n%+v", err) } // Load the map and the program into the kernel. var objs reuseportObjects if err := spec.LoadAndAssign(&objs, nil); err != nil { t.Fatalf("loadReuseportObjects() error:\n%+v", err) } t.Cleanup(func() { objs.Close() }) // Assign the file descriptors to the socket map. for worker, fd := range fds { if err := objs.reuseportMaps.SocketMap.Put(uint32(worker), uint64(fd)); err != nil { t.Fatalf("SocketMap.Put() error:\n%+v", err) } } // Attach the eBPF program to the first socket. socketFD := int(fds[0]) progFD := objs.reuseportPrograms.ReuseportBalanceProg.FD() if err := unix.SetsockoptInt(socketFD, unix.SOL_SOCKET, unix.SO_ATTACH_REUSEPORT_EBPF, progFD); err != nil { t.Fatalf("SetsockoptInt() error:\n%+v", err) }
We are now ready to process incoming packets. Each worker is a Go routine incrementing a counter for each received packet:8
var wg sync.WaitGroup receivedPackets := make([]int, workers) for worker := range workers { conn := conns[worker] packets := &receivedPackets[worker] wg.Go(func() { payload := make([]byte, 9000) for { if _, err := conn.Read(payload); err != nil { if errors.Is(err, net.ErrClosed) { return } t.Logf("Read() error:\n%+v", err) } *packets++ } }) }
Let’s send 1000 packets:
sentPackets := 1000 conn, err := net.Dial("udp", conns[0].LocalAddr().String()) if err != nil { t.Fatalf("Dial() error:\n%+v", err) } defer conn.Close() for range sentPackets { if _, err := conn.Write([]byte("hello world!")); err != nil { t.Fatalf("Write() error:\n%+v", err) } }
If we print the content of the receivedPackets array, we can check the
balancing works as expected, with each worker getting about 100 packets:
=== RUN TestUDPWorkerBalancing
balancing_test.go:84: receivedPackets[0] = 107
balancing_test.go:84: receivedPackets[1] = 92
balancing_test.go:84: receivedPackets[2] = 99
balancing_test.go:84: receivedPackets[3] = 105
balancing_test.go:84: receivedPackets[4] = 107
balancing_test.go:84: receivedPackets[5] = 96
balancing_test.go:84: receivedPackets[6] = 102
balancing_test.go:84: receivedPackets[7] = 105
balancing_test.go:84: receivedPackets[8] = 99
balancing_test.go:84: receivedPackets[9] = 88
balancing_test.go:91: receivedPackets = 1000
balancing_test.go:92: sentPackets = 1000
Graceful restart#
You can also use SO_ATTACH_REUSEPORT_EBPF to gracefully restart an
application. A new instance of the application binds to the same address and
prepare its own version of the socket map. Once it attaches the eBPF program to
the first socket, the kernel steers incoming packets to this new instance. The
old instance needs to drain the already received packets before shutting down.
To check we are not losing any packet, we spawn a Go routine to send as many packets as possible:
sentPackets := 0 notSentPackets := 0 done := make(chan bool) conn, err := net.Dial("udp", conns1[0].LocalAddr().String()) if err != nil { t.Fatalf("Dial() error:\n%+v", err) } defer conn.Close() go func() { for { if _, err := conn.Write([]byte("hello world!")); err != nil { notSentPackets++ } else { sentPackets++ } select { case <-done: return default: } } }()
Then, while the Go routine runs, we start the second set of workers. Once they are running, they start receiving packets. If we gracefully stop the initial set of workers, not a single packet is lost!9
=== RUN TestGracefulRestart
graceful_test.go:135: receivedPackets1[0] = 165
graceful_test.go:135: receivedPackets1[1] = 195
graceful_test.go:135: receivedPackets1[2] = 194
graceful_test.go:135: receivedPackets1[3] = 190
graceful_test.go:135: receivedPackets1[4] = 213
graceful_test.go:135: receivedPackets1[5] = 187
graceful_test.go:135: receivedPackets1[6] = 170
graceful_test.go:135: receivedPackets1[7] = 190
graceful_test.go:135: receivedPackets1[8] = 194
graceful_test.go:135: receivedPackets1[9] = 155
graceful_test.go:139: receivedPackets2[0] = 1631
graceful_test.go:139: receivedPackets2[1] = 1582
graceful_test.go:139: receivedPackets2[2] = 1594
graceful_test.go:139: receivedPackets2[3] = 1611
graceful_test.go:139: receivedPackets2[4] = 1571
graceful_test.go:139: receivedPackets2[5] = 1660
graceful_test.go:139: receivedPackets2[6] = 1587
graceful_test.go:139: receivedPackets2[7] = 1605
graceful_test.go:139: receivedPackets2[8] = 1631
graceful_test.go:139: receivedPackets2[9] = 1689
graceful_test.go:147: receivedPackets = 18014
graceful_test.go:148: sentPackets = 18014
Unfortunately, gracefully shutting down a UDP socket is not trivial in Go.10
Previously, we were terminating workers by closing their sockets. However, if we
close them too soon, the application loses packets that were assigned to them
but not yet processed. Before stopping, a worker needs to call conn.Read()
until there are no more packets. A solution is to set a deadline for
conn.Read() and check if we should stop the Go routine when the deadline is
exceeded:
payload := make([]byte, 9000) for { conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) if _, err := conn.Read(payload); err != nil { if errors.Is(err, os.ErrDeadlineExceeded) { select { case <-done: return default: continue } } t.Logf("Read() error:\n%+v", err) } *packets++ }
With TCP, this aspect is simpler: after enabling the net.ipv4.tcp_migrate_req
sysctl, the kernel automatically migrates waiting connections to a random socket
in the same group. Alternatively, eBPF can also control this
migration. Both features are available since Linux 5.14.
Addendum#
After implementing this strategy in Akvorado, all workers now drop packets! 😱
$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \ > | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p packets_total{listener="0.0.0.0:2055",worker="0"} 838673 packets_total{listener="0.0.0.0:2055",worker="1"} 843675 packets_total{listener="0.0.0.0:2055",worker="2"} 837922 packets_total{listener="0.0.0.0:2055",worker="3"} 841443 packets_total{listener="0.0.0.0:2055",worker="4"} 840668 packets_total{listener="0.0.0.0:2055",worker="5"} 850274 packets_total{listener="0.0.0.0:2055",worker="6"} 835488 packets_total{listener="0.0.0.0:2055",worker="7"} 834479
The root cause is the default limit of 32 records for Kafka batch sizes. This limit is too low because the brokers have a large overhead when handling each batch: they need to ensure they persist correctly before acknowledging them. Increasing the limit to 4096 records fixes this issue.
While load-balancing incoming flows with eBPF remains useful, it did not solve the main issue. At least the even distribution of dropped packets helped identify the real bottleneck. 😅
-
The current version of the manual page is incomplete and does not cover the evolution introduced in Linux 4.19. There is a pending patch about this. ↩︎
-
Rust is another option. However, the program we use is so trivial that it does not make sense to use Rust. ↩︎
-
As
bpf_get_prandom_u32()returns a pseudo-random 32-bit unsigned value, this method exhibits a very slight bias towards the first indexes. This is unlikely to be worth fixing. ↩︎ -
Some examples include
<linux/bpf.h>instead of"vmlinux.h". This makes your eBPF program dependent on the installed kernel headers. ↩︎ -
listenAddris initially set to127.0.0.1:0to allocate a random port. After the first iteration, it is updated with the allocated port. ↩︎ -
This is the
setupSockets()function infixtures_test.go. ↩︎ -
This is the
setupEBPF()function infixtures_test.go. ↩︎ -
The complete code is in
balancing_test.go↩︎ -
The complete code is in
graceful_test.go↩︎ -
In C, we would
poll()both the socket and a pipe used to signal for shutdown. When the second condition is triggered, we drain the socket by executing a series of non-blockingread()until we getEWOULDBLOCK. ↩︎