diff options
author | trasz <trasz@FreeBSD.org> | 2012-08-10 16:19:22 +0800 |
---|---|---|
committer | trasz <trasz@FreeBSD.org> | 2012-08-10 16:19:22 +0800 |
commit | 70e8f4d66f87a69be3f3756725eb9f4762fc47ce (patch) | |
tree | 114e0ba1a99e0adf624da8863f6f483cd267ff13 /net | |
parent | de2e677bf2ceaede579bd0a541b5f1ac7dd6fc5f (diff) | |
download | freebsd-ports-gnome-70e8f4d66f87a69be3f3756725eb9f4762fc47ce.tar.gz freebsd-ports-gnome-70e8f4d66f87a69be3f3756725eb9f4762fc47ce.tar.zst freebsd-ports-gnome-70e8f4d66f87a69be3f3756725eb9f4762fc47ce.zip |
Add optional threading support; disabled by default.
Diffstat (limited to 'net')
-rw-r--r-- | net/openvswitch/Makefile | 14 | ||||
-rw-r--r-- | net/openvswitch/files/threaded.diff | 1305 |
2 files changed, 1317 insertions, 2 deletions
diff --git a/net/openvswitch/Makefile b/net/openvswitch/Makefile index bff31aa9e1d1..a7b184ddd68c 100644 --- a/net/openvswitch/Makefile +++ b/net/openvswitch/Makefile @@ -30,20 +30,30 @@ MAN8= ovs-appctl.8 ovs-brcompatd.8 ovs-bugtool.8 ovs-controller.8 \ ovs-test.8 ovs-vsctl.8 ovs-vswitchd.8 \ ovs-vlan-bug-workaround.8 ovs-vlan-test.8 +OPTIONS_DEFINE= THREADED +THREADED_DESC= Experimental high-performance threading patch + .include <bsd.port.pre.mk> .if ${OSVERSION} < 800000 BROKEN= does not compile .endif +.if ${PORT_OPTIONS:MTHREADED} +CONFIGURE_ARGS+=--enable-threaded=yes +.endif + AUTOTOOLSFILES= aclocal.m4 post-patch: @${REINPLACE_CMD} -e 's|1.11.1|%%AUTOMAKE_APIVER%%|g' \ -e 's|2.65|%%AUTOCONF_VERSION%%|g' \ ${WRKSRC}/aclocal.m4 - # Workaround for a makefile bug; if it builds without this line, remove it. - #${TOUCH} ${WRKSRC}/INSTALL +.if ${PORT_OPTIONS:MTHREADED} + @# We can't use EXTRA_PATCHES, since we need to apply this one + @# after files/patch-bsd-netdef.diff, not before. + ${PATCH} ${PATCH_ARGS} < ${FILESDIR}/threaded.diff +.endif post-install: ${INSTALL_DATA} ${WRKSRC}/vswitchd/vswitch.ovsschema ${PREFIX}/share/openvswitch/ diff --git a/net/openvswitch/files/threaded.diff b/net/openvswitch/files/threaded.diff new file mode 100644 index 000000000000..858f39494157 --- /dev/null +++ b/net/openvswitch/files/threaded.diff @@ -0,0 +1,1305 @@ +diff --git configure.ac configure.ac +index 5692b86..ff62627 100644 +--- configure.ac ++++ configure.ac +@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt]) + AC_SEARCH_LIBS([timer_create], [rt]) + AC_SEARCH_LIBS([pcap_open_live], [pcap]) + ++OVS_CHECK_THREADED + OVS_CHECK_COVERAGE + OVS_CHECK_NDEBUG + OVS_CHECK_NETLINK +diff --git lib/automake.mk lib/automake.mk +index 13622b3..87bdd8d 100644 +--- lib/automake.mk ++++ lib/automake.mk +@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \ + lib/daemon.c \ + lib/daemon.h \ + lib/dhcp.h \ ++ lib/dispatch.h \ + lib/dummy.c \ + lib/dummy.h \ + lib/dhparams.h \ +diff --git lib/dispatch.h lib/dispatch.h +new file mode 100644 +index 0000000..80ac9c7 +--- /dev/null ++++ lib/dispatch.h +@@ -0,0 +1,9 @@ ++#include <sys/types.h> ++#include "ofpbuf.h" ++ ++#ifndef DISPATCH_H ++#define DISPATCH_H 1 ++ ++typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf); ++ ++#endif /* DISPATCH_H */ +diff --git lib/dpif-netdev.c lib/dpif-netdev.c +index cade79e..509e2ef 100644 +--- lib/dpif-netdev.c ++++ lib/dpif-netdev.c +@@ -32,6 +32,15 @@ + #include <sys/stat.h> + #include <unistd.h> + ++#ifdef THREADED ++#include <signal.h> ++#include <pthread.h> ++ ++#include "socket-util.h" ++#include "fatal-signal.h" ++#include "dispatch.h" ++#endif ++ + #include "csum.h" + #include "dpif.h" + #include "dpif-provider.h" +@@ -55,6 +64,16 @@ + #include "vlog.h" + + VLOG_DEFINE_THIS_MODULE(dpif_netdev); ++/* We could use these macros instead of using #ifdef and #endif every time we ++ * need to call the pthread_mutex_lock/unlock. ++#ifdef THREADED ++#define LOCK(mutex) pthread_mutex_lock(mutex) ++#define UNLOCK(mutex) pthread_mutex_unlock(mutex) ++#else ++#define LOCK(mutex) ++#define UNLOCK(mutex) ++#endif ++*/ + + /* Configuration parameters. */ + enum { MAX_PORTS = 256 }; /* Maximum number of ports. */ +@@ -82,6 +101,21 @@ struct dp_netdev { + int open_cnt; + bool destroyed; + ++#ifdef THREADED ++ /* The pipe is used to signal the presence of a packet on the queue. ++ * - dpif_netdev_recv_wait() waits on p[0] ++ * - dpif_netdev_recv() extract from queue and read p[0] ++ * - dp_netdev_output_control() send to queue and write p[1] ++ */ ++ ++ int pipe[2]; /* signal a packet on the queue */ ++ struct pollfd *pipe_fd; ++ ++ pthread_mutex_t table_mutex; /* mutex for the flow table */ ++ pthread_mutex_t port_list_mutex; /* port list mutex */ ++ ++ /* The access to this queue is protected by the table_mutex mutex */ ++#endif + struct dp_netdev_queue queues[N_QUEUES]; + struct hmap flow_table; /* Flow table. */ + +@@ -102,6 +136,9 @@ struct dp_netdev_port { + struct list node; /* Element in dp_netdev's 'port_list'. */ + struct netdev *netdev; + char *type; /* Port type as requested by user. */ ++#ifdef THREADED ++ struct pollfd *poll_fd; /* To manage the poll loop in the thread. */ ++#endif + }; + + /* A flow in dp_netdev's 'flow_table'. */ +@@ -127,6 +164,11 @@ struct dpif_netdev { + unsigned int dp_serial; + }; + ++#ifdef THREADED ++/* XXX global Descriptor of the thread that manages the datapaths. */ ++pthread_t thread_p; ++#endif ++ + /* All netdev-based datapaths. */ + static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); + +@@ -204,6 +246,23 @@ create_dp_netdev(const char *name, const struct dpif_class *class, + dp->class = class; + dp->name = xstrdup(name); + dp->open_cnt = 0; ++#ifdef THREADED ++ error = pipe(dp->pipe); ++ if (error) { ++ VLOG_ERR("Unable to create datapath thread pipe: %s", strerror(errno)); ++ return errno; ++ } ++ if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) { ++ VLOG_ERR("Unable to set nonblocking on datapath thread pipe: %s", ++ strerror(errno)); ++ return errno; ++ } ++ dp->pipe_fd = NULL; ++ VLOG_DBG("Datapath thread pipe created (%d, %d)", dp->pipe[0], dp->pipe[1]); ++ ++ pthread_mutex_init(&dp->table_mutex, NULL); ++ pthread_mutex_init(&dp->port_list_mutex, NULL); ++#endif + for (i = 0; i < N_QUEUES; i++) { + dp->queues[i].head = dp->queues[i].tail = 0; + } +@@ -221,6 +280,38 @@ create_dp_netdev(const char *name, const struct dpif_class *class, + return 0; + } + ++#ifdef THREADED ++static void * dp_thread_body(void *args OVS_UNUSED); ++ ++/* This is the function that is called in response of a fatal signal (e.g. ++ * SIGTERM) */ ++static void ++dpif_netdev_exit_hook(void *aux OVS_UNUSED) ++{ ++ if (pthread_cancel(thread_p) == 0) { ++ pthread_join(thread_p, NULL); ++ } ++} ++ ++static int ++dpif_netdev_init(void) ++{ ++ static int error = -1; ++ ++ if (error < 0) { ++ fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true); ++ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL); ++ if (error != 0) { ++ VLOG_ERR("Unable to create datapath thread: %s", strerror(errno)); ++ error = errno; ++ } else { ++ VLOG_DBG("Datapath thread started"); ++ } ++ } ++ return error; ++} ++#endif ++ + static int + dpif_netdev_open(const struct dpif_class *class, const char *name, + bool create, struct dpif **dpifp) +@@ -247,9 +338,14 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, + } + + *dpifp = create_dpif_netdev(dp); ++#ifdef THREADED ++ dpif_netdev_init(); ++#endif + return 0; + } + ++/* table_mutex must be locked in THREADED mode. ++ */ + static void + dp_netdev_purge_queues(struct dp_netdev *dp) + { +@@ -273,11 +369,23 @@ dp_netdev_free(struct dp_netdev *dp) + struct dp_netdev_port *port, *next; + + dp_netdev_flow_flush(dp); ++#ifdef THREADED ++ pthread_mutex_lock(&dp->port_list_mutex); ++#endif + LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) { + do_del_port(dp, port->port_no); + } ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + dp_netdev_purge_queues(dp); + hmap_destroy(&dp->flow_table); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++ pthread_mutex_destroy(&dp->table_mutex); ++ pthread_mutex_destroy(&dp->port_list_mutex); ++#endif + free(dp->name); + free(dp); + } +@@ -306,7 +414,13 @@ static int + dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) + { + struct dp_netdev *dp = get_dp_netdev(dpif); ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + stats->n_flows = hmap_count(&dp->flow_table); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + stats->n_hit = dp->n_hit; + stats->n_missed = dp->n_missed; + stats->n_lost = dp->n_lost; +@@ -354,13 +468,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, + port->port_no = port_no; + port->netdev = netdev; + port->type = xstrdup(type); ++#ifdef THREADED ++ port->poll_fd = NULL; ++#endif + + error = netdev_get_mtu(netdev, &mtu); + if (!error) { + max_mtu = mtu; + } + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->port_list_mutex); ++#endif + list_push_back(&dp->port_list, &port->node); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++#endif + dp->ports[port_no] = port; + dp->serial++; + +@@ -448,15 +571,25 @@ get_port_by_name(struct dp_netdev *dp, + { + struct dp_netdev_port *port; + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->port_list_mutex); ++#endif + LIST_FOR_EACH (port, node, &dp->port_list) { + if (!strcmp(netdev_get_name(port->netdev), devname)) { + *portp = port; ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++#endif + return 0; + } + } ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->port_list_mutex); ++#endif + return ENOENT; + } + ++/* In THREADED mode, must be called with port_list_mutex held. */ + static int + do_del_port(struct dp_netdev *dp, uint16_t port_no) + { +@@ -531,7 +664,13 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED) + static void + dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) + { ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + hmap_remove(&dp->flow_table, &flow->node); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + free(flow->actions); + free(flow); + } +@@ -620,7 +759,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) + } + + static struct dp_netdev_flow * +-dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) ++#ifdef THREADED ++dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key) ++#else ++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) ++#endif + { + struct dp_netdev_flow *flow; + +@@ -632,6 +775,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key) + return NULL; + } + ++#ifdef THREADED ++static struct dp_netdev_flow * ++dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key) ++{ ++ struct dp_netdev_flow *flow; ++ ++ pthread_mutex_lock(&dp->table_mutex); ++ flow = dp_netdev_lookup_flow_locked(dp, key); ++ pthread_mutex_unlock(&dp->table_mutex); ++ return flow; ++} ++#endif ++ + static void + get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats) + { +@@ -729,7 +885,13 @@ add_flow(struct dpif *dpif, const struct flow *key, + return error; + } + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0)); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + return 0; + } + +@@ -749,6 +911,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) + struct dp_netdev_flow *flow; + struct flow key; + int error; ++ int n_flows; + + error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key); + if (error) { +@@ -758,7 +921,14 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) + flow = dp_netdev_lookup_flow(dp, &key); + if (!flow) { + if (put->flags & DPIF_FP_CREATE) { +- if (hmap_count(&dp->flow_table) < MAX_FLOWS) { ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif ++ n_flows = hmap_count(&dp->flow_table); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif ++ if (n_flows < MAX_FLOWS) { + if (put->stats) { + memset(put->stats, 0, sizeof *put->stats); + } +@@ -843,7 +1013,13 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_, + struct dp_netdev_flow *flow; + struct hmap_node *node; + ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + if (!node) { + return EOF; + } +@@ -949,7 +1125,13 @@ static int + dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, + struct ofpbuf *buf) + { +- struct dp_netdev_queue *q = find_nonempty_queue(dpif); ++ struct dp_netdev_queue *q; ++#ifdef THREADED ++ struct dp_netdev *dp = get_dp_netdev(dpif); ++ char c; ++ pthread_mutex_lock(&dp->table_mutex); ++#endif ++ q = find_nonempty_queue(dpif); + if (q) { + struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK]; + *upcall = *u; +@@ -958,8 +1140,19 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, + ofpbuf_uninit(buf); + *buf = *upcall->packet; + ++#ifdef THREADED ++ /* Read a byte from the pipe to signal that a packet has been ++ * received. */ ++ if (read(dp->pipe[0], &c, 1) < 0) { ++ VLOG_ERR("Pipe read error (from datapath): %s", strerror(errno)); ++ } ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + return 0; + } else { ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + return EAGAIN; + } + } +@@ -967,19 +1160,32 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, + static void + dpif_netdev_recv_wait(struct dpif *dpif) + { ++#ifdef THREADED ++ struct dp_netdev *dp = get_dp_netdev(dpif); ++ ++ poll_fd_wait(dp->pipe[0], POLLIN); ++#else + if (find_nonempty_queue(dpif)) { + poll_immediate_wake(); + } else { + /* No messages ready to be received, and dp_wait() will ensure that we + * wake up to queue new messages, so there is nothing to do. */ + } ++#endif + } + + static void + dpif_netdev_recv_purge(struct dpif *dpif) + { + struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); ++#ifdef THREADED ++ struct dp_netdev *dp = get_dp_netdev(dpif); ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + dp_netdev_purge_queues(dpif_netdev->dp); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + } + + static void +@@ -1003,7 +1209,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, + return; + } + flow_extract(packet, 0, 0, port->port_no, &key); ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++ flow = dp_netdev_lookup_flow_locked(dp, &key); ++#else + flow = dp_netdev_lookup_flow(dp, &key); ++#endif + if (flow) { + dp_netdev_flow_used(flow, &key, packet); + dp_netdev_execute_actions(dp, packet, &key, +@@ -1013,8 +1224,22 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port, + dp->n_missed++; + dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0); + } ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + } + ++#ifdef THREADED ++static void ++dpif_netdev_run(struct dpif *dpif OVS_UNUSED) ++{ ++} ++ ++static void ++dpif_netdev_wait(struct dpif *dpif OVS_UNUSED) ++{ ++} ++#else + static void + dpif_netdev_run(struct dpif *dpif) + { +@@ -1053,6 +1278,144 @@ dpif_netdev_wait(struct dpif *dpif) + netdev_recv_wait(port->netdev); + } + } ++#endif ++ ++#ifdef THREADED ++/* ++ * pcap callback argument ++ */ ++struct dispatch_arg { ++ struct dp_netdev *dp; /* update statistics */ ++ struct dp_netdev_port *port; /* argument to flow identifier function */ ++}; ++ ++/* Process a packet. ++ * ++ * The port_input function will send immediately if it finds a flow match and ++ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP. ++ * If a flow is not found or for the other actions, the packet is copied. ++ */ ++static void ++process_pkt(u_char *user, struct ofpbuf *buf) ++{ ++ struct dispatch_arg *arg = (struct dispatch_arg *)user; ++ ++ ofpbuf_padto(buf, ETH_TOTAL_MIN); ++ dp_netdev_port_input(arg->dp, arg->port, buf); ++} ++ ++/* Body of the thread that manages the datapaths */ ++static void* ++dp_thread_body(void *args OVS_UNUSED) ++{ ++ struct dp_netdev *dp; ++ struct dp_netdev_port *port; ++ struct dispatch_arg arg; ++ int error; ++ int n_fds; ++ uint32_t batch = 50; /* max number of pkts processed by the dispatch */ ++ int processed; /* actual number of pkts processed by the dispatch */ ++ char readbuf[1024]; ++ ++ sigset_t sigmask; ++ ++ /*XXX Since the poll involves all ports of all datapaths, the right fds ++ * size should be MAX_PORTS * max_number_of_datapaths */ ++ struct pollfd fds[MAX_PORTS + 1]; ++ ++ /* mask the fatal signals. In this way the main thread is delegate to ++ * manage this them. */ ++ sigemptyset(&sigmask); ++ sigaddset(&sigmask, SIGTERM); ++ sigaddset(&sigmask, SIGALRM); ++ sigaddset(&sigmask, SIGINT); ++ sigaddset(&sigmask, SIGHUP); ++ ++ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) { ++ VLOG_ERR("Error setting thread sigmask: %s", strerror(errno)); ++ } ++ ++ for(;;) { ++ struct shash_node *node; ++ n_fds = 0; ++ /* build the structure for poll */ ++ SHASH_FOR_EACH(node, &dp_netdevs) { ++ dp = (struct dp_netdev *)node->data; ++ fds[n_fds].fd = dp->pipe[1]; ++ fds[n_fds].events = POLLIN; ++ dp->pipe_fd = &fds[n_fds]; ++ n_fds++; ++ if (n_fds >= sizeof(fds) / sizeof(fds[0])) { ++ VLOG_ERR("Too many fds for poll adding pipe_fd"); ++ break; ++ } ++ pthread_mutex_lock(&dp->port_list_mutex); ++ LIST_FOR_EACH (port, node, &dp->port_list) { ++ /* insert an element in the fds structure */ ++ fds[n_fds].fd = netdev_get_fd(port->netdev); ++ fds[n_fds].events = POLLIN; ++ port->poll_fd = &fds[n_fds]; ++ n_fds++; ++ if (n_fds >= sizeof(fds) / sizeof(fds[0])) { ++ VLOG_ERR("Too many fds for poll adding port fd"); ++ break; ++ } ++ } ++ pthread_mutex_unlock(&dp->port_list_mutex); ++ } ++ ++ error = poll(fds, n_fds, 2000); ++ VLOG_DBG("dp_thread_body poll wakeup with cnt=%d", error); ++ ++ if (error < 0) { ++ if (errno == EINTR) { ++ /* XXX get this case in detach mode */ ++ continue; ++ } ++ VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno)); ++ /* XXX terminating the thread is probably not right */ ++ break; ++ } ++ pthread_testcancel(); ++ ++ SHASH_FOR_EACH (node, &dp_netdevs) { ++ dp = (struct dp_netdev *)node->data; ++ if (dp->pipe_fd && (dp->pipe_fd->revents & POLLIN)) { ++ VLOG_DBG("Signalled from main thread"); ++ while ((error = read(dp->pipe[1], readbuf, sizeof(readbuf))) > 0) ++ ; ++ if (error < 0 && errno != EAGAIN) { ++ VLOG_ERR("Pipe read error (to datapath): %s", strerror(errno)); ++ } ++ } ++ arg.dp = dp; ++ pthread_mutex_lock(&dp->port_list_mutex); ++ LIST_FOR_EACH (port, node, &dp->port_list) { ++ arg.port = port; ++ if (port->poll_fd) { ++ VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents); ++ } ++ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) { ++ /* call the dispatch and process the packet into ++ * its callback. We process 'batch' packets at time */ ++ processed = netdev_dispatch(port->netdev, batch, ++ process_pkt, (u_char *)&arg); ++ if (processed < 0) { /* pcap returns error */ ++ static struct vlog_rate_limit rl = ++ VLOG_RATE_LIMIT_INIT(1, 5); ++ VLOG_ERR_RL(&rl, ++ "error receiving data from XXX \n"); ++ } ++ } /* end of if poll */ ++ } /* end of port loop */ ++ pthread_mutex_unlock(&dp->port_list_mutex); ++ } /* end of dp loop */ ++ } /* for ;; */ ++ ++ return NULL; ++} ++ ++#endif /* THREADED */ + + static void + dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key) +@@ -1068,11 +1431,19 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet, + uint16_t out_port) + { + struct dp_netdev_port *p = dp->ports[out_port]; ++ char c = 0; ++ + if (p) { + netdev_send(p->netdev, packet); ++#ifdef THREADED ++ if (write(dp->pipe[0], &c, 1) < 0) { ++ VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno)); ++ } ++#endif + } + } + ++/* In THREADED mode, must be called with table_lock_mutex held. */ + static int + dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, + int queue_no, const struct flow *flow, uint64_t arg) +@@ -1081,6 +1452,9 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, + struct dpif_upcall *upcall; + struct ofpbuf *buf; + size_t key_len; ++#ifdef THREADED ++ char c = 0; ++#endif + + if (q->head - q->tail >= MAX_QUEUE_LEN) { + dp->n_lost++; +@@ -1102,6 +1476,12 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet, + upcall->userdata = arg; + + q->upcalls[q->head++ & QUEUE_MASK] = upcall; ++#ifdef THREADED ++ /* Write a byte on the pipe to advertise that a packet is ready. */ ++ if (write(dp->pipe[1], &c, 1) < 0) { ++ VLOG_ERR("Pipe write error (from datapath): %s", strerror(errno)); ++ } ++#endif + + return 0; + } +@@ -1150,7 +1530,13 @@ dp_netdev_action_userspace(struct dp_netdev *dp, + + userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); + userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0; ++#ifdef THREADED ++ pthread_mutex_lock(&dp->table_mutex); ++#endif + dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata); ++#ifdef THREADED ++ pthread_mutex_unlock(&dp->table_mutex); ++#endif + } + + static void +diff --git lib/netdev-bsd.c lib/netdev-bsd.c +index 0b1a37c..ff79367 100644 +--- lib/netdev-bsd.c ++++ lib/netdev-bsd.c +@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_) + } + } + ++#ifdef THREADED ++ ++struct dispatch_arg { ++ pkt_handler h; ++ u_char *user; ++}; ++ ++static void ++dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata) ++{ ++ struct ofpbuf buf; ++ struct dispatch_arg *parg = (struct dispatch_arg*)user; ++ ++ ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen); ++ buf.size = phdr->caplen; ++ (*parg->h)(parg->user, &buf); ++ ofpbuf_uninit(&buf); ++} ++ ++static int ++netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h, ++ u_char *user) ++{ ++ int ret; ++ struct dispatch_arg arg; ++ ++ arg.h = h; ++ arg.user = user; ++ ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg); ++ return ret; ++} ++ ++static int ++netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h, ++ u_char *user) ++{ ++ int ret; ++ int i; ++ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; ++ OFPBUF_STACK_BUFFER(buf_, size); ++ ++ struct ofpbuf buf; ++ ofpbuf_use_stub(&buf, buf_, size); ++ for (i = 0; i < batch; i++) { ++ ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf)); ++ if (ret >= 0) { ++ buf.size += ret; ++ h(user, &buf); ++ } else if (ret != -EAGAIN) { ++ return -1; ++ } else { /* ret = EAGAIN */ ++ break; ++ } ++ ofpbuf_clear(&buf); ++ } ++ ofpbuf_uninit(&buf); ++ return i; ++} ++ ++static int ++netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h, ++ u_char *user) ++{ ++ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); ++ struct netdev_dev_bsd * netdev_dev = ++ netdev_dev_bsd_cast(netdev_get_dev(netdev_)); ++ ++ if (!strcmp(netdev_get_type(netdev_), "tap") && ++ netdev->netdev_fd == netdev_dev->tap_fd) { ++ return netdev_bsd_dispatch_tap(netdev, batch, h, user); ++ } else { ++ return netdev_bsd_dispatch_system(netdev, batch, h, user); ++ } ++} ++ ++static int ++netdev_bsd_get_fd(struct netdev *netdev_) ++{ ++ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_); ++ return netdev->netdev_fd; ++} ++#endif ++ + /* Discards all packets waiting to be received from 'netdev'. */ + static int + netdev_bsd_drain(struct netdev *netdev_) +@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = { + + netdev_bsd_recv, + netdev_bsd_recv_wait, ++#ifdef THREADED ++ netdev_bsd_dispatch, ++ netdev_bsd_get_fd, ++#endif + netdev_bsd_drain, + + netdev_bsd_send, +@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = { + + netdev_bsd_recv, + netdev_bsd_recv_wait, ++#ifdef THREADED ++ netdev_bsd_dispatch, ++ netdev_bsd_get_fd, ++#endif + netdev_bsd_drain, + + netdev_bsd_send, +diff --git lib/netdev-dummy.c lib/netdev-dummy.c +index b8c23c5..4e4801c 100644 +--- lib/netdev-dummy.c ++++ lib/netdev-dummy.c +@@ -20,6 +20,12 @@ + + #include <errno.h> + ++#ifdef THREADED ++#include <pthread.h> ++#include <unistd.h> ++#include "socket-util.h" ++#endif ++ + #include "flow.h" + #include "list.h" + #include "netdev-provider.h" +@@ -51,6 +57,10 @@ struct netdev_dummy { + struct list node; /* In netdev_dev_dummy's "devs" list. */ + struct list recv_queue; + bool listening; ++#ifdef THREADED ++ pthread_mutex_t queue_mutex; ++ int s_pipe[2]; /* used to signal packet arrivals */ ++#endif + }; + + static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs); +@@ -124,11 +134,30 @@ netdev_dummy_open(struct netdev_dev *netdev_dev_, struct netdev **netdevp) + { + struct netdev_dev_dummy *netdev_dev = netdev_dev_dummy_cast(netdev_dev_); + struct netdev_dummy *netdev; ++#ifdef THREADED ++ int error; ++#endif + + netdev = xmalloc(sizeof *netdev); + netdev_init(&netdev->netdev, netdev_dev_); + list_init(&netdev->recv_queue); + netdev->listening = false; ++#ifdef THREADED ++ error = pipe(netdev->s_pipe); ++ if (error) { ++ VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno)); ++ free(netdev); ++ return errno; ++ } ++ if (set_nonblocking(netdev->s_pipe[0]) || ++ set_nonblocking(netdev->s_pipe[1])) { ++ VLOG_ERR("Unable to set nonblocking on dummy pipe: %s", ++ strerror(errno)); ++ free(netdev); ++ return errno; ++ } ++ pthread_mutex_init(&netdev->queue_mutex, NULL); ++#endif + + *netdevp = &netdev->netdev; + list_push_back(&netdev_dev->devs, &netdev->node); +@@ -141,6 +170,13 @@ netdev_dummy_close(struct netdev *netdev_) + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + list_remove(&netdev->node); + ofpbuf_list_delete(&netdev->recv_queue); ++#ifdef THREADED ++ if (netdev->listening) { ++ close(netdev->s_pipe[0]); ++ close(netdev->s_pipe[1]); ++ } ++ pthread_mutex_destroy(&netdev->queue_mutex); ++#endif + free(netdev); + } + +@@ -158,12 +194,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size) + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); + struct ofpbuf *packet; + size_t packet_size; ++#ifdef THREADED ++ char c; ++#endif + ++#ifdef THREADED ++ pthread_mutex_lock(&netdev->queue_mutex); ++#endif + if (list_is_empty(&netdev->recv_queue)) { ++#ifdef THREADED ++ pthread_mutex_unlock(&netdev->queue_mutex); ++#endif + return -EAGAIN; + } ++#ifdef THREADED ++ if (read(netdev->s_pipe[0], &c, 1) < 0) { ++ VLOG_ERR("Error reading dummy pipe: %s", strerror(errno)); ++ } ++#endif + + packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); ++#ifdef THREADED ++ pthread_mutex_unlock(&netdev->queue_mutex); ++#endif + if (packet->size > size) { + return -EMSGSIZE; + } +@@ -179,11 +232,60 @@ static void + netdev_dummy_recv_wait(struct netdev *netdev_) + { + struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); +- if (!list_is_empty(&netdev->recv_queue)) { ++ int empty; ++ ++#ifdef THREADED ++ pthread_mutex_lock(&netdev->queue_mutex); ++#endif ++ empty = list_is_empty(&netdev->recv_queue); ++#ifdef THREADED ++ pthread_mutex_unlock(&netdev->queue_mutex); ++#endif ++ if (!empty) { + poll_immediate_wake(); + } + } + ++#ifdef THREADED ++static int ++netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h, ++ u_char *user) ++{ ++ int i; ++ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); ++ struct ofpbuf *packet; ++ VLOG_DBG("dispatch %d", batch); ++ ++ for (i = 0; i < batch; i++) { ++ char c; ++ if (read(netdev->s_pipe[0], &c, 1) < 0) { ++ if (errno == EAGAIN) ++ break; ++ VLOG_ERR("%s: error reading from the pipe: %s", ++ netdev_get_name(netdev_), strerror(errno)); ++ return -1; ++ } ++ pthread_mutex_lock(&netdev->queue_mutex); ++ if (list_is_empty(&netdev->recv_queue)) { ++ pthread_mutex_unlock(&netdev->queue_mutex); ++ return -EAGAIN; ++ } ++ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue)); ++ pthread_mutex_unlock(&netdev->queue_mutex); ++ h(user, packet); ++ ofpbuf_delete(packet); ++ } ++ return i; ++} ++ ++static int ++netdev_dummy_get_fd(struct netdev *netdev_) ++{ ++ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); ++ return netdev->s_pipe[0]; ++} ++#endif ++ + static int + netdev_dummy_drain(struct netdev *netdev_) + { +@@ -316,6 +418,10 @@ static const struct netdev_class dummy_class = { + netdev_dummy_listen, + netdev_dummy_recv, + netdev_dummy_recv_wait, ++#ifdef THREADED ++ netdev_dummy_dispatch, /* dispatch */ ++ netdev_dummy_get_fd, /* get_fd */ ++#endif + netdev_dummy_drain, + + NULL, /* send */ +@@ -407,6 +513,9 @@ netdev_dummy_receive(struct unixctl_conn *conn, + struct netdev_dev_dummy *dummy_dev; + int n_listeners; + int i; ++#ifdef THREADED ++ char c = 0; ++#endif + + dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]); + if (!dummy_dev) { +@@ -414,6 +523,7 @@ netdev_dummy_receive(struct unixctl_conn *conn, + return; + } + ++ + n_listeners = 0; + for (i = 2; i < argc; i++) { + struct netdev_dummy *dev; +@@ -429,7 +539,16 @@ netdev_dummy_receive(struct unixctl_conn *conn, + LIST_FOR_EACH (dev, node, &dummy_dev->devs) { + if (dev->listening) { + struct ofpbuf *copy = ofpbuf_clone(packet); ++#ifdef THREADED ++ pthread_mutex_lock(&dev->queue_mutex); ++#endif + list_push_back(&dev->recv_queue, ©->list_node); ++#ifdef THREADED ++ pthread_mutex_unlock(&dev->queue_mutex); ++ if (write(dev->s_pipe[1], &c, 1) < 0) { ++ VLOG_ERR("Error writing dummy pipe: %s", strerror(errno)); ++ } ++#endif + n_listeners++; + } + } +diff --git lib/netdev-linux.c lib/netdev-linux.c +index 4d2f3ac..c33a801 100644 +--- lib/netdev-linux.c ++++ lib/netdev-linux.c +@@ -891,6 +891,43 @@ netdev_linux_recv_wait(struct netdev *netdev_) + } + } + ++#ifdef THREADED ++static int ++netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h, ++ u_char *user) ++{ ++ int ret; ++ int i; ++ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX; ++ OFPBUF_STACK_BUFFER(buf_, size); ++ struct ofpbuf buf; ++ VLOG_DBG("dispatch %d", batch); ++ ++ ofpbuf_use_stub(&buf, buf_, size); ++ for (i = 0; i < batch; i++) { ++ ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf)); ++ if (ret >= 0) { ++ buf.size += ret; ++ h(user, &buf); ++ } else if (ret != -EAGAIN) { ++ return -1; ++ } else { ++ break; ++ } ++ ofpbuf_clear(&buf); ++ } ++ ofpbuf_uninit(&buf); ++ return i; ++} ++ ++static int ++netdev_linux_get_fd(struct netdev *netdev_) ++{ ++ struct netdev_linux *netdev = netdev_linux_cast(netdev_); ++ return netdev->fd; ++} ++#endif ++ + /* Discards all packets waiting to be received from 'netdev'. */ + static int + netdev_linux_drain(struct netdev *netdev_) +@@ -2376,6 +2413,12 @@ netdev_linux_change_seq(const struct netdev *netdev) + return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq; + } + ++#ifdef THREADED ++#define THREADED_NULL NULL, NULL, ++#else ++#define THREADED_NULL ++#endif ++ + #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \ + GET_FEATURES, GET_STATUS) \ + { \ +@@ -2396,6 +2439,7 @@ netdev_linux_change_seq(const struct netdev *netdev) + netdev_linux_listen, \ + netdev_linux_recv, \ + netdev_linux_recv_wait, \ ++ THREADED_NULL \ + netdev_linux_drain, \ + \ + netdev_linux_send, \ +diff --git lib/netdev-provider.h lib/netdev-provider.h +index 2a91f05..ee4e757 100644 +--- lib/netdev-provider.h ++++ lib/netdev-provider.h +@@ -24,6 +24,9 @@ + #include "netdev.h" + #include "list.h" + #include "shash.h" ++#ifdef THREADED ++#include "dispatch.h" ++#endif + + #ifdef __cplusplus + extern "C" { +@@ -190,6 +193,22 @@ struct netdev_class { + * implement packet reception through the 'recv' member function. */ + void (*recv_wait)(struct netdev *netdev); + ++#ifdef THREADED ++ /* Attempts to receive 'batch' packets from 'netdev' and process them ++ * through the 'handler' callback. This function is used in the 'THREADED' ++ * version in order to optimize the forwarding process, since it permits to ++ * process packets directly in the netdev memory. ++ * ++ * Returns the number of packets processed on success; this can be 0 if no ++ * packets are available to be read. Returns -1 if an error occurred. ++ */ ++ int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler, ++ u_char *user); ++ ++ /* Return the file descriptor of the device */ ++ int (*get_fd)(struct netdev *netdev); ++#endif ++ + /* Discards all packets waiting to be received from 'netdev'. + * + * May be null if not needed, such as for a network device that does not +diff --git lib/netdev-vport.c lib/netdev-vport.c +index 1721f6b..39b26de 100644 +--- lib/netdev-vport.c ++++ lib/netdev-vport.c +@@ -889,6 +889,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, + return 0; + } + ++ ++#ifdef THREADED ++# define THREADED_NULL NULL, NULL, ++#else ++# define THREADED_NULL ++#endif ++ + #define VPORT_FUNCTIONS(GET_STATUS) \ + NULL, \ + netdev_vport_run, \ +@@ -905,6 +912,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED, + NULL, /* listen */ \ + NULL, /* recv */ \ + NULL, /* recv_wait */ \ ++ THREADED_NULL \ + NULL, /* drain */ \ + \ + netdev_vport_send, /* send */ \ +diff --git lib/netdev.c lib/netdev.c +index be7cdd2..0c54e1e 100644 +--- lib/netdev.c ++++ lib/netdev.c +@@ -423,6 +423,28 @@ netdev_recv_wait(struct netdev *netdev) + } + } + ++#ifdef THREADED ++/* Attempts to receive and process 'batch' packets from 'netdev'. */ ++int ++netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user) ++{ ++ int (*dispatch)(struct netdev*, int, pkt_handler, u_char *); ++ ++ dispatch = netdev_get_dev(netdev)->netdev_class->dispatch; ++ return dispatch ? dispatch(netdev, batch, h, user) : 0; ++} ++ ++/* Returns the file descriptor */ ++int ++netdev_get_fd(struct netdev *netdev) ++{ ++ int (*get_fd)(struct netdev *); ++ ++ get_fd = netdev_get_dev(netdev)->netdev_class->get_fd; ++ return get_fd ? get_fd(netdev) : 0; ++} ++#endif ++ + /* Discards all packets waiting to be received from 'netdev'. */ + int + netdev_drain(struct netdev *netdev) +diff --git lib/netdev.h lib/netdev.h +index 4b86c21..4fad796 100644 +--- lib/netdev.h ++++ lib/netdev.h +@@ -21,6 +21,9 @@ + #include <stddef.h> + #include <stdint.h> + #include "openvswitch/types.h" ++#ifdef THREADED ++#include "dispatch.h" ++#endif + + #ifdef __cplusplus + extern "C" { +@@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *); + int netdev_listen(struct netdev *); + int netdev_recv(struct netdev *, struct ofpbuf *); + void netdev_recv_wait(struct netdev *); ++#ifdef THREADED ++int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *); ++int netdev_get_fd(struct netdev *); ++#endif + int netdev_drain(struct netdev *); + + int netdev_send(struct netdev *, const struct ofpbuf *); +diff --git lib/route-table-bsd.c lib/route-table-bsd.c +index c145091..1c29071 100644 +--- lib/route-table-bsd.c ++++ lib/route-table-bsd.c +@@ -29,6 +29,8 @@ + #include <string.h> + #include <unistd.h> + ++#include "vlog.h" ++ + VLOG_DEFINE_THIS_MODULE(route_table); + + static int pid; +diff --git lib/vlog.c lib/vlog.c +index 899072e..b6bd4ef 100644 +--- lib/vlog.c ++++ lib/vlog.c +@@ -34,6 +34,9 @@ + #include "timeval.h" + #include "unixctl.h" + #include "util.h" ++#ifdef THREADED ++#include <pthread.h> ++#endif + + VLOG_DEFINE_THIS_MODULE(vlog); + +@@ -89,6 +92,10 @@ static FILE *log_file; + /* vlog initialized? */ + static bool vlog_inited; + ++#ifdef THREADED ++static pthread_mutex_t vlog_mutex; ++#endif ++ + static void format_log_message(const struct vlog_module *, enum vlog_level, + enum vlog_facility, unsigned int msg_num, + const char *message, va_list, struct ds *) +@@ -484,6 +491,9 @@ vlog_init(void) + return; + } + vlog_inited = true; ++#ifdef THREADED ++ pthread_mutex_init(&vlog_mutex, NULL); ++#endif + + /* openlog() is allowed to keep the pointer passed in, without making a + * copy. The daemonize code sometimes frees and replaces 'program_name', +@@ -691,6 +701,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, + + ds_init(&s); + ds_reserve(&s, 1024); ++#ifdef THREADED ++ pthread_mutex_lock(&vlog_mutex); ++#endif + msg_num++; + + if (log_to_console) { +@@ -721,6 +734,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level, + fflush(log_file); + } + ++#ifdef THREADED ++ pthread_mutex_unlock(&vlog_mutex); ++#endif + ds_destroy(&s); + errno = save_errno; + } +diff --git m4/openvswitch.m4 m4/openvswitch.m4 +index dca9f5f..084ceff 100644 +--- m4/openvswitch.m4 ++++ m4/openvswitch.m4 +@@ -14,6 +14,25 @@ + # See the License for the specific language governing permissions and + # limitations under the License. + ++dnl Check for --enable-threaded and updates CFLAGS. ++AC_DEFUN([OVS_CHECK_THREADED], ++ [AC_REQUIRE([AC_PROG_CC]) ++ AC_ARG_ENABLE( ++ [threaded], ++ [AC_HELP_STRING([--enable-threaded], ++ [Enable threaded version of userspace implementation])], ++ [case "${enableval}" in ++ (yes) threaded=true ;; ++ (no) threaded=false ;; ++ (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;; ++ esac], ++ [threaded=false]) ++ if $threaded; then ++ AC_DEFINE([THREADED], [1], ++ [Define to 1 if the threaded version of userspace ++ implementation is enabled.]) ++ fi]) ++ + dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately. + AC_DEFUN([OVS_CHECK_COVERAGE], + [AC_REQUIRE([AC_PROG_CC]) |