p2p/discover: watch find failures, evacuate on too many, rebond if failed

This commit is contained in:
Péter Szilágyi 2015-05-25 15:04:40 +03:00 committed by Felix Lange
parent 64174f196f
commit 6078aa08eb
1 changed files with 47 additions and 8 deletions

View File

@ -27,6 +27,7 @@ const (
nBuckets = hashBits + 1 // Number of buckets nBuckets = hashBits + 1 // Number of buckets
maxBondingPingPongs = 16 maxBondingPingPongs = 16
maxFindnodeFailures = 5
) )
type Table struct { type Table struct {
@ -198,7 +199,19 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
asked[n.ID] = true asked[n.ID] = true
pendingQueries++ pendingQueries++
go func() { go func() {
r, _ := tab.net.findnode(n.ID, n.addr(), targetID) // Find potential neighbors to bond with
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil {
// Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails)
glog.V(logger.Detail).Infof("Bumping failures for %x: %d", n.ID[:8], fails)
if fails > maxFindnodeFailures {
glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails)
tab.del(n)
}
}
reply <- tab.bondall(r) reply <- tab.bondall(r)
}() }()
} }
@ -305,8 +318,15 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
// If pinged is true, the remote node has just pinged us and one half // If pinged is true, the remote node has just pinged us and one half
// of the process can be skipped. // of the process can be skipped.
func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
var n *Node // Retrieve a previously known node and any recent findnode failures
if n = tab.db.node(id); n == nil { node, fails := tab.db.node(id), 0
if node != nil {
fails = tab.db.findFails(id)
}
// If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch
if node == nil || fails > 0 {
glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails)
tab.bondmu.Lock() tab.bondmu.Lock()
w := tab.bonding[id] w := tab.bonding[id]
if w != nil { if w != nil {
@ -325,18 +345,22 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
delete(tab.bonding, id) delete(tab.bonding, id)
tab.bondmu.Unlock() tab.bondmu.Unlock()
} }
n = w.n node = w.n
if w.err != nil { if w.err != nil {
return nil, w.err return nil, w.err
} }
} }
// Bonding succeeded, add to the table and reset previous findnode failures
tab.mutex.Lock() tab.mutex.Lock()
defer tab.mutex.Unlock() defer tab.mutex.Unlock()
b := tab.buckets[logdist(tab.self.sha, n.sha)]
if !b.bump(n) { b := tab.buckets[logdist(tab.self.sha, node.sha)]
tab.pingreplace(n, b) if !b.bump(node) {
tab.pingreplace(node, b)
} }
return n, nil tab.db.updateFindFails(id, 0)
return node, nil
} }
func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) { func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
@ -414,6 +438,21 @@ outer:
} }
} }
// del removes an entry from the node table (used to evacuate failed/non-bonded
// discovery peers).
func (tab *Table) del(node *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
for i := range bucket.entries {
if bucket.entries[i].ID == node.ID {
bucket.entries = append(bucket.entries[:i], bucket.entries[i+1:]...)
return
}
}
}
func (b *bucket) bump(n *Node) bool { func (b *bucket) bump(n *Node) bool {
for i := range b.entries { for i := range b.entries {
if b.entries[i].ID == n.ID { if b.entries[i].ID == n.ID {