diff options
author | obscuren <geffobscura@gmail.com> | 2014-01-13 06:46:03 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-01-13 06:46:03 +0800 |
commit | 7ade1778fba0fd1f6e0bccc7647cd8fb3185528d (patch) | |
tree | 6d1ecb63d854403d6fc3bec56266f47f87fe485e | |
parent | 52fb3b412cde2c5afb0e3364a1da23f3c1d7b171 (diff) | |
download | go-tangerine-7ade1778fba0fd1f6e0bccc7647cd8fb3185528d.tar.gz go-tangerine-7ade1778fba0fd1f6e0bccc7647cd8fb3185528d.tar.zst go-tangerine-7ade1778fba0fd1f6e0bccc7647cd8fb3185528d.zip |
Peer reaping and fake network
-rw-r--r-- | ethereum.go | 8 | ||||
-rw-r--r-- | peer.go | 4 | ||||
-rw-r--r-- | server.go | 46 |
3 files changed, 45 insertions, 13 deletions
diff --git a/ethereum.go b/ethereum.go index f38de1872..055c1a5ad 100644 --- a/ethereum.go +++ b/ethereum.go @@ -72,14 +72,6 @@ func main() { server.Start() - err = server.ConnectToPeer("localhost:12345") - if err != nil { - log.Println("Error starting server", err) - - server.Stop() - - return - } // Wait for shutdown server.WaitForShutdown() @@ -28,7 +28,11 @@ type Peer struct { // Flag for checking the peer's connectivity state connected int32 disconnect int32 + // Last known message send lastSend time.Time + // Indicated whether a verack has been send or not + // This flag is used by writeMessage to check if messages are allowed + // to be send or not. If no version is known all messages are ignored. versionKnown bool } @@ -8,13 +8,14 @@ import ( "log" "net" "time" + "sync/atomic" ) -func eachPeer(peers *list.List, callback func(*Peer)) { +func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { // Loop thru the peers and close them (if we had them) for e := peers.Front(); e != nil; e = e.Next() { if peer, ok := e.Value.(*Peer); ok { - callback(peer) + callback(peer, e) } } } @@ -75,19 +76,54 @@ func (s *Server) ConnectToPeer(addr string) error { } func (s *Server) Broadcast(msgType string, data []byte) { - eachPeer(s.peers, func(p *Peer) { + eachPeer(s.peers, func(p *Peer, e *list.Element) { p.QueueMessage(ethwire.NewMessage(msgType, 0, data)) }) } +const ( + processReapingTimeout = 10 // TODO increase +) + +func (s *Server) ReapDeadPeers() { + for { + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if atomic.LoadInt32(&p.disconnect) == 1 { + log.Println("Dead peer found .. reaping") + + s.peers.Remove(e) + } + }) + + time.Sleep(processReapingTimeout * time.Second) + } +} + // Start the server func (s *Server) Start() { // For now this function just blocks the main thread ln, err := net.Listen("tcp", ":12345") if err != nil { - log.Fatal(err) + // This is mainly for testing to create a "network" + if Debug { + log.Println("Connection listening disabled. Acting as client") + + err = s.ConnectToPeer("localhost:12345") + if err != nil { + log.Println("Error starting server", err) + + s.Stop() + } + + return + } else { + log.Fatal(err) + } } + // Start the reaping processes + go s.ReapDeadPeers() + go func() { for { conn, err := ln.Accept() @@ -117,7 +153,7 @@ func (s *Server) Stop() { // Close the database defer s.db.Close() - eachPeer(s.peers, func(p *Peer) { + eachPeer(s.peers, func(p *Peer, e *list.Element) { p.Stop() }) |