Source-Changes-HG archive
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]
[src/trunk]: src/lib/librumpuser Sneeze some locking into connect/disconnect.
details: https://anonhg.NetBSD.org/src/rev/2f18a2846001
branches: trunk
changeset: 759006:2f18a2846001
user: pooka <pooka%NetBSD.org@localhost>
date: Wed Nov 24 11:40:24 2010 +0000
description:
Sneeze some locking into connect/disconnect.
diffstat:
lib/librumpuser/rumpuser_sp.c | 147 +++++++++++++++++++++++++++--------------
1 files changed, 97 insertions(+), 50 deletions(-)
diffs (271 lines):
diff -r 95f8367ba11a -r 2f18a2846001 lib/librumpuser/rumpuser_sp.c
--- a/lib/librumpuser/rumpuser_sp.c Tue Nov 23 22:14:27 2010 +0000
+++ b/lib/librumpuser/rumpuser_sp.c Wed Nov 24 11:40:24 2010 +0000
@@ -1,4 +1,4 @@
-/* $NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $ */
+/* $NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $ */
/*
* Copyright (c) 2010 Antti Kantee. All Rights Reserved.
@@ -38,9 +38,10 @@
*/
#include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $");
#include <sys/types.h>
+#include <sys/atomic.h>
#include <sys/mman.h>
#include <sys/socket.h>
@@ -67,7 +68,7 @@
static struct pollfd pfdlist[MAXCLI];
static struct spclient spclist[MAXCLI];
-static unsigned int nfds, maxidx;
+static unsigned int disco;
static struct rumpuser_sp_ops spops;
@@ -287,66 +288,81 @@
}
static void
-serv_handledisco(unsigned int idx)
+spcref(struct spclient *spc)
+{
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ spc->spc_refcnt++;
+ pthread_mutex_unlock(&spc->spc_mtx);
+}
+
+static void
+spcrelease(struct spclient *spc)
{
- struct spclient *spc = &spclist[idx];
- int fd = spc->spc_fd;
+ int ref;
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ ref = --spc->spc_refcnt;
+ pthread_mutex_unlock(&spc->spc_mtx);
- DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
+ if (ref > 0)
+ return;
+
+ _DIAGASSERT(TAILQ_EMPTY(&spclist[i].spc_respwait));
+ _DIAGASSERT(spc->spc_buf == NULL);
lwproc_switch(spc->spc_mainlwp);
lwproc_release();
+ spc->spc_mainlwp = NULL;
- pthread_mutex_destroy(&spc->spc_mtx);
- pthread_cond_destroy(&spc->spc_cv);
- free(spc->spc_buf);
- memset(spc, 0, sizeof(*spc));
- close(fd);
- pfdlist[idx].fd = -1;
- nfds--;
+ close(spc->spc_fd);
+ spc->spc_fd = -1;
- if (idx == maxidx) {
- while (idx--) {
- if (pfdlist[idx].fd != -1) {
- maxidx = idx;
- break;
- }
- assert(idx != 0);
- }
- DPRINTF(("rump_sp: set maxidx to [%u]\n", maxidx));
- }
+ spc->spc_pfd->fd = -1;
+ membar_producer();
+ atomic_inc_uint(&disco);
+
}
-static int
-serv_handleconn(int fd, connecthook_fn connhook)
+static void
+serv_handledisco(unsigned int idx)
+{
+ struct spclient *spc = &spclist[idx];
+
+ DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
+
+ spcrelease(spc);
+}
+
+static unsigned
+serv_handleconn(int fd, connecthook_fn connhook, int busy)
{
struct sockaddr_storage ss;
socklen_t sl = sizeof(ss);
- int newfd, flags, error;
+ int newfd, flags;
unsigned i;
/*LINTED: cast ok */
newfd = accept(fd, (struct sockaddr *)&ss, &sl);
if (newfd == -1)
- return errno;
+ return 0;
+
+ if (busy) {
+ close(newfd); /* EBUSY */
+ return 0;
+ }
/* XXX: should do some sort of handshake too */
- if (nfds == MAXCLI) {
- close(newfd); /* EBUSY */
- return EBUSY;
- }
-
flags = fcntl(newfd, F_GETFL, 0);
if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
close(newfd);
- return errno;
+ return 0;
}
- flags = 1;
- if ((error = connhook(newfd)) != 0) {
+ if (connhook(newfd) != 0) {
close(newfd);
- return error;
+ return 0;
}
/* find empty slot the simple way */
@@ -355,33 +371,28 @@
break;
}
- if ((error = lwproc_newproc(&spclist[i])) != 0) {
+ if (lwproc_newproc(&spclist[i]) != 0) {
close(newfd);
- return error;
+ return 0;
}
assert(i < MAXCLI);
- nfds++;
pfdlist[i].fd = newfd;
spclist[i].spc_fd = newfd;
spclist[i].spc_mainlwp = lwproc_curlwp();
spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
spclist[i].spc_pid = lwproc_getpid();
+ spclist[i].spc_refcnt = 1;
TAILQ_INIT(&spclist[i].spc_respwait);
- pthread_mutex_init(&spclist[i].spc_mtx, NULL);
- pthread_cond_init(&spclist[i].spc_cv, NULL);
-
- if (maxidx < i)
- maxidx = i;
DPRINTF(("rump_sp: added new connection at idx %u, pid %d\n",
i, lwproc_getpid()));
lwproc_switch(NULL);
- return 0;
+ return i;
}
static void
@@ -503,6 +514,7 @@
pthread_attr_init(&pattr);
pthread_attr_setdetachstate(&pattr, 1);
+ spcref(spc);
if ((rv = pthread_create(&pt, &pattr, serv_syscallbouncer, sba)) != 0) {
/* panic */
abort();
@@ -513,13 +525,21 @@
spserver(void *arg)
{
struct spservarg *sarg = arg;
+ struct spclient *spc;
unsigned idx;
int seen;
int rv;
+ unsigned int nfds, maxidx;
- for (idx = 1; idx < MAXCLI; idx++) {
+ for (idx = 0; idx < MAXCLI; idx++) {
pfdlist[idx].fd = -1;
pfdlist[idx].events = POLLIN;
+
+ spc = &spclist[idx];
+
+ spc->spc_pfd = &pfdlist[idx];
+ pthread_mutex_init(&spc->spc_mtx, NULL);
+ pthread_cond_init(&spc->spc_cv, NULL);
}
pfdlist[0].fd = sarg->sps_sock;
pfdlist[0].events = POLLIN;
@@ -529,6 +549,27 @@
DPRINTF(("rump_sp: server mainloop\n"));
for (;;) {
+ /* g/c hangarounds (eventually) */
+ if (disco) {
+ int discoed;
+
+ membar_consumer();
+ discoed = atomic_swap_uint(&disco, 0);
+ while (discoed--) {
+ nfds--;
+ idx = maxidx;
+ while (idx--) {
+ if (pfdlist[idx].fd != -1) {
+ maxidx = idx;
+ break;
+ }
+ assert(idx != 0);
+ }
+ DPRINTF(("rump_sp: set maxidx to [%u]\n",
+ maxidx));
+ }
+ }
+
DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
seen = 0;
rv = poll(pfdlist, maxidx+1, INFTIM);
@@ -552,7 +593,7 @@
DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
idx, seen, rv));
if (idx > 0) {
- struct spclient *spc = &spclist[idx];
+ spc = &spclist[idx];
DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
switch (readframe(spc)) {
@@ -576,10 +617,16 @@
}
break;
}
+
} else {
DPRINTF(("rump_sp: mainloop new connection\n"));
- serv_handleconn(pfdlist[0].fd,
- sarg->sps_connhook);
+
+ idx = serv_handleconn(pfdlist[0].fd,
+ sarg->sps_connhook, nfds == MAXCLI);
+ if (idx)
+ nfds++;
+ if (idx > maxidx)
+ maxidx = idx;
}
}
}
Home |
Main Index |
Thread Index |
Old Index