diff options
Diffstat (limited to 'p2p/connection.go')
-rw-r--r-- | p2p/connection.go | 275 |
1 files changed, 0 insertions, 275 deletions
diff --git a/p2p/connection.go b/p2p/connection.go deleted file mode 100644 index be366235d..000000000 --- a/p2p/connection.go +++ /dev/null @@ -1,275 +0,0 @@ -package p2p - -import ( - "bytes" - // "fmt" - "net" - "time" - - "github.com/ethereum/go-ethereum/ethutil" -) - -type Connection struct { - conn net.Conn - // conn NetworkConnection - timeout time.Duration - in chan []byte - out chan []byte - err chan *PeerError - closingIn chan chan bool - closingOut chan chan bool -} - -// const readBufferLength = 2 //for testing - -const readBufferLength = 1440 -const partialsQueueSize = 10 -const maxPendingQueueSize = 1 -const defaultTimeout = 500 - -var magicToken = []byte{34, 64, 8, 145} - -func (self *Connection) Open() { - go self.startRead() - go self.startWrite() -} - -func (self *Connection) Close() { - self.closeIn() - self.closeOut() -} - -func (self *Connection) closeIn() { - errc := make(chan bool) - self.closingIn <- errc - <-errc -} - -func (self *Connection) closeOut() { - errc := make(chan bool) - self.closingOut <- errc - <-errc -} - -func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection { - return &Connection{ - conn: conn, - timeout: defaultTimeout, - in: make(chan []byte), - out: make(chan []byte), - err: errchan, - closingIn: make(chan chan bool, 1), - closingOut: make(chan chan bool, 1), - } -} - -func (self *Connection) Read() <-chan []byte { - return self.in -} - -func (self *Connection) Write() chan<- []byte { - return self.out -} - -func (self *Connection) Error() <-chan *PeerError { - return self.err -} - -func (self *Connection) startRead() { - payloads := make(chan []byte) - done := make(chan *PeerError) - pending := [][]byte{} - var head []byte - var wait time.Duration // initally 0 (no delay) - read := time.After(wait * time.Millisecond) - - for { - // if pending empty, nil channel blocks - var in chan []byte - if len(pending) > 0 { - in = self.in // enable send case - head = pending[0] - } else { - in = nil - } - - select { - case <-read: - go self.read(payloads, done) - case err := <-done: - if err == nil { // no error but nothing to read - if len(pending) < maxPendingQueueSize { - wait = 100 - } else if wait == 0 { - wait = 100 - } else { - wait = 2 * wait - } - } else { - self.err <- err // report error - wait = 100 - } - read = time.After(wait * time.Millisecond) - case payload := <-payloads: - pending = append(pending, payload) - if len(pending) < maxPendingQueueSize { - wait = 0 - } else { - wait = 100 - } - read = time.After(wait * time.Millisecond) - case in <- head: - pending = pending[1:] - case errc := <-self.closingIn: - errc <- true - close(self.in) - return - } - - } -} - -func (self *Connection) startWrite() { - pending := [][]byte{} - done := make(chan *PeerError) - writing := false - for { - if len(pending) > 0 && !writing { - writing = true - go self.write(pending[0], done) - } - select { - case payload := <-self.out: - pending = append(pending, payload) - case err := <-done: - if err == nil { - pending = pending[1:] - writing = false - } else { - self.err <- err // report error - } - case errc := <-self.closingOut: - errc <- true - close(self.out) - return - } - } -} - -func pack(payload []byte) (packet []byte) { - length := ethutil.NumberToBytes(uint32(len(payload)), 32) - // return error if too long? - // Write magic token and payload length (first 8 bytes) - packet = append(magicToken, length...) - packet = append(packet, payload...) - return -} - -func avoidPanic(done chan *PeerError) { - if rec := recover(); rec != nil { - err := NewPeerError(MiscError, " %v", rec) - logger.Debugln(err) - done <- err - } -} - -func (self *Connection) write(payload []byte, done chan *PeerError) { - defer avoidPanic(done) - var err *PeerError - _, ok := self.conn.Write(pack(payload)) - if ok != nil { - err = NewPeerError(WriteError, " %v", ok) - logger.Debugln(err) - } - done <- err -} - -func (self *Connection) read(payloads chan []byte, done chan *PeerError) { - //defer avoidPanic(done) - - partials := make(chan []byte, partialsQueueSize) - errc := make(chan *PeerError) - go self.readPartials(partials, errc) - - packet := []byte{} - length := 8 - start := true - var err *PeerError -out: - for { - // appends partials read via connection until packet is - // - either parseable (>=8bytes) - // - or complete (payload fully consumed) - for len(packet) < length { - partial, ok := <-partials - if !ok { // partials channel is closed - err = <-errc - if err == nil && len(packet) > 0 { - if start { - err = NewPeerError(PacketTooShort, "%v", packet) - } else { - err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length) - } - } - break out - } - packet = append(packet, partial...) - } - if start { - // at least 8 bytes read, can validate packet - if bytes.Compare(magicToken, packet[:4]) != 0 { - err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4]) - break - } - length = int(ethutil.BytesToNumber(packet[4:8])) - packet = packet[8:] - - if length > 0 { - start = false // now consuming payload - } else { //penalize peer but read on - self.err <- NewPeerError(EmptyPayload, "") - length = 8 - } - } else { - // packet complete (payload fully consumed) - payloads <- packet[:length] - packet = packet[length:] // resclice packet - start = true - length = 8 - } - } - - // this stops partials read via the connection, should we? - //if err != nil { - // select { - // case errc <- err - // default: - //} - done <- err -} - -func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) { - defer close(partials) - for { - // Give buffering some time - self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond)) - buffer := make([]byte, readBufferLength) - // read partial from connection - bytesRead, err := self.conn.Read(buffer) - if err == nil || err.Error() == "EOF" { - if bytesRead > 0 { - partials <- buffer[:bytesRead] - } - if err != nil && err.Error() == "EOF" { - break - } - } else { - // unexpected error, report to errc - err := NewPeerError(ReadError, " %v", err) - logger.Debugln(err) - errc <- err - return // will close partials channel - } - } - close(errc) -} |