aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authortrasz <trasz@FreeBSD.org>2012-08-10 16:19:22 +0800
committertrasz <trasz@FreeBSD.org>2012-08-10 16:19:22 +0800
commit70e8f4d66f87a69be3f3756725eb9f4762fc47ce (patch)
tree114e0ba1a99e0adf624da8863f6f483cd267ff13 /net
parentde2e677bf2ceaede579bd0a541b5f1ac7dd6fc5f (diff)
downloadfreebsd-ports-gnome-70e8f4d66f87a69be3f3756725eb9f4762fc47ce.tar.gz
freebsd-ports-gnome-70e8f4d66f87a69be3f3756725eb9f4762fc47ce.tar.zst
freebsd-ports-gnome-70e8f4d66f87a69be3f3756725eb9f4762fc47ce.zip
Add optional threading support; disabled by default.
Diffstat (limited to 'net')
-rw-r--r--net/openvswitch/Makefile14
-rw-r--r--net/openvswitch/files/threaded.diff1305
2 files changed, 1317 insertions, 2 deletions
diff --git a/net/openvswitch/Makefile b/net/openvswitch/Makefile
index bff31aa9e1d1..a7b184ddd68c 100644
--- a/net/openvswitch/Makefile
+++ b/net/openvswitch/Makefile
@@ -30,20 +30,30 @@ MAN8= ovs-appctl.8 ovs-brcompatd.8 ovs-bugtool.8 ovs-controller.8 \
ovs-test.8 ovs-vsctl.8 ovs-vswitchd.8 \
ovs-vlan-bug-workaround.8 ovs-vlan-test.8
+OPTIONS_DEFINE= THREADED
+THREADED_DESC= Experimental high-performance threading patch
+
.include <bsd.port.pre.mk>
.if ${OSVERSION} < 800000
BROKEN= does not compile
.endif
+.if ${PORT_OPTIONS:MTHREADED}
+CONFIGURE_ARGS+=--enable-threaded=yes
+.endif
+
AUTOTOOLSFILES= aclocal.m4
post-patch:
@${REINPLACE_CMD} -e 's|1.11.1|%%AUTOMAKE_APIVER%%|g' \
-e 's|2.65|%%AUTOCONF_VERSION%%|g' \
${WRKSRC}/aclocal.m4
- # Workaround for a makefile bug; if it builds without this line, remove it.
- #${TOUCH} ${WRKSRC}/INSTALL
+.if ${PORT_OPTIONS:MTHREADED}
+ @# We can't use EXTRA_PATCHES, since we need to apply this one
+ @# after files/patch-bsd-netdef.diff, not before.
+ ${PATCH} ${PATCH_ARGS} < ${FILESDIR}/threaded.diff
+.endif
post-install:
${INSTALL_DATA} ${WRKSRC}/vswitchd/vswitch.ovsschema ${PREFIX}/share/openvswitch/
diff --git a/net/openvswitch/files/threaded.diff b/net/openvswitch/files/threaded.diff
new file mode 100644
index 000000000000..858f39494157
--- /dev/null
+++ b/net/openvswitch/files/threaded.diff
@@ -0,0 +1,1305 @@
+diff --git configure.ac configure.ac
+index 5692b86..ff62627 100644
+--- configure.ac
++++ configure.ac
+@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt])
+ AC_SEARCH_LIBS([timer_create], [rt])
+ AC_SEARCH_LIBS([pcap_open_live], [pcap])
+
++OVS_CHECK_THREADED
+ OVS_CHECK_COVERAGE
+ OVS_CHECK_NDEBUG
+ OVS_CHECK_NETLINK
+diff --git lib/automake.mk lib/automake.mk
+index 13622b3..87bdd8d 100644
+--- lib/automake.mk
++++ lib/automake.mk
+@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \
+ lib/daemon.c \
+ lib/daemon.h \
+ lib/dhcp.h \
++ lib/dispatch.h \
+ lib/dummy.c \
+ lib/dummy.h \
+ lib/dhparams.h \
+diff --git lib/dispatch.h lib/dispatch.h
+new file mode 100644
+index 0000000..80ac9c7
+--- /dev/null
++++ lib/dispatch.h
+@@ -0,0 +1,9 @@
++#include <sys/types.h>
++#include "ofpbuf.h"
++
++#ifndef DISPATCH_H
++#define DISPATCH_H 1
++
++typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf);
++
++#endif /* DISPATCH_H */
+diff --git lib/dpif-netdev.c lib/dpif-netdev.c
+index cade79e..509e2ef 100644
+--- lib/dpif-netdev.c
++++ lib/dpif-netdev.c
+@@ -32,6 +32,15 @@
+ #include <sys/stat.h>
+ #include <unistd.h>
+
++#ifdef THREADED
++#include <signal.h>
++#include <pthread.h>
++
++#include "socket-util.h"
++#include "fatal-signal.h"
++#include "dispatch.h"
++#endif
++
+ #include "csum.h"
+ #include "dpif.h"
+ #include "dpif-provider.h"
+@@ -55,6 +64,16 @@
+ #include "vlog.h"
+
+ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
++/* We could use these macros instead of using #ifdef and #endif every time we
++ * need to call the pthread_mutex_lock/unlock.
++#ifdef THREADED
++#define LOCK(mutex) pthread_mutex_lock(mutex)
++#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
++#else
++#define LOCK(mutex)
++#define UNLOCK(mutex)
++#endif
++*/
+
+ /* Configuration parameters. */
+ enum { MAX_PORTS = 256 }; /* Maximum number of ports. */
+@@ -82,6 +101,21 @@ struct dp_netdev {
+ int open_cnt;
+ bool destroyed;
+
++#ifdef THREADED
++ /* The pipe is used to signal the presence of a packet on the queue.
++ * - dpif_netdev_recv_wait() waits on p[0]
++ * - dpif_netdev_recv() extract from queue and read p[0]
++ * - dp_netdev_output_control() send to queue and write p[1]
++ */
++
++ int pipe[2]; /* signal a packet on the queue */
++ struct pollfd *pipe_fd;
++
++ pthread_mutex_t table_mutex; /* mutex for the flow table */
++ pthread_mutex_t port_list_mutex; /* port list mutex */
++
++ /* The access to this queue is protected by the table_mutex mutex */
++#endif
+ struct dp_netdev_queue queues[N_QUEUES];
+ struct hmap flow_table; /* Flow table. */
+
+@@ -102,6 +136,9 @@ struct dp_netdev_port {
+ struct list node; /* Element in dp_netdev's 'port_list'. */
+ struct netdev *netdev;
+ char *type; /* Port type as requested by user. */
++#ifdef THREADED
++ struct pollfd *poll_fd; /* To manage the poll loop in the thread. */
++#endif
+ };
+
+ /* A flow in dp_netdev's 'flow_table'. */
+@@ -127,6 +164,11 @@ struct dpif_netdev {
+ unsigned int dp_serial;
+ };
+
++#ifdef THREADED
++/* XXX global Descriptor of the thread that manages the datapaths. */
++pthread_t thread_p;
++#endif
++
+ /* All netdev-based datapaths. */
+ static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs);
+
+@@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
+ dp->class = class;
+ dp->name = xstrdup(name);
+ dp->open_cnt = 0;
++#ifdef THREADED
++ error = pipe(dp->pipe);
++ if (error) {
++ VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno));
++ return errno;
++ }
++ if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
++ VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s",
++ strerror(errno));
++ return errno;
++ }
++ dp->pipe_fd = NULL;
++ VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]);
++
++ pthread_mutex_init(&dp->table_mutex, NULL);
++ pthread_mutex_init(&dp->port_list_mutex, NULL);
++#endif
+ for (i = 0; i < N_QUEUES; i++) {
+ dp->queues[i].head = dp->queues[i].tail = 0;
+ }
+@@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
+ return 0;
+ }
+
++#ifdef THREADED
++static void * dp_thread_body(void *args OVS_UNUSED);
++
++/* This is the function that is called in response of a fatal signal (e.g.
++ * SIGTERM) */
++static void
++dpif_netdev_exit_hook(void *aux OVS_UNUSED)
++{
++ if (pthread_cancel(thread_p) == 0) {
++ pthread_join(thread_p, NULL);
++ }
++}
++
++static int
++dpif_netdev_init(void)
++{
++ static int error = -1;
++
++ if (error < 0) {
++ fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true);
++ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
++ if (error != 0) {
++ VLOG_ERR("Unable to create datapath thread: %s", strerror(errno));
++ error = errno;
++ } else {
++ VLOG_DBG("Datapath thread started");
++ }
++ }
++ return error;
++}
++#endif
++
+ static int
+ dpif_netdev_open(const struct dpif_class *class, const char *name,
+ bool create, struct dpif **dpifp)
+@@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
+ }
+
+ *dpifp = create_dpif_netdev(dp);
++#ifdef THREADED
++ dpif_netdev_init();
++#endif
+ return 0;
+ }
+
++/* table_mutex must be locked in THREADED mode.
++ */
+ static void
+ dp_netdev_purge_queues(struct dp_netdev *dp)
+ {
+@@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp)
+ struct dp_netdev_port *port, *next;
+
+ dp_netdev_flow_flush(dp);
++#ifdef THREADED
++ pthread_mutex_lock(&dp->port_list_mutex);
++#endif
+ LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) {
+ do_del_port(dp, port->port_no);
+ }
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->port_list_mutex);
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
+ dp_netdev_purge_queues(dp);
+ hmap_destroy(&dp->flow_table);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++ pthread_mutex_destroy(&dp->table_mutex);
++ pthread_mutex_destroy(&dp->port_list_mutex);
++#endif
+ free(dp->name);
+ free(dp);
+ }
+@@ -306,7 +414,13 @@ static int
+ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
+ {
+ struct dp_netdev *dp = get_dp_netdev(dpif);
++#ifdef THREADED
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
+ stats->n_flows = hmap_count(&dp->flow_table);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ stats->n_hit = dp->n_hit;
+ stats->n_missed = dp->n_missed;
+ stats->n_lost = dp->n_lost;
+@@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
+ port->port_no = port_no;
+ port->netdev = netdev;
+ port->type = xstrdup(type);
++#ifdef THREADED
++ port->poll_fd = NULL;
++#endif
+
+ error = netdev_get_mtu(netdev, &mtu);
+ if (!error) {
+ max_mtu = mtu;
+ }
+
++#ifdef THREADED
++ pthread_mutex_lock(&dp->port_list_mutex);
++#endif
+ list_push_back(&dp->port_list, &port->node);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->port_list_mutex);
++#endif
+ dp->ports[port_no] = port;
+ dp->serial++;
+
+@@ -448,15 +571,25 @@ get_port_by_name(struct dp_netdev *dp,
+ {
+ struct dp_netdev_port *port;
+
++#ifdef THREADED
++ pthread_mutex_lock(&dp->port_list_mutex);
++#endif
+ LIST_FOR_EACH (port, node, &dp->port_list) {
+ if (!strcmp(netdev_get_name(port->netdev), devname)) {
+ *portp = port;
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->port_list_mutex);
++#endif
+ return 0;
+ }
+ }
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->port_list_mutex);
++#endif
+ return ENOENT;
+ }
+
++/* In THREADED mode, must be called with port_list_mutex held. */
+ static int
+ do_del_port(struct dp_netdev *dp, uint16_t port_no)
+ {
+@@ -531,7 +664,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED)
+ static void
+ dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
+ {
++#ifdef THREADED
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
+ hmap_remove(&dp->flow_table, &flow->node);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ free(flow->actions);
+ free(flow);
+ }
+@@ -620,7 +759,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_)
+ }
+
+ static struct dp_netdev_flow *
+-dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
++#ifdef THREADED
++dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key)
++#else
++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
++#endif
+ {
+ struct dp_netdev_flow *flow;
+
+@@ -632,6 +775,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
+ return NULL;
+ }
+
++#ifdef THREADED
++static struct dp_netdev_flow *
++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
++{
++ struct dp_netdev_flow *flow;
++
++ pthread_mutex_lock(&dp->table_mutex);
++ flow = dp_netdev_lookup_flow_locked(dp, key);
++ pthread_mutex_unlock(&dp->table_mutex);
++ return flow;
++}
++#endif
++
+ static void
+ get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats)
+ {
+@@ -729,7 +885,13 @@ add_flow(struct dpif *dpif, const struct flow *key,
+ return error;
+ }
+
++#ifdef THREADED
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
+ hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ return 0;
+ }
+
+@@ -749,6 +911,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
+ struct dp_netdev_flow *flow;
+ struct flow key;
+ int error;
++ int n_flows;
+
+ error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key);
+ if (error) {
+@@ -758,7 +921,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
+ flow = dp_netdev_lookup_flow(dp, &key);
+ if (!flow) {
+ if (put->flags & DPIF_FP_CREATE) {
+- if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
++#ifdef THREADED
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
++ n_flows = hmap_count(&dp->flow_table);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
++ if (n_flows < MAX_FLOWS) {
+ if (put->stats) {
+ memset(put->stats, 0, sizeof *put->stats);
+ }
+@@ -843,7 +1013,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_,
+ struct dp_netdev_flow *flow;
+ struct hmap_node *node;
+
++#ifdef THREADED
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
+ node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ if (!node) {
+ return EOF;
+ }
+@@ -949,7 +1125,13 @@ static int
+ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
+ struct ofpbuf *buf)
+ {
+- struct dp_netdev_queue *q = find_nonempty_queue(dpif);
++ struct dp_netdev_queue *q;
++#ifdef THREADED
++ struct dp_netdev *dp = get_dp_netdev(dpif);
++ char c;
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
++ q = find_nonempty_queue(dpif);
+ if (q) {
+ struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK];
+ *upcall = *u;
+@@ -958,8 +1140,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
+ ofpbuf_uninit(buf);
+ *buf = *upcall->packet;
+
++#ifdef THREADED
++ /* Read a byte from the pipe to signal that a packet has been
++ * received. */
++ if (read(dp->pipe[0], &c, 1) < 0) {
++ VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno));
++ }
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ return 0;
+ } else {
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ return EAGAIN;
+ }
+ }
+@@ -967,19 +1160,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
+ static void
+ dpif_netdev_recv_wait(struct dpif *dpif)
+ {
++#ifdef THREADED
++ struct dp_netdev *dp = get_dp_netdev(dpif);
++
++ poll_fd_wait(dp->pipe[0], POLLIN);
++#else
+ if (find_nonempty_queue(dpif)) {
+ poll_immediate_wake();
+ } else {
+ /* No messages ready to be received, and dp_wait() will ensure that we
+ * wake up to queue new messages, so there is nothing to do. */
+ }
++#endif
+ }
+
+ static void
+ dpif_netdev_recv_purge(struct dpif *dpif)
+ {
+ struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
++#ifdef THREADED
++ struct dp_netdev *dp = get_dp_netdev(dpif);
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
+ dp_netdev_purge_queues(dpif_netdev->dp);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ }
+
+ static void
+@@ -1003,7 +1209,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
+ return;
+ }
+ flow_extract(packet, 0, 0, port->port_no, &key);
++#ifdef THREADED
++ pthread_mutex_lock(&dp->table_mutex);
++ flow = dp_netdev_lookup_flow_locked(dp, &key);
++#else
+ flow = dp_netdev_lookup_flow(dp, &key);
++#endif
+ if (flow) {
+ dp_netdev_flow_used(flow, &key, packet);
+ dp_netdev_execute_actions(dp, packet, &key,
+@@ -1013,8 +1224,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
+ dp->n_missed++;
+ dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0);
+ }
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ }
+
++#ifdef THREADED
++static void
++dpif_netdev_run(struct dpif *dpif OVS_UNUSED)
++{
++}
++
++static void
++dpif_netdev_wait(struct dpif *dpif OVS_UNUSED)
++{
++}
++#else
+ static void
+ dpif_netdev_run(struct dpif *dpif)
+ {
+@@ -1053,6 +1278,144 @@ dpif_netdev_wait(struct dpif *dpif)
+ netdev_recv_wait(port->netdev);
+ }
+ }
++#endif
++
++#ifdef THREADED
++/*
++ * pcap callback argument
++ */
++struct dispatch_arg {
++ struct dp_netdev *dp; /* update statistics */
++ struct dp_netdev_port *port; /* argument to flow identifier function */
++};
++
++/* Process a packet.
++ *
++ * The port_input function will send immediately if it finds a flow match and
++ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
++ * If a flow is not found or for the other actions, the packet is copied.
++ */
++static void
++process_pkt(u_char *user, struct ofpbuf *buf)
++{
++ struct dispatch_arg *arg = (struct dispatch_arg *)user;
++
++ ofpbuf_padto(buf, ETH_TOTAL_MIN);
++ dp_netdev_port_input(arg->dp, arg->port, buf);
++}
++
++/* Body of the thread that manages the datapaths */
++static void*
++dp_thread_body(void *args OVS_UNUSED)
++{
++ struct dp_netdev *dp;
++ struct dp_netdev_port *port;
++ struct dispatch_arg arg;
++ int error;
++ int n_fds;
++ uint32_t batch = 50; /* max number of pkts processed by the dispatch */
++ int processed; /* actual number of pkts processed by the dispatch */
++ char readbuf[1024];
++
++ sigset_t sigmask;
++
++ /*XXX Since the poll involves all ports of all datapaths, the right fds
++ * size should be MAX_PORTS * max_number_of_datapaths */
++ struct pollfd fds[MAX_PORTS + 1];
++
++ /* mask the fatal signals. In this way the main thread is delegate to
++ * manage this them. */
++ sigemptyset(&sigmask);
++ sigaddset(&sigmask, SIGTERM);
++ sigaddset(&sigmask, SIGALRM);
++ sigaddset(&sigmask, SIGINT);
++ sigaddset(&sigmask, SIGHUP);
++
++ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
++ VLOG_ERR("Error setting thread sigmask: %s", strerror(errno));
++ }
++
++ for(;;) {
++ struct shash_node *node;
++ n_fds = 0;
++ /* build the structure for poll */
++ SHASH_FOR_EACH(node, &dp_netdevs) {
++ dp = (struct dp_netdev *)node->data;
++ fds[n_fds].fd = dp->pipe[1];
++ fds[n_fds].events = POLLIN;
++ dp->pipe_fd = &fds[n_fds];
++ n_fds++;
++ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
++ VLOG_ERR("Too many fds for poll adding pipe_fd");
++ break;
++ }
++ pthread_mutex_lock(&dp->port_list_mutex);
++ LIST_FOR_EACH (port, node, &dp->port_list) {
++ /* insert an element in the fds structure */
++ fds[n_fds].fd = netdev_get_fd(port->netdev);
++ fds[n_fds].events = POLLIN;
++ port->poll_fd = &fds[n_fds];
++ n_fds++;
++ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
++ VLOG_ERR("Too many fds for poll adding port fd");
++ break;
++ }
++ }
++ pthread_mutex_unlock(&dp->port_list_mutex);
++ }
++
++ error = poll(fds, n_fds, 2000);
++ VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error);
++
++ if (error < 0) {
++ if (errno == EINTR) {
++ /* XXX get this case in detach mode */
++ continue;
++ }
++ VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno));
++ /* XXX terminating the thread is probably not right */
++ break;
++ }
++ pthread_testcancel();
++
++ SHASH_FOR_EACH (node, &dp_netdevs) {
++ dp = (struct dp_netdev *)node->data;
++ if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) {
++ VLOG_DBG("Signalled from main thread");
++ while ((error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0)
++ ;
++ if (error < 0 && errno != EAGAIN) {
++ VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno));
++ }
++ }
++ arg.dp = dp;
++ pthread_mutex_lock(&dp->port_list_mutex);
++ LIST_FOR_EACH (port, node, &dp->port_list) {
++ arg.port = port;
++ if (port->poll_fd) {
++ VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents);
++ }
++ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
++ /* call the dispatch and process the packet into
++ * its callback. We process 'batch' packets at time */
++ processed = netdev_dispatch(port->netdev, batch,
++ process_pkt, (u_char *)&arg);
++ if (processed < 0) { /* pcap returns error */
++ static struct vlog_rate_limit rl =
++ VLOG_RATE_LIMIT_INIT(1, 5);
++ VLOG_ERR_RL(&rl,
++ "error receiving data from XXX \n");
++ }
++ } /* end of if poll */
++ } /* end of port loop */
++ pthread_mutex_unlock(&dp->port_list_mutex);
++ } /* end of dp loop */
++ } /* for ;; */
++
++ return NULL;
++}
++
++#endif /* THREADED */
+
+ static void
+ dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key)
+@@ -1068,11 +1431,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
+ uint16_t out_port)
+ {
+ struct dp_netdev_port *p = dp->ports[out_port];
++ char c = 0;
++
+ if (p) {
+ netdev_send(p->netdev, packet);
++#ifdef THREADED
++ if (write(dp->pipe[0], &c, 1) < 0) {
++ VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno));
++ }
++#endif
+ }
+ }
+
++/* In THREADED mode, must be called with table_lock_mutex held. */
+ static int
+ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
+ int queue_no, const struct flow *flow, uint64_t arg)
+@@ -1081,6 +1452,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
+ struct dpif_upcall *upcall;
+ struct ofpbuf *buf;
+ size_t key_len;
++#ifdef THREADED
++ char c = 0;
++#endif
+
+ if (q->head - q->tail >= MAX_QUEUE_LEN) {
+ dp->n_lost++;
+@@ -1102,6 +1476,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
+ upcall->userdata = arg;
+
+ q->upcalls[q->head++ & QUEUE_MASK] = upcall;
++#ifdef THREADED
++ /* Write a byte on the pipe to advertise that a packet is ready. */
++ if (write(dp->pipe[1], &c, 1) < 0) {
++ VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno));
++ }
++#endif
+
+ return 0;
+ }
+@@ -1150,7 +1530,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp,
+
+ userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
+ userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0;
++#ifdef THREADED
++ pthread_mutex_lock(&dp->table_mutex);
++#endif
+ dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata);
++#ifdef THREADED
++ pthread_mutex_unlock(&dp->table_mutex);
++#endif
+ }
+
+ static void
+diff --git lib/netdev-bsd.c lib/netdev-bsd.c
+index 0b1a37c..ff79367 100644
+--- lib/netdev-bsd.c
++++ lib/netdev-bsd.c
+@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_)
+ }
+ }
+
++#ifdef THREADED
++
++struct dispatch_arg {
++ pkt_handler h;
++ u_char *user;
++};
++
++static void
++dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata)
++{
++ struct ofpbuf buf;
++ struct dispatch_arg *parg = (struct dispatch_arg*)user;
++
++ ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen);
++ buf.size = phdr->caplen;
++ (*parg->h)(parg->user, &buf);
++ ofpbuf_uninit(&buf);
++}
++
++static int
++netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h,
++ u_char *user)
++{
++ int ret;
++ struct dispatch_arg arg;
++
++ arg.h = h;
++ arg.user = user;
++ ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg);
++ return ret;
++}
++
++static int
++netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h,
++ u_char *user)
++{
++ int ret;
++ int i;
++ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
++ OFPBUF_STACK_BUFFER(buf_, size);
++
++ struct ofpbuf buf;
++ ofpbuf_use_stub(&buf, buf_, size);
++ for (i = 0; i < batch; i++) {
++ ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf));
++ if (ret >= 0) {
++ buf.size += ret;
++ h(user, &buf);
++ } else if (ret != -EAGAIN) {
++ return -1;
++ } else { /* ret = EAGAIN */
++ break;
++ }
++ ofpbuf_clear(&buf);
++ }
++ ofpbuf_uninit(&buf);
++ return i;
++}
++
++static int
++netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
++ u_char *user)
++{
++ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
++ struct netdev_dev_bsd * netdev_dev =
++ netdev_dev_bsd_cast(netdev_get_dev(netdev_));
++
++ if (!strcmp(netdev_get_type(netdev_), "tap") &&
++ netdev->netdev_fd == netdev_dev->tap_fd) {
++ return netdev_bsd_dispatch_tap(netdev, batch, h, user);
++ } else {
++ return netdev_bsd_dispatch_system(netdev, batch, h, user);
++ }
++}
++
++static int
++netdev_bsd_get_fd(struct netdev *netdev_)
++{
++ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
++ return netdev->netdev_fd;
++}
++#endif
++
+ /* Discards all packets waiting to be received from 'netdev'. */
+ static int
+ netdev_bsd_drain(struct netdev *netdev_)
+@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = {
+
+ netdev_bsd_recv,
+ netdev_bsd_recv_wait,
++#ifdef THREADED
++ netdev_bsd_dispatch,
++ netdev_bsd_get_fd,
++#endif
+ netdev_bsd_drain,
+
+ netdev_bsd_send,
+@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = {
+
+ netdev_bsd_recv,
+ netdev_bsd_recv_wait,
++#ifdef THREADED
++ netdev_bsd_dispatch,
++ netdev_bsd_get_fd,
++#endif
+ netdev_bsd_drain,
+
+ netdev_bsd_send,
+diff --git lib/netdev-dummy.c lib/netdev-dummy.c
+index b8c23c5..4e4801c 100644
+--- lib/netdev-dummy.c
++++ lib/netdev-dummy.c
+@@ -20,6 +20,12 @@
+
+ #include <errno.h>
+
++#ifdef THREADED
++#include <pthread.h>
++#include <unistd.h>
++#include "socket-util.h"
++#endif
++
+ #include "flow.h"
+ #include "list.h"
+ #include "netdev-provider.h"
+@@ -51,6 +57,10 @@ struct netdev_dummy {
+ struct list node; /* In netdev_dev_dummy's "devs" list. */
+ struct list recv_queue;
+ bool listening;
++#ifdef THREADED
++ pthread_mutex_t queue_mutex;
++ int s_pipe[2]; /* used to signal packet arrivals */
++#endif
+ };
+
+ static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs);
+@@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp)
+ {
+ struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_);
+ struct netdev_dummy *netdev;
++#ifdef THREADED
++ int error;
++#endif
+
+ netdev = xmalloc(sizeof *netdev);
+ netdev_init(&netdev->netdev, netdev_dev_);
+ list_init(&netdev->recv_queue);
+ netdev->listening = false;
++#ifdef THREADED
++ error = pipe(netdev->s_pipe);
++ if (error) {
++ VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno));
++ free(netdev);
++ return errno;
++ }
++ if (set_nonblocking(netdev->s_pipe[0]) ||
++ set_nonblocking(netdev->s_pipe[1])) {
++ VLOG_ERR("Unable to set nonblocking on dummy pipe: %s",
++ strerror(errno));
++ free(netdev);
++ return errno;
++ }
++ pthread_mutex_init(&netdev->queue_mutex, NULL);
++#endif
+
+ *netdevp = &netdev->netdev;
+ list_push_back(&netdev_dev->devs, &netdev->node);
+@@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_)
+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+ list_remove(&netdev->node);
+ ofpbuf_list_delete(&netdev->recv_queue);
++#ifdef THREADED
++ if (netdev->listening) {
++ close(netdev->s_pipe[0]);
++ close(netdev->s_pipe[1]);
++ }
++ pthread_mutex_destroy(&netdev->queue_mutex);
++#endif
+ free(netdev);
+ }
+
+@@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size)
+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+ struct ofpbuf *packet;
+ size_t packet_size;
++#ifdef THREADED
++ char c;
++#endif
+
++#ifdef THREADED
++ pthread_mutex_lock(&netdev->queue_mutex);
++#endif
+ if (list_is_empty(&netdev->recv_queue)) {
++#ifdef THREADED
++ pthread_mutex_unlock(&netdev->queue_mutex);
++#endif
+ return -EAGAIN;
+ }
++#ifdef THREADED
++ if (read(netdev->s_pipe[0], &c, 1) < 0) {
++ VLOG_ERR("Error reading dummy pipe: %s", strerror(errno));
++ }
++#endif
+
+ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
++#ifdef THREADED
++ pthread_mutex_unlock(&netdev->queue_mutex);
++#endif
+ if (packet->size > size) {
+ return -EMSGSIZE;
+ }
+@@ -179,11 +232,60 @@ static void
+ netdev_dummy_recv_wait(struct netdev *netdev_)
+ {
+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+- if (!list_is_empty(&netdev->recv_queue)) {
++ int empty;
++
++#ifdef THREADED
++ pthread_mutex_lock(&netdev->queue_mutex);
++#endif
++ empty = list_is_empty(&netdev->recv_queue);
++#ifdef THREADED
++ pthread_mutex_unlock(&netdev->queue_mutex);
++#endif
++ if (!empty) {
+ poll_immediate_wake();
+ }
+ }
+
++#ifdef THREADED
++static int
++netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
++ u_char *user)
++{
++ int i;
++ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
++ struct ofpbuf *packet;
++ VLOG_DBG("dispatch %d", batch);
++
++ for (i = 0; i < batch; i++) {
++ char c;
++ if (read(netdev->s_pipe[0], &c, 1) < 0) {
++ if (errno == EAGAIN)
++ break;
++ VLOG_ERR("%s: error reading from the pipe: %s",
++ netdev_get_name(netdev_), strerror(errno));
++ return -1;
++ }
++ pthread_mutex_lock(&netdev->queue_mutex);
++ if (list_is_empty(&netdev->recv_queue)) {
++ pthread_mutex_unlock(&netdev->queue_mutex);
++ return -EAGAIN;
++ }
++ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
++ pthread_mutex_unlock(&netdev->queue_mutex);
++ h(user, packet);
++ ofpbuf_delete(packet);
++ }
++ return i;
++}
++
++static int
++netdev_dummy_get_fd(struct netdev *netdev_)
++{
++ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
++ return netdev->s_pipe[0];
++}
++#endif
++
+ static int
+ netdev_dummy_drain(struct netdev *netdev_)
+ {
+@@ -316,6 +418,10 @@ static const struct netdev_class dummy_class = {
+ netdev_dummy_listen,
+ netdev_dummy_recv,
+ netdev_dummy_recv_wait,
++#ifdef THREADED
++ netdev_dummy_dispatch, /* dispatch */
++ netdev_dummy_get_fd, /* get_fd */
++#endif
+ netdev_dummy_drain,
+
+ NULL, /* send */
+@@ -407,6 +513,9 @@ netdev_dummy_receive(struct unixctl_conn *conn,
+ struct netdev_dev_dummy *dummy_dev;
+ int n_listeners;
+ int i;
++#ifdef THREADED
++ char c = 0;
++#endif
+
+ dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]);
+ if (!dummy_dev) {
+@@ -414,6 +523,7 @@ netdev_dummy_receive(struct unixctl_conn *conn,
+ return;
+ }
+
++
+ n_listeners = 0;
+ for (i = 2; i < argc; i++) {
+ struct netdev_dummy *dev;
+@@ -429,7 +539,16 @@ netdev_dummy_receive(struct unixctl_conn *conn,
+ LIST_FOR_EACH (dev, node, &dummy_dev->devs) {
+ if (dev->listening) {
+ struct ofpbuf *copy = ofpbuf_clone(packet);
++#ifdef THREADED
++ pthread_mutex_lock(&dev->queue_mutex);
++#endif
+ list_push_back(&dev->recv_queue, &copy->list_node);
++#ifdef THREADED
++ pthread_mutex_unlock(&dev->queue_mutex);
++ if (write(dev->s_pipe[1], &c, 1) < 0) {
++ VLOG_ERR("Error writing dummy pipe: %s", strerror(errno));
++ }
++#endif
+ n_listeners++;
+ }
+ }
+diff --git lib/netdev-linux.c lib/netdev-linux.c
+index 4d2f3ac..c33a801 100644
+--- lib/netdev-linux.c
++++ lib/netdev-linux.c
+@@ -891,6 +891,43 @@ netdev_linux_recv_wait(struct netdev *netdev_)
+ }
+ }
+
++#ifdef THREADED
++static int
++netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
++ u_char *user)
++{
++ int ret;
++ int i;
++ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
++ OFPBUF_STACK_BUFFER(buf_, size);
++ struct ofpbuf buf;
++ VLOG_DBG("dispatch %d", batch);
++
++ ofpbuf_use_stub(&buf, buf_, size);
++ for (i = 0; i < batch; i++) {
++ ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf));
++ if (ret >= 0) {
++ buf.size += ret;
++ h(user, &buf);
++ } else if (ret != -EAGAIN) {
++ return -1;
++ } else {
++ break;
++ }
++ ofpbuf_clear(&buf);
++ }
++ ofpbuf_uninit(&buf);
++ return i;
++}
++
++static int
++netdev_linux_get_fd(struct netdev *netdev_)
++{
++ struct netdev_linux *netdev = netdev_linux_cast(netdev_);
++ return netdev->fd;
++}
++#endif
++
+ /* Discards all packets waiting to be received from 'netdev'. */
+ static int
+ netdev_linux_drain(struct netdev *netdev_)
+@@ -2376,6 +2413,12 @@ netdev_linux_change_seq(const struct netdev *netdev)
+ return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq;
+ }
+
++#ifdef THREADED
++#define THREADED_NULL NULL, NULL,
++#else
++#define THREADED_NULL
++#endif
++
+ #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \
+ GET_FEATURES, GET_STATUS) \
+ { \
+@@ -2396,6 +2439,7 @@ netdev_linux_change_seq(const struct netdev *netdev)
+ netdev_linux_listen, \
+ netdev_linux_recv, \
+ netdev_linux_recv_wait, \
++ THREADED_NULL \
+ netdev_linux_drain, \
+ \
+ netdev_linux_send, \
+diff --git lib/netdev-provider.h lib/netdev-provider.h
+index 2a91f05..ee4e757 100644
+--- lib/netdev-provider.h
++++ lib/netdev-provider.h
+@@ -24,6 +24,9 @@
+ #include "netdev.h"
+ #include "list.h"
+ #include "shash.h"
++#ifdef THREADED
++#include "dispatch.h"
++#endif
+
+ #ifdef __cplusplus
+ extern "C" {
+@@ -190,6 +193,22 @@ struct netdev_class {
+ * implement packet reception through the 'recv' member function. */
+ void (*recv_wait)(struct netdev *netdev);
+
++#ifdef THREADED
++ /* Attempts to receive 'batch' packets from 'netdev' and process them
++ * through the 'handler' callback. This function is used in the 'THREADED'
++ * version in order to optimize the forwarding process, since it permits to
++ * process packets directly in the netdev memory.
++ *
++ * Returns the number of packets processed on success; this can be 0 if no
++ * packets are available to be read. Returns -1 if an error occurred.
++ */
++ int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler,
++ u_char *user);
++
++ /* Return the file descriptor of the device */
++ int (*get_fd)(struct netdev *netdev);
++#endif
++
+ /* Discards all packets waiting to be received from 'netdev'.
+ *
+ * May be null if not needed, such as for a network device that does not
+diff --git lib/netdev-vport.c lib/netdev-vport.c
+index 1721f6b..39b26de 100644
+--- lib/netdev-vport.c
++++ lib/netdev-vport.c
+@@ -889,6 +889,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
+ return 0;
+ }
+
++
++#ifdef THREADED
++# define THREADED_NULL NULL, NULL,
++#else
++# define THREADED_NULL
++#endif
++
+ #define VPORT_FUNCTIONS(GET_STATUS) \
+ NULL, \
+ netdev_vport_run, \
+@@ -905,6 +912,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
+ NULL, /* listen */ \
+ NULL, /* recv */ \
+ NULL, /* recv_wait */ \
++ THREADED_NULL \
+ NULL, /* drain */ \
+ \
+ netdev_vport_send, /* send */ \
+diff --git lib/netdev.c lib/netdev.c
+index be7cdd2..0c54e1e 100644
+--- lib/netdev.c
++++ lib/netdev.c
+@@ -423,6 +423,28 @@ netdev_recv_wait(struct netdev *netdev)
+ }
+ }
+
++#ifdef THREADED
++/* Attempts to receive and process 'batch' packets from 'netdev'. */
++int
++netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user)
++{
++ int (*dispatch)(struct netdev*, int, pkt_handler, u_char *);
++
++ dispatch = netdev_get_dev(netdev)->netdev_class->dispatch;
++ return dispatch ? dispatch(netdev, batch, h, user) : 0;
++}
++
++/* Returns the file descriptor */
++int
++netdev_get_fd(struct netdev *netdev)
++{
++ int (*get_fd)(struct netdev *);
++
++ get_fd = netdev_get_dev(netdev)->netdev_class->get_fd;
++ return get_fd ? get_fd(netdev) : 0;
++}
++#endif
++
+ /* Discards all packets waiting to be received from 'netdev'. */
+ int
+ netdev_drain(struct netdev *netdev)
+diff --git lib/netdev.h lib/netdev.h
+index 4b86c21..4fad796 100644
+--- lib/netdev.h
++++ lib/netdev.h
+@@ -21,6 +21,9 @@
+ #include <stddef.h>
+ #include <stdint.h>
+ #include "openvswitch/types.h"
++#ifdef THREADED
++#include "dispatch.h"
++#endif
+
+ #ifdef __cplusplus
+ extern "C" {
+@@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *);
+ int netdev_listen(struct netdev *);
+ int netdev_recv(struct netdev *, struct ofpbuf *);
+ void netdev_recv_wait(struct netdev *);
++#ifdef THREADED
++int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *);
++int netdev_get_fd(struct netdev *);
++#endif
+ int netdev_drain(struct netdev *);
+
+ int netdev_send(struct netdev *, const struct ofpbuf *);
+diff --git lib/route-table-bsd.c lib/route-table-bsd.c
+index c145091..1c29071 100644
+--- lib/route-table-bsd.c
++++ lib/route-table-bsd.c
+@@ -29,6 +29,8 @@
+ #include <string.h>
+ #include <unistd.h>
+
++#include "vlog.h"
++
+ VLOG_DEFINE_THIS_MODULE(route_table);
+
+ static int pid;
+diff --git lib/vlog.c lib/vlog.c
+index 899072e..b6bd4ef 100644
+--- lib/vlog.c
++++ lib/vlog.c
+@@ -34,6 +34,9 @@
+ #include "timeval.h"
+ #include "unixctl.h"
+ #include "util.h"
++#ifdef THREADED
++#include <pthread.h>
++#endif
+
+ VLOG_DEFINE_THIS_MODULE(vlog);
+
+@@ -89,6 +92,10 @@ static FILE *log_file;
+ /* vlog initialized? */
+ static bool vlog_inited;
+
++#ifdef THREADED
++static pthread_mutex_t vlog_mutex;
++#endif
++
+ static void format_log_message(const struct vlog_module *, enum vlog_level,
+ enum vlog_facility, unsigned int msg_num,
+ const char *message, va_list, struct ds *)
+@@ -484,6 +491,9 @@ vlog_init(void)
+ return;
+ }
+ vlog_inited = true;
++#ifdef THREADED
++ pthread_mutex_init(&vlog_mutex, NULL);
++#endif
+
+ /* openlog() is allowed to keep the pointer passed in, without making a
+ * copy. The daemonize code sometimes frees and replaces 'program_name',
+@@ -691,6 +701,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
+
+ ds_init(&s);
+ ds_reserve(&s, 1024);
++#ifdef THREADED
++ pthread_mutex_lock(&vlog_mutex);
++#endif
+ msg_num++;
+
+ if (log_to_console) {
+@@ -721,6 +734,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
+ fflush(log_file);
+ }
+
++#ifdef THREADED
++ pthread_mutex_unlock(&vlog_mutex);
++#endif
+ ds_destroy(&s);
+ errno = save_errno;
+ }
+diff --git m4/openvswitch.m4 m4/openvswitch.m4
+index dca9f5f..084ceff 100644
+--- m4/openvswitch.m4
++++ m4/openvswitch.m4
+@@ -14,6 +14,25 @@
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
++dnl Check for --enable-threaded and updates CFLAGS.
++AC_DEFUN([OVS_CHECK_THREADED],
++ [AC_REQUIRE([AC_PROG_CC])
++ AC_ARG_ENABLE(
++ [threaded],
++ [AC_HELP_STRING([--enable-threaded],
++ [Enable threaded version of userspace implementation])],
++ [case "${enableval}" in
++ (yes) threaded=true ;;
++ (no) threaded=false ;;
++ (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;;
++ esac],
++ [threaded=false])
++ if $threaded; then
++ AC_DEFINE([THREADED], [1],
++ [Define to 1 if the threaded version of userspace
++ implementation is enabled.])
++ fi])
++
+ dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately.
+ AC_DEFUN([OVS_CHECK_COVERAGE],
+ [AC_REQUIRE([AC_PROG_CC])