#include "e-msgport.h"

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>

#include <pthread.h>

#include <glib.h>

#define m(x)			/* msgport debug */
#define t(x) 			/* thread debug */

void e_dlist_init(EDList *v)
{
        v->head = (EDListNode *)&v->tail;
        v->tail = 0;
        v->tailpred = (EDListNode *)&v->head;
}

EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
{
        n->next = l->head;
        n->prev = (EDListNode *)&l->head;
        l->head->prev = n;
        l->head = n;
        return n;
}

EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
{
        n->next = (EDListNode *)&l->tail;
        n->prev = l->tailpred;
        l->tailpred->next = n;
        l->tailpred = n;
        return n;
}

EDListNode *e_dlist_remove(EDListNode *n)
{
        n->next->prev = n->prev;
        n->prev->next = n->next;
        return n;
}

EDListNode *e_dlist_remhead(EDList *l)
{
	EDListNode *n, *nn;

	n = l->head;
	nn = n->next;
	if (nn) {
		nn->prev = n->prev;
		l->head = nn;
		return n;
	}
	return NULL;
}

EDListNode *e_dlist_remtail(EDList *l)
{
	EDListNode *n, *np;

	n = l->tailpred;
	np = n->prev;
	if (np) {
		np->next = n->next;
		l->tailpred = np;
		return n;
	}
	return NULL;
}

int e_dlist_empty(EDList *l)
{
	return (l->head == (EDListNode *)&l->tail);
}

int e_dlist_length(EDList *l)
{
	EDListNode *n, *nn;
	int count = 0;

	n = l->head;
	nn = n->next;
	while (nn) {
		count++;
		n = nn;
		nn = n->next;
	}

	return 0;
}

struct _EMsgPort {
	EDList queue;
	int condwait;		/* how many waiting in condwait */
	union {
		int pipe[2];
		struct {
			int read;
			int write;
		} fd;
	} pipe;
	/* @#@$#$ glib stuff */
	GCond *cond;
	GMutex *lock;
};

EMsgPort *e_msgport_new(void)
{
	EMsgPort *mp;

	mp = g_malloc(sizeof(*mp));
	e_dlist_init(&mp->queue);
	mp->lock = g_mutex_new();
	mp->cond = g_cond_new();
	mp->pipe.fd.read = -1;
	mp->pipe.fd.write = -1;
	mp->condwait = 0;

	return mp;
}

void e_msgport_destroy(EMsgPort *mp)
{
	g_mutex_free(mp->lock);
	g_cond_free(mp->cond);
	if (mp->pipe.fd.read != -1) {
		close(mp->pipe.fd.read);
		close(mp->pipe.fd.write);
	}
	g_free(mp);
}

/* get a fd that can be used to wait on the port asynchronously */
int e_msgport_fd(EMsgPort *mp)
{
	int fd;

	g_mutex_lock(mp->lock);
	fd = mp->pipe.fd.read;
	if (fd == -1) {
		pipe(mp->pipe.pipe);
		fd = mp->pipe.fd.read;
	}
	g_mutex_unlock(mp->lock);

	return fd;
}

void e_msgport_put(EMsgPort *mp, EMsg *msg)
{
	m(printf("put:\n"));
	g_mutex_lock(mp->lock);
	e_dlist_addtail(&mp->queue, &msg->ln);
	if (mp->condwait > 0) {
		m(printf("put: condwait > 0, waking up\n"));
		g_cond_signal(mp->cond);
	}
	if (mp->pipe.fd.write != -1) {
		m(printf("put: have pipe, writing notification to it\n"));
		write(mp->pipe.fd.write, "", 1);
	}
	g_mutex_unlock(mp->lock);
	m(printf("put: done\n"));
}

static void
msgport_cleanlock(void *data)
{
	EMsgPort *mp = data;

	g_mutex_unlock(mp->lock);
}

EMsg *e_msgport_wait(EMsgPort *mp)
{
	EMsg *msg;

	m(printf("wait:\n"));
	g_mutex_lock(mp->lock);
	while (e_dlist_empty(&mp->queue)) {
		if (mp->pipe.fd.read == -1) {
			m(printf("wait: waiting on condition\n"));
			mp->condwait++;
			/* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
			pthread_cleanup_push(msgport_cleanlock, mp);
			g_cond_wait(mp->cond, mp->lock);
			pthread_cleanup_pop(0);
			m(printf("wait: got condition\n"));
			mp->condwait--;
		} else {
			fd_set rfds;

			m(printf("wait: waitng on pipe\n"));
			FD_ZERO(&rfds);
			FD_SET(mp->pipe.fd.read, &rfds);
			g_mutex_unlock(mp->lock);
			select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL);
			pthread_testcancel();
			g_mutex_lock(mp->lock);
			m(printf("wait: got pipe\n"));
		}
	}
	msg = (EMsg *)mp->queue.head;
	m(printf("wait: message = %p\n", msg));
	g_mutex_unlock(mp->lock);
	m(printf("wait: done\n"));
	return msg;
}

EMsg *e_msgport_get(EMsgPort *mp)
{
	EMsg *msg;
	char dummy[1];

	g_mutex_lock(mp->lock);
	msg = (EMsg *)e_dlist_remhead(&mp->queue);
	if (msg && mp->pipe.fd.read != -1)
		read(mp->pipe.fd.read, dummy, 1);
	m(printf("get: message = %p\n", msg));
	g_mutex_unlock(mp->lock);

	return msg;
}

void e_msgport_reply(EMsg *msg)
{
	if (msg->reply_port) {
		e_msgport_put(msg->reply_port, msg);
	}
	/* else lost? */
}

struct _thread_info {
	pthread_t id;
	int busy;
};

struct _EThread {
	EMsgPort *server_port;
	EMsgPort *reply_port;
	pthread_mutex_t mutex;
	e_thread_t type;
	int queue_limit;

	int waiting;		/* if we are waiting for a new message, count of waiting processes */
	pthread_t id;		/* id of our running child thread */
	GList *id_list;		/* if THREAD_NEW, then a list of our child threads in thread_info structs */

	EThreadFunc destroy;
	void *destroy_data;

	EThreadFunc received;
	void *received_data;

	EThreadFunc lost;
	void *lost_data;
};

#define E_THREAD_NONE ((pthread_t)~0)

static void thread_destroy_msg(EThread *e, EMsg *m);

static struct _thread_info *thread_find(EThread *e, pthread_t id)
{
	GList *node;
	struct _thread_info *info;

	node = e->id_list;
	while (node) {
		info = node->data;
		if (info->id == id)
			return info;
		node = node->next;
	}
	return NULL;
}

#if 0
static void thread_remove(EThread *e, pthread_t id)
{
	GList *node;
	struct _thread_info *info;

	node = e->id_list;
	while (node) {
		info = node->data;
		if (info->id == id) {
			e->id_list = g_list_remove(e->id_list, info);
			g_free(info);
		}
		node = node->next;
	}
}
#endif

EThread *e_thread_new(e_thread_t type)
{
	EThread *e;

	e = g_malloc0(sizeof(*e));
	pthread_mutex_init(&e->mutex, 0);
	e->type = type;
	e->server_port = e_msgport_new();
	e->id = E_THREAD_NONE;
	e->queue_limit = INT_MAX;

	return e;
}

/* close down the threads & resources etc */
void e_thread_destroy(EThread *e)
{
	int tries = 0;
	int busy = FALSE;
	EMsg *msg;
	struct _thread_info *info;

	/* make sure we soak up all the messages first */
	while ( (msg = e_msgport_get(e->server_port)) ) {
		thread_destroy_msg(e, msg);
	}

	pthread_mutex_lock(&e->mutex);

	switch(e->type) {
	case E_THREAD_QUEUE:
	case E_THREAD_DROP:
		/* if we have a thread, 'kill' it */
		while (e->id != E_THREAD_NONE && tries < 500) {
			if (e->waiting > 0) {
				pthread_t id = e->id;
				e->id = E_THREAD_NONE;
				pthread_mutex_unlock(&e->mutex);
				if (pthread_cancel(id) == 0)
					pthread_join(id, 0);
				pthread_mutex_lock(&e->mutex);
			} else {
				(printf("thread still active, waiting for it to finish\n"));
				pthread_mutex_unlock(&e->mutex);
				sleep(1);
				pthread_mutex_lock(&e->mutex);
			}
			tries++;
		}
		busy = e->id != E_THREAD_NONE;
		break;
	case E_THREAD_NEW:
		while (e->id_list && tries < 500) {
			info = e->id_list->data;
			if (!info->busy) {
				e->id_list = g_list_remove(e->id_list, info);
				printf("cleaning up pool thread %ld\n", info->id);
				pthread_mutex_unlock(&e->mutex);
				if (pthread_cancel(info->id) == 0)
					pthread_join(info->id, 0);
				pthread_mutex_lock(&e->mutex);
				printf("cleaned up ok\n");
				g_free(info);
			} else {
				(printf("thread(s) still active, waiting for it to finish\n"));
				tries++;
				pthread_mutex_unlock(&e->mutex);
				sleep(1);
				pthread_mutex_lock(&e->mutex);
			}
		}
#if 0
		while (g_list_length(e->id_list) && tries < 5) {
			(printf("thread(s) still active, waiting for them to finish\n"));
			pthread_mutex_unlock(&e->mutex);
			sleep(1);
			pthread_mutex_lock(&e->mutex);
		}
#endif
		busy = g_list_length(e->id_list) != 0;
		break;
	}

	pthread_mutex_unlock(&e->mutex);

	/* and clean up, if we can */
	if (busy) {
		g_warning("threads were busy, leaked EThread");
		return;
	}

	e_msgport_destroy(e->server_port);
	g_free(e);
}

/* set the queue maximum depth, what happens when the queue
   fills up depends on the queue type */
void e_thread_set_queue_limit(EThread *e, int limit)
{
	e->queue_limit = limit;
}

/* set a msg destroy callback, this can not call any e_thread functions on @e */
void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
{
	pthread_mutex_lock(&e->mutex);
	e->destroy = destroy;
	e->destroy_data = data;
	pthread_mutex_unlock(&e->mutex);
}

/* set a message lost callback, called if any message is discarded */
void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
{
	pthread_mutex_lock(&e->mutex);
	e->lost = lost;
	e->lost_data = lost;
	pthread_mutex_unlock(&e->mutex);
}

/* set a reply port, if set, then send messages back once finished */
void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
{
	e->reply_port = reply_port;
}

/* set a received data callback */
void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
{
	pthread_mutex_lock(&e->mutex);
	e->received = received;
	e->received_data = data;
	pthread_mutex_unlock(&e->mutex);
}

static void
thread_destroy_msg(EThread *e, EMsg *m)
{
	EThreadFunc func;
	void *func_data;

	/* we do this so we never get an incomplete/unmatched callback + data */
	pthread_mutex_lock(&e->mutex);
	func = e->destroy;
	func_data = e->destroy_data;
	pthread_mutex_unlock(&e->mutex);
	
	if (func)
		func(e, m, func_data);
}

static void
thread_received_msg(EThread *e, EMsg *m)
{
	EThreadFunc func;
	void *func_data;

	/* we do this so we never get an incomplete/unmatched callback + data */
	pthread_mutex_lock(&e->mutex);
	func = e->received;
	func_data = e->received_data;
	pthread_mutex_unlock(&e->mutex);
	
	if (func)
		func(e, m, func_data);
	else
		g_warning("No processing callback for EThread, message unprocessed");
}

static void
thread_lost_msg(EThread *e, EMsg *m)
{
	EThreadFunc func;
	void *func_data;

	/* we do this so we never get an incomplete/unmatched callback + data */
	pthread_mutex_lock(&e->mutex);
	func = e->lost;
	func_data = e->lost_data;
	pthread_mutex_unlock(&e->mutex);
	
	if (func)
		func(e, m, func_data);
}

/* the actual thread dispatcher */
static void *
thread_dispatch(void *din)
{
	EThread *e = din;
	EMsg *m;
	struct _thread_info *info;

	t(printf("dispatch thread started: %ld\n", pthread_self()));

	while (1) {
		pthread_mutex_lock(&e->mutex);
		m = e_msgport_get(e->server_port);
		if (m == NULL) {
			/* nothing to do?  If we are a 'new' type thread, just quit.
			   Otherwise, go into waiting (can be cancelled here) */
			info = NULL;
			switch (e->type) {
			case E_THREAD_NEW:
			case E_THREAD_QUEUE:
			case E_THREAD_DROP:
				info = thread_find(e, pthread_self());
				if (info)
					info->busy = FALSE;
				e->waiting++;
				pthread_mutex_unlock(&e->mutex);
				e_msgport_wait(e->server_port);
				pthread_mutex_lock(&e->mutex);
				e->waiting--;
				pthread_mutex_unlock(&e->mutex);
				break;
#if 0
			case E_THREAD_NEW:
				e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
				pthread_mutex_unlock(&e->mutex);
				return 0;
#endif
			}

			continue;
		} else {
			info = thread_find(e, pthread_self());
			if (info)
				info->busy = TRUE;
		}
		pthread_mutex_unlock(&e->mutex);

		t(printf("got message in dispatch thread\n"));

		/* process it */
		thread_received_msg(e, m);

		/* if we have a reply port, send it back, otherwise, lose it */
		if (m->reply_port) {
			e_msgport_reply(m);
		} else {
			thread_destroy_msg(e, m);
		}
	}

	/* if we run out of things to process we could conceivably 'hang around' for a bit,
	   but to do this we need to use the fd interface of the msgport, and its utility
	   is probably debatable anyway */

#if 0
	/* signify we are no longer running */
	/* This code isn't used yet, but would be if we ever had a 'quit now' message implemented */
	pthread_mutex_lock(&e->mutex);
	switch (e->type) {
	case E_THREAD_QUEUE:
	case E_THREAD_DROP:
		e->id = E_THREAD_NONE;
		break;
	case E_THREAD_NEW:
		e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
		break;
	}
	pthread_mutex_unlock(&e->mutex);
#endif

	return 0;
}

/* send a message to the thread, start thread if necessary */
void e_thread_put(EThread *e, EMsg *msg)
{
	pthread_t id;
	EMsg *dmsg = NULL;

	pthread_mutex_lock(&e->mutex);

	/* the caller forgot to tell us what to do, well, we can't do anything can we */
	if (e->received == NULL) {
		pthread_mutex_unlock(&e->mutex);
		g_warning("EThread called with no receiver function, no work to do!");
		thread_destroy_msg(e, msg);
		return;
	}

	msg->reply_port = e->reply_port;

	switch(e->type) {
	case E_THREAD_QUEUE:
		/* if the queue is full, lose this new addition */
		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
			e_msgport_put(e->server_port, msg);
		} else {
			printf("queue limit reached, dropping new message\n");
			dmsg = msg;
		}
		break;
	case E_THREAD_DROP:
		/* if the queue is full, lose the oldest (unprocessed) message */
		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
			e_msgport_put(e->server_port, msg);
		} else {
			printf("queue limit reached, dropping old message\n");
			e_msgport_put(e->server_port, msg);
			dmsg = e_msgport_get(e->server_port);
		}
		break;
	case E_THREAD_NEW:
		/* it is possible that an existing thread can catch this message, so
		   we might create a thread with no work to do.
		   but that doesn't matter, the other alternative that it be lost is worse */
		e_msgport_put(e->server_port, msg);
		if (e->waiting == 0
		    && g_list_length(e->id_list) < e->queue_limit
		    && pthread_create(&id, NULL, thread_dispatch, e) == 0) {
			struct _thread_info *info = g_malloc0(sizeof(*info));
			t(printf("created NEW thread %ld\n", id));
			info->id = id;
			info->busy = TRUE;
			e->id_list = g_list_append(e->id_list, info);
		}
		pthread_mutex_unlock(&e->mutex);
		return;
	}

	/* create the thread, if there is none to receive it yet */
	if (e->id == E_THREAD_NONE) {
		if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) {
			g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno));
			e->id = E_THREAD_NONE;
		}
	}

	pthread_mutex_unlock(&e->mutex);

	if (dmsg) {
		thread_lost_msg(e, dmsg);
		thread_destroy_msg(e, dmsg);
	}
}

/* yet-another-mutex interface */
struct _EMutex {
	int type;
	pthread_t owner;
	short waiters;
	short depth;
	pthread_mutex_t mutex;
	pthread_cond_t cond;
};

/* sigh, this is just painful to have to need, but recursive
   read/write, etc mutexes just aren't very common in thread
   implementations */
/* TODO: Just make it use recursive mutexes if they are available */
EMutex *e_mutex_new(e_mutex_t type)
{
	struct _EMutex *m;

	m = g_malloc(sizeof(*m));
	m->type = type;
	m->waiters = 0;
	m->depth = 0;
	m->owner = E_THREAD_NONE;

	switch (type) {
	case E_MUTEX_SIMPLE:
		pthread_mutex_init(&m->mutex, 0);
		break;
	case E_MUTEX_REC:
		pthread_mutex_init(&m->mutex, 0);
		pthread_cond_init(&m->cond, 0);
		break;
		/* read / write ?  flags for same? */
	}

	return m;
}

int e_mutex_destroy(EMutex *m)
{
	int ret = 0;

	switch (m->type) {
	case E_MUTEX_SIMPLE:
		ret = pthread_mutex_destroy(&m->mutex);
		if (ret == -1)
			g_warning("EMutex destroy failed: %s", strerror(errno));
		g_free(m);
		break;
	case E_MUTEX_REC:
		ret = pthread_mutex_destroy(&m->mutex);
		if (ret == -1)
			g_warning("EMutex destroy failed: %s", strerror(errno));
		ret = pthread_cond_destroy(&m->cond);
		if (ret == -1)
			g_warning("EMutex destroy failed: %s", strerror(errno));
		g_free(m);

	}
	return ret;
}

int e_mutex_lock(EMutex *m)
{
	pthread_t id;

	switch (m->type) {
	case E_MUTEX_SIMPLE:
		return pthread_mutex_lock(&m->mutex);
	case E_MUTEX_REC:
		id = pthread_self();
		if (pthread_mutex_lock(&m->mutex) == -1)
			return -1;
		while (1) {
			if (m->owner == E_THREAD_NONE) {
				m->owner = id;
				m->depth = 1;
				break;
			} else if (id == m->owner) {
				m->depth++;
				break;
			} else {
				m->waiters++;
				if (pthread_cond_wait(&m->cond, &m->mutex) == -1)
					return -1;
				m->waiters--;
			}
		}
		return pthread_mutex_unlock(&m->mutex);
	}

	errno = EINVAL;
	return -1;
}

int e_mutex_unlock(EMutex *m)
{
	switch (m->type) {
	case E_MUTEX_SIMPLE:
		return pthread_mutex_unlock(&m->mutex);
	case E_MUTEX_REC:
		if (pthread_mutex_lock(&m->mutex) == -1)
			return -1;
		g_assert(m->owner == pthread_self());

		m->depth--;
		if (m->depth == 0) {
			m->owner = E_THREAD_NONE;
			if (m->waiters > 0)
				pthread_cond_signal(&m->cond);
		}
		return pthread_mutex_unlock(&m->mutex);
	}

	errno = EINVAL;
	return -1;
}

void e_mutex_assert_locked(EMutex *m)
{
	g_return_if_fail (m->type == E_MUTEX_REC);
	pthread_mutex_lock(&m->mutex);
	g_assert(m->owner == pthread_self());
	pthread_mutex_unlock(&m->mutex);
}

#ifdef STANDALONE
EMsgPort *server_port;


void *fdserver(void *data)
{
	int fd;
	EMsg *msg;
	int id = (int)data;
	fd_set rfds;

	fd = e_msgport_fd(server_port);

	while (1) {
		int count = 0;

		printf("server %d: waiting on fd %d\n", id, fd);
		FD_ZERO(&rfds);
		FD_SET(fd, &rfds);
		select(fd+1, &rfds, NULL, NULL, NULL);
		printf("server %d: Got async notification, checking for messages\n", id);
		while ((msg = e_msgport_get(server_port))) {
			printf("server %d: got message\n", id);
			sleep(1);
			printf("server %d: replying\n", id);
			e_msgport_reply(msg);
			count++;
		}
		printf("server %d: got %d messages\n", id, count);
	}
}

void *server(void *data)
{
	EMsg *msg;
	int id = (int)data;

	while (1) {
		printf("server %d: waiting\n", id);
		msg = e_msgport_wait(server_port);
		msg = e_msgport_get(server_port);
		if (msg) {
			printf("server %d: got message\n", id);
			sleep(1);
			printf("server %d: replying\n", id);
			e_msgport_reply(msg);
		} else {
			printf("server %d: didn't get message\n", id);
		}
	}
}

void *client(void *data)
{
	EMsg *msg;
	EMsgPort *replyport;
	int i;

	replyport = e_msgport_new();
	msg = g_malloc0(sizeof(*msg));
	msg->reply_port = replyport;
	for (i=0;i<10;i++) {
		/* synchronous operation */
		printf("client: sending\n");
		e_msgport_put(server_port, msg);
		printf("client: waiting for reply\n");
		e_msgport_wait(replyport);
		e_msgport_get(replyport);
		printf("client: got reply\n");
	}

	printf("client: sleeping ...\n");
	sleep(2);
	printf("client: sending multiple\n");

	for (i=0;i<10;i++) {
		msg = g_malloc0(sizeof(*msg));
		msg->reply_port = replyport;
		e_msgport_put(server_port, msg);
	}

	printf("client: receiving multiple\n");
	for (i=0;i<10;i++) {
		e_msgport_wait(replyport);
		msg = e_msgport_get(replyport);
		g_free(msg);
	}

	printf("client: done\n");
}

int main(int argc, char **argv)
{
	pthread_t serverid, clientid;

	g_thread_init(NULL);

	server_port = e_msgport_new();

	/*pthread_create(&serverid, NULL, server, (void *)1);*/
	pthread_create(&serverid, NULL, fdserver, (void *)1);
	pthread_create(&clientid, NULL, client, NULL);

	sleep(60);

	return 0;
}
#endif