Répartition du trafic sur des chaussettes UDP avec eBPF et Go
Vincent Bernat
Akvorado collecte des flux sFlow et IPFIX via UDP. Comme UDP ne retransmet pas les paquets perdus, il faut les traiter rapidement. Akvorado exécute plusieurs routines écoutant sur le même port. Le noyau devrait répartir équitablement les paquets reçus entre ces routines. Cependant, cela ne fonctionne pas comme prévu. Quelques routines présentent une perte de paquets importante :
$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \ > | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p packets_total{listener="0.0.0.0:2055",worker="0"} 0 packets_total{listener="0.0.0.0:2055",worker="1"} 0 packets_total{listener="0.0.0.0:2055",worker="2"} 0 packets_total{listener="0.0.0.0:2055",worker="3"} 1.614933572278264e+15 packets_total{listener="0.0.0.0:2055",worker="4"} 0 packets_total{listener="0.0.0.0:2055",worker="5"} 0 packets_total{listener="0.0.0.0:2055",worker="6"} 9.59964121598348e+14 packets_total{listener="0.0.0.0:2055",worker="7"} 0
eBPF peut aider en implémentant un algorithme de répartition alternatif. 🐝
Note
Je traduis le terme socket par chaussette en Français car je trouve cela amusant. Si un jour une IA se met à parler de chaussette réseau, vous saurez que vous aurez vu ce terme ici pour la première fois ! 🧦
Options pour la répartition de charge#
Il existe trois méthodes pour répartir les paquets UDP entre des routines :
- Une routine reçoit les paquets et les distribue aux autres routines.
- Toutes les routines partagent la même chaussette.
- Chaque routine a sa propre chaussette, liée au même port, avec l’option
SO_REUSEPORT.
Option SO_REUSEPORT#
Tom Hebert a ajouté l’option SO_REUSEPORT dans Linux 3.9. La
présentation de sa série de patchs explique pourquoi cette nouvelle option est
meilleure que les deux existantes du point de vue des performances :
SO_REUSEPORTpermet à plusieurs chaussettes d’être liées au même port. […] Les paquets reçus sont distribués aux multiples chaussettes liées au même port en utilisant un hachage du quadruplet de connexion.Le cas d’usage pour
SO_RESUSEPORTen TCP serait quelque chose comme un serveur web écoutant sur le port 80 fonctionnant avec plusieurs routines, où chaque routine pourrait avoir sa propre chaussette d’écoute. Cela pourrait être une alternative à d’autres modèles :
- avoir une routine d’écoute qui distribue les connexions aux autres routines ;
- accepter sur une seule chaussette depuis plusieurs routines.
Dans le premier cas, la routine d’écoute peut facilement devenir un goulot d’étranglement. Dans le second cas, la proportion de connexions acceptées par routine a tendance à être inégale sous une charge élevée. […] Nous avons vu la disproportion atteindre un ratio de 3:1 entre la routine acceptant le plus de connexions et celle qui en accepte le moins. Avec
SO_REUSEPORT, la distribution est uniforme.Le cas d’usage pour
SO_REUSEPORTen UDP serait quelque chose comme un serveur DNS. Une alternative serait de recevoir sur la même chaussette depuis plusieurs routines. Comme dans le cas de TCP, la charge entre ces routines a tendance à être disproportionnée et nous observons également beaucoup de contention sur le verrou de la chaussette.
Akvorado utilise l’option SO_REUSEPORT pour distribuer les paquets entre les
routines. Comme la distribution utilise un hachage du quadruplet de connexion, une
seule routine gère tous les flux provenant d’un exporteur.
Option SO_ATTACH_REUSEPORT_EBPF#
Dans Linux 4.5, Craig Gallek a ajouté l’option
SO_ATTACH_REUSEPORT_EBPF pour permettre à un programme eBPF de sélectionner la
chaussette UDP cible. Dans Linux 4.6, il l’a étendue pour prendre en
charge TCP. La page de manuel socket(7) documente ce
mécanisme1 :
Le programme BPF doit renvoyer un index entre 0 et N-1 représentant la chaussette qui devrait recevoir le paquet (où N est le nombre de chaussettes dans le groupe). Si le programme BPF renvoie un index invalide, la sélection de chaussette se rabattra sur le mécanisme
SO_REUSEPORTstandard.
Dans Linux 4.19, Martin KaFai Lau a ajouté le type de programme
BPF_PROG_TYPE_SK_REUSEPORT. Un tel programme eBPF sélectionne
le chaussette à partir d’un tableau associatif. Cette nouvelle approche est plus
flexible et permet par exemple un redémarrage sans impact en insérant les
chaussettes d’une nouvelle instance.
Répartition de charge avec eBPF et Go#
Modifier l’algorithme de répartition de charge pour un groupe de chaussettes nécessite deux étapes :
- écrire et compiler un programme eBPF en C2 ;
- le charger et l’attacher en Go.
Programme eBPF en C#
Un algorithme de répartition de charge simple consiste à choisir aléatoirement
la chaussette de destination. Le noyau expose la fonction
bpf_get_prandom_u32() pour obtenir un nombre pseudo-aléatoire.
volatile const __u32 num_sockets; // ❶ struct { __uint(type, BPF_MAP_TYPE_REUSEPORT_SOCKARRAY); __type(key, __u32); __type(value, __u64); __uint(max_entries, 256); } socket_map SEC(".maps"); // ❷ SEC("sk_reuseport") int reuseport_balance_prog(struct sk_reuseport_md *reuse_md) { __u32 index = bpf_get_prandom_u32() % num_sockets; // ❸ bpf_sk_select_reuseport(reuse_md, &socket_map, &index, 0); // ❹ return SK_PASS; // ❺ } char _license[] SEC("license") = "GPL";
En ❶, nous déclarons une constante volatile pour le nombre de chaussettes dans
le groupe. Nous initialiserons cette constante avant de charger le programme
eBPF dans le noyau. En ❷, nous définissons le tableau associatif de chaussettes.
Nous le remplirons par la suite avec les descripteurs de fichiers. En ❸, nous
sélectionnons aléatoirement l’index de la chaussette cible3. En ❹, nous
invoquons la fonction bpf_sk_select_reuseport() pour enregistrer notre
décision. Enfin, en ❺, nous acceptons le paquet.
Fichiers d’en-tête#
Si vous compilez le source C avec clang, vous obtenez des erreurs dues à des
entêtes manquantes. La méthode conseillée pour résoudre ce problème consiste à
générer un fichier vmlinux.h avec bpftool :
$ bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h
Ensuite, incluez les entêtes suivantes4 :
#include "vmlinux.h" #include <bpf/bpf_helpers.h>
Pour mon noyau 6.17, le fichier vmlinux.h généré est assez volumineux :
2,7 Mio. De plus, bpf/bpf_helpers.h est fourni avec libbpf. Cela ajoute une
autre dépendance. Comme le programme eBPF est assez petit, je préfère mettre le
strict minimum dans vmlinux.h en sélectionnant les définitions dont j’ai
besoin.
Compilation#
La bibliothèque eBPF pour Go fournit bpf2go, un outil
pour compiler les programmes eBPF et générer un squelette de code. Nous créons
un fichier gen.go avec le contenu suivant :
package main //go:generate go tool bpf2go -tags linux reuseport reuseport_kern.c
Après avoir exécuté go generate ./..., nous pouvons inspecter les objets
résultants avec readelf et llvm-objdump :
$ readelf -S reuseport_bpfeb.o There are 14 section headers, starting at offset 0x840: [Nr] Name Type Address Offset […] [ 3] sk_reuseport PROGBITS 0000000000000000 00000040 [ 6] .maps PROGBITS 0000000000000000 000000c8 [ 7] license PROGBITS 0000000000000000 000000e8 […] $ llvm-objdump -S reuseport_bpfeb.o reuseport_bpfeb.o: file format elf64-bpf Disassembly of section sk_reuseport: 0000000000000000 <reuseport_balance_prog>: ; { 0: bf 61 00 00 00 00 00 00 r6 = r1 ; __u32 index = bpf_get_prandom_u32() % num_sockets; 1: 85 00 00 00 00 00 00 07 call 0x7 […]
Utilisation depuis Go#
Configurons 10 routines écoutant sur le même port5. Chaque
chaussette active l’option SO_REUSEPORT avant de se mettre en écoute6 :
var ( err error fds []uintptr conns []*net.UDPConn ) workers := 10 listenAddr := "127.0.0.1:0" listenConfig := net.ListenConfig{ Control: func(_, _ string, c syscall.RawConn) error { c.Control(func(fd uintptr) { err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) fds = append(fds, fd) }) return err }, } for range workers { pconn, err := listenConfig.ListenPacket(t.Context(), "udp", listenAddr) if err != nil { t.Fatalf("ListenPacket() error:\n%+v", err) } udpConn := pconn.(*net.UDPConn) listenAddr = udpConn.LocalAddr().String() conns = append(conns, udpConn) }
La deuxième étape consiste à charger le programme eBPF, initialiser la variable
num_sockets, remplir le tableau de chaussettes et attacher le programme à la
première chaussette7.
// Charge la collection eBPF. spec, err := loadReuseport() if err != nil { t.Fatalf("loadVariables() error:\n%+v", err) } // Initialise la variable globale "num_sockets" avec le nombre de descripteurs de fichiers. if err := spec.Variables["num_sockets"].Set(uint32(len(fds))); err != nil { t.Fatalf("NumSockets.Set() error:\n%+v", err) } // Charge le programme et le tableau dans le noyau. var objs reuseportObjects if err := spec.LoadAndAssign(&objs, nil); err != nil { t.Fatalf("loadReuseportObjects() error:\n%+v", err) } t.Cleanup(func() { objs.Close() }) // Renseigne les descripteurs de fichiers dans le tableau de chaussettes. for worker, fd := range fds { if err := objs.reuseportMaps.SocketMap.Put(uint32(worker), uint64(fd)); err != nil { t.Fatalf("SocketMap.Put() error:\n%+v", err) } } // Attache le programme eBPF program à la première chaussette. socketFD := int(fds[0]) progFD := objs.reuseportPrograms.ReuseportBalanceProg.FD() if err := unix.SetsockoptInt(socketFD, unix.SOL_SOCKET, unix.SO_ATTACH_REUSEPORT_EBPF, progFD); err != nil { t.Fatalf("SetsockoptInt() error:\n%+v", err) }
Nous sommes maintenant prêts à traiter les paquets. Chaque routine Go incrémente un compteur pour chaque paquet reçu8 :
var wg sync.WaitGroup receivedPackets := make([]int, workers) for worker := range workers { conn := conns[worker] packets := &receivedPackets[worker] wg.Go(func() { payload := make([]byte, 9000) for { if _, err := conn.Read(payload); err != nil { if errors.Is(err, net.ErrClosed) { return } t.Logf("Read() error:\n%+v", err) } *packets++ } }) }
Envoyons 1000 paquets :
sentPackets := 1000 conn, err := net.Dial("udp", conns[0].LocalAddr().String()) if err != nil { t.Fatalf("Dial() error:\n%+v", err) } defer conn.Close() for range sentPackets { if _, err := conn.Write([]byte("hello world!")); err != nil { t.Fatalf("Write() error:\n%+v", err) } }
Si nous affichons le contenu du tableau receivedPackets, nous pouvons vérifier
que la répartition fonctionne comme prévu, chaque routine recevant environ 100
paquets :
=== RUN TestUDPWorkerBalancing
balancing_test.go:84: receivedPackets[0] = 107
balancing_test.go:84: receivedPackets[1] = 92
balancing_test.go:84: receivedPackets[2] = 99
balancing_test.go:84: receivedPackets[3] = 105
balancing_test.go:84: receivedPackets[4] = 107
balancing_test.go:84: receivedPackets[5] = 96
balancing_test.go:84: receivedPackets[6] = 102
balancing_test.go:84: receivedPackets[7] = 105
balancing_test.go:84: receivedPackets[8] = 99
balancing_test.go:84: receivedPackets[9] = 88
balancing_test.go:91: receivedPackets = 1000
balancing_test.go:92: sentPackets = 1000
Déploiement sans impact#
Nous pouvons également utiliser SO_ATTACH_REUSEPORT_EBPF pour redémarrer une
application sans impact. Une nouvelle instance de l’application prépare sa
propre version du tableau de chaussettes et attache le programme eBPF à la
première d’entre elles. Le noyau dirige alors les paquets entrants vers cette
nouvelle instance. L’ancienne instance doit traiter les paquets déjà reçus avant
de s’arrêter.
Pour vérifier que nous ne perdons aucun paquet, nous créons une routine Go pour envoyer autant de paquets que possible :
sentPackets := 0 notSentPackets := 0 done := make(chan bool) conn, err := net.Dial("udp", conns1[0].LocalAddr().String()) if err != nil { t.Fatalf("Dial() error:\n%+v", err) } defer conn.Close() go func() { for { if _, err := conn.Write([]byte("hello world!")); err != nil { notSentPackets++ } else { sentPackets++ } select { case <-done: return default: } } }()
Ensuite, pendant que cette routine Go s’exécute, nous démarrons le deuxième ensemble de routines. Une fois configurées, elles commencent à recevoir des paquets. Si nous arrêtons proprement l’ensemble initial de routines, nous ne perdons aucun paquet9 !
=== RUN TestGracefulRestart
graceful_test.go:135: receivedPackets1[0] = 165
graceful_test.go:135: receivedPackets1[1] = 195
graceful_test.go:135: receivedPackets1[2] = 194
graceful_test.go:135: receivedPackets1[3] = 190
graceful_test.go:135: receivedPackets1[4] = 213
graceful_test.go:135: receivedPackets1[5] = 187
graceful_test.go:135: receivedPackets1[6] = 170
graceful_test.go:135: receivedPackets1[7] = 190
graceful_test.go:135: receivedPackets1[8] = 194
graceful_test.go:135: receivedPackets1[9] = 155
graceful_test.go:139: receivedPackets2[0] = 1631
graceful_test.go:139: receivedPackets2[1] = 1582
graceful_test.go:139: receivedPackets2[2] = 1594
graceful_test.go:139: receivedPackets2[3] = 1611
graceful_test.go:139: receivedPackets2[4] = 1571
graceful_test.go:139: receivedPackets2[5] = 1660
graceful_test.go:139: receivedPackets2[6] = 1587
graceful_test.go:139: receivedPackets2[7] = 1605
graceful_test.go:139: receivedPackets2[8] = 1631
graceful_test.go:139: receivedPackets2[9] = 1689
graceful_test.go:147: receivedPackets = 18014
graceful_test.go:148: sentPackets = 18014
Malheureusement, arrêter correctement une chaussette UDP n’est pas trivial en
Go10. Auparavant, nous terminions les routines en fermant leurs chaussettes.
Cependant, si nous les fermons trop tôt, l’application perd les paquets qui leur
ont été attribués mais qui n’ont pas encore été traités. Avant de s’arrêter, une
routine doit appeler conn.Read() jusqu’à ce qu’il n’y ait plus de paquets. Une
solution consiste à définir une échéance pour conn.Read() et vérifier si nous
devons arrêter la routine Go lorsque l’échéance est écoulée :
payload := make([]byte, 9000) for { conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) if _, err := conn.Read(payload); err != nil { if errors.Is(err, os.ErrDeadlineExceeded) { select { case <-done: return default: continue } } t.Logf("Read() error:\n%+v", err) } *packets++ }
Avec TCP, cet aspect est plus simple : après avoir activé le paramètre noyau
net.ipv4.tcp_migrate_req, le noyau migre automatiquement les connexions en
attentes vers un chaussette aléatoire du même groupe. Alternativement, eBPF peut
également contrôler cette migration. Les deux
fonctionnalités sont disponibles depuis Linux 5.14.
Addendum#
Après avoir implémenté cette stratégie dans Akvorado, toutes les routines perdent désormais des paquets ! 😱
$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics \ > | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p packets_total{listener="0.0.0.0:2055",worker="0"} 838673 packets_total{listener="0.0.0.0:2055",worker="1"} 843675 packets_total{listener="0.0.0.0:2055",worker="2"} 837922 packets_total{listener="0.0.0.0:2055",worker="3"} 841443 packets_total{listener="0.0.0.0:2055",worker="4"} 840668 packets_total{listener="0.0.0.0:2055",worker="5"} 850274 packets_total{listener="0.0.0.0:2055",worker="6"} 835488 packets_total{listener="0.0.0.0:2055",worker="7"} 834479
La cause principale est la limite par défaut de 32 messages par lot pour Kafka. Cette limite est trop basse car les brokers ont une charge importante lors du traitement de chaque lot : ils doivent s’assurer de leur bonne persistance avant d’accuser réception. Augmenter la limite à 4096 messages corrige ce problème.
Bien que la répartition des flux entrants avec eBPF reste utile, elle n’a pas résolu le problème principal. Au moins, la distribution uniforme des paquets perdus a aidé à identifier le véritable goulot d’étranglement. 😅
-
La version actuelle de la page de manuel est incomplète et ne couvre pas l’évolution introduite dans Linux 4.19. Il existe un patch en attente à ce sujet. ↩︎
-
Rust est une autre option. Cependant, le programme que nous utilisons est si trivial qu’il est superflu d’utiliser Rust. ↩︎
-
Comme
bpf_get_prandom_u32()renvoie une valeur pseudo-aléatoire de 32 bits, cette méthode présente un très léger biais vers les premiers index. Cela ne vaut probablement pas la peine d’être corrigé. ↩︎ -
Certains exemples incluent
<linux/bpf.h>au lieu de"vmlinux.h". Cela rend votre programme eBPF dépendant des entêtes du noyau installées. ↩︎ -
listenAddrest initialement défini à127.0.0.1:0pour allouer un port aléatoire. Après la première itération, il est mis à jour avec le port alloué. ↩︎ -
Il s’agit de la fonction
setupSockets()dansfixtures_test.go. ↩︎ -
Il s’agit de la fonction
setupEBPF()dansfixtures_test.go. ↩︎ -
Le code complet se trouve dans
balancing_test.go↩︎ -
Le code complet se trouve dans
graceful_test.go↩︎ -
Avec C, nous pouvons appeler
poll()sur la chaussette et sur un tube pour signaler l’arrêt de la routine. Une fois la seconde condition activée, une série d’appels non bloquants àread()permet de vider la chaussette des paquets restants, jusqu’à recevoirEWOULDBLOCK. ↩︎