summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLAN-TW <lantw44@gmail.com>2013-12-25 22:48:20 +0800
committerLAN-TW <lantw44@gmail.com>2013-12-25 22:48:20 +0800
commit86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02 (patch)
tree8eafae4322f29a17094d62c8cce4b6cb91350881
parent20cc607c4e10b8b31b4c35b5bcb4984f19ef278b (diff)
downloadcn2013-86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02.tar.gz
cn2013-86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02.tar.zst
cn2013-86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02.zip
HW2: 修正許多 sender 和 receiver 的問題並實作檔案操作
-rw-r--r--hw2/trans-loop.c85
-rw-r--r--hw2/ump-app.h3
-rw-r--r--hw2/ump-sched.c17
3 files changed, 97 insertions, 8 deletions
diff --git a/hw2/trans-loop.c b/hw2/trans-loop.c
index d654c59..a70713b 100644
--- a/hw2/trans-loop.c
+++ b/hw2/trans-loop.c
@@ -2,18 +2,21 @@
# include "config.h"
#endif
+#include "xwrap.h"
#include "l4logger.h"
#include "ump-sched.h"
#include "trans-loop.h"
#include <errno.h>
#include <fcntl.h>
+#include <inttypes.h>
+#include <stdlib.h>
#include <string.h>
#include <unistd.h>
int ump_trans_sender (LbsLogger* lbs_log,
char** dest, char** files, char** agents) {
-
+
int i;
int files_count;
int agents_count;
@@ -107,11 +110,91 @@ int ump_trans_receiver (LbsLogger* lbs_log, char** bind_ptr) {
lbs_logger_string (lbs_log, "==> New connection accepted");
char buf[sizeof (UmpPkt) * UMP_SCHED_PKT_COUNT];
+ char* buf_moved = buf;
ssize_t buf_size;
+ int state = 1;
+
+ uint16_t filename_length;
+ char* filename = NULL;
+ char* filename_pos;
+ int fd;
while ((buf_size = ump_sched_receive (sched, buf)) > 0) {
lbs_logger_format (lbs_log, "==> Get %d bytes!", buf_size);
+re_eval:
+ switch (state) {
+ case 1: // filename length
+ if (buf_size >= 2) {
+ memcpy (&filename_length, buf, 2);
+ buf_moved += 2;
+ buf_size -= 2;
+ filename = xmalloc (filename_length + 1);
+ filename_pos = filename;
+ state++;
+ lbs_logger_format (lbs_log,
+ "!! Filename length = %" PRIu16, filename_length);
+ goto re_eval;
+ } else {
+ memcpy (&filename_length, buf_moved, 1);
+ buf_moved++;
+ if (buf_moved - buf >= 2) {
+ buf_size -= 2;
+ filename = xmalloc (filename_length + 1);
+ filename_pos = filename;
+ state++;
+ lbs_logger_format (lbs_log,
+ "!! Filename length = %" PRIu16, filename_length);
+ goto re_eval;
+ }
+ }
+ break;
+ case 2: // filename
+ if (filename_pos + buf_size > filename + filename_length) {
+ int buf_cp = filename + filename_length - filename_pos;
+ memcpy (filename_pos, buf_moved, buf_cp);
+ buf_moved += buf_cp;
+ buf_size -= buf_cp;
+ filename[filename_length] = '\0';
+ state++;
+ lbs_logger_format (lbs_log,
+ "!! File name = %s", filename);
+ for (int i = 0; i < filename_length; i++) {
+ if (filename[i] == '/') {
+ filename[i] = '-';
+ }
+ }
+ lbs_logger_format (lbs_log,
+ "!! File name = %s (converted)", filename);
+ fd = open (filename, O_CREAT | O_EXCL | O_WRONLY, 0666);
+ if (fd < 0) {
+ lbs_logger_format (lbs_log,
+ "Cannot create file: %s", strerror (errno));
+ }
+ goto re_eval;
+ } else {
+ memcpy (filename_pos, buf_moved, buf_size);
+ filename_pos += buf_size;
+ buf_moved = buf;
+ }
+ break;
+ case 3: // data
+ if (fd >= 0) {
+ lbs_logger_format (lbs_log,
+ "!! Write %d bytes to file", buf_size);
+ write (fd, buf_moved, buf_size);
+ buf_moved = buf;
+ }
+ break;
+ }
+ }
+
+ if (filename != NULL) {
+ free (filename);
+ }
+
+ if (fd >= 0) {
+ close (fd);
}
lbs_logger_string (lbs_log, "==> Connection closed");
diff --git a/hw2/ump-app.h b/hw2/ump-app.h
index 158619a..edc72ac 100644
--- a/hw2/ump-app.h
+++ b/hw2/ump-app.h
@@ -91,7 +91,8 @@ static inline size_t ump_app_get_real_data_len (UmpApp* app_data) {
return app_data->real_data_len;
}
static inline size_t ump_app_get_real_data_max (UmpApp* app_data) {
- return (uint8_t*)ump_app_get_real_data (app_data) - (uint8_t*)app_data;
+ return ((uint8_t*)(app_data->data) + UMP_APP_SIZE - 1) -
+ (uint8_t*)ump_app_get_real_data (app_data);
}
static inline void ump_app_set_real_data (
UmpApp* app_data, const void* data, size_t len) {
diff --git a/hw2/ump-sched.c b/hw2/ump-sched.c
index bee8b32..692ad6e 100644
--- a/hw2/ump-sched.c
+++ b/hw2/ump-sched.c
@@ -293,6 +293,7 @@ static ssize_t flush_packet (UmpSched* sched, void* buf) {
}
}
+ lbs_logger_format (sched->log, "flush %d packets", i - 3);
if (i >= UMP_SCHED_PKT_COUNT) {
sched->seq_start += i - 1;
memset (&sched->pkt_valid, 0, sizeof (sched->pkt_valid));
@@ -314,6 +315,7 @@ ssize_t ump_sched_receive (UmpSched* sched, void* buf) {
return 0;
}
+ memset (sched->pkt_valid, 0, sizeof (sched->pkt_valid));
uint32_t ack_rel = 0;
while (true) {
UmpPkt pkt_struct;
@@ -361,7 +363,7 @@ ssize_t ump_sched_receive (UmpSched* sched, void* buf) {
lbs_logger_format (sched->log, "receive %s packet #%" PRIu32,
ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt));
if (!no_copy && !is_fin) {
- memcpy (&sched->pkt[seq_rel], pkt, UMP_PKT_SIZE);
+ memcpy (&(sched->pkt[seq_rel]), pkt, sizeof (UmpPkt));
sched->pkt_valid[seq_rel] = true;
}
@@ -416,8 +418,9 @@ ssize_t ump_sched_send (UmpSched* sched, void* buf, size_t count) {
sched->conn[selected].sent++;
sched->conn[selected].score++;
sched->pkt_map[i] = selected;
- lbs_logger_format (sched->log, "send %s packet #%" PRIu32,
- ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt));
+ lbs_logger_format (sched->log, "send %s packet #%" PRIu32
+ " (agent[%d])",
+ ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt), selected);
sendto (sched->conn_fd, pkt,
ump_pkt_get_data_offset (pkt) + pkt->app_data_len, 0,
SOCKADDR (&(sched->conn[selected].addr)),
@@ -432,7 +435,7 @@ ssize_t ump_sched_send (UmpSched* sched, void* buf, size_t count) {
j, sched->conn[j].sent, sched->conn[j].score);
}
- xalarm (1, 500000);
+ xalarm (0, 500000);
UmpPkt pkt_new_struct;
UmpPkt* pkt_new = &pkt_new_struct;
@@ -447,8 +450,10 @@ ssize_t ump_sched_send (UmpSched* sched, void* buf, size_t count) {
int selected = find_high_score (sched);
UmpPkt* pkt = &(sched->pkt[i]);
- lbs_logger_format (sched->log, "REsend %s packet #%" PRIu32,
- ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt));
+ lbs_logger_format (sched->log, "REsend %s packet #%" PRIu32
+ " (agent[%d])",
+ ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt),
+ selected);
sendto (sched->conn_fd, pkt,
ump_pkt_get_data_offset (pkt) + pkt->app_data_len, 0,
SOCKADDR (&(sched->conn[selected].addr)),