Source-Changes-HG archive
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]
[src/trunk]: src/lib A bunch of improvements:
details: https://anonhg.NetBSD.org/src/rev/470f77f25c32
branches: trunk
changeset: 760674:470f77f25c32
user: pooka <pooka%NetBSD.org@localhost>
date: Mon Jan 10 19:49:43 2011 +0000
description:
A bunch of improvements:
* don't hold spc mutex while sending data
* use send() for the banner to avoid SIGPIPE in case a client
connects and immediately goes away
* fix error path locking
* use kevent() instead of pollts() in the client. Apparently that
is the only sensible way for a library to support both multithreading
and signal-reentrancy in a race-free manner.
(can I catch all signals with one kevent instead of installing
NSIG different ones??)
* mark client comm descriptor non-blocking so that clients have
better signal-interruptibility (we now sleep in signal-accepting
kevent() instead of signal-masked recvfrom())
diffstat:
lib/librumpclient/rumpclient.c | 120 ++++++++++++++++++++++++++--------------
lib/librumpuser/rumpuser_sp.c | 12 ++-
lib/librumpuser/sp_common.c | 7 +-
3 files changed, 91 insertions(+), 48 deletions(-)
diffs (truncated from 315 to 300 lines):
diff -r 343087c002a2 -r 470f77f25c32 lib/librumpclient/rumpclient.c
--- a/lib/librumpclient/rumpclient.c Mon Jan 10 19:30:21 2011 +0000
+++ b/lib/librumpclient/rumpclient.c Mon Jan 10 19:49:43 2011 +0000
@@ -1,4 +1,4 @@
-/* $NetBSD: rumpclient.c,v 1.14 2011/01/09 14:10:03 pooka Exp $ */
+/* $NetBSD: rumpclient.c,v 1.15 2011/01/10 19:49:43 pooka Exp $ */
/*
* Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
@@ -33,6 +33,7 @@
__RCSID("$NetBSD");
#include <sys/param.h>
+#include <sys/event.h>
#include <sys/mman.h>
#include <sys/socket.h>
@@ -60,6 +61,7 @@
int (*host_socket)(int, int, int);
int (*host_close)(int);
int (*host_connect)(int, const struct sockaddr *, socklen_t);
+int (*host_fcntl)(int, int, ...);
int (*host_poll)(struct pollfd *, nfds_t, int);
int (*host_pollts)(struct pollfd *, nfds_t, const struct timespec *,
const sigset_t *);
@@ -74,28 +76,14 @@
.spc_fd = -1,
};
-/*
- * This version of waitresp is optimized for single-threaded clients
- * and is required by signal-safe clientside rump syscalls.
- */
+static int kq;
+static sigset_t fullset;
-static void
-releasercvlock(struct spclient *spc)
+static int
+waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
{
pthread_mutex_lock(&spc->spc_mtx);
- if (spc->spc_istatus == SPCSTATUS_WANTED)
- kickall(spc);
- spc->spc_istatus = SPCSTATUS_FREE;
-}
-
-static sigset_t fullset;
-static int
-waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
-{
- struct pollfd pfd;
- int rv = 0;
-
sendunlockl(spc);
rw->rw_error = 0;
@@ -103,32 +91,41 @@
&& spc->spc_state != SPCSTATE_DYING){
/* are we free to receive? */
if (spc->spc_istatus == SPCSTATUS_FREE) {
+ struct kevent kev[8];
+ int gotresp, dosig, rv, i;
+
spc->spc_istatus = SPCSTATUS_BUSY;
pthread_mutex_unlock(&spc->spc_mtx);
- pfd.fd = spc->spc_fd;
- pfd.events = POLLIN;
+ dosig = 0;
+ for (gotresp = 0; !gotresp; ) {
+ switch (readframe(spc)) {
+ case 0:
+ rv = kevent(kq, NULL, 0,
+ kev, __arraycount(kev), NULL);
+ assert(rv > 0);
+ for (i = 0; i < rv; i++) {
+ if (kev[i].filter
+ == EVFILT_SIGNAL)
+ dosig++;
+ }
+ if (dosig)
+ goto cleanup;
- switch (readframe(spc)) {
- case 0:
- releasercvlock(spc);
- pthread_mutex_unlock(&spc->spc_mtx);
- host_pollts(&pfd, 1, NULL, mask);
- pthread_mutex_lock(&spc->spc_mtx);
- continue;
- case -1:
- releasercvlock(spc);
- rv = errno;
- spc->spc_state = SPCSTATE_DYING;
- continue;
- default:
- break;
- }
+ continue;
+ case -1:
+ spc->spc_state = SPCSTATE_DYING;
+ goto cleanup;
+ default:
+ break;
+ }
- switch (spc->spc_hdr.rsp_class) {
+ switch (spc->spc_hdr.rsp_class) {
case RUMPSP_RESP:
case RUMPSP_ERROR:
kickwaiter(spc);
+ gotresp = spc->spc_hdr.rsp_reqno ==
+ rw->rw_reqno;
break;
case RUMPSP_REQ:
handlereq(spc);
@@ -136,9 +133,22 @@
default:
/* panic */
break;
+ }
}
- releasercvlock(spc);
+ cleanup:
+ pthread_mutex_lock(&spc->spc_mtx);
+ if (spc->spc_istatus == SPCSTATUS_WANTED)
+ kickall(spc);
+ spc->spc_istatus = SPCSTATUS_FREE;
+
+ /* take one for the team */
+ if (dosig) {
+ pthread_mutex_unlock(&spc->spc_mtx);
+ pthread_sigmask(SIG_SETMASK, mask, NULL);
+ pthread_sigmask(SIG_SETMASK, &fullset, NULL);
+ pthread_mutex_lock(&spc->spc_mtx);
+ }
} else {
spc->spc_istatus = SPCSTATUS_WANTED;
pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
@@ -148,8 +158,6 @@
pthread_mutex_unlock(&spc->spc_mtx);
pthread_cond_destroy(&rw->rw_cv);
- if (rv)
- return rv;
if (spc->spc_state == SPCSTATE_DYING)
return ENOTCONN;
return rw->rw_error;
@@ -385,8 +393,9 @@
static int
doconnect(void)
{
+ struct kevent kev[NSIG+1];
char banner[MAXBANNER];
- int s, error;
+ int s, error, flags, i;
ssize_t n;
s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0);
@@ -421,8 +430,34 @@
}
banner[n] = '\0';
+ flags = host_fcntl(s, F_GETFL, 0);
+ if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
+ fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n");
+ errno = EINVAL;
+ return -1;
+ }
+
/* parse the banner some day */
+ /* setup kqueue, we want all signals and the fd */
+ if ((kq = kqueue()) == -1) {
+ error = errno;
+ fprintf(stderr, "rump_sp: cannot setup kqueue");
+ errno = error;
+ return -1;
+ }
+
+ for (i = 0; i < NSIG; i++) {
+ EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0);
+ }
+ EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0);
+ if (kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) {
+ error = errno;
+ fprintf(stderr, "rump_sp: kevent() failed");
+ errno = error;
+ return -1;
+ }
+
clispc.spc_fd = s;
TAILQ_INIT(&clispc.spc_respwait);
pthread_mutex_init(&clispc.spc_mtx, NULL);
@@ -455,6 +490,7 @@
FINDSYM2(socket,__socket30);
FINDSYM(close);
FINDSYM(connect);
+ FINDSYM(fcntl);
FINDSYM(poll);
FINDSYM(pollts);
FINDSYM(read);
@@ -522,6 +558,8 @@
int error;
host_close(clispc.spc_fd);
+ host_close(kq);
+ kq = -1;
memset(&clispc, 0, sizeof(clispc));
clispc.spc_fd = -1;
diff -r 343087c002a2 -r 470f77f25c32 lib/librumpuser/rumpuser_sp.c
--- a/lib/librumpuser/rumpuser_sp.c Mon Jan 10 19:30:21 2011 +0000
+++ b/lib/librumpuser/rumpuser_sp.c Mon Jan 10 19:49:43 2011 +0000
@@ -1,4 +1,4 @@
-/* $NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $ */
+/* $NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $ */
/*
* Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
@@ -35,7 +35,7 @@
*/
#include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $");
#include <sys/types.h>
#include <sys/atomic.h>
@@ -104,20 +104,23 @@
static int
waitresp(struct spclient *spc, struct respwait *rw)
{
+ int spcstate;
int rv = 0;
+ pthread_mutex_lock(&spc->spc_mtx);
sendunlockl(spc);
while (!rw->rw_done && spc->spc_state != SPCSTATE_DYING) {
pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
}
TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
+ spcstate = spc->spc_state;
pthread_mutex_unlock(&spc->spc_mtx);
pthread_cond_destroy(&rw->rw_cv);
if (rv)
return rv;
- if (spc->spc_state == SPCSTATE_DYING)
+ if (spcstate == SPCSTATE_DYING)
return ENOTCONN;
return rw->rw_error;
}
@@ -511,7 +514,8 @@
}
/* write out a banner for the client */
- if (write(newfd, banner, strlen(banner)) != (ssize_t)strlen(banner)) {
+ if (send(newfd, banner, strlen(banner), MSG_NOSIGNAL)
+ != (ssize_t)strlen(banner)) {
close(newfd);
return 0;
}
diff -r 343087c002a2 -r 470f77f25c32 lib/librumpuser/sp_common.c
--- a/lib/librumpuser/sp_common.c Mon Jan 10 19:30:21 2011 +0000
+++ b/lib/librumpuser/sp_common.c Mon Jan 10 19:49:43 2011 +0000
@@ -1,4 +1,4 @@
-/* $NetBSD: sp_common.c,v 1.22 2011/01/10 11:57:53 pooka Exp $ */
+/* $NetBSD: sp_common.c,v 1.23 2011/01/10 19:49:43 pooka Exp $ */
/*
* Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
@@ -212,7 +212,6 @@
sendlockl(struct spclient *spc)
{
- /* assert(pthread_mutex_owned) */
while (spc->spc_ostatus != SPCSTATUS_FREE) {
spc->spc_ostatus = SPCSTATUS_WANTED;
pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
@@ -233,7 +232,6 @@
sendunlockl(struct spclient *spc)
{
- /* assert(pthread_mutex_owned) */
if (spc->spc_ostatus == SPCSTATUS_WANTED)
pthread_cond_broadcast(&spc->spc_cv);
spc->spc_ostatus = SPCSTATUS_FREE;
@@ -298,12 +296,14 @@
TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries);
sendlockl(spc);
+ pthread_mutex_unlock(&spc->spc_mtx);
}
static void
Home |
Main Index |
Thread Index |
Old Index