diff options
Diffstat (limited to 'swarm/network/syncer.go')
-rw-r--r-- | swarm/network/syncer.go | 69 |
1 files changed, 34 insertions, 35 deletions
diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go index b6b1ea3b6..20129c2a8 100644 --- a/swarm/network/syncer.go +++ b/swarm/network/syncer.go @@ -22,8 +22,7 @@ import ( "fmt" "path/filepath" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -209,7 +208,7 @@ func newSyncer( // initialise a syncdb instance for each priority queue self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i))) } - glog.V(logger.Info).Infof("syncer started: %v", state) + log.Info(fmt.Sprintf("syncer started: %v", state)) // launch chunk delivery service go self.syncDeliveries() // launch sync task manager @@ -270,14 +269,14 @@ func (self *syncer) sync() { // 0. first replay stale requests from request db if state.SessionAt == 0 { - glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log()) + log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log())) return } - glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log()) + log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log())) for p := priorities - 1; p >= 0; p-- { self.queues[p].dbRead(false, 0, self.replay()) } - glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log()) + log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log())) // unless peer is synced sync unfinished history beginning on if !state.Synced { @@ -286,7 +285,7 @@ func (self *syncer) sync() { if !storage.IsZeroKey(state.Latest) { // 1. there is unfinished earlier sync state.Start = state.Latest - glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state) + log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state)) // blocks while the entire history upto state is synced self.syncState(state) if state.Last < state.SessionAt { @@ -298,7 +297,7 @@ func (self *syncer) sync() { // 2. sync up to last disconnect1 if state.First < state.LastSeenAt { state.Last = state.LastSeenAt - glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state) + log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state)) self.syncState(state) state.First = state.LastSeenAt } @@ -313,11 +312,11 @@ func (self *syncer) sync() { // if there have been new chunks since last session if state.LastSeenAt < state.SessionAt { state.Last = state.SessionAt - glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state) + log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state)) // blocks until state syncing is finished self.syncState(state) } - glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log()) + log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log())) } @@ -333,7 +332,7 @@ func (self *syncer) syncState(state *syncState) { // stop quits both request processor and saves the request cache to disk func (self *syncer) stop() { close(self.quit) - glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log())) for _, db := range self.queues { db.stop() } @@ -366,7 +365,7 @@ func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) func (self *syncer) syncHistory(state *syncState) chan interface{} { var n uint history := make(chan interface{}) - glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop) + log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop)) it := self.dbAccess.iterator(state) if it != nil { go func() { @@ -382,13 +381,13 @@ func (self *syncer) syncHistory(state *syncState) chan interface{} { // blocking until history channel is read from case history <- storage.Key(key): n++ - glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n) + log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n)) state.Latest = key case <-self.quit: return } } - glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n) + log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n)) }() } return history @@ -438,14 +437,14 @@ LOOP: for priority = High; priority >= 0; priority-- { // the first priority channel that is non-empty will be assigned to keys if len(self.keys[priority]) > 0 { - glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) + log.Trace(fmt.Sprintf("syncer[%v]: reading request with priority %v", self.key.Log(), priority)) keys = self.keys[priority] break PRIORITIES } - glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])) + log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low]))) // if the input queue is empty on this level, resort to history if there is any if uint(priority) == histPrior && history != nil { - glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key) + log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key)) keys = history break PRIORITIES } @@ -455,7 +454,7 @@ LOOP: // if peer ready to receive but nothing to send if keys == nil && deliveryRequest == nil { // if no items left and switch to waiting mode - glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log())) newUnsyncedKeys = self.newUnsyncedKeys } @@ -476,15 +475,15 @@ LOOP: // (all nonhistorical outgoing traffic sheduled and persisted state.LastSeenAt = self.dbAccess.counter() state.Latest = storage.ZeroKey - glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced) + log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced)) // send the unsynced keys stateCopy := *state err := self.unsyncedKeys(unsynced, &stateCopy) if err != nil { - glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err) + log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", self.key.Log(), err)) } self.state = state - glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy) + log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy)) unsynced = nil keys = nil } @@ -495,7 +494,7 @@ LOOP: break LOOP case req, more = <-keys: if keys == history && !more { - glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log())) // history channel is closed, waiting for new state (called from sync()) syncStates = self.syncStates state.Synced = true // this signals that the current segment is complete @@ -508,7 +507,7 @@ LOOP: history = nil } case <-deliveryRequest: - glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log())) // this 1 cap channel can wake up the loop // signaling that peer is ready to receive unsynced Keys @@ -516,7 +515,7 @@ LOOP: deliveryRequest = nil case <-newUnsyncedKeys: - glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log()) + log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log())) // this 1 cap channel can wake up the loop // signals that data is available to send if peer is ready to receive newUnsyncedKeys = nil @@ -526,11 +525,11 @@ LOOP: // this resets the state if !more { state = self.state - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state)) state.Synced = true syncStates = nil } else { - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior)) state.Synced = false history = self.syncHistory(state) // only one history at a time, only allow another one once the @@ -542,19 +541,19 @@ LOOP: continue LOOP } - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req)) keyCounts[priority]++ keyCount++ if keys == history { - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced)) historyCnt++ } if sreq, err := self.newSyncRequest(req, priority); err == nil { // extract key from req - glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)) unsynced = append(unsynced, sreq) } else { - glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) + log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, err)) } } @@ -601,18 +600,18 @@ func (self *syncer) syncDeliveries() { total++ msg, err = self.newStoreRequestMsgData(req) if err != nil { - glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err) + log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err)) } else { err = self.store(msg) if err != nil { - glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err) + log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err)) } else { success++ - glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req) + log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req)) } } if total%self.SyncBatchSize == 0 { - glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]) + log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low])) } } } @@ -679,7 +678,7 @@ func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool { msgdata, err := self.newStoreRequestMsgData(req) if err != nil { - glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err) + log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err)) return false } select { |