diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-08-09 09:05:48 +0800 |
---|---|---|
committer | Wei-Ning Huang <aitjcize@gmail.com> | 2018-08-09 09:05:48 +0800 |
commit | 2cf6003621a744ae5b625443774bf885f70acd51 (patch) | |
tree | 10fa7d27aaa6d1c65d5dbc3567d1fa4eca145e38 | |
parent | 295c7b5efbc36f59e3ae8d10bc3abc3a5d17e785 (diff) | |
download | tangerine-consensus-2cf6003621a744ae5b625443774bf885f70acd51.tar.gz tangerine-consensus-2cf6003621a744ae5b625443774bf885f70acd51.tar.zst tangerine-consensus-2cf6003621a744ae5b625443774bf885f70acd51.zip |
simulation: Fix k8s simulation issues. (#36)
* Refine peer server
* k8s ignore
* Keep peer server alive on k8s
* Stop validators from accepting new blocks after peer server has shut down.
* Add comment
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | simulation/peer-server.go | 33 | ||||
-rw-r--r-- | simulation/validator.go | 14 |
3 files changed, 41 insertions, 9 deletions
@@ -20,5 +20,8 @@ vendor build simulation/kubernetes/build +simulation/kubernetes/*.log +simulation/kubernetes/config.toml +simulation/kubernetes/validator.yaml .dep diff --git a/simulation/peer-server.go b/simulation/peer-server.go index bae323e..43a1c2b 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -49,6 +49,15 @@ func NewPeerServer() *PeerServer { } } +// isValidator checks if vID is in p.peers. If peer server restarts but +// validators are not, it will cause panic if validators send message. +func (p *PeerServer) isValidator(vID types.ValidatorID) bool { + p.peersMu.Lock() + defer p.peersMu.Unlock() + _, exist := p.peers[vID] + return exist +} + // Run starts the peer server. func (p *PeerServer) Run(configPath string) { cfg, err := config.Read(configPath) @@ -84,6 +93,7 @@ func (p *PeerServer) Run(configPath string) { if len(p.peers) == cfg.Validator.Num { log.Println("All peers connected.") } + w.WriteHeader(http.StatusOK) } peersHandler := func(w http.ResponseWriter, r *http.Request) { @@ -102,6 +112,7 @@ func (p *PeerServer) Run(configPath string) { return } + w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") w.Write(jsonText) } @@ -131,6 +142,7 @@ func (p *PeerServer) Run(configPath string) { return } + w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") w.Write(jsonText) } @@ -139,8 +151,6 @@ func (p *PeerServer) Run(configPath string) { defer r.Body.Close() idString := r.Header.Get("ID") - - defer r.Body.Close() body, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -159,6 +169,11 @@ func (p *PeerServer) Run(configPath string) { return } + if !p.isValidator(id) { + w.WriteHeader(http.StatusForbidden) + return + } + w.WriteHeader(http.StatusOK) p.peerTotalOrderMu.Lock() @@ -187,15 +202,17 @@ func (p *PeerServer) Run(configPath string) { stopServer := make(chan struct{}) messageHandler := func(w http.ResponseWriter, r *http.Request) { - p.peersMu.Lock() - defer p.peersMu.Unlock() defer r.Body.Close() idString := r.Header.Get("ID") id := types.ValidatorID{} id.UnmarshalText([]byte(idString)) - defer r.Body.Close() + if !p.isValidator(id) { + w.WriteHeader(http.StatusForbidden) + return + } + body, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -264,4 +281,10 @@ func (p *PeerServer) Run(configPath string) { if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("Error starting server %v\n", err) } + + // Do not exit when we are in TCP node, since k8s will restart the pod and + // cause confusions. + if cfg.Networking.Type == config.NetworkTypeTCP { + select {} + } } diff --git a/simulation/validator.go b/simulation/validator.go index 69ee317..a30420e 100644 --- a/simulation/validator.go +++ b/simulation/validator.go @@ -78,11 +78,11 @@ func (v *Validator) GetID() types.ValidatorID { func (v *Validator) Run() { v.msgChannel = v.network.Join(v) - isStopped := make(chan struct{}) + isStopped := make(chan struct{}, 2) isShutdown := make(chan struct{}) v.BroadcastGenesisBlock() - go v.MsgServer() + go v.MsgServer(isStopped) go v.CheckServerInfo(isShutdown) go v.BlockProposer(isStopped, isShutdown) @@ -116,10 +116,15 @@ func (v *Validator) CheckServerInfo(isShutdown chan struct{}) { } // MsgServer listen to the network channel for message and handle it. -func (v *Validator) MsgServer() { +func (v *Validator) MsgServer(isStopped chan struct{}) { var pendingBlocks []*types.Block for { - msg := <-v.msgChannel + var msg interface{} + select { + case msg = <-v.msgChannel: + case <-isStopped: + return + } switch val := msg.(type) { case *types.Block: @@ -200,6 +205,7 @@ ProposingBlockLoop: select { case <-isShutdown: isStopped <- struct{}{} + isStopped <- struct{}{} break ProposingBlockLoop default: break |