aboutsummaryrefslogtreecommitdiffstats
path: root/e-util/e-msgport.c
diff options
context:
space:
mode:
Diffstat (limited to 'e-util/e-msgport.c')
-rw-r--r--e-util/e-msgport.c97
1 files changed, 89 insertions, 8 deletions
diff --git a/e-util/e-msgport.c b/e-util/e-msgport.c
index a63e371558..58d75b16f8 100644
--- a/e-util/e-msgport.c
+++ b/e-util/e-msgport.c
@@ -14,7 +14,7 @@
#include <pthread.h>
#define m(x) /* msgport debug */
-#define t(x) /* thread debug */
+#define t(x) x /* thread debug */
void e_dlist_init(EDList *v)
{
@@ -227,6 +227,11 @@ void e_msgport_reply(EMsg *msg)
/* else lost? */
}
+struct _thread_info {
+ pthread_t id;
+ int busy;
+};
+
struct _EThread {
EMsgPort *server_port;
EMsgPort *reply_port;
@@ -234,9 +239,9 @@ struct _EThread {
e_thread_t type;
int queue_limit;
- int waiting; /* if we are waiting for a new message */
+ int waiting; /* if we are waiting for a new message, count of waiting processes */
pthread_t id; /* id of our running child thread */
- GList *id_list; /* if THREAD_NEW, then a list of our child threads */
+ GList *id_list; /* if THREAD_NEW, then a list of our child threads in thread_info structs */
EThreadFunc destroy;
void *destroy_data;
@@ -252,6 +257,39 @@ struct _EThread {
static void thread_destroy_msg(EThread *e, EMsg *m);
+static struct _thread_info *thread_find(EThread *e, pthread_t id)
+{
+ GList *node;
+ struct _thread_info *info;
+
+ node = e->id_list;
+ while (node) {
+ info = node->data;
+ if (info->id == id)
+ return info;
+ node = node->next;
+ }
+ return NULL;
+}
+
+#if 0
+static void thread_remove(EThread *e, pthread_t id)
+{
+ GList *node;
+ struct _thread_info *info;
+
+ node = e->id_list;
+ while (node) {
+ info = node->data;
+ if (info->id == id) {
+ e->id_list = g_list_remove(e->id_list, info);
+ g_free(info);
+ }
+ node = node->next;
+ }
+}
+#endif
+
EThread *e_thread_new(e_thread_t type)
{
EThread *e;
@@ -272,6 +310,7 @@ void e_thread_destroy(EThread *e)
int tries = 0;
int busy = FALSE;
EMsg *msg;
+ struct _thread_info *info;
/* make sure we soak up all the messages first */
while ( (msg = e_msgport_get(e->server_port)) ) {
@@ -285,7 +324,7 @@ void e_thread_destroy(EThread *e)
case E_THREAD_DROP:
/* if we have a thread, 'kill' it */
while (e->id != E_THREAD_NONE && tries < 5) {
- if (e->waiting == 1) {
+ if (e->waiting > 0) {
pthread_t id = e->id;
e->id = E_THREAD_NONE;
pthread_mutex_unlock(&e->mutex);
@@ -303,12 +342,33 @@ void e_thread_destroy(EThread *e)
busy = e->id != E_THREAD_NONE;
break;
case E_THREAD_NEW:
+ while (e->id_list && tries < 5) {
+ info = e->id_list->data;
+ if (!info->busy) {
+ e->id_list = g_list_remove(e->id_list, info);
+ printf("cleaning up pool thread %d\n", info->id);
+ pthread_mutex_unlock(&e->mutex);
+ if (pthread_cancel(info->id) == 0)
+ /*pthread_join(info->id, 0);*/
+ pthread_mutex_lock(&e->mutex);
+ printf("cleaned up ok\n");
+ g_free(info);
+ } else {
+ (printf("thread(s) still active, waiting for it to finish\n"));
+ tries++;
+ pthread_mutex_unlock(&e->mutex);
+ sleep(1);
+ pthread_mutex_lock(&e->mutex);
+ }
+ }
+#if 0
while (g_list_length(e->id_list) && tries < 5) {
(printf("thread(s) still active, waiting for them to finish\n"));
pthread_mutex_unlock(&e->mutex);
sleep(1);
pthread_mutex_lock(&e->mutex);
}
+#endif
busy = g_list_length(e->id_list) != 0;
break;
}
@@ -421,6 +481,7 @@ thread_dispatch(void *din)
{
EThread *e = din;
EMsg *m;
+ struct _thread_info *info;
t(printf("dispatch thread started: %ld\n", pthread_self()));
@@ -430,21 +491,34 @@ thread_dispatch(void *din)
if (m == NULL) {
/* nothing to do? If we are a 'new' type thread, just quit.
Otherwise, go into waiting (can be cancelled here) */
+ info = NULL;
switch (e->type) {
+ case E_THREAD_NEW:
case E_THREAD_QUEUE:
case E_THREAD_DROP:
- e->waiting = 1;
+ info = thread_find(e, pthread_self());
+ if (info)
+ info->busy = FALSE;
+ e->waiting++;
pthread_mutex_unlock(&e->mutex);
e_msgport_wait(e->server_port);
- e->waiting = 0;
+ pthread_mutex_lock(&e->mutex);
+ e->waiting--;
+ pthread_mutex_unlock(&e->mutex);
break;
+#if 0
case E_THREAD_NEW:
e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
pthread_mutex_unlock(&e->mutex);
return 0;
+#endif
}
continue;
+ } else {
+ info = thread_find(e, pthread_self());
+ if (info)
+ info->busy = TRUE;
}
pthread_mutex_unlock(&e->mutex);
@@ -465,6 +539,7 @@ thread_dispatch(void *din)
but to do this we need to use the fd interface of the msgport, and its utility
is probably debatable anyway */
+#if 0
/* signify we are no longer running */
/* This code isn't used yet, but would be if we ever had a 'quit now' message implemented */
pthread_mutex_lock(&e->mutex);
@@ -478,6 +553,7 @@ thread_dispatch(void *din)
break;
}
pthread_mutex_unlock(&e->mutex);
+#endif
return 0;
}
@@ -525,9 +601,14 @@ void e_thread_put(EThread *e, EMsg *msg)
we might create a thread with no work to do.
but that doesn't matter, the other alternative that it be lost is worse */
e_msgport_put(e->server_port, msg);
- if (g_list_length(e->id_list) < e->queue_limit
+ if (e->waiting == 0
+ && g_list_length(e->id_list) < e->queue_limit
&& pthread_create(&id, NULL, thread_dispatch, e) == 0) {
- e->id_list = g_list_append(e->id_list, (void *)id);
+ struct _thread_info *info = g_malloc0(sizeof(*info));
+ printf("created NEW thread %ld\n", id);
+ info->id = id;
+ info->busy = TRUE;
+ e->id_list = g_list_append(e->id_list, info);
}
pthread_mutex_unlock(&e->mutex);
return;