diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-02-22 20:10:07 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2017-02-23 18:16:44 +0800 |
commit | d4fd06c3dc6cd6d2dbd2bfebfee5bcb46a504851 (patch) | |
tree | 17c93170551d3eeabe2935de1765f157007f0dc2 /swarm/network | |
parent | 47af53f9aaf9aa7b12cd976eb150ccf3d64da6fd (diff) | |
download | dexon-d4fd06c3dc6cd6d2dbd2bfebfee5bcb46a504851.tar.gz dexon-d4fd06c3dc6cd6d2dbd2bfebfee5bcb46a504851.tar.zst dexon-d4fd06c3dc6cd6d2dbd2bfebfee5bcb46a504851.zip |
all: blidly swap out glog to our log15, logs need rework
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/depo.go | 32 | ||||
-rw-r--r-- | swarm/network/forwarding.go | 18 | ||||
-rw-r--r-- | swarm/network/hive.go | 41 | ||||
-rw-r--r-- | swarm/network/kademlia/kaddb.go | 27 | ||||
-rw-r--r-- | swarm/network/kademlia/kademlia.go | 27 | ||||
-rw-r--r-- | swarm/network/protocol.go | 43 | ||||
-rw-r--r-- | swarm/network/syncdb.go | 41 | ||||
-rw-r--r-- | swarm/network/syncdb_test.go | 11 | ||||
-rw-r--r-- | swarm/network/syncer.go | 69 |
9 files changed, 151 insertions, 158 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go index cd0a17ffa..340128aa8 100644 --- a/swarm/network/depo.go +++ b/swarm/network/depo.go @@ -19,10 +19,10 @@ package network import ( "bytes" "encoding/binary" + "fmt" "time" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -60,8 +60,8 @@ func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error missing = append(missing, req) } } - glog.V(logger.Debug).Infof("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State) - glog.V(logger.Detail).Infof("Depo.HandleUnsyncedKeysMsg: received %v", unsynced) + log.Debug(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State)) + log.Trace(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v", unsynced)) // send delivery request with missing keys err = p.deliveryRequest(missing) if err != nil { @@ -81,7 +81,7 @@ func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error { deliver := req.Deliver // queue the actual delivery of a chunk () - glog.V(logger.Detail).Infof("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver) + log.Trace(fmt.Sprintf("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver)) for _, sreq := range deliver { // TODO: look up in cache here or in deliveries // priorities are taken from the message so the remote party can @@ -104,19 +104,19 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { chunk, err := self.localStore.Get(req.Key) switch { case err != nil: - glog.V(logger.Detail).Infof("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key) + log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key)) // not found in memory cache, ie., a genuine store request // create chunk chunk = storage.NewChunk(req.Key, nil) case chunk.SData == nil: // found chunk in memory store, needs the data, validate now - glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req) + log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v. request entry found", req)) default: // data is found, store request ignored // this should update access count? - glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req) + log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req)) islocal = true //return } @@ -126,7 +126,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { if !bytes.Equal(hasher.Sum(nil), req.Key) { // data does not validate, ignore // TODO: peer should be penalised/dropped? - glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req) + log.Warn(fmt.Sprintf("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)) return } @@ -136,7 +136,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { // update chunk with size and data chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data) chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8])) - glog.V(logger.Detail).Infof("delivery of %v from %v", chunk, p) + log.Trace(fmt.Sprintf("delivery of %v from %v", chunk, p)) chunk.Source = p self.netStore.Put(chunk) } @@ -152,7 +152,7 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) err = p.swap.Add(1) } if err != nil { - glog.V(logger.Warn).Infof("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err) + log.Warn(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err)) return } @@ -163,7 +163,7 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) req = self.strategyUpdateRequest(chunk.Req, req) // check if we can immediately deliver if chunk.SData != nil { - glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log()) + log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log())) if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size { sreq := &storeRequestMsgData{ @@ -174,16 +174,16 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) } p.syncer.addRequest(sreq, DeliverReq) } else { - glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()) + log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log())) } } else { - glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()) + log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log())) } } // add peer request the chunk and decides the timeout for the response if still searching func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) { - glog.V(logger.Detail).Infof("Depo.strategyUpdateRequest: key %v", origReq.Key.Log()) + log.Trace(fmt.Sprintf("Depo.strategyUpdateRequest: key %v", origReq.Key.Log())) // we do not create an alternative one req = origReq if rs != nil { @@ -211,7 +211,7 @@ only add if less than requesterCount peers forwarded the same request id so far note this is done irrespective of status (searching or found) */ func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) { - glog.V(logger.Detail).Infof("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.from, req.Id) + log.Trace(fmt.Sprintf("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.from, req.Id)) list := rs.Requesters[req.Id] rs.Requesters[req.Id] = append(list, req) } diff --git a/swarm/network/forwarding.go b/swarm/network/forwarding.go index fef79c70b..88a82a678 100644 --- a/swarm/network/forwarding.go +++ b/swarm/network/forwarding.go @@ -17,11 +17,11 @@ package network import ( + "fmt" "math/rand" "time" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -56,10 +56,10 @@ var searchTimeout = 3 * time.Second // logic propagating retrieve requests to peers given by the kademlia hive func (self *forwarder) Retrieve(chunk *storage.Chunk) { peers := self.hive.getPeers(chunk.Key, 0) - glog.V(logger.Detail).Infof("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers)) + log.Trace(fmt.Sprintf("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers))) OUT: for _, p := range peers { - glog.V(logger.Detail).Infof("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p) + log.Trace(fmt.Sprintf("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p)) for _, recipients := range chunk.Req.Requesters { for _, recipient := range recipients { req := recipient.(*retrieveRequestMsgData) @@ -80,7 +80,7 @@ OUT: p.retrieve(req) break OUT } - glog.V(logger.Warn).Infof("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err) + log.Warn(fmt.Sprintf("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err)) } } @@ -98,14 +98,14 @@ func (self *forwarder) Store(chunk *storage.Chunk) { source = chunk.Source.(*peer) } for _, p := range self.hive.getPeers(chunk.Key, 0) { - glog.V(logger.Detail).Infof("forwarder.Store: %v %v", p, chunk) + log.Trace(fmt.Sprintf("forwarder.Store: %v %v", p, chunk)) if p.syncer != nil && (source == nil || p.Addr() != source.Addr()) { n++ Deliver(p, msg, PropagateReq) } } - glog.V(logger.Detail).Infof("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk) + log.Trace(fmt.Sprintf("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk)) } // once a chunk is found deliver it to its requesters unless timed out @@ -123,7 +123,7 @@ func (self *forwarder) Deliver(chunk *storage.Chunk) { for id, r := range requesters { req = r.(*retrieveRequestMsgData) if req.timeout == nil || req.timeout.After(time.Now()) { - glog.V(logger.Detail).Infof("forwarder.Deliver: %v -> %v", req.Id, req.from) + log.Trace(fmt.Sprintf("forwarder.Deliver: %v -> %v", req.Id, req.from)) msg.Id = uint64(id) Deliver(req.from, msg, DeliverReq) n++ @@ -133,7 +133,7 @@ func (self *forwarder) Deliver(chunk *storage.Chunk) { } } } - glog.V(logger.Detail).Infof("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n) + log.Trace(fmt.Sprintf("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n)) } } diff --git a/swarm/network/hive.go b/swarm/network/hive.go index f81761b97..70652c450 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -23,8 +23,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/swarm/network/kademlia" @@ -129,7 +128,7 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee self.listenAddr = listenAddr err = self.kad.Load(self.path, nil) if err != nil { - glog.V(logger.Warn).Infof("Warning: error reading kaddb '%s' (skipping): %v", self.path, err) + log.Warn(fmt.Sprintf("Warning: error reading kaddb '%s' (skipping): %v", self.path, err)) err = nil } // this loop is doing bootstrapping and maintains a healthy table @@ -145,7 +144,7 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee node, need, proxLimit := self.kad.Suggest() if node != nil && len(node.Url) > 0 { - glog.V(logger.Detail).Infof("call known bee %v", node.Url) + log.Trace(fmt.Sprintf("call known bee %v", node.Url)) // enode or any lower level connection address is unnecessary in future // discovery table is used to look it up. connectPeer(node.Url) @@ -159,21 +158,21 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee req := &retrieveRequestMsgData{ Key: storage.Key(randAddr[:]), } - glog.V(logger.Detail).Infof("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]) + log.Trace(fmt.Sprintf("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0])) peers[0].(*peer).retrieve(req) } else { - glog.V(logger.Warn).Infof("no peer") + log.Warn(fmt.Sprintf("no peer")) } - glog.V(logger.Detail).Infof("buzz kept alive") + log.Trace(fmt.Sprintf("buzz kept alive")) } else { - glog.V(logger.Info).Infof("no need for more bees") + log.Info(fmt.Sprintf("no need for more bees")) } select { case self.toggle <- need: case <-self.quit: return } - glog.V(logger.Debug).Infof("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()) + log.Debug(fmt.Sprintf("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount())) } }() return @@ -192,7 +191,7 @@ func (self *Hive) keepAlive() { if self.kad.DBCount() > 0 { select { case self.more <- true: - glog.V(logger.Debug).Infof("buzz wakeup") + log.Debug(fmt.Sprintf("buzz wakeup")) default: } } @@ -224,7 +223,7 @@ func (self *Hive) addPeer(p *peer) error { default: } }() - glog.V(logger.Detail).Infof("hi new bee %v", p) + log.Trace(fmt.Sprintf("hi new bee %v", p)) err := self.kad.On(p, loadSync) if err != nil { return err @@ -235,21 +234,21 @@ func (self *Hive) addPeer(p *peer) error { // to send the 6 byte self lookup // we do not record as request or forward it, just reply with peers p.retrieve(&retrieveRequestMsgData{}) - glog.V(logger.Detail).Infof("'whatsup wheresdaparty' sent to %v", p) + log.Trace(fmt.Sprintf("'whatsup wheresdaparty' sent to %v", p)) return nil } // called after peer disconnected func (self *Hive) removePeer(p *peer) { - glog.V(logger.Debug).Infof("bee %v removed", p) + log.Debug(fmt.Sprintf("bee %v removed", p)) self.kad.Off(p, saveSync) select { case self.more <- true: default: } if self.kad.Count() == 0 { - glog.V(logger.Debug).Infof("empty, all bees gone") + log.Debug(fmt.Sprintf("empty, all bees gone")) } } @@ -265,7 +264,7 @@ func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) { // disconnects all the peers func (self *Hive) DropAll() { - glog.V(logger.Info).Infof("dropping all bees") + log.Info(fmt.Sprintf("dropping all bees")) for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) { node.Drop() } @@ -290,7 +289,7 @@ func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) { var nrs []*kademlia.NodeRecord for _, p := range req.Peers { if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil { - glog.V(logger.Detail).Infof("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err) + log.Trace(fmt.Sprintf("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err)) continue } nrs = append(nrs, newNodeRecord(p)) @@ -326,7 +325,7 @@ func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error { return fmt.Errorf("invalid type") } if record.Meta == nil { - glog.V(logger.Debug).Infof("no sync state for node record %v setting default", record) + log.Debug(fmt.Sprintf("no sync state for node record %v setting default", record)) p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}} return nil } @@ -334,7 +333,7 @@ func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error { if err != nil { return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err) } - glog.V(logger.Detail).Infof("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))) + log.Trace(fmt.Sprintf("sync state for node record %v read from Meta: %s", record, string(*(record.Meta)))) p.syncState = state return err } @@ -344,10 +343,10 @@ func saveSync(record *kademlia.NodeRecord, node kademlia.Node) { if p, ok := node.(*peer); ok { meta, err := encodeSync(p.syncState) if err != nil { - glog.V(logger.Warn).Infof("error saving sync state for %v: %v", node, err) + log.Warn(fmt.Sprintf("error saving sync state for %v: %v", node, err)) return } - glog.V(logger.Detail).Infof("saved sync state for %v: %s", node, string(*meta)) + log.Trace(fmt.Sprintf("saved sync state for %v: %s", node, string(*meta))) record.Meta = meta } } @@ -370,7 +369,7 @@ func (self *Hive) peers(req *retrieveRequestMsgData) { for _, peer := range self.getPeers(key, int(req.MaxPeers)) { addrs = append(addrs, peer.remoteAddr) } - glog.V(logger.Debug).Infof("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()) + log.Debug(fmt.Sprintf("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log())) peersData := &peersMsgData{ Peers: addrs, diff --git a/swarm/network/kademlia/kaddb.go b/swarm/network/kademlia/kaddb.go index 53a7db451..f4279917e 100644 --- a/swarm/network/kademlia/kaddb.go +++ b/swarm/network/kademlia/kaddb.go @@ -24,8 +24,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" ) type NodeData interface { @@ -88,12 +87,12 @@ func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord { Addr: a, Url: url, } - glog.V(logger.Info).Infof("add new record %v to kaddb", record) + log.Info(fmt.Sprintf("add new record %v to kaddb", record)) // insert in kaddb self.index[a] = record self.Nodes[index] = append(self.Nodes[index], record) } else { - glog.V(logger.Info).Infof("found record %v in kaddb", record) + log.Info(fmt.Sprintf("found record %v in kaddb", record)) } // update last seen time record.setSeen() @@ -121,13 +120,13 @@ func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) { copy(newnodes[:], nodes[:dbcursor]) newnodes[dbcursor] = node copy(newnodes[dbcursor+1:], nodes[dbcursor:]) - glog.V(logger.Detail).Infof("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes) + log.Trace(fmt.Sprintf("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes)) self.Nodes[index] = newnodes n++ } } if n > 0 { - glog.V(logger.Debug).Infof("%d/%d node records (new/known)", n, len(nrs)) + log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs))) } } @@ -207,13 +206,13 @@ func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRe // skip already connected nodes if node.node != nil { - glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)) + log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow))) continue ROW } // if node is scheduled to connect if time.Time(node.After).After(time.Now()) { - glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After) + log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)) continue ROW } @@ -224,17 +223,17 @@ func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRe if delta > self.purgeInterval { // remove node purge[cursor] = true - glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen) + log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen)) continue ROW } - glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After) + log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)) // scheduling next check interval = time.Duration(delta * time.Duration(self.connRetryExp)) after = time.Now().Add(interval) - glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval) + log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval)) node.After = after found = true } // ROW @@ -295,9 +294,9 @@ func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error { } err = ioutil.WriteFile(path, data, os.ModePerm) if err != nil { - glog.V(logger.Warn).Infof("unable to save kaddb with %v nodes to %v: err", n, path, err) + log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: err", n, path, err)) } else { - glog.V(logger.Info).Infof("saved kaddb with %v nodes to %v", n, path) + log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path)) } return err } @@ -338,7 +337,7 @@ func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err erro } self.delete(po, purge) } - glog.V(logger.Info).Infof("loaded kaddb with %v nodes from %v", n, path) + log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path)) return } diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go index 87c57cefe..8d731c038 100644 --- a/swarm/network/kademlia/kademlia.go +++ b/swarm/network/kademlia/kademlia.go @@ -23,8 +23,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" ) const ( @@ -117,7 +116,7 @@ func (self *Kademlia) DBCount() int { // On is the entry point called when a new nodes is added // unsafe in that node is not checked to be already active node (to be called once) func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) { - glog.V(logger.Warn).Infof("%v", self) + log.Warn(fmt.Sprintf("%v", self)) defer self.lock.Unlock() self.lock.Lock() @@ -126,11 +125,11 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error if cb != nil { err = cb(record, node) - glog.V(logger.Detail).Infof("cb(%v, %v) ->%v", record, node, err) + log.Trace(fmt.Sprintf("cb(%v, %v) ->%v", record, node, err)) if err != nil { return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err) } - glog.V(logger.Debug).Infof("add node record %v with node %v", record, node) + log.Debug(fmt.Sprintf("add node record %v with node %v", record, node)) } // insert in kademlia table of active nodes @@ -139,7 +138,7 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error // TODO: give priority to peers with active traffic if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation self.buckets[index] = append(bucket, node) - glog.V(logger.Debug).Infof("add node %v to table", node) + log.Debug(fmt.Sprintf("add node %v to table", node)) self.setProxLimit(index, true) record.node = node self.count++ @@ -159,10 +158,10 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error } } if replaced == nil { - glog.V(logger.Debug).Infof("all peers wanted, PO%03d bucket full", index) + log.Debug(fmt.Sprintf("all peers wanted, PO%03d bucket full", index)) return fmt.Errorf("bucket full") } - glog.V(logger.Debug).Infof("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval) + log.Debug(fmt.Sprintf("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval)) replaced.Drop() // actually replace in the row. When off(node) is called, the peer is no longer in the row bucket[pos] = node @@ -195,7 +194,7 @@ func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) { } record.node = nil self.count-- - glog.V(logger.Debug).Infof("remove node %v from table, population now is %v", node, self.count) + log.Debug(fmt.Sprintf("remove node %v from table, population now is %v", node, self.count)) return } @@ -223,7 +222,7 @@ func (self *Kademlia) setProxLimit(r int, on bool) { self.proxLimit++ curr = len(self.buckets[self.proxLimit]) - glog.V(logger.Detail).Infof("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r) + log.Trace(fmt.Sprintf("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)) } return } @@ -237,7 +236,7 @@ func (self *Kademlia) setProxLimit(r int, on bool) { // self.proxLimit-- self.proxSize += len(self.buckets[self.proxLimit]) - glog.V(logger.Detail).Infof("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r) + log.Trace(fmt.Sprintf("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)) } } @@ -257,7 +256,7 @@ func (self *Kademlia) FindClosest(target Address, max int) []Node { po := self.proximityBin(target) index := po step := 1 - glog.V(logger.Detail).Infof("serving %v nodes at %v (PO%02d)", max, index, po) + log.Trace(fmt.Sprintf("serving %v nodes at %v (PO%02d)", max, index, po)) // if max is set to 0, just want a full bucket, dynamic number min := max @@ -276,7 +275,7 @@ func (self *Kademlia) FindClosest(target Address, max int) []Node { n++ } // terminate if index reached the bottom or enough peers > min - glog.V(logger.Detail).Infof("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po) + log.Trace(fmt.Sprintf("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po)) if n >= min && (step < 0 || max == 0) { break } @@ -287,7 +286,7 @@ func (self *Kademlia) FindClosest(target Address, max int) []Node { } index += step } - glog.V(logger.Detail).Infof("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po) + log.Trace(fmt.Sprintf("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po)) return r.nodes } diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index 763fb0b8e..44787947c 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -38,8 +38,7 @@ import ( "github.com/ethereum/go-ethereum/contracts/chequebook" "github.com/ethereum/go-ethereum/errs" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap" @@ -201,7 +200,7 @@ func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook // the main forever loop that handles incoming requests for { if self.hive.blockRead { - glog.V(logger.Warn).Infof("Cannot read network") + log.Warn(fmt.Sprintf("Cannot read network")) time.Sleep(100 * time.Millisecond) continue } @@ -221,7 +220,7 @@ func (self *bzz) Drop() { // one cycle of the main forever loop that handles and dispatches incoming messages func (self *bzz) handle() error { msg, err := self.rw.ReadMsg() - glog.V(logger.Debug).Infof("<- %v", msg) + log.Debug(fmt.Sprintf("<- %v", msg)) if err != nil { return err } @@ -236,7 +235,7 @@ func (self *bzz) handle() error { case statusMsg: // no extra status message allowed. The one needed already handled by // handleStatus - glog.V(logger.Debug).Infof("Status message: %v", msg) + log.Debug(fmt.Sprintf("Status message: %v", msg)) return self.protoError(ErrExtraStatusMsg, "") case storeRequestMsg: @@ -250,7 +249,7 @@ func (self *bzz) handle() error { } // last Active time is set only when receiving chunks self.lastActive = time.Now() - glog.V(logger.Detail).Infof("incoming store request: %s", req.String()) + log.Trace(fmt.Sprintf("incoming store request: %s", req.String())) // swap accounting is done within forwarding self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self}) @@ -263,7 +262,7 @@ func (self *bzz) handle() error { req.from = &peer{bzz: self} // if request is lookup and not to be delivered if req.isLookup() { - glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from) + log.Trace(fmt.Sprintf("self lookup for %v: responding with peers only...", req.from)) } else if req.Key == nil { return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil") } else { @@ -281,7 +280,7 @@ func (self *bzz) handle() error { return self.protoError(ErrDecode, "<- %v: %v", msg, err) } req.from = &peer{bzz: self} - glog.V(logger.Detail).Infof("<- peer addresses: %v", req) + log.Trace(fmt.Sprintf("<- peer addresses: %v", req)) self.hive.HandlePeersMsg(&req, &peer{bzz: self}) case syncRequestMsg: @@ -289,7 +288,7 @@ func (self *bzz) handle() error { if err := msg.Decode(&req); err != nil { return self.protoError(ErrDecode, "<- %v: %v", msg, err) } - glog.V(logger.Debug).Infof("<- sync request: %v", req) + log.Debug(fmt.Sprintf("<- sync request: %v", req)) self.lastActive = time.Now() self.sync(req.SyncState) @@ -299,7 +298,7 @@ func (self *bzz) handle() error { if err := msg.Decode(&req); err != nil { return self.protoError(ErrDecode, "<- %v: %v", msg, err) } - glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String()) + log.Debug(fmt.Sprintf("<- unsynced keys : %s", req.String())) err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self}) self.lastActive = time.Now() if err != nil { @@ -313,7 +312,7 @@ func (self *bzz) handle() error { if err := msg.Decode(&req); err != nil { return self.protoError(ErrDecode, "<-msg %v: %v", msg, err) } - glog.V(logger.Debug).Infof("<- delivery request: %s", req.String()) + log.Debug(fmt.Sprintf("<- delivery request: %s", req.String())) err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self}) self.lastActive = time.Now() if err != nil { @@ -327,7 +326,7 @@ func (self *bzz) handle() error { if err := msg.Decode(&req); err != nil { return self.protoError(ErrDecode, "<- %v: %v", msg, err) } - glog.V(logger.Debug).Infof("<- payment: %s", req.String()) + log.Debug(fmt.Sprintf("<- payment: %s", req.String())) self.swap.Receive(int(req.Units), req.Promise) } @@ -385,7 +384,7 @@ func (self *bzz) handleStatus() (err error) { } self.remoteAddr = self.peerAddr(status.Addr) - glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr()) + log.Trace(fmt.Sprintf("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr())) if self.swapEnabled { // set remote profile for accounting @@ -395,14 +394,14 @@ func (self *bzz) handleStatus() (err error) { } } - glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId) + log.Info(fmt.Sprintf("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId)) err = self.hive.addPeer(&peer{bzz: self}) if err != nil { return self.protoError(ErrUnwanted, "%v", err) } // hive sets syncstate so sync should start after node added - glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState) + log.Info(fmt.Sprintf("syncronisation request sent with %v", self.syncState)) self.syncRequest() return nil @@ -421,7 +420,7 @@ func (self *bzz) sync(state *syncState) error { // an explicitly received nil syncstate disables syncronisation if state == nil { self.syncEnabled = false - glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self) + log.Warn(fmt.Sprintf("syncronisation disabled for peer %v", self)) state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true} } else { state.synced = make(chan bool) @@ -430,7 +429,7 @@ func (self *bzz) sync(state *syncState) error { state.Start = storage.Key(start[:]) state.Stop = storage.Key(stop[:]) } - glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state) + log.Debug(fmt.Sprintf("syncronisation requested by peer %v at state %v", self, state)) } var err error self.syncer, err = newSyncer( @@ -443,7 +442,7 @@ func (self *bzz) sync(state *syncState) error { if err != nil { return self.protoError(ErrSync, "%v", err) } - glog.V(logger.Detail).Infof("syncer set for peer %v", self) + log.Trace(fmt.Sprintf("syncer set for peer %v", self)) return nil } @@ -490,11 +489,11 @@ func (self *bzz) store(req *storeRequestMsgData) error { func (self *bzz) syncRequest() error { req := &syncRequestMsgData{} if self.hive.syncEnabled { - glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState) + log.Debug(fmt.Sprintf("syncronisation request to peer %v at state %v", self, self.syncState)) req.SyncState = self.syncState } if self.syncState == nil { - glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState) + log.Warn(fmt.Sprintf("syncronisation disabled for peer %v at state %v", self, self.syncState)) } return self.send(syncRequestMsg, req) } @@ -534,7 +533,7 @@ func (self *bzz) peers(req *peersMsgData) error { func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) { err = self.errors.New(code, format, params...) - err.Log(glog.V(logger.Info)) + log.Info(err.Error()) return } @@ -542,7 +541,7 @@ func (self *bzz) send(msg uint64, data interface{}) error { if self.hive.blockWrite { return fmt.Errorf("network write blocked") } - glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self) + log.Trace(fmt.Sprintf("-> %v: %v (%T) to %v", msg, data, data, self)) err := p2p.Send(self.rw, msg, data) if err != nil { self.Drop() diff --git a/swarm/network/syncdb.go b/swarm/network/syncdb.go index cef32610f..7216da525 100644 --- a/swarm/network/syncdb.go +++ b/swarm/network/syncdb.go @@ -20,8 +20,7 @@ import ( "encoding/binary" "fmt" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -80,7 +79,7 @@ func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSi batch: make(chan chan int), dbBatchSize: dbBatchSize, } - glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority) + log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority)) // starts the main forever loop reading from buffer go syncdb.bufferRead(deliver) @@ -126,9 +125,9 @@ func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) { var counter uint64 if err == nil { counter = binary.BigEndian.Uint64(data) - glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter) + log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter)) } else { - glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter) + log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter)) } LOOP: @@ -142,7 +141,7 @@ LOOP: // if syncdb is stopped. In this case we need to save the item to the db more = deliver(req, self.quit) if !more { - glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) + log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)) // received quit signal, save request currently waiting delivery // by switching to db mode and closing the buffer buffer = nil @@ -152,12 +151,12 @@ LOOP: break // break from select, this item will be written to the db } self.total++ - glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) + log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)) // by the time deliver returns, there were new writes to the buffer // if buffer contention is detected, switch to db mode which drains // the buffer so no process will block on pushing store requests if len(buffer) == cap(buffer) { - glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total) + log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total)) buffer = nil db = self.buffer } @@ -170,7 +169,7 @@ LOOP: binary.BigEndian.PutUint64(counterValue, counter) batch.Put(self.counterKey, counterValue) // persist counter in batch self.writeSyncBatch(batch) // save batch - glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority) + log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority)) break LOOP } self.dbTotal++ @@ -181,7 +180,7 @@ LOOP: if inBatch == 0 && quit != nil { // there was no writes since the last batch so db depleted // switch to buffer mode - glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority) + log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority)) db = nil buffer = self.buffer dbSize <- 0 // indicates to 'caller' that batch has been written @@ -190,7 +189,7 @@ LOOP: } binary.BigEndian.PutUint64(counterValue, counter) batch.Put(self.counterKey, counterValue) - glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue) + log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue)) batch = self.writeSyncBatch(batch) dbSize <- inBatch // indicates to 'caller' that batch has been written inBatch = 0 @@ -202,7 +201,7 @@ LOOP: db = self.buffer buffer = nil quit = nil - glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority) + log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority)) close(db) continue LOOP } @@ -210,15 +209,15 @@ LOOP: // only get here if we put req into db entry, err = self.newSyncDbEntry(req, counter) if err != nil { - glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err) + log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err)) continue LOOP } batch.Put(entry.key, entry.val) - glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter) + log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter)) // if just switched to db mode and not quitting, then launch dbRead // in a parallel go routine to send deliveries from db if inDb == 0 && quit != nil { - glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead") + log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead")) go self.dbRead(true, counter, deliver) } inDb++ @@ -229,7 +228,7 @@ LOOP: batch = self.writeSyncBatch(batch) } } - glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter) + log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter)) close(self.done) } @@ -237,7 +236,7 @@ LOOP: func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch { err := self.db.Write(batch) if err != nil { - glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err) + log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err)) return batch } return new(leveldb.Batch) @@ -311,7 +310,7 @@ func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{} continue } del = new(leveldb.Batch) - glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt) + log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt)) for n = 0; !useBatches || n < cnt; it.Next() { copy(key, it.Key()) @@ -323,11 +322,11 @@ func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{} val := make([]byte, 40) copy(val, it.Value()) entry = &syncDbEntry{key, val} - // glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total) + // log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total)) more = fun(entry, self.quit) if !more { // quit received when waiting to deliver entry, the entry will not be deleted - glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt) + log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt)) break } // since subsequent batches of the same db session are indexed incrementally @@ -337,7 +336,7 @@ func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{} n++ total++ } - glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total) + log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total)) self.db.Write(del) // this could be async called only when db is idle it.Release() } diff --git a/swarm/network/syncdb_test.go b/swarm/network/syncdb_test.go index 21453a110..a9417e1d4 100644 --- a/swarm/network/syncdb_test.go +++ b/swarm/network/syncdb_test.go @@ -18,6 +18,7 @@ package network import ( "bytes" + "fmt" "io/ioutil" "os" "path/filepath" @@ -25,14 +26,12 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage" ) func init() { - glog.SetV(0) - glog.SetToStderr(true) + log.Root().SetHandler(log.LvlFilterHandler(log.LvlCrit, log.StreamHandler(os.Stderr, log.TerminalFormat()))) } type testSyncDb struct { @@ -83,7 +82,7 @@ func (self *testSyncDb) push(n int) { self.sent = append(self.sent, self.c) self.c++ } - glog.V(logger.Debug).Infof("pushed %v requests", n) + log.Debug(fmt.Sprintf("pushed %v requests", n)) } func (self *testSyncDb) draindb() { @@ -128,7 +127,7 @@ func (self *testSyncDb) expect(n int, db bool) { } if len(self.sent) > self.at && !bytes.Equal(crypto.Keccak256([]byte{byte(self.sent[self.at])}), self.delivered[self.at]) { self.t.Fatalf("expected delivery %v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db) - glog.V(logger.Debug).Infof("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db) + log.Debug(fmt.Sprintf("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)) } if !ok && db { self.t.Fatalf("expected delivery %v/%v/%v from db", i, n, self.at) diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go index b6b1ea3b6..eb932e927 100644 --- a/swarm/network/syncer.go +++ b/swarm/network/syncer.go @@ -22,8 +22,7 @@ import ( "fmt" "path/filepath" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -209,7 +208,7 @@ func newSyncer( // initialise a syncdb instance for each priority queue self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i))) } - glog.V(logger.Info).Infof("syncer started: %v", state) + log.Info(fmt.Sprintf("syncer started: %v", state)) // launch chunk delivery service go self.syncDeliveries() // launch sync task manager @@ -270,14 +269,14 @@ func (self *syncer) sync() { // 0. first replay stale requests from request db if state.SessionAt == 0 { - glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log()) + log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log())) return } - glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log()) + log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log())) for p := priorities - 1; p >= 0; p-- { self.queues[p].dbRead(false, 0, self.replay()) } - glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log()) + log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log())) // unless peer is synced sync unfinished history beginning on if !state.Synced { @@ -286,7 +285,7 @@ func (self *syncer) sync() { if !storage.IsZeroKey(state.Latest) { // 1. there is unfinished earlier sync state.Start = state.Latest - glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state) + log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state)) // blocks while the entire history upto state is synced self.syncState(state) if state.Last < state.SessionAt { @@ -298,7 +297,7 @@ func (self *syncer) sync() { // 2. sync up to last disconnect1 if state.First < state.LastSeenAt { state.Last = state.LastSeenAt - glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state) + log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state)) self.syncState(state) state.First = state.LastSeenAt } @@ -313,11 +312,11 @@ func (self *syncer) sync() { // if there have been new chunks since last session if state.LastSeenAt < state.SessionAt { state.Last = state.SessionAt - glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state) + log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state)) // blocks until state syncing is finished self.syncState(state) } - glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log()) + log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log())) } @@ -333,7 +332,7 @@ func (self *syncer) syncState(state *syncState) { // stop quits both request processor and saves the request cache to disk func (self *syncer) stop() { close(self.quit) - glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log())) for _, db := range self.queues { db.stop() } @@ -366,7 +365,7 @@ func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) func (self *syncer) syncHistory(state *syncState) chan interface{} { var n uint history := make(chan interface{}) - glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop) + log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop)) it := self.dbAccess.iterator(state) if it != nil { go func() { @@ -382,13 +381,13 @@ func (self *syncer) syncHistory(state *syncState) chan interface{} { // blocking until history channel is read from case history <- storage.Key(key): n++ - glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n) + log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n)) state.Latest = key case <-self.quit: return } } - glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n) + log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n)) }() } return history @@ -438,14 +437,14 @@ LOOP: for priority = High; priority >= 0; priority-- { // the first priority channel that is non-empty will be assigned to keys if len(self.keys[priority]) > 0 { - glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) + log.Trace(fmt.Sprintf("syncer[%v]: reading request with priority %v", self.key.Log(), priority)) keys = self.keys[priority] break PRIORITIES } - glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])) + log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low]))) // if the input queue is empty on this level, resort to history if there is any if uint(priority) == histPrior && history != nil { - glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key) + log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key)) keys = history break PRIORITIES } @@ -455,7 +454,7 @@ LOOP: // if peer ready to receive but nothing to send if keys == nil && deliveryRequest == nil { // if no items left and switch to waiting mode - glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log())) newUnsyncedKeys = self.newUnsyncedKeys } @@ -476,15 +475,15 @@ LOOP: // (all nonhistorical outgoing traffic sheduled and persisted state.LastSeenAt = self.dbAccess.counter() state.Latest = storage.ZeroKey - glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced) + log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced)) // send the unsynced keys stateCopy := *state err := self.unsyncedKeys(unsynced, &stateCopy) if err != nil { - glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err) + log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", err)) } self.state = state - glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy) + log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy)) unsynced = nil keys = nil } @@ -495,7 +494,7 @@ LOOP: break LOOP case req, more = <-keys: if keys == history && !more { - glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log())) // history channel is closed, waiting for new state (called from sync()) syncStates = self.syncStates state.Synced = true // this signals that the current segment is complete @@ -508,7 +507,7 @@ LOOP: history = nil } case <-deliveryRequest: - glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log())) // this 1 cap channel can wake up the loop // signaling that peer is ready to receive unsynced Keys @@ -516,7 +515,7 @@ LOOP: deliveryRequest = nil case <-newUnsyncedKeys: - glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log())) // this 1 cap channel can wake up the loop // signals that data is available to send if peer is ready to receive newUnsyncedKeys = nil @@ -526,11 +525,11 @@ LOOP: // this resets the state if !more { state = self.state - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state)) state.Synced = true syncStates = nil } else { - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior)) state.Synced = false history = self.syncHistory(state) // only one history at a time, only allow another one once the @@ -542,19 +541,19 @@ LOOP: continue LOOP } - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req)) keyCounts[priority]++ keyCount++ if keys == history { - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced)) historyCnt++ } if sreq, err := self.newSyncRequest(req, priority); err == nil { // extract key from req - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)) unsynced = append(unsynced, sreq) } else { - glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) + log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)) } } @@ -601,18 +600,18 @@ func (self *syncer) syncDeliveries() { total++ msg, err = self.newStoreRequestMsgData(req) if err != nil { - glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err) + log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err)) } else { err = self.store(msg) if err != nil { - glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err) + log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err)) } else { success++ - glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req) + log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req)) } } if total%self.SyncBatchSize == 0 { - glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]) + log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low])) } } } @@ -679,7 +678,7 @@ func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool { msgdata, err := self.newStoreRequestMsgData(req) if err != nil { - glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err) + log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err)) return false } select { |