diff options
Diffstat (limited to 'e-util/e-msgport.c')
-rw-r--r-- | e-util/e-msgport.c | 97 |
1 files changed, 89 insertions, 8 deletions
diff --git a/e-util/e-msgport.c b/e-util/e-msgport.c index a63e371558..58d75b16f8 100644 --- a/e-util/e-msgport.c +++ b/e-util/e-msgport.c @@ -14,7 +14,7 @@ #include <pthread.h> #define m(x) /* msgport debug */ -#define t(x) /* thread debug */ +#define t(x) x /* thread debug */ void e_dlist_init(EDList *v) { @@ -227,6 +227,11 @@ void e_msgport_reply(EMsg *msg) /* else lost? */ } +struct _thread_info { + pthread_t id; + int busy; +}; + struct _EThread { EMsgPort *server_port; EMsgPort *reply_port; @@ -234,9 +239,9 @@ struct _EThread { e_thread_t type; int queue_limit; - int waiting; /* if we are waiting for a new message */ + int waiting; /* if we are waiting for a new message, count of waiting processes */ pthread_t id; /* id of our running child thread */ - GList *id_list; /* if THREAD_NEW, then a list of our child threads */ + GList *id_list; /* if THREAD_NEW, then a list of our child threads in thread_info structs */ EThreadFunc destroy; void *destroy_data; @@ -252,6 +257,39 @@ struct _EThread { static void thread_destroy_msg(EThread *e, EMsg *m); +static struct _thread_info *thread_find(EThread *e, pthread_t id) +{ + GList *node; + struct _thread_info *info; + + node = e->id_list; + while (node) { + info = node->data; + if (info->id == id) + return info; + node = node->next; + } + return NULL; +} + +#if 0 +static void thread_remove(EThread *e, pthread_t id) +{ + GList *node; + struct _thread_info *info; + + node = e->id_list; + while (node) { + info = node->data; + if (info->id == id) { + e->id_list = g_list_remove(e->id_list, info); + g_free(info); + } + node = node->next; + } +} +#endif + EThread *e_thread_new(e_thread_t type) { EThread *e; @@ -272,6 +310,7 @@ void e_thread_destroy(EThread *e) int tries = 0; int busy = FALSE; EMsg *msg; + struct _thread_info *info; /* make sure we soak up all the messages first */ while ( (msg = e_msgport_get(e->server_port)) ) { @@ -285,7 +324,7 @@ void e_thread_destroy(EThread *e) case E_THREAD_DROP: /* if we have a thread, 'kill' it */ while (e->id != E_THREAD_NONE && tries < 5) { - if (e->waiting == 1) { + if (e->waiting > 0) { pthread_t id = e->id; e->id = E_THREAD_NONE; pthread_mutex_unlock(&e->mutex); @@ -303,12 +342,33 @@ void e_thread_destroy(EThread *e) busy = e->id != E_THREAD_NONE; break; case E_THREAD_NEW: + while (e->id_list && tries < 5) { + info = e->id_list->data; + if (!info->busy) { + e->id_list = g_list_remove(e->id_list, info); + printf("cleaning up pool thread %d\n", info->id); + pthread_mutex_unlock(&e->mutex); + if (pthread_cancel(info->id) == 0) + /*pthread_join(info->id, 0);*/ + pthread_mutex_lock(&e->mutex); + printf("cleaned up ok\n"); + g_free(info); + } else { + (printf("thread(s) still active, waiting for it to finish\n")); + tries++; + pthread_mutex_unlock(&e->mutex); + sleep(1); + pthread_mutex_lock(&e->mutex); + } + } +#if 0 while (g_list_length(e->id_list) && tries < 5) { (printf("thread(s) still active, waiting for them to finish\n")); pthread_mutex_unlock(&e->mutex); sleep(1); pthread_mutex_lock(&e->mutex); } +#endif busy = g_list_length(e->id_list) != 0; break; } @@ -421,6 +481,7 @@ thread_dispatch(void *din) { EThread *e = din; EMsg *m; + struct _thread_info *info; t(printf("dispatch thread started: %ld\n", pthread_self())); @@ -430,21 +491,34 @@ thread_dispatch(void *din) if (m == NULL) { /* nothing to do? If we are a 'new' type thread, just quit. Otherwise, go into waiting (can be cancelled here) */ + info = NULL; switch (e->type) { + case E_THREAD_NEW: case E_THREAD_QUEUE: case E_THREAD_DROP: - e->waiting = 1; + info = thread_find(e, pthread_self()); + if (info) + info->busy = FALSE; + e->waiting++; pthread_mutex_unlock(&e->mutex); e_msgport_wait(e->server_port); - e->waiting = 0; + pthread_mutex_lock(&e->mutex); + e->waiting--; + pthread_mutex_unlock(&e->mutex); break; +#if 0 case E_THREAD_NEW: e->id_list = g_list_remove(e->id_list, (void *)pthread_self()); pthread_mutex_unlock(&e->mutex); return 0; +#endif } continue; + } else { + info = thread_find(e, pthread_self()); + if (info) + info->busy = TRUE; } pthread_mutex_unlock(&e->mutex); @@ -465,6 +539,7 @@ thread_dispatch(void *din) but to do this we need to use the fd interface of the msgport, and its utility is probably debatable anyway */ +#if 0 /* signify we are no longer running */ /* This code isn't used yet, but would be if we ever had a 'quit now' message implemented */ pthread_mutex_lock(&e->mutex); @@ -478,6 +553,7 @@ thread_dispatch(void *din) break; } pthread_mutex_unlock(&e->mutex); +#endif return 0; } @@ -525,9 +601,14 @@ void e_thread_put(EThread *e, EMsg *msg) we might create a thread with no work to do. but that doesn't matter, the other alternative that it be lost is worse */ e_msgport_put(e->server_port, msg); - if (g_list_length(e->id_list) < e->queue_limit + if (e->waiting == 0 + && g_list_length(e->id_list) < e->queue_limit && pthread_create(&id, NULL, thread_dispatch, e) == 0) { - e->id_list = g_list_append(e->id_list, (void *)id); + struct _thread_info *info = g_malloc0(sizeof(*info)); + printf("created NEW thread %ld\n", id); + info->id = id; + info->busy = TRUE; + e->id_list = g_list_append(e->id_list, info); } pthread_mutex_unlock(&e->mutex); return; |