aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/syncer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/syncer.go')
-rw-r--r--swarm/network/syncer.go69
1 files changed, 34 insertions, 35 deletions
diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go
index b6b1ea3b6..eb932e927 100644
--- a/swarm/network/syncer.go
+++ b/swarm/network/syncer.go
@@ -22,8 +22,7 @@ import (
"fmt"
"path/filepath"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@@ -209,7 +208,7 @@ func newSyncer(
// initialise a syncdb instance for each priority queue
self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i)))
}
- glog.V(logger.Info).Infof("syncer started: %v", state)
+ log.Info(fmt.Sprintf("syncer started: %v", state))
// launch chunk delivery service
go self.syncDeliveries()
// launch sync task manager
@@ -270,14 +269,14 @@ func (self *syncer) sync() {
// 0. first replay stale requests from request db
if state.SessionAt == 0 {
- glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log())
+ log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log()))
return
}
- glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log())
+ log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log()))
for p := priorities - 1; p >= 0; p-- {
self.queues[p].dbRead(false, 0, self.replay())
}
- glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log())
+ log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log()))
// unless peer is synced sync unfinished history beginning on
if !state.Synced {
@@ -286,7 +285,7 @@ func (self *syncer) sync() {
if !storage.IsZeroKey(state.Latest) {
// 1. there is unfinished earlier sync
state.Start = state.Latest
- glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state)
+ log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state))
// blocks while the entire history upto state is synced
self.syncState(state)
if state.Last < state.SessionAt {
@@ -298,7 +297,7 @@ func (self *syncer) sync() {
// 2. sync up to last disconnect1
if state.First < state.LastSeenAt {
state.Last = state.LastSeenAt
- glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state)
+ log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state))
self.syncState(state)
state.First = state.LastSeenAt
}
@@ -313,11 +312,11 @@ func (self *syncer) sync() {
// if there have been new chunks since last session
if state.LastSeenAt < state.SessionAt {
state.Last = state.SessionAt
- glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state)
+ log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state))
// blocks until state syncing is finished
self.syncState(state)
}
- glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log())
+ log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log()))
}
@@ -333,7 +332,7 @@ func (self *syncer) syncState(state *syncState) {
// stop quits both request processor and saves the request cache to disk
func (self *syncer) stop() {
close(self.quit)
- glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log())
+ log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log()))
for _, db := range self.queues {
db.stop()
}
@@ -366,7 +365,7 @@ func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error)
func (self *syncer) syncHistory(state *syncState) chan interface{} {
var n uint
history := make(chan interface{})
- glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop)
+ log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop))
it := self.dbAccess.iterator(state)
if it != nil {
go func() {
@@ -382,13 +381,13 @@ func (self *syncer) syncHistory(state *syncState) chan interface{} {
// blocking until history channel is read from
case history <- storage.Key(key):
n++
- glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n)
+ log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n))
state.Latest = key
case <-self.quit:
return
}
}
- glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n)
+ log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n))
}()
}
return history
@@ -438,14 +437,14 @@ LOOP:
for priority = High; priority >= 0; priority-- {
// the first priority channel that is non-empty will be assigned to keys
if len(self.keys[priority]) > 0 {
- glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority)
+ log.Trace(fmt.Sprintf("syncer[%v]: reading request with priority %v", self.key.Log(), priority))
keys = self.keys[priority]
break PRIORITIES
}
- glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low]))
+ log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])))
// if the input queue is empty on this level, resort to history if there is any
if uint(priority) == histPrior && history != nil {
- glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key)
+ log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key))
keys = history
break PRIORITIES
}
@@ -455,7 +454,7 @@ LOOP:
// if peer ready to receive but nothing to send
if keys == nil && deliveryRequest == nil {
// if no items left and switch to waiting mode
- glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log())
+ log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log()))
newUnsyncedKeys = self.newUnsyncedKeys
}
@@ -476,15 +475,15 @@ LOOP:
// (all nonhistorical outgoing traffic sheduled and persisted
state.LastSeenAt = self.dbAccess.counter()
state.Latest = storage.ZeroKey
- glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced)
+ log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced))
// send the unsynced keys
stateCopy := *state
err := self.unsyncedKeys(unsynced, &stateCopy)
if err != nil {
- glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err)
+ log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", err))
}
self.state = state
- glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy)
+ log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy))
unsynced = nil
keys = nil
}
@@ -495,7 +494,7 @@ LOOP:
break LOOP
case req, more = <-keys:
if keys == history && !more {
- glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log())
+ log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log()))
// history channel is closed, waiting for new state (called from sync())
syncStates = self.syncStates
state.Synced = true // this signals that the current segment is complete
@@ -508,7 +507,7 @@ LOOP:
history = nil
}
case <-deliveryRequest:
- glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log())
+ log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log()))
// this 1 cap channel can wake up the loop
// signaling that peer is ready to receive unsynced Keys
@@ -516,7 +515,7 @@ LOOP:
deliveryRequest = nil
case <-newUnsyncedKeys:
- glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log())
+ log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log()))
// this 1 cap channel can wake up the loop
// signals that data is available to send if peer is ready to receive
newUnsyncedKeys = nil
@@ -526,11 +525,11 @@ LOOP:
// this resets the state
if !more {
state = self.state
- glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state)
+ log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state))
state.Synced = true
syncStates = nil
} else {
- glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior)
+ log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior))
state.Synced = false
history = self.syncHistory(state)
// only one history at a time, only allow another one once the
@@ -542,19 +541,19 @@ LOOP:
continue LOOP
}
- glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req)
+ log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req))
keyCounts[priority]++
keyCount++
if keys == history {
- glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
+ log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
historyCnt++
}
if sreq, err := self.newSyncRequest(req, priority); err == nil {
// extract key from req
- glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
+ log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
unsynced = append(unsynced, sreq)
} else {
- glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
+ log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err))
}
}
@@ -601,18 +600,18 @@ func (self *syncer) syncDeliveries() {
total++
msg, err = self.newStoreRequestMsgData(req)
if err != nil {
- glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err)
+ log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err))
} else {
err = self.store(msg)
if err != nil {
- glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err)
+ log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err))
} else {
success++
- glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req)
+ log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req))
}
}
if total%self.SyncBatchSize == 0 {
- glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low])
+ log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]))
}
}
}
@@ -679,7 +678,7 @@ func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool)
func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool {
msgdata, err := self.newStoreRequestMsgData(req)
if err != nil {
- glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err)
+ log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err))
return false
}
select {