aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpc/server.go29
-rw-r--r--rpc/subscription.go1
-rw-r--r--rpc/subscription_test.go16
-rw-r--r--rpc/types.go13
-rw-r--r--rpc/utils.go27
5 files changed, 24 insertions, 62 deletions
diff --git a/rpc/server.go b/rpc/server.go
index 62b84af34..30c288349 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -29,11 +29,7 @@ import (
"gopkg.in/fatih/set.v0"
)
-const (
- notificationBufferSize = 10000 // max buffered notifications before codec is closed
-
- MetadataApi = "rpc"
-)
+const MetadataApi = "rpc"
// CodecOption specifies which type of messages this codec supports
type CodecOption int
@@ -49,10 +45,9 @@ const (
// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{
- services: make(serviceRegistry),
- subscriptions: make(subscriptionRegistry),
- codecs: set.New(),
- run: 1,
+ services: make(serviceRegistry),
+ codecs: set.New(),
+ run: 1,
}
// register a default service which will provide meta information about the RPC service such as the services and
@@ -124,16 +119,6 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil
}
-// hasOption returns true if option is included in options, otherwise false
-func hasOption(option CodecOption, options []CodecOption) bool {
- for _, o := range options {
- if option == o {
- return true
- }
- }
- return false
-}
-
// serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec.
//
@@ -148,13 +133,11 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
- log.Error(fmt.Sprint(string(buf)))
+ log.Error(string(buf))
}
s.codecsMu.Lock()
s.codecs.Remove(codec)
s.codecsMu.Unlock()
-
- return
}()
ctx, cancel := context.WithCancel(context.Background())
@@ -246,7 +229,7 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
// close all codecs which will cancel pending requests/subscriptions.
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
- log.Debug(fmt.Sprint("RPC Server shutdown initiatied"))
+ log.Debug("RPC Server shutdown initiatied")
s.codecsMu.Lock()
defer s.codecsMu.Unlock()
s.codecs.Each(func(c interface{}) bool {
diff --git a/rpc/subscription.go b/rpc/subscription.go
index 720e4dd06..6ce7befa1 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -53,7 +53,6 @@ type notifierKey struct{}
type Notifier struct {
codec ServerCodec
subMu sync.RWMutex // guards active and inactive maps
- stopped bool
active map[ID]*Subscription
inactive map[ID]*Subscription
}
diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go
index 0ed15ddfe..39f759692 100644
--- a/rpc/subscription_test.go
+++ b/rpc/subscription_test.go
@@ -165,7 +165,7 @@ func TestNotifications(t *testing.T) {
}
func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse,
- failures chan<- jsonErrResponse, notifications chan<- jsonNotification) {
+ failures chan<- jsonErrResponse, notifications chan<- jsonNotification, errors chan<- error) {
// read and parse server messages
for {
@@ -177,12 +177,14 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
var responses []map[string]interface{}
if rmsg[0] == '[' {
if err := json.Unmarshal(rmsg, &responses); err != nil {
- t.Fatalf("Received invalid message: %s", rmsg)
+ errors <- fmt.Errorf("Received invalid message: %s", rmsg)
+ return
}
} else {
var msg map[string]interface{}
if err := json.Unmarshal(rmsg, &msg); err != nil {
- t.Fatalf("Received invalid message: %s", rmsg)
+ errors <- fmt.Errorf("Received invalid message: %s", rmsg)
+ return
}
responses = append(responses, msg)
}
@@ -216,7 +218,7 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
}
continue
}
- t.Fatalf("Received invalid message: %s", msg)
+ errors <- fmt.Errorf("Received invalid message: %s", msg)
}
}
}
@@ -235,6 +237,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
successes = make(chan jsonSuccessResponse)
failures = make(chan jsonErrResponse)
notifications = make(chan jsonNotification)
+
+ errors = make(chan error, 10)
)
// setup and start server
@@ -248,7 +252,7 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
defer server.Stop()
// wait for message and write them to the given channels
- go waitForMessages(t, in, successes, failures, notifications)
+ go waitForMessages(t, in, successes, failures, notifications, errors)
// create subscriptions one by one
n := 3
@@ -297,6 +301,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
}
select {
+ case err := <-errors:
+ t.Fatal(err)
case suc := <-successes: // subscription created
subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
case failure := <-failures:
diff --git a/rpc/types.go b/rpc/types.go
index a7b8c9788..f2375604e 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -48,7 +48,6 @@ type callback struct {
// service represents a registered object
type service struct {
name string // name for service
- rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // receiver type
callbacks callbacks // registered handlers
subscriptions subscriptions // available subscriptions/notifications
@@ -58,23 +57,19 @@ type service struct {
type serverRequest struct {
id interface{}
svcname string
- rcvr reflect.Value
callb *callback
args []reflect.Value
isUnsubscribe bool
err Error
}
-type serviceRegistry map[string]*service // collection of services
-type callbacks map[string]*callback // collection of RPC callbacks
-type subscriptions map[string]*callback // collection of subscription callbacks
-type subscriptionRegistry map[string]*callback // collection of subscription callbacks
+type serviceRegistry map[string]*service // collection of services
+type callbacks map[string]*callback // collection of RPC callbacks
+type subscriptions map[string]*callback // collection of subscription callbacks
// Server represents a RPC server
type Server struct {
- services serviceRegistry
- muSubcriptions sync.Mutex // protects subscriptions
- subscriptions subscriptionRegistry
+ services serviceRegistry
run int32
codecsMu sync.Mutex
diff --git a/rpc/utils.go b/rpc/utils.go
index 2506c4833..9315cab59 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -119,21 +119,6 @@ func isHexNum(t reflect.Type) bool {
return t == bigIntType
}
-var blockNumberType = reflect.TypeOf((*BlockNumber)(nil)).Elem()
-
-// Indication if the given block is a BlockNumber
-func isBlockNumber(t reflect.Type) bool {
- if t == nil {
- return false
- }
-
- for t.Kind() == reflect.Ptr {
- t = t.Elem()
- }
-
- return t == blockNumberType
-}
-
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
@@ -210,18 +195,12 @@ METHODS:
}
switch mtype.NumOut() {
- case 0, 1:
- break
- case 2:
- if h.errPos == -1 { // method must one return value and 1 error
+ case 0, 1, 2:
+ if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
continue METHODS
}
- break
- default:
- continue METHODS
+ callbacks[mname] = &h
}
-
- callbacks[mname] = &h
}
return callbacks, subscriptions