aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-08-09 09:05:48 +0800
committerWei-Ning Huang <aitjcize@gmail.com>2018-08-09 09:05:48 +0800
commit2cf6003621a744ae5b625443774bf885f70acd51 (patch)
tree10fa7d27aaa6d1c65d5dbc3567d1fa4eca145e38
parent295c7b5efbc36f59e3ae8d10bc3abc3a5d17e785 (diff)
downloadtangerine-consensus-2cf6003621a744ae5b625443774bf885f70acd51.tar.gz
tangerine-consensus-2cf6003621a744ae5b625443774bf885f70acd51.tar.zst
tangerine-consensus-2cf6003621a744ae5b625443774bf885f70acd51.zip
simulation: Fix k8s simulation issues. (#36)
* Refine peer server * k8s ignore * Keep peer server alive on k8s * Stop validators from accepting new blocks after peer server has shut down. * Add comment
-rw-r--r--.gitignore3
-rw-r--r--simulation/peer-server.go33
-rw-r--r--simulation/validator.go14
3 files changed, 41 insertions, 9 deletions
diff --git a/.gitignore b/.gitignore
index ef110dd..f8cb90c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,5 +20,8 @@
vendor
build
simulation/kubernetes/build
+simulation/kubernetes/*.log
+simulation/kubernetes/config.toml
+simulation/kubernetes/validator.yaml
.dep
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index bae323e..43a1c2b 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -49,6 +49,15 @@ func NewPeerServer() *PeerServer {
}
}
+// isValidator checks if vID is in p.peers. If peer server restarts but
+// validators are not, it will cause panic if validators send message.
+func (p *PeerServer) isValidator(vID types.ValidatorID) bool {
+ p.peersMu.Lock()
+ defer p.peersMu.Unlock()
+ _, exist := p.peers[vID]
+ return exist
+}
+
// Run starts the peer server.
func (p *PeerServer) Run(configPath string) {
cfg, err := config.Read(configPath)
@@ -84,6 +93,7 @@ func (p *PeerServer) Run(configPath string) {
if len(p.peers) == cfg.Validator.Num {
log.Println("All peers connected.")
}
+ w.WriteHeader(http.StatusOK)
}
peersHandler := func(w http.ResponseWriter, r *http.Request) {
@@ -102,6 +112,7 @@ func (p *PeerServer) Run(configPath string) {
return
}
+ w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write(jsonText)
}
@@ -131,6 +142,7 @@ func (p *PeerServer) Run(configPath string) {
return
}
+ w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write(jsonText)
}
@@ -139,8 +151,6 @@ func (p *PeerServer) Run(configPath string) {
defer r.Body.Close()
idString := r.Header.Get("ID")
-
- defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
@@ -159,6 +169,11 @@ func (p *PeerServer) Run(configPath string) {
return
}
+ if !p.isValidator(id) {
+ w.WriteHeader(http.StatusForbidden)
+ return
+ }
+
w.WriteHeader(http.StatusOK)
p.peerTotalOrderMu.Lock()
@@ -187,15 +202,17 @@ func (p *PeerServer) Run(configPath string) {
stopServer := make(chan struct{})
messageHandler := func(w http.ResponseWriter, r *http.Request) {
- p.peersMu.Lock()
- defer p.peersMu.Unlock()
defer r.Body.Close()
idString := r.Header.Get("ID")
id := types.ValidatorID{}
id.UnmarshalText([]byte(idString))
- defer r.Body.Close()
+ if !p.isValidator(id) {
+ w.WriteHeader(http.StatusForbidden)
+ return
+ }
+
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
@@ -264,4 +281,10 @@ func (p *PeerServer) Run(configPath string) {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Error starting server %v\n", err)
}
+
+ // Do not exit when we are in TCP node, since k8s will restart the pod and
+ // cause confusions.
+ if cfg.Networking.Type == config.NetworkTypeTCP {
+ select {}
+ }
}
diff --git a/simulation/validator.go b/simulation/validator.go
index 69ee317..a30420e 100644
--- a/simulation/validator.go
+++ b/simulation/validator.go
@@ -78,11 +78,11 @@ func (v *Validator) GetID() types.ValidatorID {
func (v *Validator) Run() {
v.msgChannel = v.network.Join(v)
- isStopped := make(chan struct{})
+ isStopped := make(chan struct{}, 2)
isShutdown := make(chan struct{})
v.BroadcastGenesisBlock()
- go v.MsgServer()
+ go v.MsgServer(isStopped)
go v.CheckServerInfo(isShutdown)
go v.BlockProposer(isStopped, isShutdown)
@@ -116,10 +116,15 @@ func (v *Validator) CheckServerInfo(isShutdown chan struct{}) {
}
// MsgServer listen to the network channel for message and handle it.
-func (v *Validator) MsgServer() {
+func (v *Validator) MsgServer(isStopped chan struct{}) {
var pendingBlocks []*types.Block
for {
- msg := <-v.msgChannel
+ var msg interface{}
+ select {
+ case msg = <-v.msgChannel:
+ case <-isStopped:
+ return
+ }
switch val := msg.(type) {
case *types.Block:
@@ -200,6 +205,7 @@ ProposingBlockLoop:
select {
case <-isShutdown:
isStopped <- struct{}{}
+ isStopped <- struct{}{}
break ProposingBlockLoop
default:
break