diff options
-rw-r--r-- | cmd/geth/chaincmd.go | 73 | ||||
-rw-r--r-- | cmd/utils/flags.go | 14 | ||||
-rw-r--r-- | consensus/clique/clique.go | 2 | ||||
-rw-r--r-- | rpc/client.go | 35 | ||||
-rw-r--r-- | rpc/json.go | 36 | ||||
-rw-r--r-- | rpc/server.go | 17 | ||||
-rw-r--r-- | rpc/subscription.go | 14 | ||||
-rw-r--r-- | rpc/subscription_test.go | 160 | ||||
-rw-r--r-- | rpc/types.go | 10 |
9 files changed, 263 insertions, 98 deletions
diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 66516b409..d4a263d60 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -58,10 +58,10 @@ participating. ArgsUsage: "<filename> (<filename 2> ... <filename N>) ", Category: "BLOCKCHAIN COMMANDS", Description: ` -The import command imports blocks from an RLP-encoded form. The form can be one file -with several RLP-encoded blocks, or several files can be used. -If only one file is used, import error will result in failure. If several files are used, -processing will proceed even if an individual RLP-file import failure occurs. +The import command imports blocks from an RLP-encoded form. The form can be one file +with several RLP-encoded blocks, or several files can be used. +If only one file is used, import error will result in failure. If several files are used, +processing will proceed even if an individual RLP-file import failure occurs. `, } exportCommand = cli.Command{ @@ -103,17 +103,14 @@ Use "ethereum dump 0" to dump the genesis block. // initGenesis will initialise the given JSON format genesis file and writes it as // the zero'd block (i.e. genesis) or will fail hard if it can't succeed. func initGenesis(ctx *cli.Context) error { + // Make sure we have a valid genesis JSON genesisPath := ctx.Args().First() if len(genesisPath) == 0 { - utils.Fatalf("must supply path to genesis JSON file") + utils.Fatalf("Must supply path to genesis JSON file") } - - stack := makeFullNode(ctx) - chaindb := utils.MakeChainDatabase(ctx, stack) - file, err := os.Open(genesisPath) if err != nil { - utils.Fatalf("failed to read genesis file: %v", err) + utils.Fatalf("Failed to read genesis file: %v", err) } defer file.Close() @@ -121,12 +118,19 @@ func initGenesis(ctx *cli.Context) error { if err := json.NewDecoder(file).Decode(genesis); err != nil { utils.Fatalf("invalid genesis file: %v", err) } - - _, hash, err := core.SetupGenesisBlock(chaindb, genesis) - if err != nil { - utils.Fatalf("failed to write genesis block: %v", err) + // Open an initialise both full and light databases + stack := makeFullNode(ctx) + for _, name := range []string{"chaindata", "lightchaindata"} { + chaindb, err := stack.OpenDatabase(name, 0, 0) + if err != nil { + utils.Fatalf("Failed to open database: %v", err) + } + _, hash, err := core.SetupGenesisBlock(chaindb, genesis) + if err != nil { + utils.Fatalf("Failed to write genesis block: %v", err) + } + log.Info("Successfully wrote genesis state", "database", name, "hash", hash) } - log.Info("Successfully wrote genesis state", "hash", hash) return nil } @@ -245,24 +249,29 @@ func exportChain(ctx *cli.Context) error { func removeDB(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) - dbdir := stack.ResolvePath(utils.ChainDbName(ctx)) - if !common.FileExist(dbdir) { - fmt.Println(dbdir, "does not exist") - return nil - } - fmt.Println(dbdir) - confirm, err := console.Stdin.PromptConfirm("Remove this database?") - switch { - case err != nil: - utils.Fatalf("%v", err) - case !confirm: - fmt.Println("Operation aborted") - default: - fmt.Println("Removing...") - start := time.Now() - os.RemoveAll(dbdir) - fmt.Printf("Removed in %v\n", time.Since(start)) + for _, name := range []string{"chaindata", "lightchaindata"} { + // Ensure the database exists in the first place + logger := log.New("database", name) + + dbdir := stack.ResolvePath(name) + if !common.FileExist(dbdir) { + logger.Info("Database doesn't exist, skipping", "path", dbdir) + continue + } + // Confirm removal and execute + fmt.Println(dbdir) + confirm, err := console.Stdin.PromptConfirm("Remove this database?") + switch { + case err != nil: + utils.Fatalf("%v", err) + case !confirm: + logger.Warn("Database deletion aborted") + default: + start := time.Now() + os.RemoveAll(dbdir) + logger.Info("Database successfully deleted", "elapsed", common.PrettyDuration(time.Since(start))) + } } return nil } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9e80bba60..c47301dfb 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -908,22 +908,16 @@ func SetupNetwork(ctx *cli.Context) { params.TargetGasLimit = new(big.Int).SetUint64(ctx.GlobalUint64(TargetGasLimitFlag.Name)) } -func ChainDbName(ctx *cli.Context) string { - if ctx.GlobalBool(LightModeFlag.Name) { - return "lightchaindata" - } else { - return "chaindata" - } -} - // MakeChainDatabase open an LevelDB using the flags passed to the client and will hard crash if it fails. func MakeChainDatabase(ctx *cli.Context, stack *node.Node) ethdb.Database { var ( cache = ctx.GlobalInt(CacheFlag.Name) handles = makeDatabaseHandles() - name = ChainDbName(ctx) ) - + name := "chaindata" + if ctx.GlobalBool(LightModeFlag.Name) { + name = "lightchaindata" + } chainDb, err := stack.OpenDatabase(name, cache, handles) if err != nil { Fatalf("Could not open database: %v", err) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 8619bd1d8..675333bcc 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -599,7 +599,7 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch for seen, recent := range snap.Recents { if recent == signer { // Signer is among recents, only wait if the current block doens't shift it out - if limit := uint64(len(snap.Signers)/2 + 1); seen > number-limit { + if limit := uint64(len(snap.Signers)/2 + 1); number < limit || seen > number-limit { log.Info("Signed recently, must wait for others") <-stop return nil, nil diff --git a/rpc/client.go b/rpc/client.go index 2c35ba54a..591986987 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -27,6 +27,7 @@ import ( "net/url" "reflect" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -373,14 +374,14 @@ func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ... return nil, ErrNotificationsUnsupported } - msg, err := c.newMessage(subscribeMethod, args...) + msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...) if err != nil { return nil, err } op := &requestOp{ ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage), - sub: newClientSubscription(c, chanVal), + sub: newClientSubscription(c, "eth", chanVal), } // Send the subscription request. @@ -575,7 +576,7 @@ func (c *Client) closeRequestOps(err error) { } func (c *Client) handleNotification(msg *jsonrpcMessage) { - if msg.Method != notificationMethod { + if !strings.HasSuffix(msg.Method, notificationMethodSuffix) { log.Debug(fmt.Sprint("dropping non-subscription message: ", msg)) return } @@ -653,11 +654,12 @@ func (c *Client) read(conn net.Conn) error { // A ClientSubscription represents a subscription established through EthSubscribe. type ClientSubscription struct { - client *Client - etype reflect.Type - channel reflect.Value - subid string - in chan json.RawMessage + client *Client + etype reflect.Type + channel reflect.Value + namespace string + subid string + in chan json.RawMessage quitOnce sync.Once // ensures quit is closed once quit chan struct{} // quit is closed when the subscription exits @@ -665,14 +667,15 @@ type ClientSubscription struct { err chan error } -func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription { +func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { sub := &ClientSubscription{ - client: c, - etype: channel.Type().Elem(), - channel: channel, - quit: make(chan struct{}), - err: make(chan error, 1), - in: make(chan json.RawMessage), + client: c, + namespace: namespace, + etype: channel.Type().Elem(), + channel: channel, + quit: make(chan struct{}), + err: make(chan error, 1), + in: make(chan json.RawMessage), } return sub } @@ -774,5 +777,5 @@ func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, e func (sub *ClientSubscription) requestUnsubscribe() error { var result interface{} - return sub.client.Call(&result, unsubscribeMethod, sub.subid) + return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) } diff --git a/rpc/json.go b/rpc/json.go index c777fab6e..2e7fd599e 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -30,11 +30,11 @@ import ( ) const ( - jsonrpcVersion = "2.0" - serviceMethodSeparator = "_" - subscribeMethod = "eth_subscribe" - unsubscribeMethod = "eth_unsubscribe" - notificationMethod = "eth_subscription" + jsonrpcVersion = "2.0" + serviceMethodSeparator = "_" + subscribeMethodSuffix = "_subscribe" + unsubscribeMethodSuffix = "_unsubscribe" + notificationMethodSuffix = "_subscription" ) type jsonRequest struct { @@ -164,7 +164,7 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) { } // subscribe are special, they will always use `subscribeMethod` as first param in the payload - if in.Method == subscribeMethod { + if strings.HasSuffix(in.Method, subscribeMethodSuffix) { reqs := []rpcRequest{{id: &in.Id, isPubSub: true}} if len(in.Payload) > 0 { // first param must be subscription name @@ -174,17 +174,16 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) { return nil, false, &invalidRequestError{"Unable to parse subscription request"} } - // all subscriptions are made on the eth service - reqs[0].service, reqs[0].method = "eth", subscribeMethod[0] + reqs[0].service, reqs[0].method = strings.TrimSuffix(in.Method, subscribeMethodSuffix), subscribeMethod[0] reqs[0].params = in.Payload return reqs, false, nil } return nil, false, &invalidRequestError{"Unable to parse subscription request"} } - if in.Method == unsubscribeMethod { + if strings.HasSuffix(in.Method, unsubscribeMethodSuffix) { return []rpcRequest{{id: &in.Id, isPubSub: true, - method: unsubscribeMethod, params: in.Payload}}, false, nil + method: in.Method, params: in.Payload}}, false, nil } elems := strings.Split(in.Method, serviceMethodSeparator) @@ -216,8 +215,8 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) id := &in[i].Id - // subscribe are special, they will always use `subscribeMethod` as first param in the payload - if r.Method == subscribeMethod { + // subscribe are special, they will always use `subscriptionMethod` as first param in the payload + if strings.HasSuffix(r.Method, subscribeMethodSuffix) { requests[i] = rpcRequest{id: id, isPubSub: true} if len(r.Payload) > 0 { // first param must be subscription name @@ -227,8 +226,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) return nil, false, &invalidRequestError{"Unable to parse subscription request"} } - // all subscriptions are made on the eth service - requests[i].service, requests[i].method = "eth", subscribeMethod[0] + requests[i].service, requests[i].method = strings.TrimSuffix(r.Method, subscribeMethodSuffix), subscribeMethod[0] requests[i].params = r.Payload continue } @@ -236,8 +234,8 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) return nil, true, &invalidRequestError{"Unable to parse (un)subscribe request arguments"} } - if r.Method == unsubscribeMethod { - requests[i] = rpcRequest{id: id, isPubSub: true, method: unsubscribeMethod, params: r.Payload} + if strings.HasSuffix(r.Method, unsubscribeMethodSuffix) { + requests[i] = rpcRequest{id: id, isPubSub: true, method: r.Method, params: r.Payload} continue } @@ -325,13 +323,13 @@ func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info } // CreateNotification will create a JSON-RPC notification with the given subscription id and event as params. -func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} { +func (c *jsonCodec) CreateNotification(subid, namespace string, event interface{}) interface{} { if isHexNum(reflect.TypeOf(event)) { - return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, + return &jsonNotification{Version: jsonrpcVersion, Method: namespace + notificationMethodSuffix, Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}} } - return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, + return &jsonNotification{Version: jsonrpcVersion, Method: namespace + notificationMethodSuffix, Params: jsonSubscription{Subscription: subid, Result: event}} } diff --git a/rpc/server.go b/rpc/server.go index 78df37e52..62b84af34 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "runtime" + "strings" "sync" "sync/atomic" @@ -96,32 +97,30 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name()) } + methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) + // already a previous service register under given sname, merge methods/subscriptions if regsvc, present := s.services[name]; present { - methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) if len(methods) == 0 && len(subscriptions) == 0 { return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } - for _, m := range methods { regsvc.callbacks[formatName(m.method.Name)] = m } for _, s := range subscriptions { regsvc.subscriptions[formatName(s.method.Name)] = s } - return nil } svc.name = name - svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) + svc.callbacks, svc.subscriptions = methods, subscriptions if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } s.services[svc.name] = svc - return nil } @@ -303,7 +302,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) - notifier.activate(subid) + notifier.activate(subid, req.svcname) } return codec.CreateResponse(req.id, subid), activateSub @@ -383,7 +382,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s codec.Close() } - // when request holds one of more subscribe requests this allows these subscriptions to be actived + // when request holds one of more subscribe requests this allows these subscriptions to be activated for _, c := range callbacks { c() } @@ -410,7 +409,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) continue } - if r.isPubSub && r.method == unsubscribeMethod { + if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { @@ -439,7 +438,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) } } } else { - requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}} + requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.method, r.method}} } continue } diff --git a/rpc/subscription.go b/rpc/subscription.go index 9ab6af9e1..720e4dd06 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -35,8 +35,9 @@ type ID string // a Subscription is created by a notifier and tight to that notifier. The client can use // this subscription to wait for an unsubscribe request for the client, see Err(). type Subscription struct { - ID ID - err chan error // closed on unsubscribe + ID ID + namespace string + err chan error // closed on unsubscribe } // Err returns a channel that is closed when the client send an unsubscribe request. @@ -78,7 +79,7 @@ func NotifierFromContext(ctx context.Context) (*Notifier, bool) { // are dropped until the subscription is marked as active. This is done // by the RPC server after the subscription ID is send to the client. func (n *Notifier) CreateSubscription() *Subscription { - s := &Subscription{NewID(), make(chan error)} + s := &Subscription{ID: NewID(), err: make(chan error)} n.subMu.Lock() n.inactive[s.ID] = s n.subMu.Unlock() @@ -91,9 +92,9 @@ func (n *Notifier) Notify(id ID, data interface{}) error { n.subMu.RLock() defer n.subMu.RUnlock() - _, active := n.active[id] + sub, active := n.active[id] if active { - notification := n.codec.CreateNotification(string(id), data) + notification := n.codec.CreateNotification(string(id), sub.namespace, data) if err := n.codec.Write(notification); err != nil { n.codec.Close() return err @@ -124,10 +125,11 @@ func (n *Notifier) unsubscribe(id ID) error { // notifications are dropped. This method is called by the RPC server after // the subscription ID was sent to client. This prevents notifications being // send to the client before the subscription ID is send to the client. -func (n *Notifier) activate(id ID) { +func (n *Notifier) activate(id ID, namespace string) { n.subMu.Lock() defer n.subMu.Unlock() if sub, found := n.inactive[id]; found { + sub.namespace = namespace n.active[id] = sub delete(n.inactive, id) } diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index 345b4e5f2..0ed15ddfe 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -19,6 +19,7 @@ package rpc import ( "context" "encoding/json" + "fmt" "net" "sync" "testing" @@ -162,3 +163,162 @@ func TestNotifications(t *testing.T) { t.Error("unsubscribe callback not called after closing connection") } } + +func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse, + failures chan<- jsonErrResponse, notifications chan<- jsonNotification) { + + // read and parse server messages + for { + var rmsg json.RawMessage + if err := in.Decode(&rmsg); err != nil { + return + } + + var responses []map[string]interface{} + if rmsg[0] == '[' { + if err := json.Unmarshal(rmsg, &responses); err != nil { + t.Fatalf("Received invalid message: %s", rmsg) + } + } else { + var msg map[string]interface{} + if err := json.Unmarshal(rmsg, &msg); err != nil { + t.Fatalf("Received invalid message: %s", rmsg) + } + responses = append(responses, msg) + } + + for _, msg := range responses { + // determine what kind of msg was received and broadcast + // it to over the corresponding channel + if _, found := msg["result"]; found { + successes <- jsonSuccessResponse{ + Version: msg["jsonrpc"].(string), + Id: msg["id"], + Result: msg["result"], + } + continue + } + if _, found := msg["error"]; found { + params := msg["params"].(map[string]interface{}) + failures <- jsonErrResponse{ + Version: msg["jsonrpc"].(string), + Id: msg["id"], + Error: jsonError{int(params["subscription"].(float64)), params["message"].(string), params["data"]}, + } + continue + } + if _, found := msg["params"]; found { + params := msg["params"].(map[string]interface{}) + notifications <- jsonNotification{ + Version: msg["jsonrpc"].(string), + Method: msg["method"].(string), + Params: jsonSubscription{params["subscription"].(string), params["result"]}, + } + continue + } + t.Fatalf("Received invalid message: %s", msg) + } + } +} + +// TestSubscriptionMultipleNamespaces ensures that subscriptions can exists +// for multiple different namespaces. +func TestSubscriptionMultipleNamespaces(t *testing.T) { + var ( + namespaces = []string{"eth", "shh", "bzz"} + server = NewServer() + service = NotificationTestService{} + clientConn, serverConn = net.Pipe() + + out = json.NewEncoder(clientConn) + in = json.NewDecoder(clientConn) + successes = make(chan jsonSuccessResponse) + failures = make(chan jsonErrResponse) + notifications = make(chan jsonNotification) + ) + + // setup and start server + for _, namespace := range namespaces { + if err := server.RegisterName(namespace, &service); err != nil { + t.Fatalf("unable to register test service %v", err) + } + } + + go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) + defer server.Stop() + + // wait for message and write them to the given channels + go waitForMessages(t, in, successes, failures, notifications) + + // create subscriptions one by one + n := 3 + for i, namespace := range namespaces { + request := map[string]interface{}{ + "id": i, + "method": fmt.Sprintf("%s_subscribe", namespace), + "version": "2.0", + "params": []interface{}{"someSubscription", n, i}, + } + + if err := out.Encode(&request); err != nil { + t.Fatalf("Could not create subscription: %v", err) + } + } + + // create all subscriptions in 1 batch + var requests []interface{} + for i, namespace := range namespaces { + requests = append(requests, map[string]interface{}{ + "id": i, + "method": fmt.Sprintf("%s_subscribe", namespace), + "version": "2.0", + "params": []interface{}{"someSubscription", n, i}, + }) + } + + if err := out.Encode(&requests); err != nil { + t.Fatalf("Could not create subscription in batch form: %v", err) + } + + timeout := time.After(30 * time.Second) + subids := make(map[string]string, 2*len(namespaces)) + count := make(map[string]int, 2*len(namespaces)) + + for { + done := true + for id, _ := range count { + if count, found := count[id]; !found || count < (2*n) { + done = false + } + } + + if done && len(count) == len(namespaces) { + break + } + + select { + case suc := <-successes: // subscription created + subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string) + case failure := <-failures: + t.Errorf("received error: %v", failure.Error) + case notification := <-notifications: + if cnt, found := count[notification.Params.Subscription]; found { + count[notification.Params.Subscription] = cnt + 1 + } else { + count[notification.Params.Subscription] = 1 + } + case <-timeout: + for _, namespace := range namespaces { + subid, found := subids[namespace] + if !found { + t.Errorf("Subscription for '%s' not created", namespace) + continue + } + if count, found := count[subid]; !found || count < n { + t.Errorf("Didn't receive all notifications (%d<%d) in time for namespace '%s'", count, n, namespace) + } + } + return + } + } +} diff --git a/rpc/types.go b/rpc/types.go index d29281a4a..a7b8c9788 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -104,17 +104,17 @@ type ServerCodec interface { // Read next request ReadRequestHeaders() ([]rpcRequest, bool, Error) // Parse request argument to the given types - ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error) + ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) // Assemble success response, expects response id and payload - CreateResponse(interface{}, interface{}) interface{} + CreateResponse(id interface{}, reply interface{}) interface{} // Assemble error response, expects response id and error - CreateErrorResponse(interface{}, Error) interface{} + CreateErrorResponse(id interface{}, err Error) interface{} // Assemble error response with extra information about the error through info CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} // Create notification response - CreateNotification(string, interface{}) interface{} + CreateNotification(id, namespace string, event interface{}) interface{} // Write msg to client. - Write(interface{}) error + Write(msg interface{}) error // Close underlying data stream Close() // Closed when underlying connection is closed |