diff options
Diffstat (limited to 'camel/camel-session.c')
-rw-r--r-- | camel/camel-session.c | 186 |
1 files changed, 184 insertions, 2 deletions
diff --git a/camel/camel-session.c b/camel/camel-session.c index a6e70bcd98..06b7bb8799 100644 --- a/camel/camel-session.c +++ b/camel/camel-session.c @@ -61,6 +61,12 @@ static char *get_storage_path (CamelSession *session, CamelService *service, CamelException *ex); +#ifdef ENABLE_THREADS +static void *session_thread_msg_new(CamelSession *session, CamelSessionThreadOps *ops, unsigned int size); +static void session_thread_msg_free(CamelSession *session, CamelSessionThreadMsg *msg); +static int session_thread_queue(CamelSession *session, CamelSessionThreadMsg *msg, int flags); +static void session_thread_wait(CamelSession *session, int id); +#endif /* The vfolder provider is always avilable */ static CamelProvider vee_provider = { @@ -87,6 +93,10 @@ camel_session_init (CamelSession *session) session->priv = g_malloc0(sizeof(*session->priv)); #ifdef ENABLE_THREADS session->priv->lock = g_mutex_new(); + session->priv->thread_lock = g_mutex_new(); + session->priv->thread_id = 1; + session->priv->thread_active = g_hash_table_new(NULL, NULL); + session->priv->thread_queue = NULL; #endif } @@ -105,6 +115,12 @@ camel_session_finalise (CamelObject *o) { CamelSession *session = (CamelSession *)o; +#ifdef ENABLE_THREADS + g_hash_table_destroy(session->priv->thread_active); + if (session->priv->thread_queue) + e_thread_destroy(session->priv->thread_queue); +#endif + g_free(session->storage_path); g_hash_table_foreach_remove (session->providers, camel_session_destroy_provider, NULL); @@ -112,8 +128,8 @@ camel_session_finalise (CamelObject *o) #ifdef ENABLE_THREADS g_mutex_free(session->priv->lock); -#endif - + g_mutex_free(session->priv->thread_lock); +#endif g_free(session->priv); } @@ -127,6 +143,13 @@ camel_session_class_init (CamelSessionClass *camel_session_class) camel_session_class->get_service = get_service; camel_session_class->get_storage_path = get_storage_path; +#ifdef ENABLE_THREADS + camel_session_class->thread_msg_new = session_thread_msg_new; + camel_session_class->thread_msg_free = session_thread_msg_free; + camel_session_class->thread_queue = session_thread_queue; + camel_session_class->thread_wait = session_thread_wait; +#endif + if (vee_provider.service_cache == NULL) { vee_provider.object_types[CAMEL_PROVIDER_STORE] = camel_vee_store_get_type (); vee_provider.service_cache = g_hash_table_new (camel_url_hash, camel_url_equal); @@ -686,3 +709,162 @@ camel_session_get_filter_driver (CamelSession *session, { return CS_CLASS (session)->get_filter_driver (session, type, ex); } + +#ifdef ENABLE_THREADS + +static void *session_thread_msg_new(CamelSession *session, CamelSessionThreadOps *ops, unsigned int size) +{ + CamelSessionThreadMsg *m; + + g_assert(size >= sizeof(*m)); + + m = g_malloc0(size); + m->ops = ops; + + CAMEL_SESSION_LOCK(session, thread_lock); + m->id = session->priv->thread_id++; + g_hash_table_insert(session->priv->thread_active, (void *)m->id, m); + CAMEL_SESSION_UNLOCK(session, thread_lock); + + return m; +} + +static void session_thread_msg_free(CamelSession *session, CamelSessionThreadMsg *msg) +{ + g_assert(msg->ops != NULL); + + printf("free message %p session %p\n", msg, session); + + CAMEL_SESSION_LOCK(session, thread_lock); + g_hash_table_remove(session->priv->thread_active, (void *)msg->id); + CAMEL_SESSION_UNLOCK(session, thread_lock); + + printf("free msg, ops->free = %p\n", msg->ops->free); + + if (msg->ops->free) + msg->ops->free(session, msg); + g_free(msg); +} + +static void session_thread_destroy(EThread *thread, CamelSessionThreadMsg *msg, CamelSession *session) +{ + printf("destroy message %p session %p\n", msg, session); + session_thread_msg_free(session, msg); +} + +static void session_thread_received(EThread *thread, CamelSessionThreadMsg *msg, CamelSession *session) +{ + printf("receive message %p session %p\n", msg, session); + if (msg->ops->receive) + msg->ops->receive(session, msg); +} + +static int session_thread_queue(CamelSession *session, CamelSessionThreadMsg *msg, int flags) +{ + int id; + + CAMEL_SESSION_LOCK(session, thread_lock); + if (session->priv->thread_queue == NULL) { + session->priv->thread_queue = e_thread_new(E_THREAD_QUEUE); + e_thread_set_msg_destroy(session->priv->thread_queue, (EThreadFunc)session_thread_destroy, session); + e_thread_set_msg_received(session->priv->thread_queue, (EThreadFunc)session_thread_received, session); + } + CAMEL_SESSION_UNLOCK(session, thread_lock); + + id = msg->id; + e_thread_put(session->priv->thread_queue, &msg->msg); + + return id; +} + +static void session_thread_wait(CamelSession *session, int id) +{ + int wait; + + /* we just busy wait, only other alternative is to setup a reply port? */ + do { + CAMEL_SESSION_LOCK(session, thread_lock); + wait = g_hash_table_lookup(session->priv->thread_active, (void *)id) != NULL; + CAMEL_SESSION_UNLOCK(session, thread_lock); + if (wait) { + usleep(20000); + } + } while (wait); +} + +/** + * camel_session_thread_msg_new: + * @session: + * @ops: + * @size: + * + * Create a new thread message, using ops as the receive/reply/free + * ops, of @size bytes. + * + * @ops points to the operations used to recieve/process and finally + * free the message. + **/ +void *camel_session_thread_msg_new(CamelSession *session, CamelSessionThreadOps *ops, unsigned int size) +{ + g_assert(CAMEL_IS_SESSION(session)); + g_assert(ops != NULL); + g_assert(size >= sizeof(CamelSessionThreadMsg)); + + return CS_CLASS (session)->thread_msg_new(session, ops, size); +} + +/** + * camel_session_thread_msg_free: + * @session: + * @msg: + * + * Free a @msg. Note that the message must have been allocated using + * msg_new, and must nto have been submitted to any queue function. + **/ +void camel_session_thread_msg_free(CamelSession *session, CamelSessionThreadMsg *msg) +{ + g_assert(CAMEL_IS_SESSION(session)); + g_assert(msg != NULL); + g_assert(msg->ops != NULL); + + return CS_CLASS (session)->thread_msg_free(session, msg); +} + +/** + * camel_session_thread_queue: + * @session: + * @msg: + * @flags: queue type flags, currently 0. + * + * Queue a thread message in another thread for processing. + * The operation should be (but needn't) run in a queued manner + * with other operations queued in this manner. + * + * Return value: The id of the operation queued. + **/ +int camel_session_thread_queue(CamelSession *session, CamelSessionThreadMsg *msg, int flags) +{ + g_assert(CAMEL_IS_SESSION(session)); + g_assert(msg != NULL); + + return CS_CLASS (session)->thread_queue(session, msg, flags); +} + +/** + * camel_session_thread_wait: + * @session: + * @id: + * + * Wait on an operation to complete (by id). + **/ +void camel_session_thread_wait(CamelSession *session, int id) +{ + g_assert(CAMEL_IS_SESSION(session)); + + if (id == -1) + return; + + return CS_CLASS (session)->thread_wait(session, id); +} + +#endif |