aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/swarm.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/swarm.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloaddexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/swarm.go')
-rw-r--r--swarm/swarm.go340
1 files changed, 207 insertions, 133 deletions
diff --git a/swarm/swarm.go b/swarm/swarm.go
index 0a120db1f..90360264e 100644
--- a/swarm/swarm.go
+++ b/swarm/swarm.go
@@ -23,6 +23,7 @@ import (
"fmt"
"math/big"
"net"
+ "path/filepath"
"strings"
"time"
"unicode"
@@ -31,20 +32,24 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/contracts/chequebook"
"github.com/ethereum/go-ethereum/contracts/ens"
- "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
- "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/api"
httpapi "github.com/ethereum/go-ethereum/swarm/api/http"
"github.com/ethereum/go-ethereum/swarm/fuse"
+ "github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/stream"
+ "github.com/ethereum/go-ethereum/swarm/pss"
+ "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock"
+ "github.com/ethereum/go-ethereum/swarm/storage/mru"
)
var (
@@ -53,31 +58,28 @@ var (
startCounter = metrics.NewRegisteredCounter("stack,start", nil)
stopCounter = metrics.NewRegisteredCounter("stack,stop", nil)
uptimeGauge = metrics.NewRegisteredGauge("stack.uptime", nil)
- dbSizeGauge = metrics.NewRegisteredGauge("storage.db.chunks.size", nil)
- cacheSizeGauge = metrics.NewRegisteredGauge("storage.db.cache.size", nil)
+ requestsCacheGauge = metrics.NewRegisteredGauge("storage.cache.requests.size", nil)
)
// the swarm stack
type Swarm struct {
- config *api.Config // swarm configuration
- api *api.Api // high level api layer (fs/manifest)
- dns api.Resolver // DNS registrar
- dbAccess *network.DbAccess // access to local chunk db iterator and storage counter
- storage storage.ChunkStore // internal access to storage, common interface to cloud storage backends
- dpa *storage.DPA // distributed preimage archive, the local API to the storage with document level storage/retrieval support
- depo network.StorageHandler // remote request handler, interface between bzz protocol and the storage
- cloud storage.CloudStore // procurement, cloud storage backend (can multi-cloud)
- hive *network.Hive // the logistic manager
- backend chequebook.Backend // simple blockchain Backend
+ config *api.Config // swarm configuration
+ api *api.API // high level api layer (fs/manifest)
+ dns api.Resolver // DNS registrar
+ fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support
+ streamer *stream.Registry
+ bzz *network.Bzz // the logistic manager
+ backend chequebook.Backend // simple blockchain Backend
privateKey *ecdsa.PrivateKey
corsString string
swapEnabled bool
lstore *storage.LocalStore // local store, needs to store for releasing resources after node stopped
sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit
+ ps *pss.Pss
}
type SwarmAPI struct {
- Api *api.Api
+ Api *api.API
Backend chequebook.Backend
PrvKey *ecdsa.PrivateKey
}
@@ -92,77 +94,147 @@ func (self *Swarm) API() *SwarmAPI {
// creates a new swarm service instance
// implements node.Service
-func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.Config) (self *Swarm, err error) {
- if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroKey) {
+// If mockStore is not nil, it will be used as the storage for chunk data.
+// MockStore should be used only for testing.
+func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err error) {
+
+ if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroAddr) {
return nil, fmt.Errorf("empty public key")
}
- if bytes.Equal(common.FromHex(config.BzzKey), storage.ZeroKey) {
+ if bytes.Equal(common.FromHex(config.BzzKey), storage.ZeroAddr) {
return nil, fmt.Errorf("empty bzz key")
}
- self = &Swarm{
- config: config,
- swapEnabled: config.SwapEnabled,
- backend: backend,
- privateKey: config.Swap.PrivateKey(),
- corsString: config.Cors,
+ var backend chequebook.Backend
+ if config.SwapAPI != "" && config.SwapEnabled {
+ log.Info("connecting to SWAP API", "url", config.SwapAPI)
+ backend, err = ethclient.Dial(config.SwapAPI)
+ if err != nil {
+ return nil, fmt.Errorf("error connecting to SWAP API %s: %s", config.SwapAPI, err)
+ }
}
- log.Debug(fmt.Sprintf("Setting up Swarm service components"))
- hash := storage.MakeHashFunc(config.ChunkerParams.Hash)
- self.lstore, err = storage.NewLocalStore(hash, config.StoreParams)
- if err != nil {
- return
+ self = &Swarm{
+ config: config,
+ backend: backend,
+ privateKey: config.ShiftPrivateKey(),
}
+ log.Debug(fmt.Sprintf("Setting up Swarm service components"))
- // setup local store
- log.Debug(fmt.Sprintf("Set up local storage"))
-
- self.dbAccess = network.NewDbAccess(self.lstore)
- log.Debug(fmt.Sprintf("Set up local db access (iterator/counter)"))
-
- // set up the kademlia hive
- self.hive = network.NewHive(
- common.HexToHash(self.config.BzzKey), // key to hive (kademlia base address)
- config.HiveParams, // configuration parameters
- config.SwapEnabled, // SWAP enabled
- config.SyncEnabled, // syncronisation enabled
- )
- log.Debug(fmt.Sprintf("Set up swarm network with Kademlia hive"))
-
- // setup cloud storage backend
- self.cloud = network.NewForwarder(self.hive)
- log.Debug(fmt.Sprintf("-> set swarm forwarder as cloud storage backend"))
+ config.HiveParams.Discovery = true
- // setup cloud storage internal access layer
- self.storage = storage.NewNetStore(hash, self.lstore, self.cloud, config.StoreParams)
log.Debug(fmt.Sprintf("-> swarm net store shared access layer to Swarm Chunk Store"))
- // set up Depo (storage handler = cloud storage access layer for incoming remote requests)
- self.depo = network.NewDepo(hash, self.lstore, self.storage)
- log.Debug(fmt.Sprintf("-> REmote Access to CHunks"))
+ nodeID, err := discover.HexID(config.NodeID)
+ if err != nil {
+ return nil, err
+ }
+ addr := &network.BzzAddr{
+ OAddr: common.FromHex(config.BzzKey),
+ UAddr: []byte(discover.NewNode(nodeID, net.IP{127, 0, 0, 1}, 30303, 30303).String()),
+ }
+
+ bzzconfig := &network.BzzConfig{
+ NetworkID: config.NetworkID,
+ OverlayAddr: addr.OAddr,
+ UnderlayAddr: addr.UAddr,
+ HiveParams: config.HiveParams,
+ }
- // set up DPA, the cloud storage local access layer
- dpaChunkStore := storage.NewDpaChunkStore(self.lstore, self.storage)
- log.Debug(fmt.Sprintf("-> Local Access to Swarm"))
- // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
- self.dpa = storage.NewDPA(dpaChunkStore, self.config.ChunkerParams)
- log.Debug(fmt.Sprintf("-> Content Store API"))
+ stateStore, err := state.NewDBStore(filepath.Join(config.Path, "state-store.db"))
+ if err != nil {
+ return
+ }
+ // set up high level api
+ var resolver *api.MultiResolver
if len(config.EnsAPIs) > 0 {
opts := []api.MultiResolverOption{}
for _, c := range config.EnsAPIs {
tld, endpoint, addr := parseEnsAPIAddress(c)
- r, err := newEnsClient(endpoint, addr, config)
+ r, err := newEnsClient(endpoint, addr, config, self.privateKey)
if err != nil {
return nil, err
}
opts = append(opts, api.MultiResolverOptionWithResolver(r, tld))
+
}
- self.dns = api.NewMultiResolver(opts...)
+ resolver = api.NewMultiResolver(opts...)
+ self.dns = resolver
+ }
+
+ self.lstore, err = storage.NewLocalStore(config.LocalStoreParams, mockStore)
+ if err != nil {
+ return
+ }
+
+ db := storage.NewDBAPI(self.lstore)
+ to := network.NewKademlia(
+ common.FromHex(config.BzzKey),
+ network.NewKadParams(),
+ )
+ delivery := stream.NewDelivery(to, db)
+
+ self.streamer = stream.NewRegistry(addr, delivery, db, stateStore, &stream.RegistryOptions{
+ SkipCheck: config.DeliverySkipCheck,
+ DoSync: config.SyncEnabled,
+ DoRetrieve: true,
+ SyncUpdateDelay: config.SyncUpdateDelay,
+ })
+
+ // set up NetStore, the cloud storage local access layer
+ netStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve)
+ // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
+ self.fileStore = storage.NewFileStore(netStore, self.config.FileStoreParams)
+
+ var resourceHandler *mru.Handler
+ rhparams := &mru.HandlerParams{
+ // TODO: config parameter to set limits
+ QueryMaxPeriods: &mru.LookupParams{
+ Limit: false,
+ },
+ Signer: &mru.GenericSigner{
+ PrivKey: self.privateKey,
+ },
+ }
+ if resolver != nil {
+ resolver.SetNameHash(ens.EnsNode)
+ // Set HeaderGetter and OwnerValidator interfaces to resolver only if it is not nil.
+ rhparams.HeaderGetter = resolver
+ rhparams.OwnerValidator = resolver
+ } else {
+ log.Warn("No ETH API specified, resource updates will use block height approximation")
+ // TODO: blockestimator should use saved values derived from last time ethclient was connected
+ rhparams.HeaderGetter = mru.NewBlockEstimator()
+ }
+ resourceHandler, err = mru.NewHandler(rhparams)
+ if err != nil {
+ return nil, err
+ }
+ resourceHandler.SetStore(netStore)
+
+ var validators []storage.ChunkValidator
+ validators = append(validators, storage.NewContentAddressValidator(storage.MakeHashFunc(storage.DefaultHash)))
+ if resourceHandler != nil {
+ validators = append(validators, resourceHandler)
+ }
+ self.lstore.Validators = validators
+
+ // setup local store
+ log.Debug(fmt.Sprintf("Set up local storage"))
+
+ self.bzz = network.NewBzz(bzzconfig, to, stateStore, stream.Spec, self.streamer.Run)
+
+ // Pss = postal service over swarm (devp2p over bzz)
+ self.ps, err = pss.NewPss(to, config.Pss)
+ if err != nil {
+ return nil, err
+ }
+ if pss.IsActiveHandshake {
+ pss.SetHandshakeController(self.ps, pss.NewHandshakeParams())
}
- self.api = api.NewApi(self.dpa, self.dns)
+ self.api = api.NewAPI(self.fileStore, self.dns, resourceHandler)
// Manifests for Smart Hosting
log.Debug(fmt.Sprintf("-> Web3 virtual server API"))
@@ -198,16 +270,22 @@ func parseEnsAPIAddress(s string) (tld, endpoint string, addr common.Address) {
return
}
+// ensClient provides functionality for api.ResolveValidator
+type ensClient struct {
+ *ens.ENS
+ *ethclient.Client
+}
+
// newEnsClient creates a new ENS client for that is a consumer of
// a ENS API on a specific endpoint. It is used as a helper function
// for creating multiple resolvers in NewSwarm function.
-func newEnsClient(endpoint string, addr common.Address, config *api.Config) (*ens.ENS, error) {
+func newEnsClient(endpoint string, addr common.Address, config *api.Config, privkey *ecdsa.PrivateKey) (*ensClient, error) {
log.Info("connecting to ENS API", "url", endpoint)
client, err := rpc.Dial(endpoint)
if err != nil {
return nil, fmt.Errorf("error connecting to ENS API %s: %s", endpoint, err)
}
- ensClient := ethclient.NewClient(client)
+ ethClient := ethclient.NewClient(client)
ensRoot := config.EnsRoot
if addr != (common.Address{}) {
@@ -220,13 +298,16 @@ func newEnsClient(endpoint string, addr common.Address, config *api.Config) (*en
log.Warn(fmt.Sprintf("could not determine ENS contract address, using default %s", ensRoot), "err", err)
}
}
- transactOpts := bind.NewKeyedTransactor(config.Swap.PrivateKey())
- dns, err := ens.NewENS(transactOpts, ensRoot, ensClient)
+ transactOpts := bind.NewKeyedTransactor(privkey)
+ dns, err := ens.NewENS(transactOpts, ensRoot, ethClient)
if err != nil {
return nil, err
}
log.Debug(fmt.Sprintf("-> Swarm Domain Name Registrar %v @ address %v", endpoint, ensRoot.Hex()))
- return dns, err
+ return &ensClient{
+ ENS: dns,
+ Client: ethClient,
+ }, err
}
// detectEnsAddr determines the ENS contract address by getting both the
@@ -274,16 +355,12 @@ Start is called when the stack is started
// implements the node.Service interface
func (self *Swarm) Start(srv *p2p.Server) error {
startTime = time.Now()
- connectPeer := func(url string) error {
- node, err := discover.ParseNode(url)
- if err != nil {
- return fmt.Errorf("invalid node URL: %v", err)
- }
- srv.AddPeer(node)
- return nil
- }
+
+ // update uaddr to correct enode
+ newaddr := self.bzz.UpdateLocalAddr([]byte(srv.Self().String()))
+ log.Warn("Updated bzz local addr", "oaddr", fmt.Sprintf("%x", newaddr.OAddr), "uaddr", fmt.Sprintf("%s", newaddr.UAddr))
// set chequebook
- if self.swapEnabled {
+ if self.config.SwapEnabled {
ctx := context.Background() // The initial setup has no deadline.
err := self.SetChequebook(ctx)
if err != nil {
@@ -295,33 +372,38 @@ func (self *Swarm) Start(srv *p2p.Server) error {
}
log.Warn(fmt.Sprintf("Starting Swarm service"))
- self.hive.Start(
- discover.PubkeyID(&srv.PrivateKey.PublicKey),
- func() string { return srv.ListenAddr },
- connectPeer,
- )
- log.Info(fmt.Sprintf("Swarm network started on bzz address: %v", self.hive.Addr()))
- self.dpa.Start()
- log.Debug(fmt.Sprintf("Swarm DPA started"))
+ err := self.bzz.Start(srv)
+ if err != nil {
+ log.Error("bzz failed", "err", err)
+ return err
+ }
+ log.Info(fmt.Sprintf("Swarm network started on bzz address: %x", self.bzz.Hive.Overlay.BaseAddr()))
+
+ if self.ps != nil {
+ self.ps.Start(srv)
+ log.Info("Pss started")
+ }
// start swarm http proxy server
if self.config.Port != "" {
addr := net.JoinHostPort(self.config.ListenAddr, self.config.Port)
- go httpapi.StartHttpServer(self.api, &httpapi.ServerConfig{
+ go httpapi.StartHTTPServer(self.api, &httpapi.ServerConfig{
Addr: addr,
- CorsString: self.corsString,
+ CorsString: self.config.Cors,
})
- log.Info(fmt.Sprintf("Swarm http proxy started on %v", addr))
+ }
- if self.corsString != "" {
- log.Debug(fmt.Sprintf("Swarm http proxy started with corsdomain: %v", self.corsString))
- }
+ log.Debug(fmt.Sprintf("Swarm http proxy started on port: %v", self.config.Port))
+
+ if self.config.Cors != "" {
+ log.Debug(fmt.Sprintf("Swarm http proxy started with corsdomain: %v", self.config.Cors))
}
self.periodicallyUpdateGauges()
startCounter.Inc(1)
+ self.streamer.Start(srv)
return nil
}
@@ -336,16 +418,16 @@ func (self *Swarm) periodicallyUpdateGauges() {
}
func (self *Swarm) updateGauges() {
- dbSizeGauge.Update(int64(self.lstore.DbCounter()))
- cacheSizeGauge.Update(int64(self.lstore.CacheCounter()))
uptimeGauge.Update(time.Since(startTime).Nanoseconds())
+ requestsCacheGauge.Update(int64(self.lstore.RequestsCacheLen()))
}
// implements the node.Service interface
// stops all component services.
func (self *Swarm) Stop() error {
- self.dpa.Stop()
- err := self.hive.Stop()
+ if self.ps != nil {
+ self.ps.Stop()
+ }
if ch := self.config.Swap.Chequebook(); ch != nil {
ch.Stop()
ch.Save()
@@ -356,34 +438,45 @@ func (self *Swarm) Stop() error {
}
self.sfs.Stop()
stopCounter.Inc(1)
- return err
+ self.streamer.Stop()
+ return self.bzz.Stop()
}
// implements the node.Service interface
-func (self *Swarm) Protocols() []p2p.Protocol {
- proto, err := network.Bzz(self.depo, self.backend, self.hive, self.dbAccess, self.config.Swap, self.config.SyncParams, self.config.NetworkId)
- if err != nil {
- return nil
+func (self *Swarm) Protocols() (protos []p2p.Protocol) {
+ protos = append(protos, self.bzz.Protocols()...)
+
+ if self.ps != nil {
+ protos = append(protos, self.ps.Protocols()...)
+ }
+ return
+}
+
+func (self *Swarm) RegisterPssProtocol(spec *protocols.Spec, targetprotocol *p2p.Protocol, options *pss.ProtocolParams) (*pss.Protocol, error) {
+ if !pss.IsActiveProtocol {
+ return nil, fmt.Errorf("Pss protocols not available (built with !nopssprotocol tag)")
}
- return []p2p.Protocol{proto}
+ topic := pss.ProtocolTopic(spec)
+ return pss.RegisterProtocol(self.ps, &topic, spec, targetprotocol, options)
}
// implements node.Service
-// Apis returns the RPC Api descriptors the Swarm implementation offers
+// APIs returns the RPC API descriptors the Swarm implementation offers
func (self *Swarm) APIs() []rpc.API {
- return []rpc.API{
+
+ apis := []rpc.API{
// public APIs
{
Namespace: "bzz",
- Version: "0.1",
+ Version: "3.0",
Service: &Info{self.config, chequebook.ContractParams},
Public: true,
},
// admin APIs
{
Namespace: "bzz",
- Version: "0.1",
- Service: api.NewControl(self.api, self.hive),
+ Version: "3.0",
+ Service: api.NewControl(self.api, self.bzz.Hive),
Public: false,
},
{
@@ -414,9 +507,17 @@ func (self *Swarm) APIs() []rpc.API {
},
// {Namespace, Version, api.NewAdmin(self), false},
}
+
+ apis = append(apis, self.bzz.APIs()...)
+
+ if self.ps != nil {
+ apis = append(apis, self.ps.APIs()...)
+ }
+
+ return apis
}
-func (self *Swarm) Api() *api.Api {
+func (self *Swarm) Api() *api.API {
return self.api
}
@@ -427,36 +528,9 @@ func (self *Swarm) SetChequebook(ctx context.Context) error {
return err
}
log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", self.config.Swap.Contract.Hex()))
- self.hive.DropAll()
return nil
}
-// Local swarm without netStore
-func NewLocalSwarm(datadir, port string) (self *Swarm, err error) {
-
- prvKey, err := crypto.GenerateKey()
- if err != nil {
- return
- }
-
- config := api.NewDefaultConfig()
- config.Path = datadir
- config.Init(prvKey)
- config.Port = port
-
- dpa, err := storage.NewLocalDPA(datadir)
- if err != nil {
- return
- }
-
- self = &Swarm{
- api: api.NewApi(dpa, nil),
- config: config,
- }
-
- return
-}
-
// serialisable info about swarm
type Info struct {
*api.Config