Source-Changes-HG archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

[src/trunk]: src/sys/net pktqueue(9): Use percpu_create to allow early initia...



details:   https://anonhg.NetBSD.org/src/rev/4260ea57fbbb
branches:  trunk
changeset: 943823:4260ea57fbbb
user:      riastradh <riastradh%NetBSD.org@localhost>
date:      Fri Sep 11 14:29:00 2020 +0000

description:
pktqueue(9): Use percpu_create to allow early initialization.

Otherwise pktqueues can't be created before all CPUs are detected --
they will have a queue only for the primary CPU, not for others.

This will also be necessary if we want to add CPU hotplug (still need
some way to block hotplug during pktq_set_maxlen but it's a start).

diffstat:

 sys/net/pktqueue.c |  146 ++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 106 insertions(+), 40 deletions(-)

diffs (284 lines):

diff -r b34779b91faf -r 4260ea57fbbb sys/net/pktqueue.c
--- a/sys/net/pktqueue.c        Fri Sep 11 14:06:00 2020 +0000
+++ b/sys/net/pktqueue.c        Fri Sep 11 14:29:00 2020 +0000
@@ -1,4 +1,4 @@
-/*     $NetBSD: pktqueue.c,v 1.11 2020/02/07 12:35:33 thorpej Exp $    */
+/*     $NetBSD: pktqueue.c,v 1.12 2020/09/11 14:29:00 riastradh Exp $  */
 
 /*-
  * Copyright (c) 2014 The NetBSD Foundation, Inc.
@@ -36,7 +36,7 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.11 2020/02/07 12:35:33 thorpej Exp $");
+__KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.12 2020/09/11 14:29:00 riastradh Exp $");
 
 #include <sys/param.h>
 #include <sys/types.h>
@@ -74,7 +74,7 @@
        void *          pq_sih;
 
        /* Finally, per-CPU queues. */
-       pcq_t *         pq_queue[];
+       struct percpu * pq_pcq; /* struct pcq * */
 };
 
 /* The counters of the packet queue. */
@@ -90,17 +90,46 @@
 /* Special marker value used by pktq_barrier() mechanism. */
 #define        PKTQ_MARKER     ((void *)(~0ULL))
 
-/*
- * The total size of pktqueue_t which depends on the number of CPUs.
- */
-#define        PKTQUEUE_STRUCT_LEN(ncpu)       \
-    roundup2(offsetof(pktqueue_t, pq_queue[ncpu]), coherency_unit)
+static void
+pktq_init_cpu(void *vqp, void *vpq, struct cpu_info *ci)
+{
+       struct pcq **qp = vqp;
+       struct pktqueue *pq = vpq;
+
+       *qp = pcq_create(pq->pq_maxlen, KM_SLEEP);
+}
+
+static void
+pktq_fini_cpu(void *vqp, void *vpq, struct cpu_info *ci)
+{
+       struct pcq **qp = vqp, *q = *qp;
+
+       KASSERT(pcq_peek(q) == NULL);
+       pcq_destroy(q);
+       *qp = NULL;             /* paranoia */
+}
+
+static struct pcq *
+pktq_pcq(struct pktqueue *pq, struct cpu_info *ci)
+{
+       struct pcq **qp, *q;
+
+       /*
+        * As long as preemption is disabled, the xcall to swap percpu
+        * buffers can't complete, so it is safe to read the pointer.
+        */
+       KASSERT(kpreempt_disabled());
+
+       qp = percpu_getptr_remote(pq->pq_pcq, ci);
+       q = *qp;
+
+       return q;
+}
 
 pktqueue_t *
 pktq_create(size_t maxlen, void (*intrh)(void *), void *sc)
 {
        const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU;
-       const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
        pktqueue_t *pq;
        percpu_t *pc;
        void *sih;
@@ -111,14 +140,13 @@
                return NULL;
        }
 
-       pq = kmem_zalloc(len, KM_SLEEP);
-       for (u_int i = 0; i < ncpu; i++) {
-               pq->pq_queue[i] = pcq_create(maxlen, KM_SLEEP);
-       }
+       pq = kmem_zalloc(sizeof(*pq), KM_SLEEP);
        mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE);
        pq->pq_maxlen = maxlen;
        pq->pq_counters = pc;
        pq->pq_sih = sih;
+       pq->pq_pcq = percpu_create(sizeof(struct pcq *),
+           pktq_init_cpu, pktq_fini_cpu, pq);
 
        return pq;
 }
@@ -126,17 +154,12 @@
 void
 pktq_destroy(pktqueue_t *pq)
 {
-       const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
 
-       for (u_int i = 0; i < ncpu; i++) {
-               pcq_t *q = pq->pq_queue[i];
-               KASSERT(pcq_peek(q) == NULL);
-               pcq_destroy(q);
-       }
+       percpu_free(pq->pq_pcq, sizeof(struct pcq *));
        percpu_free(pq->pq_counters, sizeof(pktq_counters_t));
        softint_disestablish(pq->pq_sih);
        mutex_destroy(&pq->pq_lock);
-       kmem_free(pq, len);
+       kmem_free(pq, sizeof(*pq));
 }
 
 /*
@@ -213,18 +236,18 @@
 pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused)
 {
 #if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI)
-       const unsigned cpuid = curcpu()->ci_index;
+       struct cpu_info *ci = curcpu();
 #else
-       const unsigned cpuid = hash % ncpu;
+       struct cpu_info *ci = cpu_lookup(hash % ncpu);
 #endif
 
        KASSERT(kpreempt_disabled());
 
-       if (__predict_false(!pcq_put(pq->pq_queue[cpuid], m))) {
+       if (__predict_false(!pcq_put(pktq_pcq(pq, ci), m))) {
                pktq_inc_count(pq, PQCNT_DROP);
                return false;
        }
-       softint_schedule_cpu(pq->pq_sih, cpu_lookup(cpuid));
+       softint_schedule_cpu(pq->pq_sih, ci);
        pktq_inc_count(pq, PQCNT_ENQUEUE);
        return true;
 }
@@ -238,11 +261,12 @@
 struct mbuf *
 pktq_dequeue(pktqueue_t *pq)
 {
-       const struct cpu_info *ci = curcpu();
-       const unsigned cpuid = cpu_index(ci);
+       struct cpu_info *ci = curcpu();
        struct mbuf *m;
 
-       m = pcq_get(pq->pq_queue[cpuid]);
+       KASSERT(kpreempt_disabled());
+
+       m = pcq_get(pktq_pcq(pq, ci));
        if (__predict_false(m == PKTQ_MARKER)) {
                /* Note the marker entry. */
                atomic_inc_uint(&pq->pq_barrier);
@@ -262,13 +286,19 @@
 void
 pktq_barrier(pktqueue_t *pq)
 {
+       CPU_INFO_ITERATOR cii;
+       struct cpu_info *ci;
        u_int pending = 0;
 
        mutex_enter(&pq->pq_lock);
        KASSERT(pq->pq_barrier == 0);
 
-       for (u_int i = 0; i < ncpu; i++) {
-               pcq_t *q = pq->pq_queue[i];
+       for (CPU_INFO_FOREACH(cii, ci)) {
+               struct pcq *q;
+
+               kpreempt_disable();
+               q = pktq_pcq(pq, ci);
+               kpreempt_enable();
 
                /* If the queue is empty - nothing to do. */
                if (pcq_peek(q) == NULL) {
@@ -279,7 +309,7 @@
                        kpause("pktqsync", false, 1, NULL);
                }
                kpreempt_disable();
-               softint_schedule_cpu(pq->pq_sih, cpu_lookup(i));
+               softint_schedule_cpu(pq->pq_sih, ci);
                kpreempt_enable();
                pending++;
        }
@@ -300,19 +330,50 @@
 void
 pktq_flush(pktqueue_t *pq)
 {
+       CPU_INFO_ITERATOR cii;
+       struct cpu_info *ci;
        struct mbuf *m;
 
-       for (u_int i = 0; i < ncpu; i++) {
-               while ((m = pcq_get(pq->pq_queue[i])) != NULL) {
+       for (CPU_INFO_FOREACH(cii, ci)) {
+               struct pcq *q;
+
+               kpreempt_disable();
+               q = pktq_pcq(pq, ci);
+               kpreempt_enable();
+
+               /*
+                * XXX This can't be right -- if the softint is running
+                * then pcq_get isn't safe here.
+                */
+               while ((m = pcq_get(q)) != NULL) {
                        pktq_inc_count(pq, PQCNT_DEQUEUE);
                        m_freem(m);
                }
        }
 }
 
+static void
+pktq_set_maxlen_cpu(void *vpq, void *vqs)
+{
+       struct pktqueue *pq = vpq;
+       struct pcq **qp, *q, **qs = vqs;
+       unsigned i = cpu_index(curcpu());
+       int s;
+
+       s = splnet();
+       qp = percpu_getref(pq->pq_pcq);
+       q = *qp;
+       *qp = qs[i];
+       qs[i] = q;
+       percpu_putref(pq->pq_pcq);
+       splx(s);
+}
+
 /*
  * pktq_set_maxlen: create per-CPU queues using a new size and replace
  * the existing queues without losing any packets.
+ *
+ * XXX ncpu must remain stable throughout.
  */
 int
 pktq_set_maxlen(pktqueue_t *pq, size_t maxlen)
@@ -325,18 +386,18 @@
        if (pq->pq_maxlen == maxlen)
                return 0;
 
-       /* First, allocate the new queues and replace them. */
+       /* First, allocate the new queues. */
        qs = kmem_zalloc(slotbytes, KM_SLEEP);
        for (u_int i = 0; i < ncpu; i++) {
                qs[i] = pcq_create(maxlen, KM_SLEEP);
        }
+
+       /*
+        * Issue an xcall to replace the queue pointers on each CPU.
+        * This implies all the necessary memory barriers.
+        */
        mutex_enter(&pq->pq_lock);
-       for (u_int i = 0; i < ncpu; i++) {
-               /* Swap: store of a word is atomic. */
-               pcq_t *q = pq->pq_queue[i];
-               pq->pq_queue[i] = qs[i];
-               qs[i] = q;
-       }
+       xc_wait(xc_broadcast(XC_HIGHPRI, pktq_set_maxlen_cpu, pq, qs));
        pq->pq_maxlen = maxlen;
        mutex_exit(&pq->pq_lock);
 
@@ -355,10 +416,15 @@
        pktq_barrier(pq);
 
        for (u_int i = 0; i < ncpu; i++) {
+               struct pcq *q;
                struct mbuf *m;
 
+               kpreempt_disable();
+               q = pktq_pcq(pq, cpu_lookup(i));
+               kpreempt_enable();
+
                while ((m = pcq_get(qs[i])) != NULL) {
-                       while (!pcq_put(pq->pq_queue[i], m)) {
+                       while (!pcq_put(q, m)) {
                                kpause("pktqrenq", false, 1, NULL);
                        }
                }



Home | Main Index | Thread Index | Old Index