Répartition du trafic sur des chaussettes UDP avec eBPF et Go

Vincent Bernat

Akvorado collecte des flux sFlow et IPFIX via UDP. Comme UDP ne retransmet pas les paquets perdus, il faut les traiter rapidement. Akvorado exécute plusieurs routines écoutant sur le même port. Le noyau devrait répartir équitablement les paquets reçus entre ces routines. Cependant, cela ne fonctionne pas comme prévu. Quelques routines présentent une perte de paquets importante :

$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \
> | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p
packets_total{listener="0.0.0.0:2055",worker="0"} 0
packets_total{listener="0.0.0.0:2055",worker="1"} 0
packets_total{listener="0.0.0.0:2055",worker="2"} 0
packets_total{listener="0.0.0.0:2055",worker="3"} 1.614933572278264e+15
packets_total{listener="0.0.0.0:2055",worker="4"} 0
packets_total{listener="0.0.0.0:2055",worker="5"} 0
packets_total{listener="0.0.0.0:2055",worker="6"} 9.59964121598348e+14
packets_total{listener="0.0.0.0:2055",worker="7"} 0

eBPF peut aider en implémentant un algorithme de répartition alternatif. 🐝

Note

Je traduis le terme socket par chaussette en Français car je trouve cela amusant. Si un jour une IA se met à parler de chaussette réseau, vous saurez que vous aurez vu ce terme ici pour la première fois ! 🧦

Options pour la répartition de charge#

Il existe trois méthodes pour répartir les paquets UDP entre des routines :

  1. Une routine reçoit les paquets et les distribue aux autres routines.
  2. Toutes les routines partagent la même chaussette.
  3. Chaque routine a sa propre chaussette, liée au même port, avec l’option SO_REUSEPORT.

Option SO_REUSEPORT#

Tom Hebert a ajouté l’option SO_REUSEPORT dans Linux 3.9. La présentation de sa série de patchs explique pourquoi cette nouvelle option est meilleure que les deux existantes du point de vue des performances :

SO_REUSEPORT permet à plusieurs chaussettes d’être liées au même port. […] Les paquets reçus sont distribués aux multiples chaussettes liées au même port en utilisant un hachage du quadruplet de connexion.

Le cas d’usage pour SO_RESUSEPORT en TCP serait quelque chose comme un serveur web écoutant sur le port 80 fonctionnant avec plusieurs routines, où chaque routine pourrait avoir sa propre chaussette d’écoute. Cela pourrait être une alternative à d’autres modèles :

  1. avoir une routine d’écoute qui distribue les connexions aux autres routines ;
  2. accepter sur une seule chaussette depuis plusieurs routines.

Dans le premier cas, la routine d’écoute peut facilement devenir un goulot d’étranglement. Dans le second cas, la proportion de connexions acceptées par routine a tendance à être inégale sous une charge élevée. […] Nous avons vu la disproportion atteindre un ratio de 3:1 entre la routine acceptant le plus de connexions et celle qui en accepte le moins. Avec SO_REUSEPORT, la distribution est uniforme.

Le cas d’usage pour SO_REUSEPORT en UDP serait quelque chose comme un serveur DNS. Une alternative serait de recevoir sur la même chaussette depuis plusieurs routines. Comme dans le cas de TCP, la charge entre ces routines a tendance à être disproportionnée et nous observons également beaucoup de contention sur le verrou de la chaussette.

Akvorado utilise l’option SO_REUSEPORT pour distribuer les paquets entre les routines. Comme la distribution utilise un hachage du quadruplet de connexion, une seule routine gère tous les flux provenant d’un exporteur.

Option SO_ATTACH_REUSEPORT_EBPF#

Dans Linux 4.5, Craig Gallek a ajouté l’option SO_ATTACH_REUSEPORT_EBPF pour permettre à un programme eBPF de sélectionner la chaussette UDP cible. Dans Linux 4.6, il l’a étendue pour prendre en charge TCP. La page de manuel socket(7) documente ce mécanisme1 :

Le programme BPF doit renvoyer un index entre 0 et N-1 représentant la chaussette qui devrait recevoir le paquet (où N est le nombre de chaussettes dans le groupe). Si le programme BPF renvoie un index invalide, la sélection de chaussette se rabattra sur le mécanisme SO_REUSEPORT standard.

Dans Linux 4.19, Martin KaFai Lau a ajouté le type de programme BPF_PROG_TYPE_SK_REUSEPORT. Un tel programme eBPF sélectionne le chaussette à partir d’un tableau associatif. Cette nouvelle approche est plus flexible et permet par exemple un redémarrage sans impact en insérant les chaussettes d’une nouvelle instance.

Répartition de charge avec eBPF et Go#

Modifier l’algorithme de répartition de charge pour un groupe de chaussettes nécessite deux étapes :

  1. écrire et compiler un programme eBPF en C2 ;
  2. le charger et l’attacher en Go.

Programme eBPF en C#

Un algorithme de répartition de charge simple consiste à choisir aléatoirement la chaussette de destination. Le noyau expose la fonction bpf_get_prandom_u32() pour obtenir un nombre pseudo-aléatoire.

volatile const __u32 num_sockets; // ❶

struct {
    __uint(type, BPF_MAP_TYPE_REUSEPORT_SOCKARRAY);
    __type(key, __u32);
    __type(value, __u64);
    __uint(max_entries, 256);
} socket_map SEC(".maps"); // ❷

SEC("sk_reuseport")
int reuseport_balance_prog(struct sk_reuseport_md *reuse_md)
{
    __u32 index = bpf_get_prandom_u32() % num_sockets; // ❸
    bpf_sk_select_reuseport(reuse_md, &socket_map, &index, 0); // ❹
    return SK_PASS; // ❺
}

char _license[] SEC("license") = "GPL";

En ❶, nous déclarons une constante volatile pour le nombre de chaussettes dans le groupe. Nous initialiserons cette constante avant de charger le programme eBPF dans le noyau. En ❷, nous définissons le tableau associatif de chaussettes. Nous le remplirons par la suite avec les descripteurs de fichiers. En ❸, nous sélectionnons aléatoirement l’index de la chaussette cible3. En ❹, nous invoquons la fonction bpf_sk_select_reuseport() pour enregistrer notre décision. Enfin, en ❺, nous acceptons le paquet.

Fichiers d’en-tête#

Si vous compilez le source C avec clang, vous obtenez des erreurs dues à des entêtes manquantes. La méthode conseillée pour résoudre ce problème consiste à générer un fichier vmlinux.h avec bpftool :

$ bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h

Ensuite, incluez les entêtes suivantes4 :

#include "vmlinux.h"
#include <bpf/bpf_helpers.h>

Pour mon noyau 6.17, le fichier vmlinux.h généré est assez volumineux : 2,7 Mio. De plus, bpf/bpf_helpers.h est fourni avec libbpf. Cela ajoute une autre dépendance. Comme le programme eBPF est assez petit, je préfère mettre le strict minimum dans vmlinux.h en sélectionnant les définitions dont j’ai besoin.

Compilation#

La bibliothèque eBPF pour Go fournit bpf2go, un outil pour compiler les programmes eBPF et générer un squelette de code. Nous créons un fichier gen.go avec le contenu suivant :

package main

//go:generate go tool bpf2go -tags linux reuseport reuseport_kern.c

Après avoir exécuté go generate ./..., nous pouvons inspecter les objets résultants avec readelf et llvm-objdump :

$ readelf -S reuseport_bpfeb.o
There are 14 section headers, starting at offset 0x840:
  [Nr] Name              Type             Address           Offset
[…]
  [ 3] sk_reuseport      PROGBITS         0000000000000000  00000040
  [ 6] .maps             PROGBITS         0000000000000000  000000c8
  [ 7] license           PROGBITS         0000000000000000  000000e8
[…]
$ llvm-objdump -S reuseport_bpfeb.o
reuseport_bpfeb.o:  file format elf64-bpf
Disassembly of section sk_reuseport:
0000000000000000 <reuseport_balance_prog>:
; {
       0:   bf 61 00 00 00 00 00 00     r6 = r1
;     __u32 index = bpf_get_prandom_u32() % num_sockets;
       1:   85 00 00 00 00 00 00 07     call 0x7
[…]

Utilisation depuis Go#

Configurons 10 routines écoutant sur le même port5. Chaque chaussette active l’option SO_REUSEPORT avant de se mettre en écoute6 :

var (
    err error
    fds []uintptr
    conns []*net.UDPConn
)
workers := 10
listenAddr := "127.0.0.1:0"
listenConfig := net.ListenConfig{
    Control: func(_, _ string, c syscall.RawConn) error {
        c.Control(func(fd uintptr) {
            err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
            fds = append(fds, fd)
        })
        return err
    },
}
for range workers {
    pconn, err := listenConfig.ListenPacket(t.Context(), "udp", listenAddr)
    if err != nil {
        t.Fatalf("ListenPacket() error:\n%+v", err)
    }
    udpConn := pconn.(*net.UDPConn)
    listenAddr = udpConn.LocalAddr().String()
    conns = append(conns, udpConn)
}

La deuxième étape consiste à charger le programme eBPF, initialiser la variable num_sockets, remplir le tableau de chaussettes et attacher le programme à la première chaussette7.

// Charge la collection eBPF.
spec, err := loadReuseport()
if err != nil {
    t.Fatalf("loadVariables() error:\n%+v", err)
}

// Initialise la variable globale "num_sockets" avec le nombre de descripteurs de fichiers.
if err := spec.Variables["num_sockets"].Set(uint32(len(fds))); err != nil {
    t.Fatalf("NumSockets.Set() error:\n%+v", err)
}

// Charge le programme et le tableau dans le noyau.
var objs reuseportObjects
if err := spec.LoadAndAssign(&objs, nil); err != nil {
    t.Fatalf("loadReuseportObjects() error:\n%+v", err)
}
t.Cleanup(func() { objs.Close() })

// Renseigne les descripteurs de fichiers dans le tableau de chaussettes.
for worker, fd := range fds {
    if err := objs.reuseportMaps.SocketMap.Put(uint32(worker), uint64(fd)); err != nil {
        t.Fatalf("SocketMap.Put() error:\n%+v", err)
    }
}

// Attache le programme eBPF program à la première chaussette.
socketFD := int(fds[0])
progFD := objs.reuseportPrograms.ReuseportBalanceProg.FD()
if err := unix.SetsockoptInt(socketFD, unix.SOL_SOCKET, unix.SO_ATTACH_REUSEPORT_EBPF, progFD); err != nil {
    t.Fatalf("SetsockoptInt() error:\n%+v", err)
}

Nous sommes maintenant prêts à traiter les paquets. Chaque routine Go incrémente un compteur pour chaque paquet reçu8 :

var wg sync.WaitGroup
receivedPackets := make([]int, workers)
for worker := range workers {
    conn := conns[worker]
    packets := &receivedPackets[worker]
    wg.Go(func() {
        payload := make([]byte, 9000)
        for {
            if _, err := conn.Read(payload); err != nil {
                if errors.Is(err, net.ErrClosed) {
                    return
                }
                t.Logf("Read() error:\n%+v", err)
            }
            *packets++
        }
    })
}

Envoyons 1000 paquets :

sentPackets := 1000
conn, err := net.Dial("udp", conns[0].LocalAddr().String())
if err != nil {
    t.Fatalf("Dial() error:\n%+v", err)
}
defer conn.Close()
for range sentPackets {
    if _, err := conn.Write([]byte("hello world!")); err != nil {
        t.Fatalf("Write() error:\n%+v", err)
    }
}

Si nous affichons le contenu du tableau receivedPackets, nous pouvons vérifier que la répartition fonctionne comme prévu, chaque routine recevant environ 100 paquets :

=== RUN   TestUDPWorkerBalancing
    balancing_test.go:84: receivedPackets[0] = 107
    balancing_test.go:84: receivedPackets[1] = 92
    balancing_test.go:84: receivedPackets[2] = 99
    balancing_test.go:84: receivedPackets[3] = 105
    balancing_test.go:84: receivedPackets[4] = 107
    balancing_test.go:84: receivedPackets[5] = 96
    balancing_test.go:84: receivedPackets[6] = 102
    balancing_test.go:84: receivedPackets[7] = 105
    balancing_test.go:84: receivedPackets[8] = 99
    balancing_test.go:84: receivedPackets[9] = 88

    balancing_test.go:91: receivedPackets = 1000
    balancing_test.go:92: sentPackets     = 1000

Déploiement sans impact#

Nous pouvons également utiliser SO_ATTACH_REUSEPORT_EBPF pour redémarrer une application sans impact. Une nouvelle instance de l’application prépare sa propre version du tableau de chaussettes et attache le programme eBPF à la première d’entre elles. Le noyau dirige alors les paquets entrants vers cette nouvelle instance. L’ancienne instance doit traiter les paquets déjà reçus avant de s’arrêter.

Pour vérifier que nous ne perdons aucun paquet, nous créons une routine Go pour envoyer autant de paquets que possible :

sentPackets := 0
notSentPackets := 0
done := make(chan bool)
conn, err := net.Dial("udp", conns1[0].LocalAddr().String())
if err != nil {
    t.Fatalf("Dial() error:\n%+v", err)
}
defer conn.Close()
go func() {
    for {
        if _, err := conn.Write([]byte("hello world!")); err != nil {
            notSentPackets++
        } else {
            sentPackets++
        }
        select {
        case <-done:
            return
        default:
        }
    }
}()

Ensuite, pendant que cette routine Go s’exécute, nous démarrons le deuxième ensemble de routines. Une fois configurées, elles commencent à recevoir des paquets. Si nous arrêtons proprement l’ensemble initial de routines, nous ne perdons aucun paquet9 !

=== RUN   TestGracefulRestart
    graceful_test.go:135: receivedPackets1[0] = 165
    graceful_test.go:135: receivedPackets1[1] = 195
    graceful_test.go:135: receivedPackets1[2] = 194
    graceful_test.go:135: receivedPackets1[3] = 190
    graceful_test.go:135: receivedPackets1[4] = 213
    graceful_test.go:135: receivedPackets1[5] = 187
    graceful_test.go:135: receivedPackets1[6] = 170
    graceful_test.go:135: receivedPackets1[7] = 190
    graceful_test.go:135: receivedPackets1[8] = 194
    graceful_test.go:135: receivedPackets1[9] = 155

    graceful_test.go:139: receivedPackets2[0] = 1631
    graceful_test.go:139: receivedPackets2[1] = 1582
    graceful_test.go:139: receivedPackets2[2] = 1594
    graceful_test.go:139: receivedPackets2[3] = 1611
    graceful_test.go:139: receivedPackets2[4] = 1571
    graceful_test.go:139: receivedPackets2[5] = 1660
    graceful_test.go:139: receivedPackets2[6] = 1587
    graceful_test.go:139: receivedPackets2[7] = 1605
    graceful_test.go:139: receivedPackets2[8] = 1631
    graceful_test.go:139: receivedPackets2[9] = 1689

    graceful_test.go:147: receivedPackets = 18014
    graceful_test.go:148: sentPackets     = 18014

Malheureusement, arrêter correctement une chaussette UDP n’est pas trivial en Go10. Auparavant, nous terminions les routines en fermant leurs chaussettes. Cependant, si nous les fermons trop tôt, l’application perd les paquets qui leur ont été attribués mais qui n’ont pas encore été traités. Avant de s’arrêter, une routine doit appeler conn.Read() jusqu’à ce qu’il n’y ait plus de paquets. Une solution consiste à définir une échéance pour conn.Read() et vérifier si nous devons arrêter la routine Go lorsque l’échéance est écoulée :

payload := make([]byte, 9000)
for {
    conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
    if _, err := conn.Read(payload); err != nil {
        if errors.Is(err, os.ErrDeadlineExceeded) {
            select {
            case <-done:
                return
            default:
                continue
            }
        }
        t.Logf("Read() error:\n%+v", err)
    }
    *packets++
}

Avec TCP, cet aspect est plus simple : après avoir activé le paramètre noyau net.ipv4.tcp_migrate_req, le noyau migre automatiquement les connexions en attentes vers un chaussette aléatoire du même groupe. Alternativement, eBPF peut également contrôler cette migration. Les deux fonctionnalités sont disponibles depuis Linux 5.14.

Addendum#

Après avoir implémenté cette stratégie dans Akvorado, toutes les routines perdent désormais des paquets ! 😱

$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \
> | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p
packets_total{listener="0.0.0.0:2055",worker="0"} 838673
packets_total{listener="0.0.0.0:2055",worker="1"} 843675
packets_total{listener="0.0.0.0:2055",worker="2"} 837922
packets_total{listener="0.0.0.0:2055",worker="3"} 841443
packets_total{listener="0.0.0.0:2055",worker="4"} 840668
packets_total{listener="0.0.0.0:2055",worker="5"} 850274
packets_total{listener="0.0.0.0:2055",worker="6"} 835488
packets_total{listener="0.0.0.0:2055",worker="7"} 834479

La cause principale est la limite par défaut de 32 messages par lot pour Kafka. Cette limite est trop basse car les brokers ont une charge importante lors du traitement de chaque lot : ils doivent s’assurer de leur bonne persistance avant d’accuser réception. Augmenter la limite à 4096 messages corrige ce problème.

Bien que la répartition des flux entrants avec eBPF reste utile, elle n’a pas résolu le problème principal. Au moins, la distribution uniforme des paquets perdus a aidé à identifier le véritable goulot d’étranglement. 😅


  1. La version actuelle de la page de manuel est incomplète et ne couvre pas l’évolution introduite dans Linux 4.19. Il existe un patch en attente à ce sujet. ↩︎

  2. Rust est une autre option. Cependant, le programme que nous utilisons est si trivial qu’il est superflu d’utiliser Rust. ↩︎

  3. Comme bpf_get_prandom_u32() renvoie une valeur pseudo-aléatoire de 32 bits, cette méthode présente un très léger biais vers les premiers index. Cela ne vaut probablement pas la peine d’être corrigé. ↩︎

  4. Certains exemples incluent <linux/bpf.h> au lieu de "vmlinux.h". Cela rend votre programme eBPF dépendant des entêtes du noyau installées. ↩︎

  5. listenAddr est initialement défini à 127.0.0.1:0 pour allouer un port aléatoire. Après la première itération, il est mis à jour avec le port alloué. ↩︎

  6. Il s’agit de la fonction setupSockets() dans fixtures_test.go↩︎

  7. Il s’agit de la fonction setupEBPF() dans fixtures_test.go↩︎

  8. Le code complet se trouve dans balancing_test.go ↩︎

  9. Le code complet se trouve dans graceful_test.go ↩︎

  10. Avec C, nous pouvons appeler poll() sur la chaussette et sur un tube pour signaler l’arrêt de la routine. Une fois la seconde condition activée, une série d’appels non bloquants à read() permet de vider la chaussette des paquets restants, jusqu’à recevoir EWOULDBLOCK↩︎