Source-Changes-HG archive
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]
[src/trunk]: src/sys/net wg: Use a global pktqueue rather than a per-peer pcq.
details: https://anonhg.NetBSD.org/src/rev/288cd36bc0e8
branches: trunk
changeset: 1013862:288cd36bc0e8
user: riastradh <riastradh%NetBSD.org@localhost>
date: Mon Sep 07 01:14:42 2020 +0000
description:
wg: Use a global pktqueue rather than a per-peer pcq.
- Improves scalability -- won't hit limit on softints no matter how
many peers there are.
- Improves parallelism -- softint was kernel-locked to serialize
access to the pcq.
- Requires per-peer queue on handshake init to avoid dropping first
packet.
. Per-peer queue is currently a single packet -- should serve well
enough for pings, dns queries, tcp connections, &c.
diffstat:
sys/net/if_wg.c | 248 ++++++++++++++++++++++++++++++++-----------------------
1 files changed, 145 insertions(+), 103 deletions(-)
diffs (truncated from 472 to 300 lines):
diff -r 43f0c1391817 -r 288cd36bc0e8 sys/net/if_wg.c
--- a/sys/net/if_wg.c Mon Sep 07 01:08:27 2020 +0000
+++ b/sys/net/if_wg.c Mon Sep 07 01:14:42 2020 +0000
@@ -1,4 +1,4 @@
-/* $NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $ */
+/* $NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $ */
/*
* Copyright (C) Ryota Ozaki <ozaki.ryota%gmail.com@localhost>
@@ -41,7 +41,7 @@
*/
#include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $");
+__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $");
#ifdef _KERNEL_OPT
#include "opt_inet.h"
@@ -65,7 +65,6 @@
#include <sys/mbuf.h>
#include <sys/module.h>
#include <sys/mutex.h>
-#include <sys/pcq.h>
#include <sys/percpu.h>
#include <sys/pserialize.h>
#include <sys/psref.h>
@@ -85,6 +84,7 @@
#include <net/if.h>
#include <net/if_types.h>
#include <net/if_wg.h>
+#include <net/pktqueue.h>
#include <net/route.h>
#include <netinet/in.h>
@@ -124,8 +124,6 @@
* runs event handlers
* - It has its own two routing tables: one is for IPv4 and the other IPv6
* - struct wg_peer is a representative of a peer
- * - It has a softint that is used to send packets over an wg interface
- * to a peer
* - It has a pair of session instances (struct wg_session)
* - It has a pair of endpoint instances (struct wg_sockaddr)
* - Normally one endpoint is used and the second one is used only on
@@ -562,12 +560,12 @@
/* The preshared key (optional) */
uint8_t wgp_psk[WG_PRESHARED_KEY_LEN];
- void *wgp_si;
- pcq_t *wgp_q;
-
struct wg_session *wgp_session_stable;
struct wg_session *wgp_session_unstable;
+ /* first outgoing packet awaiting session initiation */
+ struct mbuf *wgp_pending;
+
/* timestamp in big-endian */
wg_timestamp_t wgp_timestamp_latest_init;
@@ -706,6 +704,7 @@
static int wg_init(struct ifnet *);
static void wg_stop(struct ifnet *, int);
+static void wgintr(void *);
static void wg_purge_pending_packets(struct wg_peer *);
static int wg_clone_create(struct if_clone *, int);
@@ -788,6 +787,7 @@
static struct if_clone wg_cloner =
IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy);
+static struct pktqueue *wg_pktq __read_mostly;
void wgattach(int);
/* ARGSUSED */
@@ -808,6 +808,10 @@
mutex_init(&wg_softcs.lock, MUTEX_DEFAULT, IPL_NONE);
LIST_INIT(&wg_softcs.list);
+
+ wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL);
+ KASSERT(wg_pktq != NULL);
+
if_clone_attach(&wg_cloner);
}
@@ -1650,6 +1654,9 @@
MIN(wg_rekey_timeout, INT_MAX/hz) * hz);
} else {
wg_put_session_index(wg, wgs);
+ /* Initiation failed; toss packet waiting for it if any. */
+ if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL)
+ m_freem(m);
}
return error;
@@ -1780,6 +1787,7 @@
int error;
uint8_t mac1[WG_MAC_LEN];
struct wg_session *wgs_prev;
+ struct mbuf *m;
wg_algo_mac_mac1(mac1, sizeof(mac1),
wg->wg_pubkey, sizeof(wg->wg_pubkey),
@@ -1932,17 +1940,21 @@
wg_update_endpoint_if_necessary(wgp, src);
/*
- * Send something immediately (same as the official implementation)
- * XXX if there are pending data packets, we don't need to send
- * a keepalive message.
+ * If we had a data packet queued up, send it; otherwise send a
+ * keepalive message -- either way we have to send something
+ * immediately or else the responder will never answer.
*/
- wg_send_keepalive_msg(wgp, wgs);
-
- /* Anyway run a softint to flush pending packets */
- kpreempt_disable();
- softint_schedule(wgp->wgp_si);
- kpreempt_enable();
- WG_TRACE("softint scheduled");
+ if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) {
+ const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m)
+
+ M_SETCTX(m, wgp);
+ if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) {
+ WGLOG(LOG_ERR, "pktq full, dropping\n");
+ m_freem(m);
+ }
+ } else {
+ wg_send_keepalive_msg(wgp, wgs);
+ }
if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) {
/* Wait for wg_get_stable_session to drain. */
@@ -2874,6 +2886,7 @@
wg_task_establish_session(struct wg_softc *wg, struct wg_peer *wgp)
{
struct wg_session *wgs, *wgs_prev;
+ struct mbuf *m;
KASSERT(mutex_owned(wgp->wgp_lock));
@@ -2896,6 +2909,17 @@
wgp->wgp_last_sent_mac1_valid = false;
wgp->wgp_last_sent_cookie_valid = false;
+ /* If we had a data packet queued up, send it. */
+ if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) {
+ const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m)
+
+ M_SETCTX(m, wgp);
+ if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) {
+ WGLOG(LOG_ERR, "pktq full, dropping\n");
+ m_freem(m);
+ }
+ }
+
if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) {
/* Wait for wg_get_stable_session to drain. */
pserialize_perform(wgp->wgp_psz);
@@ -2911,11 +2935,6 @@
wg_clear_states(wgs_prev);
wgs_prev->wgs_state = WGS_STATE_UNKNOWN;
}
-
- /* Anyway run a softint to flush pending packets */
- kpreempt_disable();
- softint_schedule(wgp->wgp_si);
- kpreempt_enable();
}
static void
@@ -3290,29 +3309,32 @@
}
static void
-wg_peer_softint(void *arg)
+wgintr(void *cookie)
{
- struct wg_peer *wgp = arg;
+ struct wg_peer *wgp;
struct wg_session *wgs;
struct mbuf *m;
struct psref psref;
- if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) {
- /* XXX how to treat? */
- WG_TRACE("skipped");
- return;
+ while ((m = pktq_dequeue(wg_pktq)) != NULL) {
+ wgp = M_GETCTX(m, struct wg_peer *);
+ if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) {
+ WG_TRACE("no stable session");
+ wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
+ goto next0;
+ }
+ if (__predict_false(wg_session_hit_limits(wgs))) {
+ WG_TRACE("stable session hit limits");
+ wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
+ goto next1;
+ }
+ wg_send_data_msg(wgp, wgs, m);
+ m = NULL; /* consumed */
+next1: wg_put_session(wgs, &psref);
+next0: if (m)
+ m_freem(m);
+ /* XXX Yield to avoid userland starvation? */
}
- if (wg_session_hit_limits(wgs)) {
- wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
- goto out;
- }
- WG_TRACE("running");
-
- while ((m = pcq_get(wgp->wgp_q)) != NULL) {
- wg_send_data_msg(wgp, wgs, m);
- }
-out:
- wg_put_session(wgs, &psref);
}
static void
@@ -3328,9 +3350,9 @@
{
struct mbuf *m;
- while ((m = pcq_get(wgp->wgp_q)) != NULL) {
+ if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL)
m_freem(m);
- }
+ pktq_barrier(wg_pktq);
}
static void
@@ -3351,8 +3373,6 @@
wgp = kmem_zalloc(sizeof(*wgp), KM_SLEEP);
wgp->wgp_sc = wg;
- wgp->wgp_q = pcq_create(1024, KM_SLEEP);
- wgp->wgp_si = softint_establish(SOFTINT_NET, wg_peer_softint, wgp);
callout_init(&wgp->wgp_rekey_timer, CALLOUT_MPSAFE);
callout_setfunc(&wgp->wgp_rekey_timer, wg_rekey_timer, wgp);
callout_init(&wgp->wgp_handshake_timeout_timer, CALLOUT_MPSAFE);
@@ -3430,7 +3450,6 @@
wg_purge_pending_packets(wgp);
/* Halt all packet processing and timeouts. */
- softint_disestablish(wgp->wgp_si);
callout_halt(&wgp->wgp_rekey_timer, NULL);
callout_halt(&wgp->wgp_handshake_timeout_timer, NULL);
callout_halt(&wgp->wgp_session_dtor_timer, NULL);
@@ -3467,7 +3486,6 @@
kmem_free(wgp->wgp_endpoint0, sizeof(*wgp->wgp_endpoint0));
pserialize_destroy(wgp->wgp_psz);
- pcq_destroy(wgp->wgp_q);
mutex_obj_free(wgp->wgp_lock);
kmem_free(wgp, sizeof(*wgp));
@@ -3581,21 +3599,28 @@
return 0;
}
+static void
+wg_if_detach(struct wg_softc *wg)
+{
+ struct ifnet *ifp = &wg->wg_if;
+
+ bpf_detach(ifp);
+ if_detach(ifp);
+}
+
static int
wg_clone_create(struct if_clone *ifc, int unit)
{
struct wg_softc *wg;
int error;
- wg = kmem_zalloc(sizeof(struct wg_softc), KM_SLEEP);
+ wg = kmem_zalloc(sizeof(*wg), KM_SLEEP);
if_initname(&wg->wg_if, ifc->ifc_name, unit);
error = wg_worker_init(wg);
- if (error != 0) {
- kmem_free(wg, sizeof(struct wg_softc));
- return error;
- }
+ if (error)
+ goto fail0;
rn_inithead((void **)&wg->wg_rtable_ipv4,
offsetof(struct sockaddr_in, sin_addr) * NBBY);
@@ -3613,26 +3638,34 @@
wg->wg_ops = &wg_ops_rumpkernel;
error = wg_if_attach(wg);
- if (error != 0) {
Home |
Main Index |
Thread Index |
Old Index