From 86ec742f975d825f42dd69ebf17b0adaa66542c0 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 31 Jan 2019 11:48:54 +0100 Subject: [PATCH] p2p/discover: improve table addition code (#18974) This change clears up confusion around the two ways in which nodes can be added to the table. When a neighbors packet is received as a reply to findnode, the nodes contained in the reply are added as 'seen' entries if sufficient space is available. When a ping is received and the endpoint verification has taken place, the remote node is added as a 'verified' entry or moved to the front of the bucket if present. This also updates the node's IP address and port if they have changed. --- p2p/discover/table.go | 117 ++++++++++++++++++++++---------- p2p/discover/table_test.go | 96 ++++++++++++++++++++++++-- p2p/discover/table_util_test.go | 20 +----- p2p/discover/udp.go | 4 +- 4 files changed, 175 insertions(+), 62 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index ba4c063271..ef0c08afce 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -328,7 +328,7 @@ func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) { // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll // just remove those again during revalidation. for _, n := range r { - tab.add(n) + tab.addSeenNode(n) } reply <- r } @@ -443,7 +443,7 @@ func (tab *Table) loadSeedNodes() { seed := seeds[i] age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }} log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) - tab.add(seed) + tab.addSeenNode(seed) } } @@ -468,7 +468,7 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { // The node responded, move it to the front. last.livenessChecks++ log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) - b.bump(last) + tab.bumpInBucket(b, last) return } // No reply received, pick a replacement or delete the node if there aren't @@ -551,12 +551,12 @@ func (tab *Table) bucket(id enode.ID) *bucket { return tab.buckets[d-bucketMinDistance-1] } -// add attempts to add the given node to its corresponding bucket. If the bucket has space -// available, adding the node succeeds immediately. Otherwise, the node is added if the -// least recently active node in the bucket does not respond to a ping packet. +// addSeenNode adds a node which may or may not be live to the end of a bucket. If the +// bucket has space available, adding the node succeeds immediately. Otherwise, the node is +// added to the replacements list. // // The caller must not hold tab.mutex. -func (tab *Table) add(n *node) { +func (tab *Table) addSeenNode(n *node) { if n.ID() == tab.self().ID() { return } @@ -564,23 +564,68 @@ func (tab *Table) add(n *node) { tab.mutex.Lock() defer tab.mutex.Unlock() b := tab.bucket(n.ID()) - if !tab.bumpOrAdd(b, n) { - // Node is not in table. Add it to the replacement list. + if contains(b.entries, n.ID()) { + // Already in bucket, don't add. + return + } + if len(b.entries) >= bucketSize { + // Bucket full, maybe add as replacement. tab.addReplacement(b, n) + return + } + if !tab.addIP(b, n.IP()) { + // Can't add: IP limit reached. + return + } + // Add to end of bucket: + b.entries = append(b.entries, n) + b.replacements = deleteNode(b.replacements, n) + n.addedAt = time.Now() + if tab.nodeAddedHook != nil { + tab.nodeAddedHook(n) } } -// addThroughPing adds the given node to the table. Compared to plain -// 'add' there is an additional safety measure: if the table is still -// initializing the node is not added. This prevents an attack where the -// table could be filled by just sending ping repeatedly. +// addVerifiedNode adds a node whose existence has been verified recently to the front of a +// bucket. If the node is already in the bucket, it is moved to the front. If the bucket +// has no space, the node is added to the replacements list. +// +// There is an additional safety measure: if the table is still initializing the node +// is not added. This prevents an attack where the table could be filled by just sending +// ping repeatedly. // // The caller must not hold tab.mutex. -func (tab *Table) addThroughPing(n *node) { +func (tab *Table) addVerifiedNode(n *node) { if !tab.isInitDone() { return } - tab.add(n) + if n.ID() == tab.self().ID() { + return + } + + tab.mutex.Lock() + defer tab.mutex.Unlock() + b := tab.bucket(n.ID()) + if tab.bumpInBucket(b, n) { + // Already in bucket, moved to front. + return + } + if len(b.entries) >= bucketSize { + // Bucket full, maybe add as replacement. + tab.addReplacement(b, n) + return + } + if !tab.addIP(b, n.IP()) { + // Can't add: IP limit reached. + return + } + // Add to front of bucket. + b.entries, _ = pushNode(b.entries, n, bucketSize) + b.replacements = deleteNode(b.replacements, n) + n.addedAt = time.Now() + if tab.nodeAddedHook != nil { + tab.nodeAddedHook(n) + } } // delete removes an entry from the node table. It is used to evacuate dead nodes. @@ -651,12 +696,21 @@ func (tab *Table) replace(b *bucket, last *node) *node { return r } -// bump moves the given node to the front of the bucket entry list +// bumpInBucket moves the given node to the front of the bucket entry list // if it is contained in that list. -func (b *bucket) bump(n *node) bool { +func (tab *Table) bumpInBucket(b *bucket, n *node) bool { for i := range b.entries { if b.entries[i].ID() == n.ID() { - // move it to the front + if !n.IP().Equal(b.entries[i].IP()) { + // Endpoint has changed, ensure that the new IP fits into table limits. + tab.removeIP(b, b.entries[i].IP()) + if !tab.addIP(b, n.IP()) { + // It doesn't, put the previous one back. + tab.addIP(b, b.entries[i].IP()) + return false + } + } + // Move it to the front. copy(b.entries[1:], b.entries[:i]) b.entries[0] = n return true @@ -665,29 +719,20 @@ func (b *bucket) bump(n *node) bool { return false } -// bumpOrAdd moves n to the front of the bucket entry list or adds it if the list isn't -// full. The return value is true if n is in the bucket. -func (tab *Table) bumpOrAdd(b *bucket, n *node) bool { - if b.bump(n) { - return true - } - if len(b.entries) >= bucketSize || !tab.addIP(b, n.IP()) { - return false - } - b.entries, _ = pushNode(b.entries, n, bucketSize) - b.replacements = deleteNode(b.replacements, n) - n.addedAt = time.Now() - if tab.nodeAddedHook != nil { - tab.nodeAddedHook(n) - } - return true -} - func (tab *Table) deleteInBucket(b *bucket, n *node) { b.entries = deleteNode(b.entries, n) tab.removeIP(b, n.IP()) } +func contains(ns []*node, id enode.ID) bool { + for _, n := range ns { + if n.ID() == id { + return true + } + } + return false +} + // pushNode adds n to the front of list, keeping at most max items. func pushNode(list []*node, n *node, max int) ([]*node, *node) { if len(list) < max { diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index b00a932118..b5622e3a27 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/p2p/netutil" ) func TestTable_pingReplace(t *testing.T) { @@ -64,7 +65,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding // its bucket if it is unresponsive. Revalidate again to ensure that transport.dead[last.ID()] = !lastInBucketIsResponding transport.dead[pingSender.ID()] = !newNodeIsResponding - tab.add(pingSender) + tab.addSeenNode(pingSender) tab.doRevalidate(make(chan struct{}, 1)) tab.doRevalidate(make(chan struct{}, 1)) @@ -114,10 +115,14 @@ func TestBucket_bumpNoDuplicates(t *testing.T) { } prop := func(nodes []*node, bumps []int) (ok bool) { + tab, db := newTestTable(newPingRecorder()) + defer db.Close() + defer tab.Close() + b := &bucket{entries: make([]*node, len(nodes))} copy(b.entries, nodes) for i, pos := range bumps { - b.bump(b.entries[pos]) + tab.bumpInBucket(b, b.entries[pos]) if hasDuplicates(b.entries) { t.Logf("bucket has duplicates after %d/%d bumps:", i+1, len(bumps)) for _, n := range b.entries { @@ -126,6 +131,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) { return false } } + checkIPLimitInvariant(t, tab) return true } if err := quick.Check(prop, cfg); err != nil { @@ -142,11 +148,12 @@ func TestTable_IPLimit(t *testing.T) { for i := 0; i < tableIPLimit+1; i++ { n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)}) - tab.add(n) + tab.addSeenNode(n) } if tab.len() > tableIPLimit { t.Errorf("too many nodes in table") } + checkIPLimitInvariant(t, tab) } // This checks that the per-bucket IP limit is applied correctly. @@ -159,11 +166,28 @@ func TestTable_BucketIPLimit(t *testing.T) { d := 3 for i := 0; i < bucketIPLimit+1; i++ { n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)}) - tab.add(n) + tab.addSeenNode(n) } if tab.len() > bucketIPLimit { t.Errorf("too many nodes in table") } + checkIPLimitInvariant(t, tab) +} + +// checkIPLimitInvariant checks that ip limit sets contain an entry for every +// node in the table and no extra entries. +func checkIPLimitInvariant(t *testing.T, tab *Table) { + t.Helper() + + tabset := netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit} + for _, b := range tab.buckets { + for _, n := range b.entries { + tabset.Add(n.IP()) + } + } + if tabset.String() != tab.ips.String() { + t.Errorf("table IP set is incorrect:\nhave: %v\nwant: %v", tab.ips, tabset) + } } func TestTable_closest(t *testing.T) { @@ -281,6 +305,69 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value { return reflect.ValueOf(t) } +func TestTable_addVerifiedNode(t *testing.T) { + tab, db := newTestTable(newPingRecorder()) + <-tab.initDone + defer db.Close() + defer tab.Close() + + // Insert two nodes. + n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + tab.addSeenNode(n1) + tab.addSeenNode(n2) + + // Verify bucket content: + bcontent := []*node{n1, n2} + if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { + t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries) + } + + // Add a changed version of n2. + newrec := n2.Record() + newrec.Set(enr.IP{99, 99, 99, 99}) + newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) + tab.addVerifiedNode(newn2) + + // Check that bucket is updated correctly. + newBcontent := []*node{newn2, n1} + if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, newBcontent) { + t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries) + } + checkIPLimitInvariant(t, tab) +} + +func TestTable_addSeenNode(t *testing.T) { + tab, db := newTestTable(newPingRecorder()) + <-tab.initDone + defer db.Close() + defer tab.Close() + + // Insert two nodes. + n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + tab.addSeenNode(n1) + tab.addSeenNode(n2) + + // Verify bucket content: + bcontent := []*node{n1, n2} + if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { + t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries) + } + + // Add a changed version of n2. + newrec := n2.Record() + newrec.Set(enr.IP{99, 99, 99, 99}) + newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) + tab.addSeenNode(newn2) + + // Check that bucket content is unchanged. + if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { + t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries) + } + checkIPLimitInvariant(t, tab) +} + func TestTable_Lookup(t *testing.T) { tab, db := newTestTable(lookupTestnet) defer db.Close() @@ -535,7 +622,6 @@ func (tn *preminedTestnet) findnode(toid enode.ID, toaddr *net.UDPAddr, target e } func (*preminedTestnet) close() {} -func (*preminedTestnet) waitping(from enode.ID) error { return nil } func (*preminedTestnet) ping(toid enode.ID, toaddr *net.UDPAddr) error { return nil } // mine generates a testnet struct literal with nodes at diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 3ce582b994..e61c9e6fc5 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -86,17 +86,8 @@ func fillBucket(tab *Table, n *node) (last *node) { // fillTable adds nodes the table to the end of their corresponding bucket // if the bucket is not full. The caller must not hold tab.mutex. func fillTable(tab *Table, nodes []*node) { - tab.mutex.Lock() - defer tab.mutex.Unlock() - for _, n := range nodes { - if n.ID() == tab.self().ID() { - continue // don't add self - } - b := tab.bucket(n.ID()) - if len(b.entries) < bucketSize { - tab.bumpOrAdd(b, n) - } + tab.addSeenNode(n) } } @@ -154,15 +145,6 @@ func hasDuplicates(slice []*node) bool { return false } -func contains(ns []*node, id enode.ID) bool { - for _, n := range ns { - if n.ID() == id { - return true - } - } - return false -} - func sortedByDistanceTo(distbase enode.ID, slice []*node) bool { var last enode.ID for i, e := range slice { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 5ce4c43dcf..df9a3065fa 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -661,10 +661,10 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) n := wrapNode(enode.NewV4(req.senderKey, from.IP, int(req.From.TCP), from.Port)) if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration { t.sendPing(fromID, from, func() { - t.tab.addThroughPing(n) + t.tab.addVerifiedNode(n) }) } else { - t.tab.addThroughPing(n) + t.tab.addVerifiedNode(n) } // Update node database and endpoint predictor.