aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/connection.go')
-rw-r--r--p2p/connection.go275
1 files changed, 275 insertions, 0 deletions
diff --git a/p2p/connection.go b/p2p/connection.go
new file mode 100644
index 000000000..e999cbe55
--- /dev/null
+++ b/p2p/connection.go
@@ -0,0 +1,275 @@
+package p2p
+
+import (
+ "bytes"
+ // "fmt"
+ "net"
+ "time"
+
+ "github.com/ethereum/eth-go/ethutil"
+)
+
+type Connection struct {
+ conn net.Conn
+ // conn NetworkConnection
+ timeout time.Duration
+ in chan []byte
+ out chan []byte
+ err chan *PeerError
+ closingIn chan chan bool
+ closingOut chan chan bool
+}
+
+// const readBufferLength = 2 //for testing
+
+const readBufferLength = 1440
+const partialsQueueSize = 10
+const maxPendingQueueSize = 1
+const defaultTimeout = 500
+
+var magicToken = []byte{34, 64, 8, 145}
+
+func (self *Connection) Open() {
+ go self.startRead()
+ go self.startWrite()
+}
+
+func (self *Connection) Close() {
+ self.closeIn()
+ self.closeOut()
+}
+
+func (self *Connection) closeIn() {
+ errc := make(chan bool)
+ self.closingIn <- errc
+ <-errc
+}
+
+func (self *Connection) closeOut() {
+ errc := make(chan bool)
+ self.closingOut <- errc
+ <-errc
+}
+
+func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
+ return &Connection{
+ conn: conn,
+ timeout: defaultTimeout,
+ in: make(chan []byte),
+ out: make(chan []byte),
+ err: errchan,
+ closingIn: make(chan chan bool, 1),
+ closingOut: make(chan chan bool, 1),
+ }
+}
+
+func (self *Connection) Read() <-chan []byte {
+ return self.in
+}
+
+func (self *Connection) Write() chan<- []byte {
+ return self.out
+}
+
+func (self *Connection) Error() <-chan *PeerError {
+ return self.err
+}
+
+func (self *Connection) startRead() {
+ payloads := make(chan []byte)
+ done := make(chan *PeerError)
+ pending := [][]byte{}
+ var head []byte
+ var wait time.Duration // initally 0 (no delay)
+ read := time.After(wait * time.Millisecond)
+
+ for {
+ // if pending empty, nil channel blocks
+ var in chan []byte
+ if len(pending) > 0 {
+ in = self.in // enable send case
+ head = pending[0]
+ } else {
+ in = nil
+ }
+
+ select {
+ case <-read:
+ go self.read(payloads, done)
+ case err := <-done:
+ if err == nil { // no error but nothing to read
+ if len(pending) < maxPendingQueueSize {
+ wait = 100
+ } else if wait == 0 {
+ wait = 100
+ } else {
+ wait = 2 * wait
+ }
+ } else {
+ self.err <- err // report error
+ wait = 100
+ }
+ read = time.After(wait * time.Millisecond)
+ case payload := <-payloads:
+ pending = append(pending, payload)
+ if len(pending) < maxPendingQueueSize {
+ wait = 0
+ } else {
+ wait = 100
+ }
+ read = time.After(wait * time.Millisecond)
+ case in <- head:
+ pending = pending[1:]
+ case errc := <-self.closingIn:
+ errc <- true
+ close(self.in)
+ return
+ }
+
+ }
+}
+
+func (self *Connection) startWrite() {
+ pending := [][]byte{}
+ done := make(chan *PeerError)
+ writing := false
+ for {
+ if len(pending) > 0 && !writing {
+ writing = true
+ go self.write(pending[0], done)
+ }
+ select {
+ case payload := <-self.out:
+ pending = append(pending, payload)
+ case err := <-done:
+ if err == nil {
+ pending = pending[1:]
+ writing = false
+ } else {
+ self.err <- err // report error
+ }
+ case errc := <-self.closingOut:
+ errc <- true
+ close(self.out)
+ return
+ }
+ }
+}
+
+func pack(payload []byte) (packet []byte) {
+ length := ethutil.NumberToBytes(uint32(len(payload)), 32)
+ // return error if too long?
+ // Write magic token and payload length (first 8 bytes)
+ packet = append(magicToken, length...)
+ packet = append(packet, payload...)
+ return
+}
+
+func avoidPanic(done chan *PeerError) {
+ if rec := recover(); rec != nil {
+ err := NewPeerError(MiscError, " %v", rec)
+ logger.Debugln(err)
+ done <- err
+ }
+}
+
+func (self *Connection) write(payload []byte, done chan *PeerError) {
+ defer avoidPanic(done)
+ var err *PeerError
+ _, ok := self.conn.Write(pack(payload))
+ if ok != nil {
+ err = NewPeerError(WriteError, " %v", ok)
+ logger.Debugln(err)
+ }
+ done <- err
+}
+
+func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
+ //defer avoidPanic(done)
+
+ partials := make(chan []byte, partialsQueueSize)
+ errc := make(chan *PeerError)
+ go self.readPartials(partials, errc)
+
+ packet := []byte{}
+ length := 8
+ start := true
+ var err *PeerError
+out:
+ for {
+ // appends partials read via connection until packet is
+ // - either parseable (>=8bytes)
+ // - or complete (payload fully consumed)
+ for len(packet) < length {
+ partial, ok := <-partials
+ if !ok { // partials channel is closed
+ err = <-errc
+ if err == nil && len(packet) > 0 {
+ if start {
+ err = NewPeerError(PacketTooShort, "%v", packet)
+ } else {
+ err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
+ }
+ }
+ break out
+ }
+ packet = append(packet, partial...)
+ }
+ if start {
+ // at least 8 bytes read, can validate packet
+ if bytes.Compare(magicToken, packet[:4]) != 0 {
+ err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
+ break
+ }
+ length = int(ethutil.BytesToNumber(packet[4:8]))
+ packet = packet[8:]
+
+ if length > 0 {
+ start = false // now consuming payload
+ } else { //penalize peer but read on
+ self.err <- NewPeerError(EmptyPayload, "")
+ length = 8
+ }
+ } else {
+ // packet complete (payload fully consumed)
+ payloads <- packet[:length]
+ packet = packet[length:] // resclice packet
+ start = true
+ length = 8
+ }
+ }
+
+ // this stops partials read via the connection, should we?
+ //if err != nil {
+ // select {
+ // case errc <- err
+ // default:
+ //}
+ done <- err
+}
+
+func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
+ defer close(partials)
+ for {
+ // Give buffering some time
+ self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
+ buffer := make([]byte, readBufferLength)
+ // read partial from connection
+ bytesRead, err := self.conn.Read(buffer)
+ if err == nil || err.Error() == "EOF" {
+ if bytesRead > 0 {
+ partials <- buffer[:bytesRead]
+ }
+ if err != nil && err.Error() == "EOF" {
+ break
+ }
+ } else {
+ // unexpected error, report to errc
+ err := NewPeerError(ReadError, " %v", err)
+ logger.Debugln(err)
+ errc <- err
+ return // will close partials channel
+ }
+ }
+ close(errc)
+}