aboutsummaryrefslogblamecommitdiffstats
path: root/e-util/e-msgport.c
blob: 2bb4311b18473920dd33dee3b0dd139225f18c9c (plain) (tree)
1
2
3
4
5
6
7
8
9
10
 
                      



                      
                  
                    
                 
                                                   
                                                  
 






























































































                                                                  










































                                                                  
               





                                                            


                                 
                                                                          
                                 
         
 

                                 






                                 








                                                                  
                                                                                                                   
                                                        
                                               











































                                                                            



                     





                              
                                                                                                     
                                                                    
                                                                                                            









                             
                                     
                                                        
 
                                                    































                                                                     






                                         
                              






                                            
                         
                                  
                 










                                                         














                                                                            
                 
                                              
                          







                                                                                  
                 



                                                                     
                                                        

                                                                     
                                                      
                                     















































































































                                                                                     
                                  
                                        
 
                                                                    





                                                                                    
                                    
                                          
                                          
                                            
                                                            

                                                           
                                                                

                                                                
                                      
     


                                                                                               
      

                                 







                                                                                                                    
                        
                                                    
                                                  

                                                
                                                              










                                                                               
                    











































                                                                                             
                                                                 
                                                                            
                                                                             
                                                                  

                                                                     




                                                                   
                                     
                                                                                                              
                                              































                                                                    
                                 

















































                                                                                
                                                        






























                                                                                 
                                                 








                                                              






                                                  










































































































                                                                                         

#include "e-msgport.h"

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>

#include <pthread.h>

#include <glib.h>

#define m(x)            /* msgport debug */
#define t(x)            /* thread debug */

void e_dlist_init(EDList *v)
{
        v->head = (EDListNode *)&v->tail;
        v->tail = 0;
        v->tailpred = (EDListNode *)&v->head;
}

EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
{
        n->next = l->head;
        n->prev = (EDListNode *)&l->head;
        l->head->prev = n;
        l->head = n;
        return n;
}

EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
{
        n->next = (EDListNode *)&l->tail;
        n->prev = l->tailpred;
        l->tailpred->next = n;
        l->tailpred = n;
        return n;
}

EDListNode *e_dlist_remove(EDListNode *n)
{
        n->next->prev = n->prev;
        n->prev->next = n->next;
        return n;
}

EDListNode *e_dlist_remhead(EDList *l)
{
    EDListNode *n, *nn;

    n = l->head;
    nn = n->next;
    if (nn) {
        nn->prev = n->prev;
        l->head = nn;
        return n;
    }
    return NULL;
}

EDListNode *e_dlist_remtail(EDList *l)
{
    EDListNode *n, *np;

    n = l->tailpred;
    np = n->prev;
    if (np) {
        np->next = n->next;
        l->tailpred = np;
        return n;
    }
    return NULL;
}

int e_dlist_empty(EDList *l)
{
    return (l->head == (EDListNode *)&l->tail);
}

int e_dlist_length(EDList *l)
{
    EDListNode *n, *nn;
    int count = 0;

    n = l->head;
    nn = n->next;
    while (nn) {
        count++;
        n = nn;
        nn = n->next;
    }

    return 0;
}

struct _EMsgPort {
    EDList queue;
    int condwait;       /* how many waiting in condwait */
    union {
        int pipe[2];
        struct {
            int read;
            int write;
        } fd;
    } pipe;
    /* @#@$#$ glib stuff */
    GCond *cond;
    GMutex *lock;
};

EMsgPort *e_msgport_new(void)
{
    EMsgPort *mp;

    mp = g_malloc(sizeof(*mp));
    e_dlist_init(&mp->queue);
    mp->lock = g_mutex_new();
    mp->cond = g_cond_new();
    mp->pipe.fd.read = -1;
    mp->pipe.fd.write = -1;
    mp->condwait = 0;

    return mp;
}

void e_msgport_destroy(EMsgPort *mp)
{
    g_mutex_free(mp->lock);
    g_cond_free(mp->cond);
    if (mp->pipe.fd.read != -1) {
        close(mp->pipe.fd.read);
        close(mp->pipe.fd.write);
    }
    g_free(mp);
}

/* get a fd that can be used to wait on the port asynchronously */
int e_msgport_fd(EMsgPort *mp)
{
    int fd;

    g_mutex_lock(mp->lock);
    fd = mp->pipe.fd.read;
    if (fd == -1) {
        pipe(mp->pipe.pipe);
        fd = mp->pipe.fd.read;
    }
    g_mutex_unlock(mp->lock);

    return fd;
}

void e_msgport_put(EMsgPort *mp, EMsg *msg)
{
    int fd;

    m(printf("put:\n"));
    g_mutex_lock(mp->lock);
    e_dlist_addtail(&mp->queue, &msg->ln);
    if (mp->condwait > 0) {
        m(printf("put: condwait > 0, waking up\n"));
        g_cond_signal(mp->cond);
    }
    fd = mp->pipe.fd.write;
    g_mutex_unlock(mp->lock);

    if (fd != -1) {
        m(printf("put: have pipe, writing notification to it\n"));
        write(fd, "", 1);
    }

    m(printf("put: done\n"));
}

static void
msgport_cleanlock(void *data)
{
    EMsgPort *mp = data;

    g_mutex_unlock(mp->lock);
}

EMsg *e_msgport_wait(EMsgPort *mp)
{
    EMsg *msg;

    m(printf("wait:\n"));
    g_mutex_lock(mp->lock);
    while (e_dlist_empty(&mp->queue)) {
        if (mp->pipe.fd.read == -1) {
            m(printf("wait: waiting on condition\n"));
            mp->condwait++;
            /* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
            pthread_cleanup_push(msgport_cleanlock, mp);
            g_cond_wait(mp->cond, mp->lock);
            pthread_cleanup_pop(0);
            m(printf("wait: got condition\n"));
            mp->condwait--;
        } else {
            fd_set rfds;

            m(printf("wait: waitng on pipe\n"));
            FD_ZERO(&rfds);
            FD_SET(mp->pipe.fd.read, &rfds);
            g_mutex_unlock(mp->lock);
            select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL);
            pthread_testcancel();
            g_mutex_lock(mp->lock);
            m(printf("wait: got pipe\n"));
        }
    }
    msg = (EMsg *)mp->queue.head;
    m(printf("wait: message = %p\n", msg));
    g_mutex_unlock(mp->lock);
    m(printf("wait: done\n"));
    return msg;
}

EMsg *e_msgport_get(EMsgPort *mp)
{
    EMsg *msg;
    char dummy[1];

    g_mutex_lock(mp->lock);
    msg = (EMsg *)e_dlist_remhead(&mp->queue);
    if (msg && mp->pipe.fd.read != -1)
        read(mp->pipe.fd.read, dummy, 1);
    m(printf("get: message = %p\n", msg));
    g_mutex_unlock(mp->lock);

    return msg;
}

void e_msgport_reply(EMsg *msg)
{
    if (msg->reply_port) {
        e_msgport_put(msg->reply_port, msg);
    }
    /* else lost? */
}

struct _thread_info {
    pthread_t id;
    int busy;
};

struct _EThread {
    EMsgPort *server_port;
    EMsgPort *reply_port;
    pthread_mutex_t mutex;
    e_thread_t type;
    int queue_limit;

    int waiting;        /* if we are waiting for a new message, count of waiting processes */
    pthread_t id;       /* id of our running child thread */
    GList *id_list;     /* if THREAD_NEW, then a list of our child threads in thread_info structs */

    EThreadFunc destroy;
    void *destroy_data;

    EThreadFunc received;
    void *received_data;

    EThreadFunc lost;
    void *lost_data;
};

#define E_THREAD_NONE ((pthread_t)~0)
#define E_THREAD_QUIT_REPLYPORT ((struct _EMsgPort *)~0)

static void thread_destroy_msg(EThread *e, EMsg *m);

static struct _thread_info *thread_find(EThread *e, pthread_t id)
{
    GList *node;
    struct _thread_info *info;

    node = e->id_list;
    while (node) {
        info = node->data;
        if (info->id == id)
            return info;
        node = node->next;
    }
    return NULL;
}

#if 0
static void thread_remove(EThread *e, pthread_t id)
{
    GList *node;
    struct _thread_info *info;

    node = e->id_list;
    while (node) {
        info = node->data;
        if (info->id == id) {
            e->id_list = g_list_remove(e->id_list, info);
            g_free(info);
        }
        node = node->next;
    }
}
#endif

EThread *e_thread_new(e_thread_t type)
{
    EThread *e;

    e = g_malloc0(sizeof(*e));
    pthread_mutex_init(&e->mutex, 0);
    e->type = type;
    e->server_port = e_msgport_new();
    e->id = E_THREAD_NONE;
    e->queue_limit = INT_MAX;

    return e;
}

/* close down the threads & resources etc */
void e_thread_destroy(EThread *e)
{
    int busy = FALSE;
    EMsg *msg;
    struct _thread_info *info;
    GList *l;

    /* make sure we soak up all the messages first */
    while ( (msg = e_msgport_get(e->server_port)) ) {
        thread_destroy_msg(e, msg);
    }

    pthread_mutex_lock(&e->mutex);

    switch(e->type) {
    case E_THREAD_QUEUE:
    case E_THREAD_DROP:
        /* if we have a thread, 'kill' it */
        if (e->id != E_THREAD_NONE) {
            pthread_t id = e->id;

            t(printf("Sending thread '%d' quit message\n", id));

            e->id = E_THREAD_NONE;

            msg = g_malloc0(sizeof(*msg));
            msg->reply_port = E_THREAD_QUIT_REPLYPORT;
            e_msgport_put(e->server_port, msg);

            pthread_mutex_unlock(&e->mutex);
            t(printf("Joining thread '%d'\n", id));
            pthread_join(id, 0);
            t(printf("Joined thread '%d'!\n", id));
            pthread_mutex_lock(&e->mutex);
        }
        busy = e->id != E_THREAD_NONE;
        break;
    case E_THREAD_NEW:
        /* first, send everyone a quit message */
        l = e->id_list;
        while (l) {
            info = l->data;
            t(printf("Sending thread '%d' quit message\n", info->id));
            msg = g_malloc0(sizeof(*msg));
            msg->reply_port = E_THREAD_QUIT_REPLYPORT;
            e_msgport_put(e->server_port, msg);
            l = l->next;            
        }

        /* then, wait for everyone to quit */
        while (e->id_list) {
            info = e->id_list->data;
            e->id_list = g_list_remove(e->id_list, info);
            pthread_mutex_unlock(&e->mutex);
            t(printf("Joining thread '%d'\n", info->id));
            pthread_join(info->id, 0);
            t(printf("Joined thread '%d'!\n", info->id));
            pthread_mutex_lock(&e->mutex);
            g_free(info);
        }
        busy = g_list_length(e->id_list) != 0;
        break;
    }

    pthread_mutex_unlock(&e->mutex);

    /* and clean up, if we can */
    if (busy) {
        g_warning("threads were busy, leaked EThread");
        return;
    }

    e_msgport_destroy(e->server_port);
    g_free(e);
}

/* set the queue maximum depth, what happens when the queue
   fills up depends on the queue type */
void e_thread_set_queue_limit(EThread *e, int limit)
{
    e->queue_limit = limit;
}

/* set a msg destroy callback, this can not call any e_thread functions on @e */
void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
{
    pthread_mutex_lock(&e->mutex);
    e->destroy = destroy;
    e->destroy_data = data;
    pthread_mutex_unlock(&e->mutex);
}

/* set a message lost callback, called if any message is discarded */
void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
{
    pthread_mutex_lock(&e->mutex);
    e->lost = lost;
    e->lost_data = lost;
    pthread_mutex_unlock(&e->mutex);
}

/* set a reply port, if set, then send messages back once finished */
void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
{
    e->reply_port = reply_port;
}

/* set a received data callback */
void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
{
    pthread_mutex_lock(&e->mutex);
    e->received = received;
    e->received_data = data;
    pthread_mutex_unlock(&e->mutex);
}

static void
thread_destroy_msg(EThread *e, EMsg *m)
{
    EThreadFunc func;
    void *func_data;

    /* we do this so we never get an incomplete/unmatched callback + data */
    pthread_mutex_lock(&e->mutex);
    func = e->destroy;
    func_data = e->destroy_data;
    pthread_mutex_unlock(&e->mutex);
    
    if (func)
        func(e, m, func_data);
}

static void
thread_received_msg(EThread *e, EMsg *m)
{
    EThreadFunc func;
    void *func_data;

    /* we do this so we never get an incomplete/unmatched callback + data */
    pthread_mutex_lock(&e->mutex);
    func = e->received;
    func_data = e->received_data;
    pthread_mutex_unlock(&e->mutex);
    
    if (func)
        func(e, m, func_data);
    else
        g_warning("No processing callback for EThread, message unprocessed");
}

static void
thread_lost_msg(EThread *e, EMsg *m)
{
    EThreadFunc func;
    void *func_data;

    /* we do this so we never get an incomplete/unmatched callback + data */
    pthread_mutex_lock(&e->mutex);
    func = e->lost;
    func_data = e->lost_data;
    pthread_mutex_unlock(&e->mutex);
    
    if (func)
        func(e, m, func_data);
}

/* the actual thread dispatcher */
static void *
thread_dispatch(void *din)
{
    EThread *e = din;
    EMsg *m;
    struct _thread_info *info;
    pthread_t self = pthread_self();

    t(printf("dispatch thread started: %ld\n", pthread_self()));

    while (1) {
        pthread_mutex_lock(&e->mutex);
        m = e_msgport_get(e->server_port);
        if (m == NULL) {
            /* nothing to do?  If we are a 'new' type thread, just quit.
               Otherwise, go into waiting (can be cancelled here) */
            info = NULL;
            switch (e->type) {
            case E_THREAD_NEW:
            case E_THREAD_QUEUE:
            case E_THREAD_DROP:
                info = thread_find(e, self);
                if (info)
                    info->busy = FALSE;
                e->waiting++;
                pthread_mutex_unlock(&e->mutex);
                e_msgport_wait(e->server_port);
                pthread_mutex_lock(&e->mutex);
                e->waiting--;
                pthread_mutex_unlock(&e->mutex);
                break;
#if 0
            case E_THREAD_NEW:
                e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
                pthread_mutex_unlock(&e->mutex);
                return 0;
#endif
            }

            continue;
        } else if (m->reply_port == E_THREAD_QUIT_REPLYPORT) {
            t(printf("Thread %d got quit message\n", self));
            /* Handle a quit message, say we're quitting, free the message, and break out of the loop */
            info = thread_find(e, self);
            if (info)
                info->busy = 2;
            pthread_mutex_unlock(&e->mutex);
            g_free(m);
            break;
        } else {
            info = thread_find(e, self);
            if (info)
                info->busy = TRUE;
        }
        pthread_mutex_unlock(&e->mutex);

        t(printf("got message in dispatch thread\n"));

        /* process it */
        thread_received_msg(e, m);

        /* if we have a reply port, send it back, otherwise, lose it */
        if (m->reply_port) {
            e_msgport_reply(m);
        } else {
            thread_destroy_msg(e, m);
        }
    }

    return NULL;
}

/* send a message to the thread, start thread if necessary */
void e_thread_put(EThread *e, EMsg *msg)
{
    pthread_t id;
    EMsg *dmsg = NULL;

    pthread_mutex_lock(&e->mutex);

    /* the caller forgot to tell us what to do, well, we can't do anything can we */
    if (e->received == NULL) {
        pthread_mutex_unlock(&e->mutex);
        g_warning("EThread called with no receiver function, no work to do!");
        thread_destroy_msg(e, msg);
        return;
    }

    msg->reply_port = e->reply_port;

    switch(e->type) {
    case E_THREAD_QUEUE:
        /* if the queue is full, lose this new addition */
        if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
            e_msgport_put(e->server_port, msg);
        } else {
            printf("queue limit reached, dropping new message\n");
            dmsg = msg;
        }
        break;
    case E_THREAD_DROP:
        /* if the queue is full, lose the oldest (unprocessed) message */
        if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
            e_msgport_put(e->server_port, msg);
        } else {
            printf("queue limit reached, dropping old message\n");
            e_msgport_put(e->server_port, msg);
            dmsg = e_msgport_get(e->server_port);
        }
        break;
    case E_THREAD_NEW:
        /* it is possible that an existing thread can catch this message, so
           we might create a thread with no work to do.
           but that doesn't matter, the other alternative that it be lost is worse */
        e_msgport_put(e->server_port, msg);
        if (e->waiting == 0
            && g_list_length(e->id_list) < e->queue_limit
            && pthread_create(&id, NULL, thread_dispatch, e) == 0) {
            struct _thread_info *info = g_malloc0(sizeof(*info));
            t(printf("created NEW thread %ld\n", id));
            info->id = id;
            info->busy = TRUE;
            e->id_list = g_list_append(e->id_list, info);
        }
        pthread_mutex_unlock(&e->mutex);
        return;
    }

    /* create the thread, if there is none to receive it yet */
    if (e->id == E_THREAD_NONE) {
        if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) {
            g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno));
            e->id = E_THREAD_NONE;
        }
    }

    pthread_mutex_unlock(&e->mutex);

    if (dmsg) {
        thread_lost_msg(e, dmsg);
        thread_destroy_msg(e, dmsg);
    }
}

/* yet-another-mutex interface */
struct _EMutex {
    int type;
    pthread_t owner;
    short waiters;
    short depth;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
};

/* sigh, this is just painful to have to need, but recursive
   read/write, etc mutexes just aren't very common in thread
   implementations */
/* TODO: Just make it use recursive mutexes if they are available */
EMutex *e_mutex_new(e_mutex_t type)
{
    struct _EMutex *m;

    m = g_malloc(sizeof(*m));
    m->type = type;
    m->waiters = 0;
    m->depth = 0;
    m->owner = E_THREAD_NONE;

    switch (type) {
    case E_MUTEX_SIMPLE:
        pthread_mutex_init(&m->mutex, 0);
        break;
    case E_MUTEX_REC:
        pthread_mutex_init(&m->mutex, 0);
        pthread_cond_init(&m->cond, 0);
        break;
        /* read / write ?  flags for same? */
    }

    return m;
}

int e_mutex_destroy(EMutex *m)
{
    int ret = 0;

    switch (m->type) {
    case E_MUTEX_SIMPLE:
        ret = pthread_mutex_destroy(&m->mutex);
        if (ret == -1)
            g_warning("EMutex destroy failed: %s", strerror(errno));
        g_free(m);
        break;
    case E_MUTEX_REC:
        ret = pthread_mutex_destroy(&m->mutex);
        if (ret == -1)
            g_warning("EMutex destroy failed: %s", strerror(errno));
        ret = pthread_cond_destroy(&m->cond);
        if (ret == -1)
            g_warning("EMutex destroy failed: %s", strerror(errno));
        g_free(m);

    }
    return ret;
}

int e_mutex_lock(EMutex *m)
{
    pthread_t id;

    switch (m->type) {
    case E_MUTEX_SIMPLE:
        return pthread_mutex_lock(&m->mutex);
    case E_MUTEX_REC:
        id = pthread_self();
        if (pthread_mutex_lock(&m->mutex) == -1)
            return -1;
        while (1) {
            if (m->owner == E_THREAD_NONE) {
                m->owner = id;
                m->depth = 1;
                break;
            } else if (id == m->owner) {
                m->depth++;
                break;
            } else {
                m->waiters++;
                if (pthread_cond_wait(&m->cond, &m->mutex) == -1)
                    return -1;
                m->waiters--;
            }
        }
        return pthread_mutex_unlock(&m->mutex);
    }

    errno = EINVAL;
    return -1;
}

int e_mutex_unlock(EMutex *m)
{
    switch (m->type) {
    case E_MUTEX_SIMPLE:
        return pthread_mutex_unlock(&m->mutex);
    case E_MUTEX_REC:
        if (pthread_mutex_lock(&m->mutex) == -1)
            return -1;
        g_assert(m->owner == pthread_self());

        m->depth--;
        if (m->depth == 0) {
            m->owner = E_THREAD_NONE;
            if (m->waiters > 0)
                pthread_cond_signal(&m->cond);
        }
        return pthread_mutex_unlock(&m->mutex);
    }

    errno = EINVAL;
    return -1;
}

void e_mutex_assert_locked(EMutex *m)
{
    g_return_if_fail (m->type == E_MUTEX_REC);
    pthread_mutex_lock(&m->mutex);
    g_assert(m->owner == pthread_self());
    pthread_mutex_unlock(&m->mutex);
}

#ifdef STANDALONE
EMsgPort *server_port;


void *fdserver(void *data)
{
    int fd;
    EMsg *msg;
    int id = (int)data;
    fd_set rfds;

    fd = e_msgport_fd(server_port);

    while (1) {
        int count = 0;

        printf("server %d: waiting on fd %d\n", id, fd);
        FD_ZERO(&rfds);
        FD_SET(fd, &rfds);
        select(fd+1, &rfds, NULL, NULL, NULL);
        printf("server %d: Got async notification, checking for messages\n", id);
        while ((msg = e_msgport_get(server_port))) {
            printf("server %d: got message\n", id);
            sleep(1);
            printf("server %d: replying\n", id);
            e_msgport_reply(msg);
            count++;
        }
        printf("server %d: got %d messages\n", id, count);
    }
}

void *server(void *data)
{
    EMsg *msg;
    int id = (int)data;

    while (1) {
        printf("server %d: waiting\n", id);
        msg = e_msgport_wait(server_port);
        msg = e_msgport_get(server_port);
        if (msg) {
            printf("server %d: got message\n", id);
            sleep(1);
            printf("server %d: replying\n", id);
            e_msgport_reply(msg);
        } else {
            printf("server %d: didn't get message\n", id);
        }
    }
}

void *client(void *data)
{
    EMsg *msg;
    EMsgPort *replyport;
    int i;

    replyport = e_msgport_new();
    msg = g_malloc0(sizeof(*msg));
    msg->reply_port = replyport;
    for (i=0;i<10;i++) {
        /* synchronous operation */
        printf("client: sending\n");
        e_msgport_put(server_port, msg);
        printf("client: waiting for reply\n");
        e_msgport_wait(replyport);
        e_msgport_get(replyport);
        printf("client: got reply\n");
    }

    printf("client: sleeping ...\n");
    sleep(2);
    printf("client: sending multiple\n");

    for (i=0;i<10;i++) {
        msg = g_malloc0(sizeof(*msg));
        msg->reply_port = replyport;
        e_msgport_put(server_port, msg);
    }

    printf("client: receiving multiple\n");
    for (i=0;i<10;i++) {
        e_msgport_wait(replyport);
        msg = e_msgport_get(replyport);
        g_free(msg);
    }

    printf("client: done\n");
}

int main(int argc, char **argv)
{
    pthread_t serverid, clientid;

    g_thread_init(NULL);

    server_port = e_msgport_new();

    /*pthread_create(&serverid, NULL, server, (void *)1);*/
    pthread_create(&serverid, NULL, fdserver, (void *)1);
    pthread_create(&clientid, NULL, client, NULL);

    sleep(60);

    return 0;
}
#endif