diff options
Diffstat (limited to 'rpc/v2/server.go')
-rw-r--r-- | rpc/v2/server.go | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/rpc/v2/server.go b/rpc/v2/server.go index ff6b69015..4c04f04d2 100644 --- a/rpc/v2/server.go +++ b/rpc/v2/server.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" ) // NewServer will create a new server instance with no registered handlers. @@ -120,6 +121,9 @@ func (s *Server) ServeCodec(codec ServerCodec) { codec.Close() }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { reqs, batch, err := s.readRequest(codec) if err != nil { @@ -129,9 +133,9 @@ func (s *Server) ServeCodec(codec ServerCodec) { } if batch { - go s.execBatch(codec, reqs) + go s.execBatch(ctx, codec, reqs) } else { - go s.exec(codec, reqs[0]) + go s.exec(ctx, codec, reqs[0]) } } } @@ -220,7 +224,7 @@ func (s *Server) unsubscribe(subid string) bool { } // handle executes a request and returns the response from the callback. -func (s *Server) handle(codec ServerCodec, req *serverRequest) interface{} { +func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) interface{} { if req.err != nil { return codec.CreateErrorResponse(&req.id, req.err) } @@ -255,6 +259,9 @@ func (s *Server) handle(codec ServerCodec, req *serverRequest) interface{} { } arguments := []reflect.Value{req.callb.rcvr} + if req.callb.hasCtx { + arguments = append(arguments, reflect.ValueOf(ctx)) + } if len(req.args) > 0 { arguments = append(arguments, req.args...) } @@ -277,12 +284,12 @@ func (s *Server) handle(codec ServerCodec, req *serverRequest) interface{} { } // exec executes the given request and writes the result back using the codec. -func (s *Server) exec(codec ServerCodec, req *serverRequest) { +func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) { var response interface{} if req.err != nil { response = codec.CreateErrorResponse(&req.id, req.err) } else { - response = s.handle(codec, req) + response = s.handle(ctx, codec, req) } if err := codec.Write(response); err != nil { @@ -293,13 +300,13 @@ func (s *Server) exec(codec ServerCodec, req *serverRequest) { // execBatch executes the given requests and writes the result back using the codec. It will only write the response // back when the last request is processed. -func (s *Server) execBatch(codec ServerCodec, requests []*serverRequest) { +func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) { responses := make([]interface{}, len(requests)) for i, req := range requests { if req.err != nil { responses[i] = codec.CreateErrorResponse(&req.id, req.err) } else { - responses[i] = s.handle(codec, req) + responses[i] = s.handle(ctx, codec, req) } } |