diff options
author | LAN-TW <lantw44@gmail.com> | 2013-12-25 22:48:20 +0800 |
---|---|---|
committer | LAN-TW <lantw44@gmail.com> | 2013-12-25 22:48:20 +0800 |
commit | 86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02 (patch) | |
tree | 8eafae4322f29a17094d62c8cce4b6cb91350881 | |
parent | 20cc607c4e10b8b31b4c35b5bcb4984f19ef278b (diff) | |
download | cn2013-86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02.tar.gz cn2013-86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02.tar.zst cn2013-86f9aa41ca1bdebcf858b4efb7d18aca3ba77d02.zip |
HW2: 修正許多 sender 和 receiver 的問題並實作檔案操作
-rw-r--r-- | hw2/trans-loop.c | 85 | ||||
-rw-r--r-- | hw2/ump-app.h | 3 | ||||
-rw-r--r-- | hw2/ump-sched.c | 17 |
3 files changed, 97 insertions, 8 deletions
diff --git a/hw2/trans-loop.c b/hw2/trans-loop.c index d654c59..a70713b 100644 --- a/hw2/trans-loop.c +++ b/hw2/trans-loop.c @@ -2,18 +2,21 @@ # include "config.h" #endif +#include "xwrap.h" #include "l4logger.h" #include "ump-sched.h" #include "trans-loop.h" #include <errno.h> #include <fcntl.h> +#include <inttypes.h> +#include <stdlib.h> #include <string.h> #include <unistd.h> int ump_trans_sender (LbsLogger* lbs_log, char** dest, char** files, char** agents) { - + int i; int files_count; int agents_count; @@ -107,11 +110,91 @@ int ump_trans_receiver (LbsLogger* lbs_log, char** bind_ptr) { lbs_logger_string (lbs_log, "==> New connection accepted"); char buf[sizeof (UmpPkt) * UMP_SCHED_PKT_COUNT]; + char* buf_moved = buf; ssize_t buf_size; + int state = 1; + + uint16_t filename_length; + char* filename = NULL; + char* filename_pos; + int fd; while ((buf_size = ump_sched_receive (sched, buf)) > 0) { lbs_logger_format (lbs_log, "==> Get %d bytes!", buf_size); +re_eval: + switch (state) { + case 1: // filename length + if (buf_size >= 2) { + memcpy (&filename_length, buf, 2); + buf_moved += 2; + buf_size -= 2; + filename = xmalloc (filename_length + 1); + filename_pos = filename; + state++; + lbs_logger_format (lbs_log, + "!! Filename length = %" PRIu16, filename_length); + goto re_eval; + } else { + memcpy (&filename_length, buf_moved, 1); + buf_moved++; + if (buf_moved - buf >= 2) { + buf_size -= 2; + filename = xmalloc (filename_length + 1); + filename_pos = filename; + state++; + lbs_logger_format (lbs_log, + "!! Filename length = %" PRIu16, filename_length); + goto re_eval; + } + } + break; + case 2: // filename + if (filename_pos + buf_size > filename + filename_length) { + int buf_cp = filename + filename_length - filename_pos; + memcpy (filename_pos, buf_moved, buf_cp); + buf_moved += buf_cp; + buf_size -= buf_cp; + filename[filename_length] = '\0'; + state++; + lbs_logger_format (lbs_log, + "!! File name = %s", filename); + for (int i = 0; i < filename_length; i++) { + if (filename[i] == '/') { + filename[i] = '-'; + } + } + lbs_logger_format (lbs_log, + "!! File name = %s (converted)", filename); + fd = open (filename, O_CREAT | O_EXCL | O_WRONLY, 0666); + if (fd < 0) { + lbs_logger_format (lbs_log, + "Cannot create file: %s", strerror (errno)); + } + goto re_eval; + } else { + memcpy (filename_pos, buf_moved, buf_size); + filename_pos += buf_size; + buf_moved = buf; + } + break; + case 3: // data + if (fd >= 0) { + lbs_logger_format (lbs_log, + "!! Write %d bytes to file", buf_size); + write (fd, buf_moved, buf_size); + buf_moved = buf; + } + break; + } + } + + if (filename != NULL) { + free (filename); + } + + if (fd >= 0) { + close (fd); } lbs_logger_string (lbs_log, "==> Connection closed"); diff --git a/hw2/ump-app.h b/hw2/ump-app.h index 158619a..edc72ac 100644 --- a/hw2/ump-app.h +++ b/hw2/ump-app.h @@ -91,7 +91,8 @@ static inline size_t ump_app_get_real_data_len (UmpApp* app_data) { return app_data->real_data_len; } static inline size_t ump_app_get_real_data_max (UmpApp* app_data) { - return (uint8_t*)ump_app_get_real_data (app_data) - (uint8_t*)app_data; + return ((uint8_t*)(app_data->data) + UMP_APP_SIZE - 1) - + (uint8_t*)ump_app_get_real_data (app_data); } static inline void ump_app_set_real_data ( UmpApp* app_data, const void* data, size_t len) { diff --git a/hw2/ump-sched.c b/hw2/ump-sched.c index bee8b32..692ad6e 100644 --- a/hw2/ump-sched.c +++ b/hw2/ump-sched.c @@ -293,6 +293,7 @@ static ssize_t flush_packet (UmpSched* sched, void* buf) { } } + lbs_logger_format (sched->log, "flush %d packets", i - 3); if (i >= UMP_SCHED_PKT_COUNT) { sched->seq_start += i - 1; memset (&sched->pkt_valid, 0, sizeof (sched->pkt_valid)); @@ -314,6 +315,7 @@ ssize_t ump_sched_receive (UmpSched* sched, void* buf) { return 0; } + memset (sched->pkt_valid, 0, sizeof (sched->pkt_valid)); uint32_t ack_rel = 0; while (true) { UmpPkt pkt_struct; @@ -361,7 +363,7 @@ ssize_t ump_sched_receive (UmpSched* sched, void* buf) { lbs_logger_format (sched->log, "receive %s packet #%" PRIu32, ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt)); if (!no_copy && !is_fin) { - memcpy (&sched->pkt[seq_rel], pkt, UMP_PKT_SIZE); + memcpy (&(sched->pkt[seq_rel]), pkt, sizeof (UmpPkt)); sched->pkt_valid[seq_rel] = true; } @@ -416,8 +418,9 @@ ssize_t ump_sched_send (UmpSched* sched, void* buf, size_t count) { sched->conn[selected].sent++; sched->conn[selected].score++; sched->pkt_map[i] = selected; - lbs_logger_format (sched->log, "send %s packet #%" PRIu32, - ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt)); + lbs_logger_format (sched->log, "send %s packet #%" PRIu32 + " (agent[%d])", + ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt), selected); sendto (sched->conn_fd, pkt, ump_pkt_get_data_offset (pkt) + pkt->app_data_len, 0, SOCKADDR (&(sched->conn[selected].addr)), @@ -432,7 +435,7 @@ ssize_t ump_sched_send (UmpSched* sched, void* buf, size_t count) { j, sched->conn[j].sent, sched->conn[j].score); } - xalarm (1, 500000); + xalarm (0, 500000); UmpPkt pkt_new_struct; UmpPkt* pkt_new = &pkt_new_struct; @@ -447,8 +450,10 @@ ssize_t ump_sched_send (UmpSched* sched, void* buf, size_t count) { int selected = find_high_score (sched); UmpPkt* pkt = &(sched->pkt[i]); - lbs_logger_format (sched->log, "REsend %s packet #%" PRIu32, - ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt)); + lbs_logger_format (sched->log, "REsend %s packet #%" PRIu32 + " (agent[%d])", + ump_pkt_show_message (pkt), ump_pkt_get_seq_num (pkt), + selected); sendto (sched->conn_fd, pkt, ump_pkt_get_data_offset (pkt) + pkt->app_data_len, 0, SOCKADDR (&(sched->conn[selected].addr)), |