From 6ec15610443b28eabf665199f1dc5be2b3e3f7cb Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 18 Feb 2021 17:54:29 +0100 Subject: [PATCH] eth: implement eth66 (#22241) * eth/protocols/eth: split up the eth protocol handlers * eth/protocols/eth: define eth-66 protocol messages * eth/protocols/eth: poc implement getblockheaders on eth/66 * eth/protocols/eth: implement remaining eth-66 handlers * eth/protocols: define handler map for eth 66 * eth/downloader: use protocol constants from eth package * eth/protocols/eth: add ETH66 capability * eth/downloader: tests for eth66 * eth/downloader: fix error in tests * eth/protocols/eth: use eth66 for outgoing requests * eth/protocols/eth: remove unused error type * eth/protocols/eth: define protocol length * eth/protocols/eth: fix pooled tx over eth66 * protocols/eth/handlers: revert behavioural change which caused tests to fail * eth/downloader: fix failing test * eth/protocols/eth: add testcases + fix flaw with header requests * eth/protocols: change comments * eth/protocols/eth: review fixes + fixed flaw in RequestOneHeader * eth/protocols: documentation * eth/protocols/eth: review concerns about types --- eth/downloader/downloader_test.go | 185 ++++++++--- eth/downloader/peer.go | 9 +- eth/protocols/eth/handler.go | 401 ++++------------------- eth/protocols/eth/handlers.go | 510 +++++++++++++++++++++++++++++ eth/protocols/eth/peer.go | 113 ++++++- eth/protocols/eth/protocol.go | 95 +++++- eth/protocols/eth/protocol_test.go | 200 +++++++++++ p2p/message.go | 4 + 8 files changed, 1121 insertions(+), 396 deletions(-) create mode 100644 eth/protocols/eth/handlers.go diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5de1ef3f8e..2917116144 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -515,18 +515,18 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng } } -// Tests that simple synchronization against a canonical chain works correctly. -// In this test common ancestor lookup should be short circuited and not require -// binary searching. -func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) } -func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) } -func TestCanonicalSynchronisation65Full(t *testing.T) { testCanonicalSynchronisation(t, 65, FullSync) } -func TestCanonicalSynchronisation65Fast(t *testing.T) { testCanonicalSynchronisation(t, 65, FastSync) } -func TestCanonicalSynchronisation65Light(t *testing.T) { - testCanonicalSynchronisation(t, 65, LightSync) -} +func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonSync(t, 64, FullSync) } +func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonSync(t, 64, FastSync) } -func testCanonicalSynchronisation(t *testing.T, protocol uint, mode SyncMode) { +func TestCanonicalSynchronisation65Full(t *testing.T) { testCanonSync(t, 65, FullSync) } +func TestCanonicalSynchronisation65Fast(t *testing.T) { testCanonSync(t, 65, FastSync) } +func TestCanonicalSynchronisation65Light(t *testing.T) { testCanonSync(t, 65, LightSync) } + +func TestCanonicalSynchronisation66Full(t *testing.T) { testCanonSync(t, 66, FullSync) } +func TestCanonicalSynchronisation66Fast(t *testing.T) { testCanonSync(t, 66, FastSync) } +func TestCanonicalSynchronisation66Light(t *testing.T) { testCanonSync(t, 66, LightSync) } + +func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() tester := newTester() @@ -547,9 +547,13 @@ func testCanonicalSynchronisation(t *testing.T, protocol uint, mode SyncMode) { // until the cached blocks are retrieved. func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) } func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } + func TestThrottling65Full(t *testing.T) { testThrottling(t, 65, FullSync) } func TestThrottling65Fast(t *testing.T) { testThrottling(t, 65, FastSync) } +func TestThrottling66Full(t *testing.T) { testThrottling(t, 66, FullSync) } +func TestThrottling66Fast(t *testing.T) { testThrottling(t, 66, FastSync) } + func testThrottling(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() tester := newTester() @@ -629,12 +633,17 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { // Tests that simple synchronization against a forked chain works correctly. In // this test common ancestor lookup should *not* be short circuited, and a full // binary search should be executed. -func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) } -func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) } +func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) } +func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) } + func TestForkedSync65Full(t *testing.T) { testForkedSync(t, 65, FullSync) } func TestForkedSync65Fast(t *testing.T) { testForkedSync(t, 65, FastSync) } func TestForkedSync65Light(t *testing.T) { testForkedSync(t, 65, LightSync) } +func TestForkedSync66Full(t *testing.T) { testForkedSync(t, 66, FullSync) } +func TestForkedSync66Fast(t *testing.T) { testForkedSync(t, 66, FastSync) } +func TestForkedSync66Light(t *testing.T) { testForkedSync(t, 66, LightSync) } + func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -660,12 +669,17 @@ func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that synchronising against a much shorter but much heavyer fork works // corrently and is not dropped. -func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) } -func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) } +func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) } +func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) } + func TestHeavyForkedSync65Full(t *testing.T) { testHeavyForkedSync(t, 65, FullSync) } func TestHeavyForkedSync65Fast(t *testing.T) { testHeavyForkedSync(t, 65, FastSync) } func TestHeavyForkedSync65Light(t *testing.T) { testHeavyForkedSync(t, 65, LightSync) } +func TestHeavyForkedSync66Full(t *testing.T) { testHeavyForkedSync(t, 66, FullSync) } +func TestHeavyForkedSync66Fast(t *testing.T) { testHeavyForkedSync(t, 66, FastSync) } +func TestHeavyForkedSync66Light(t *testing.T) { testHeavyForkedSync(t, 66, LightSync) } + func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -693,12 +707,17 @@ func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that chain forks are contained within a certain interval of the current // chain head, ensuring that malicious peers cannot waste resources by feeding // long dead chains. -func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) } -func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) } +func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) } +func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) } + func TestBoundedForkedSync65Full(t *testing.T) { testBoundedForkedSync(t, 65, FullSync) } func TestBoundedForkedSync65Fast(t *testing.T) { testBoundedForkedSync(t, 65, FastSync) } func TestBoundedForkedSync65Light(t *testing.T) { testBoundedForkedSync(t, 65, LightSync) } +func TestBoundedForkedSync66Full(t *testing.T) { testBoundedForkedSync(t, 66, FullSync) } +func TestBoundedForkedSync66Fast(t *testing.T) { testBoundedForkedSync(t, 66, FastSync) } +func TestBoundedForkedSync66Light(t *testing.T) { testBoundedForkedSync(t, 66, LightSync) } + func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -725,12 +744,17 @@ func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that chain forks are contained within a certain interval of the current // chain head for short but heavy forks too. These are a bit special because they // take different ancestor lookup paths. -func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) } -func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) } +func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) } +func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) } + func TestBoundedHeavyForkedSync65Full(t *testing.T) { testBoundedHeavyForkedSync(t, 65, FullSync) } func TestBoundedHeavyForkedSync65Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 65, FastSync) } func TestBoundedHeavyForkedSync65Light(t *testing.T) { testBoundedHeavyForkedSync(t, 65, LightSync) } +func TestBoundedHeavyForkedSync66Full(t *testing.T) { testBoundedHeavyForkedSync(t, 66, FullSync) } +func TestBoundedHeavyForkedSync66Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 66, FastSync) } +func TestBoundedHeavyForkedSync66Light(t *testing.T) { testBoundedHeavyForkedSync(t, 66, LightSync) } + func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() tester := newTester() @@ -775,12 +799,17 @@ func TestInactiveDownloader63(t *testing.T) { } // Tests that a canceled download wipes all previously accumulated state. -func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) } -func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) } +func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) } +func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) } + func TestCancel65Full(t *testing.T) { testCancel(t, 65, FullSync) } func TestCancel65Fast(t *testing.T) { testCancel(t, 65, FastSync) } func TestCancel65Light(t *testing.T) { testCancel(t, 65, LightSync) } +func TestCancel66Full(t *testing.T) { testCancel(t, 66, FullSync) } +func TestCancel66Fast(t *testing.T) { testCancel(t, 66, FastSync) } +func TestCancel66Light(t *testing.T) { testCancel(t, 66, LightSync) } + func testCancel(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -806,12 +835,17 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) { } // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). -func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) } -func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) } +func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) } +func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) } + func TestMultiSynchronisation65Full(t *testing.T) { testMultiSynchronisation(t, 65, FullSync) } func TestMultiSynchronisation65Fast(t *testing.T) { testMultiSynchronisation(t, 65, FastSync) } func TestMultiSynchronisation65Light(t *testing.T) { testMultiSynchronisation(t, 65, LightSync) } +func TestMultiSynchronisation66Full(t *testing.T) { testMultiSynchronisation(t, 66, FullSync) } +func TestMultiSynchronisation66Fast(t *testing.T) { testMultiSynchronisation(t, 66, FastSync) } +func TestMultiSynchronisation66Light(t *testing.T) { testMultiSynchronisation(t, 66, LightSync) } + func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -834,12 +868,17 @@ func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { // Tests that synchronisations behave well in multi-version protocol environments // and not wreak havoc on other nodes in the network. -func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) } -func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) } +func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) } +func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) } + func TestMultiProtoSynchronisation65Full(t *testing.T) { testMultiProtoSync(t, 65, FullSync) } func TestMultiProtoSynchronisation65Fast(t *testing.T) { testMultiProtoSync(t, 65, FastSync) } func TestMultiProtoSynchronisation65Light(t *testing.T) { testMultiProtoSync(t, 65, LightSync) } +func TestMultiProtoSynchronisation66Full(t *testing.T) { testMultiProtoSync(t, 66, FullSync) } +func TestMultiProtoSynchronisation66Fast(t *testing.T) { testMultiProtoSync(t, 66, FastSync) } +func TestMultiProtoSynchronisation66Light(t *testing.T) { testMultiProtoSync(t, 66, LightSync) } + func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -850,9 +889,9 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { chain := testChainBase.shorten(blockCacheMaxItems - 15) // Create peers of every type - tester.newPeer("peer 63", 63, chain) tester.newPeer("peer 64", 64, chain) tester.newPeer("peer 65", 65, chain) + tester.newPeer("peer 66", 66, chain) // Synchronise with the requested peer and make sure all blocks were retrieved if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { @@ -861,7 +900,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { assertOwnChain(t, tester, chain.len()) // Check that no peers have been dropped off - for _, version := range []int{63, 64, 65} { + for _, version := range []int{64, 65, 66} { peer := fmt.Sprintf("peer %d", version) if _, ok := tester.peers[peer]; !ok { t.Errorf("%s dropped", peer) @@ -871,12 +910,17 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that if a block is empty (e.g. header only), no body request should be // made, and instead the header should be assembled into a whole block in itself. -func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) } -func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) } +func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) } +func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) } + func TestEmptyShortCircuit65Full(t *testing.T) { testEmptyShortCircuit(t, 65, FullSync) } func TestEmptyShortCircuit65Fast(t *testing.T) { testEmptyShortCircuit(t, 65, FastSync) } func TestEmptyShortCircuit65Light(t *testing.T) { testEmptyShortCircuit(t, 65, LightSync) } +func TestEmptyShortCircuit66Full(t *testing.T) { testEmptyShortCircuit(t, 66, FullSync) } +func TestEmptyShortCircuit66Fast(t *testing.T) { testEmptyShortCircuit(t, 66, FastSync) } +func TestEmptyShortCircuit66Light(t *testing.T) { testEmptyShortCircuit(t, 66, LightSync) } + func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -923,12 +967,17 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { // Tests that headers are enqueued continuously, preventing malicious nodes from // stalling the downloader by feeding gapped header chains. -func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) } -func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) } +func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) } +func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) } + func TestMissingHeaderAttack65Full(t *testing.T) { testMissingHeaderAttack(t, 65, FullSync) } func TestMissingHeaderAttack65Fast(t *testing.T) { testMissingHeaderAttack(t, 65, FastSync) } func TestMissingHeaderAttack65Light(t *testing.T) { testMissingHeaderAttack(t, 65, LightSync) } +func TestMissingHeaderAttack66Full(t *testing.T) { testMissingHeaderAttack(t, 66, FullSync) } +func TestMissingHeaderAttack66Fast(t *testing.T) { testMissingHeaderAttack(t, 66, FastSync) } +func TestMissingHeaderAttack66Light(t *testing.T) { testMissingHeaderAttack(t, 66, LightSync) } + func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -953,12 +1002,17 @@ func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { // Tests that if requested headers are shifted (i.e. first is missing), the queue // detects the invalid numbering. -func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) } -func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) } +func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) } +func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) } + func TestShiftedHeaderAttack65Full(t *testing.T) { testShiftedHeaderAttack(t, 65, FullSync) } func TestShiftedHeaderAttack65Fast(t *testing.T) { testShiftedHeaderAttack(t, 65, FastSync) } func TestShiftedHeaderAttack65Light(t *testing.T) { testShiftedHeaderAttack(t, 65, LightSync) } +func TestShiftedHeaderAttack66Full(t *testing.T) { testShiftedHeaderAttack(t, 66, FullSync) } +func TestShiftedHeaderAttack66Fast(t *testing.T) { testShiftedHeaderAttack(t, 66, FastSync) } +func TestShiftedHeaderAttack66Light(t *testing.T) { testShiftedHeaderAttack(t, 66, LightSync) } + func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -990,6 +1044,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { // sure no state was corrupted. func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(t, 64, FastSync) } func TestInvalidHeaderRollback65Fast(t *testing.T) { testInvalidHeaderRollback(t, 65, FastSync) } +func TestInvalidHeaderRollback66Fast(t *testing.T) { testInvalidHeaderRollback(t, 66, FastSync) } func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -1079,12 +1134,17 @@ func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) { // Tests that a peer advertising a high TD doesn't get to stall the downloader // afterwards by not sending any useful hashes. -func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) } -func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) } +func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) } +func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) } + func TestHighTDStarvationAttack65Full(t *testing.T) { testHighTDStarvationAttack(t, 65, FullSync) } func TestHighTDStarvationAttack65Fast(t *testing.T) { testHighTDStarvationAttack(t, 65, FastSync) } func TestHighTDStarvationAttack65Light(t *testing.T) { testHighTDStarvationAttack(t, 65, LightSync) } +func TestHighTDStarvationAttack66Full(t *testing.T) { testHighTDStarvationAttack(t, 66, FullSync) } +func TestHighTDStarvationAttack66Fast(t *testing.T) { testHighTDStarvationAttack(t, 66, FastSync) } +func TestHighTDStarvationAttack66Light(t *testing.T) { testHighTDStarvationAttack(t, 66, LightSync) } + func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -1101,6 +1161,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { // Tests that misbehaving peers are disconnected, whilst behaving ones are not. func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) } func TestBlockHeaderAttackerDropping65(t *testing.T) { testBlockHeaderAttackerDropping(t, 65) } +func TestBlockHeaderAttackerDropping66(t *testing.T) { testBlockHeaderAttackerDropping(t, 66) } func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { t.Parallel() @@ -1152,12 +1213,17 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // Tests that synchronisation progress (origin block number, current block number // and highest block number) is tracked and updated correctly. -func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) } -func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) } +func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) } +func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) } + func TestSyncProgress65Full(t *testing.T) { testSyncProgress(t, 65, FullSync) } func TestSyncProgress65Fast(t *testing.T) { testSyncProgress(t, 65, FastSync) } func TestSyncProgress65Light(t *testing.T) { testSyncProgress(t, 65, LightSync) } +func TestSyncProgress66Full(t *testing.T) { testSyncProgress(t, 66, FullSync) } +func TestSyncProgress66Fast(t *testing.T) { testSyncProgress(t, 66, FastSync) } +func TestSyncProgress66Light(t *testing.T) { testSyncProgress(t, 66, LightSync) } + func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -1234,12 +1300,17 @@ func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.Sync // Tests that synchronisation progress (origin block number and highest block // number) is tracked and updated correctly in case of a fork (or manual head // revertal). -func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) } -func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) } +func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) } +func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) } + func TestForkedSyncProgress65Full(t *testing.T) { testForkedSyncProgress(t, 65, FullSync) } func TestForkedSyncProgress65Fast(t *testing.T) { testForkedSyncProgress(t, 65, FastSync) } func TestForkedSyncProgress65Light(t *testing.T) { testForkedSyncProgress(t, 65, LightSync) } +func TestForkedSyncProgress66Full(t *testing.T) { testForkedSyncProgress(t, 66, FullSync) } +func TestForkedSyncProgress66Fast(t *testing.T) { testForkedSyncProgress(t, 66, FastSync) } +func TestForkedSyncProgress66Light(t *testing.T) { testForkedSyncProgress(t, 66, LightSync) } + func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -1308,12 +1379,17 @@ func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // Tests that if synchronisation is aborted due to some failure, then the progress // origin is not updated in the next sync cycle, as it should be considered the // continuation of the previous sync and not a new instance. -func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) } -func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) } +func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) } +func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) } + func TestFailedSyncProgress65Full(t *testing.T) { testFailedSyncProgress(t, 65, FullSync) } func TestFailedSyncProgress65Fast(t *testing.T) { testFailedSyncProgress(t, 65, FastSync) } func TestFailedSyncProgress65Light(t *testing.T) { testFailedSyncProgress(t, 65, LightSync) } +func TestFailedSyncProgress66Full(t *testing.T) { testFailedSyncProgress(t, 66, FullSync) } +func TestFailedSyncProgress66Fast(t *testing.T) { testFailedSyncProgress(t, 66, FastSync) } +func TestFailedSyncProgress66Light(t *testing.T) { testFailedSyncProgress(t, 66, LightSync) } + func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -1379,12 +1455,17 @@ func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // Tests that if an attacker fakes a chain height, after the attack is detected, // the progress height is successfully reduced at the next sync invocation. -func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) } -func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) } +func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) } +func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) } + func TestFakedSyncProgress65Full(t *testing.T) { testFakedSyncProgress(t, 65, FullSync) } func TestFakedSyncProgress65Fast(t *testing.T) { testFakedSyncProgress(t, 65, FastSync) } func TestFakedSyncProgress65Light(t *testing.T) { testFakedSyncProgress(t, 65, LightSync) } +func TestFakedSyncProgress66Full(t *testing.T) { testFakedSyncProgress(t, 66, FullSync) } +func TestFakedSyncProgress66Fast(t *testing.T) { testFakedSyncProgress(t, 66, FastSync) } +func TestFakedSyncProgress66Light(t *testing.T) { testFakedSyncProgress(t, 66, LightSync) } + func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -1454,12 +1535,17 @@ func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // This test reproduces an issue where unexpected deliveries would // block indefinitely if they arrived at the right time. -func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) } -func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } +func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) } +func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } + func TestDeliverHeadersHang65Full(t *testing.T) { testDeliverHeadersHang(t, 65, FullSync) } func TestDeliverHeadersHang65Fast(t *testing.T) { testDeliverHeadersHang(t, 65, FastSync) } func TestDeliverHeadersHang65Light(t *testing.T) { testDeliverHeadersHang(t, 65, LightSync) } +func TestDeliverHeadersHang66Full(t *testing.T) { testDeliverHeadersHang(t, 66, FullSync) } +func TestDeliverHeadersHang66Fast(t *testing.T) { testDeliverHeadersHang(t, 66, FastSync) } +func TestDeliverHeadersHang66Light(t *testing.T) { testDeliverHeadersHang(t, 66, LightSync) } + func testDeliverHeadersHang(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() @@ -1613,12 +1699,17 @@ func TestRemoteHeaderRequestSpan(t *testing.T) { // Tests that peers below a pre-configured checkpoint block are prevented from // being fast-synced from, avoiding potential cheap eclipse attacks. -func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) } -func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) } +func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) } +func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) } + func TestCheckpointEnforcement65Full(t *testing.T) { testCheckpointEnforcement(t, 65, FullSync) } func TestCheckpointEnforcement65Fast(t *testing.T) { testCheckpointEnforcement(t, 65, FastSync) } func TestCheckpointEnforcement65Light(t *testing.T) { testCheckpointEnforcement(t, 65, LightSync) } +func TestCheckpointEnforcement66Full(t *testing.T) { testCheckpointEnforcement(t, 66, FullSync) } +func TestCheckpointEnforcement66Fast(t *testing.T) { testCheckpointEnforcement(t, 66, FastSync) } +func TestCheckpointEnforcement66Light(t *testing.T) { testCheckpointEnforcement(t, 66, LightSync) } + func testCheckpointEnforcement(t *testing.T, protocol uint, mode SyncMode) { t.Parallel() diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index ba90bf31cb..7852569d8e 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -29,6 +29,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -457,7 +458,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.headerThroughput } - return ps.idlePeers(64, 65, idle, throughput) + return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -471,7 +472,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.blockThroughput } - return ps.idlePeers(64, 65, idle, throughput) + return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -485,7 +486,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.receiptThroughput } - return ps.idlePeers(64, 65, idle, throughput) + return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -499,7 +500,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.stateThroughput } - return ps.idlePeers(64, 65, idle, throughput) + return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index e32008fb41..64648ed419 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -17,19 +17,17 @@ package eth import ( - "encoding/json" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -166,6 +164,64 @@ func Handle(backend Backend, peer *Peer) error { } } +type msgHandler func(backend Backend, msg Decoder, peer *Peer) error +type Decoder interface { + Decode(val interface{}) error + Time() time.Time +} + +var eth64 = map[uint64]msgHandler{ + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetNodeDataMsg: handleGetNodeData, + NodeDataMsg: handleNodeData, + GetReceiptsMsg: handleGetReceipts, + ReceiptsMsg: handleReceipts, + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, +} +var eth65 = map[uint64]msgHandler{ + // old 64 messages + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetNodeDataMsg: handleGetNodeData, + NodeDataMsg: handleNodeData, + GetReceiptsMsg: handleGetReceipts, + ReceiptsMsg: handleReceipts, + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + // New eth65 messages + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + GetPooledTransactionsMsg: handleGetPooledTransactions, + PooledTransactionsMsg: handlePooledTransactions, +} + +var eth66 = map[uint64]msgHandler{ + // eth64 announcement messages (no id) + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + // eth65 announcement messages (no id) + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + // eth66 messages with request-id + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetNodeDataMsg: handleGetNodeData66, + NodeDataMsg: handleNodeData66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, +} + // handleMessage is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. func handleMessage(backend Backend, peer *Peer) error { @@ -179,334 +235,15 @@ func handleMessage(backend Backend, peer *Peer) error { } defer msg.Discard() - // Handle the message depending on its contents - switch { - case msg.Code == StatusMsg: - // Status messages should never arrive after the handshake - return fmt.Errorf("%w: uncontrolled status message", errExtraStatusMsg) - - // Block header query, collect the requested headers and reply - case msg.Code == GetBlockHeadersMsg: - // Decode the complex header query - var query GetBlockHeadersPacket - if err := msg.Decode(&query); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - hashMode := query.Origin.Hash != (common.Hash{}) - first := true - maxNonCanonical := uint64(100) - - // Gather headers until the fetch or network limits is reached - var ( - bytes common.StorageSize - headers []*types.Header - unknown bool - lookups int - ) - for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && - len(headers) < maxHeadersServe && lookups < 2*maxHeadersServe { - lookups++ - // Retrieve the next header satisfying the query - var origin *types.Header - if hashMode { - if first { - first = false - origin = backend.Chain().GetHeaderByHash(query.Origin.Hash) - if origin != nil { - query.Origin.Number = origin.Number.Uint64() - } - } else { - origin = backend.Chain().GetHeader(query.Origin.Hash, query.Origin.Number) - } - } else { - origin = backend.Chain().GetHeaderByNumber(query.Origin.Number) - } - if origin == nil { - break - } - headers = append(headers, origin) - bytes += estHeaderSize - - // Advance to the next header of the query - switch { - case hashMode && query.Reverse: - // Hash based traversal towards the genesis block - ancestor := query.Skip + 1 - if ancestor == 0 { - unknown = true - } else { - query.Origin.Hash, query.Origin.Number = backend.Chain().GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) - unknown = (query.Origin.Hash == common.Hash{}) - } - case hashMode && !query.Reverse: - // Hash based traversal towards the leaf block - var ( - current = origin.Number.Uint64() - next = current + query.Skip + 1 - ) - if next <= current { - infos, _ := json.MarshalIndent(peer.Peer.Info(), "", " ") - peer.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) - unknown = true - } else { - if header := backend.Chain().GetHeaderByNumber(next); header != nil { - nextHash := header.Hash() - expOldHash, _ := backend.Chain().GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) - if expOldHash == query.Origin.Hash { - query.Origin.Hash, query.Origin.Number = nextHash, next - } else { - unknown = true - } - } else { - unknown = true - } - } - case query.Reverse: - // Number based traversal towards the genesis block - if query.Origin.Number >= query.Skip+1 { - query.Origin.Number -= query.Skip + 1 - } else { - unknown = true - } - - case !query.Reverse: - // Number based traversal towards the leaf block - query.Origin.Number += query.Skip + 1 - } - } - return peer.SendBlockHeaders(headers) - - case msg.Code == BlockHeadersMsg: - // A batch of headers arrived to one of our previous requests - res := new(BlockHeadersPacket) - if err := msg.Decode(res); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - return backend.Handle(peer, res) - - case msg.Code == GetBlockBodiesMsg: - // Decode the block body retrieval message - var query GetBlockBodiesPacket - if err := msg.Decode(&query); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - // Gather blocks until the fetch or network limits is reached - var ( - bytes int - bodies []rlp.RawValue - ) - for lookups, hash := range query { - if bytes >= softResponseLimit || len(bodies) >= maxBodiesServe || - lookups >= 2*maxBodiesServe { - break - } - if data := backend.Chain().GetBodyRLP(hash); len(data) != 0 { - bodies = append(bodies, data) - bytes += len(data) - } - } - return peer.SendBlockBodiesRLP(bodies) - - case msg.Code == BlockBodiesMsg: - // A batch of block bodies arrived to one of our previous requests - res := new(BlockBodiesPacket) - if err := msg.Decode(res); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - return backend.Handle(peer, res) - - case msg.Code == GetNodeDataMsg: - // Decode the trie node data retrieval message - var query GetNodeDataPacket - if err := msg.Decode(&query); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - bytes int - nodes [][]byte - ) - for lookups, hash := range query { - if bytes >= softResponseLimit || len(nodes) >= maxNodeDataServe || - lookups >= 2*maxNodeDataServe { - break - } - // Retrieve the requested state entry - if bloom := backend.StateBloom(); bloom != nil && !bloom.Contains(hash[:]) { - // Only lookup the trie node if there's chance that we actually have it - continue - } - entry, err := backend.Chain().TrieNode(hash) - if len(entry) == 0 || err != nil { - // Read the contract code with prefix only to save unnecessary lookups. - entry, err = backend.Chain().ContractCodeWithPrefix(hash) - } - if err == nil && len(entry) > 0 { - nodes = append(nodes, entry) - bytes += len(entry) - } - } - return peer.SendNodeData(nodes) - - case msg.Code == NodeDataMsg: - // A batch of node state data arrived to one of our previous requests - res := new(NodeDataPacket) - if err := msg.Decode(res); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - return backend.Handle(peer, res) - - case msg.Code == GetReceiptsMsg: - // Decode the block receipts retrieval message - var query GetReceiptsPacket - if err := msg.Decode(&query); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - bytes int - receipts []rlp.RawValue - ) - for lookups, hash := range query { - if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe || - lookups >= 2*maxReceiptsServe { - break - } - // Retrieve the requested block's receipts - results := backend.Chain().GetReceiptsByHash(hash) - if results == nil { - if header := backend.Chain().GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { - continue - } - } - // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(results); err != nil { - log.Error("Failed to encode receipt", "err", err) - } else { - receipts = append(receipts, encoded) - bytes += len(encoded) - } - } - return peer.SendReceiptsRLP(receipts) - - case msg.Code == ReceiptsMsg: - // A batch of receipts arrived to one of our previous requests - res := new(ReceiptsPacket) - if err := msg.Decode(res); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - return backend.Handle(peer, res) - - case msg.Code == NewBlockHashesMsg: - // A batch of new block announcements just arrived - ann := new(NewBlockHashesPacket) - if err := msg.Decode(ann); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - // Mark the hashes as present at the remote node - for _, block := range *ann { - peer.markBlock(block.Hash) - } - // Deliver them all to the backend for queuing - return backend.Handle(peer, ann) - - case msg.Code == NewBlockMsg: - // Retrieve and decode the propagated block - ann := new(NewBlockPacket) - if err := msg.Decode(ann); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() { - log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash()) - break // TODO(karalabe): return error eventually, but wait a few releases - } - if hash := types.DeriveSha(ann.Block.Transactions(), trie.NewStackTrie(nil)); hash != ann.Block.TxHash() { - log.Warn("Propagated block has invalid body", "have", hash, "exp", ann.Block.TxHash()) - break // TODO(karalabe): return error eventually, but wait a few releases - } - if err := ann.sanityCheck(); err != nil { - return err - } - ann.Block.ReceivedAt = msg.ReceivedAt - ann.Block.ReceivedFrom = peer - - // Mark the peer as owning the block - peer.markBlock(ann.Block.Hash()) - - return backend.Handle(peer, ann) - - case msg.Code == NewPooledTransactionHashesMsg && peer.version >= ETH65: - // New transaction announcement arrived, make sure we have - // a valid and fresh chain to handle them - if !backend.AcceptTxs() { - break - } - ann := new(NewPooledTransactionHashesPacket) - if err := msg.Decode(ann); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - // Schedule all the unknown hashes for retrieval - for _, hash := range *ann { - peer.markTransaction(hash) - } - return backend.Handle(peer, ann) - - case msg.Code == GetPooledTransactionsMsg && peer.version >= ETH65: - // Decode the pooled transactions retrieval message - var query GetPooledTransactionsPacket - if err := msg.Decode(&query); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - // Gather transactions until the fetch or network limits is reached - var ( - bytes int - hashes []common.Hash - txs []rlp.RawValue - ) - for _, hash := range query { - if bytes >= softResponseLimit { - break - } - // Retrieve the requested transaction, skipping if unknown to us - tx := backend.TxPool().Get(hash) - if tx == nil { - continue - } - // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(tx); err != nil { - log.Error("Failed to encode transaction", "err", err) - } else { - hashes = append(hashes, hash) - txs = append(txs, encoded) - bytes += len(encoded) - } - } - return peer.SendPooledTransactionsRLP(hashes, txs) - - case msg.Code == TransactionsMsg || (msg.Code == PooledTransactionsMsg && peer.version >= ETH65): - // Transactions arrived, make sure we have a valid and fresh chain to handle them - if !backend.AcceptTxs() { - break - } - // Transactions can be processed, parse all of them and deliver to the pool - var txs []*types.Transaction - if err := msg.Decode(&txs); err != nil { - return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) - } - for i, tx := range txs { - // Validate and mark the remote transaction - if tx == nil { - return fmt.Errorf("%w: transaction %d is nil", errDecode, i) - } - peer.markTransaction(tx.Hash()) - } - if msg.Code == PooledTransactionsMsg { - return backend.Handle(peer, (*PooledTransactionsPacket)(&txs)) - } - return backend.Handle(peer, (*TransactionsPacket)(&txs)) - - default: - return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code) + var handlers = eth64 + if peer.Version() == ETH65 { + handlers = eth65 + } else if peer.Version() >= ETH66 { + handlers = eth66 } - return nil + + if handler := handlers[msg.Code]; handler != nil { + return handler(backend, msg, peer) + } + return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code) } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go new file mode 100644 index 0000000000..8433fa343a --- /dev/null +++ b/eth/protocols/eth/handlers.go @@ -0,0 +1,510 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package eth + +import ( + "encoding/json" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +// handleGetBlockHeaders handles Block header query, collect the requested headers and reply +func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { + // Decode the complex header query + var query GetBlockHeadersPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetBlockHeadersQuery(backend, &query, peer) + return peer.SendBlockHeaders(response) +} + +// handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders +func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { + // Decode the complex header query + var query GetBlockHeadersPacket66 + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetBlockHeadersQuery(backend, query.GetBlockHeadersPacket, peer) + return peer.ReplyBlockHeaders(query.RequestId, response) +} + +func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, peer *Peer) []*types.Header { + hashMode := query.Origin.Hash != (common.Hash{}) + first := true + maxNonCanonical := uint64(100) + + // Gather headers until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + lookups int + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && + len(headers) < maxHeadersServe && lookups < 2*maxHeadersServe { + lookups++ + // Retrieve the next header satisfying the query + var origin *types.Header + if hashMode { + if first { + first = false + origin = backend.Chain().GetHeaderByHash(query.Origin.Hash) + if origin != nil { + query.Origin.Number = origin.Number.Uint64() + } + } else { + origin = backend.Chain().GetHeader(query.Origin.Hash, query.Origin.Number) + } + } else { + origin = backend.Chain().GetHeaderByNumber(query.Origin.Number) + } + if origin == nil { + break + } + headers = append(headers, origin) + bytes += estHeaderSize + + // Advance to the next header of the query + switch { + case hashMode && query.Reverse: + // Hash based traversal towards the genesis block + ancestor := query.Skip + 1 + if ancestor == 0 { + unknown = true + } else { + query.Origin.Hash, query.Origin.Number = backend.Chain().GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) + unknown = (query.Origin.Hash == common.Hash{}) + } + case hashMode && !query.Reverse: + // Hash based traversal towards the leaf block + var ( + current = origin.Number.Uint64() + next = current + query.Skip + 1 + ) + if next <= current { + infos, _ := json.MarshalIndent(peer.Peer.Info(), "", " ") + peer.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) + unknown = true + } else { + if header := backend.Chain().GetHeaderByNumber(next); header != nil { + nextHash := header.Hash() + expOldHash, _ := backend.Chain().GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) + if expOldHash == query.Origin.Hash { + query.Origin.Hash, query.Origin.Number = nextHash, next + } else { + unknown = true + } + } else { + unknown = true + } + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= query.Skip + 1 + } else { + unknown = true + } + + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += query.Skip + 1 + } + } + return headers +} + +func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error { + // Decode the block body retrieval message + var query GetBlockBodiesPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetBlockBodiesQuery(backend, query, peer) + return peer.SendBlockBodiesRLP(response) +} + +func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { + // Decode the block body retrieval message + var query GetBlockBodiesPacket66 + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetBlockBodiesQuery(backend, query.GetBlockBodiesPacket, peer) + return peer.ReplyBlockBodiesRLP(query.RequestId, response) +} + +func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer *Peer) []rlp.RawValue { + // Gather blocks until the fetch or network limits is reached + var ( + bytes int + bodies []rlp.RawValue + ) + for lookups, hash := range query { + if bytes >= softResponseLimit || len(bodies) >= maxBodiesServe || + lookups >= 2*maxBodiesServe { + break + } + if data := backend.Chain().GetBodyRLP(hash); len(data) != 0 { + bodies = append(bodies, data) + bytes += len(data) + } + } + return bodies +} + +func handleGetNodeData(backend Backend, msg Decoder, peer *Peer) error { + // Decode the trie node data retrieval message + var query GetNodeDataPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetNodeDataQuery(backend, query, peer) + return peer.SendNodeData(response) +} + +func handleGetNodeData66(backend Backend, msg Decoder, peer *Peer) error { + // Decode the trie node data retrieval message + var query GetNodeDataPacket66 + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetNodeDataQuery(backend, query.GetNodeDataPacket, peer) + return peer.ReplyNodeData(query.RequestId, response) +} + +func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer) [][]byte { + // Gather state data until the fetch or network limits is reached + var ( + bytes int + nodes [][]byte + ) + for lookups, hash := range query { + if bytes >= softResponseLimit || len(nodes) >= maxNodeDataServe || + lookups >= 2*maxNodeDataServe { + break + } + // Retrieve the requested state entry + if bloom := backend.StateBloom(); bloom != nil && !bloom.Contains(hash[:]) { + // Only lookup the trie node if there's chance that we actually have it + continue + } + entry, err := backend.Chain().TrieNode(hash) + if len(entry) == 0 || err != nil { + // Read the contract code with prefix only to save unnecessary lookups. + entry, err = backend.Chain().ContractCodeWithPrefix(hash) + } + if err == nil && len(entry) > 0 { + nodes = append(nodes, entry) + bytes += len(entry) + } + } + return nodes +} + +func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error { + // Decode the block receipts retrieval message + var query GetReceiptsPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetReceiptsQuery(backend, query, peer) + return peer.SendReceiptsRLP(response) +} + +func handleGetReceipts66(backend Backend, msg Decoder, peer *Peer) error { + // Decode the block receipts retrieval message + var query GetReceiptsPacket66 + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetReceiptsQuery(backend, query.GetReceiptsPacket, peer) + return peer.ReplyReceiptsRLP(query.RequestId, response) +} + +func answerGetReceiptsQuery(backend Backend, query GetReceiptsPacket, peer *Peer) []rlp.RawValue { + // Gather state data until the fetch or network limits is reached + var ( + bytes int + receipts []rlp.RawValue + ) + for lookups, hash := range query { + if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe || + lookups >= 2*maxReceiptsServe { + break + } + // Retrieve the requested block's receipts + results := backend.Chain().GetReceiptsByHash(hash) + if results == nil { + if header := backend.Chain().GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + log.Error("Failed to encode receipt", "err", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) + } + } + return receipts +} + +func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { + // A batch of new block announcements just arrived + ann := new(NewBlockHashesPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Mark the hashes as present at the remote node + for _, block := range *ann { + peer.markBlock(block.Hash) + } + // Deliver them all to the backend for queuing + return backend.Handle(peer, ann) +} + +func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { + // Retrieve and decode the propagated block + ann := new(NewBlockPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() { + log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash()) + return nil // TODO(karalabe): return error eventually, but wait a few releases + } + if hash := types.DeriveSha(ann.Block.Transactions(), trie.NewStackTrie(nil)); hash != ann.Block.TxHash() { + log.Warn("Propagated block has invalid body", "have", hash, "exp", ann.Block.TxHash()) + return nil // TODO(karalabe): return error eventually, but wait a few releases + } + if err := ann.sanityCheck(); err != nil { + return err + } + ann.Block.ReceivedAt = msg.Time() + ann.Block.ReceivedFrom = peer + + // Mark the peer as owning the block + peer.markBlock(ann.Block.Hash()) + + return backend.Handle(peer, ann) +} + +func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { + // A batch of headers arrived to one of our previous requests + res := new(BlockHeadersPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + +func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { + // A batch of headers arrived to one of our previous requests + res := new(BlockHeadersPacket66) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, &res.BlockHeadersPacket) +} + +func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error { + // A batch of block bodies arrived to one of our previous requests + res := new(BlockBodiesPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + +func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { + // A batch of block bodies arrived to one of our previous requests + res := new(BlockBodiesPacket66) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, &res.BlockBodiesPacket) +} + +func handleNodeData(backend Backend, msg Decoder, peer *Peer) error { + // A batch of node state data arrived to one of our previous requests + res := new(NodeDataPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + +func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { + // A batch of node state data arrived to one of our previous requests + res := new(NodeDataPacket66) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, &res.NodeDataPacket) +} + +func handleReceipts(backend Backend, msg Decoder, peer *Peer) error { + // A batch of receipts arrived to one of our previous requests + res := new(ReceiptsPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + +func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { + // A batch of receipts arrived to one of our previous requests + res := new(ReceiptsPacket66) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, &res.ReceiptsPacket) +} + +func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + ann := new(NewPooledTransactionHashesPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Schedule all the unknown hashes for retrieval + for _, hash := range *ann { + peer.markTransaction(hash) + } + return backend.Handle(peer, ann) +} + +func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error { + // Decode the pooled transactions retrieval message + var query GetPooledTransactionsPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + hashes, txs := answerGetPooledTransactions(backend, query, peer) + return peer.SendPooledTransactionsRLP(hashes, txs) +} + +func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { + // Decode the pooled transactions retrieval message + var query GetPooledTransactionsPacket66 + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsPacket, peer) + return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs) +} + +func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsPacket, peer *Peer) ([]common.Hash, []rlp.RawValue) { + // Gather transactions until the fetch or network limits is reached + var ( + bytes int + hashes []common.Hash + txs []rlp.RawValue + ) + for _, hash := range query { + if bytes >= softResponseLimit { + break + } + // Retrieve the requested transaction, skipping if unknown to us + tx := backend.TxPool().Get(hash) + if tx == nil { + continue + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(tx); err != nil { + log.Error("Failed to encode transaction", "err", err) + } else { + hashes = append(hashes, hash) + txs = append(txs, encoded) + bytes += len(encoded) + } + } + return hashes, txs +} + +func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + // Transactions can be processed, parse all of them and deliver to the pool + var txs TransactionsPacket + if err := msg.Decode(&txs); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return fmt.Errorf("%w: transaction %d is nil", errDecode, i) + } + peer.markTransaction(tx.Hash()) + } + return backend.Handle(peer, &txs) +} + +func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error { + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + // Transactions can be processed, parse all of them and deliver to the pool + var txs PooledTransactionsPacket + if err := msg.Decode(&txs); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return fmt.Errorf("%w: transaction %d is nil", errDecode, i) + } + peer.markTransaction(tx.Hash()) + } + return backend.Handle(peer, &txs) +} + +func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + // Transactions can be processed, parse all of them and deliver to the pool + var txs PooledTransactionsPacket66 + if err := msg.Decode(&txs); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + for i, tx := range txs.PooledTransactionsPacket { + // Validate and mark the remote transaction + if tx == nil { + return fmt.Errorf("%w: transaction %d is nil", errDecode, i) + } + peer.markTransaction(tx.Hash()) + } + return backend.Handle(peer, &txs.PooledTransactionsPacket) +} diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 735ef78ce7..709fca8655 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -18,6 +18,7 @@ package eth import ( "math/big" + "math/rand" "sync" mapset "github.com/deckarep/golang-set" @@ -267,6 +268,22 @@ func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValu return p2p.Send(p.rw, PooledTransactionsMsg, txs) // Not packed into PooledTransactionsPacket to avoid RLP decoding } +// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP. +func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) { + p.knownTxs.Pop() + } + for _, hash := range hashes { + p.knownTxs.Add(hash) + } + // Not packed into PooledTransactionsPacket to avoid RLP decoding + return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{ + RequestId: id, + PooledTransactionsRLPPacket: txs, + }) +} + // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { @@ -308,7 +325,10 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { p.knownBlocks.Pop() } p.knownBlocks.Add(block.Hash()) - return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{block, td}) + return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{ + Block: block, + TD: td, + }) } // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If @@ -331,9 +351,12 @@ func (p *Peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers)) } -// SendBlockBodies sends a batch of block contents to the remote peer. -func (p *Peer) SendBlockBodies(bodies []*BlockBody) error { - return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesPacket(bodies)) +// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders. +func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error { + return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{ + RequestId: id, + BlockHeadersPacket: headers, + }) } // SendBlockBodiesRLP sends a batch of block contents to the remote peer from @@ -342,52 +365,98 @@ func (p *Peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { return p2p.Send(p.rw, BlockBodiesMsg, bodies) // Not packed into BlockBodiesPacket to avoid RLP decoding } +// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP. +func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { + // Not packed into BlockBodiesPacket to avoid RLP decoding + return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesRLPPacket66{ + RequestId: id, + BlockBodiesRLPPacket: bodies, + }) +} + // SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the // hashes requested. func (p *Peer) SendNodeData(data [][]byte) error { return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket(data)) } +// ReplyNodeData is the eth/66 response to GetNodeData. +func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error { + return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{ + RequestId: id, + NodeDataPacket: data, + }) +} + // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the // ones requested from an already RLP encoded format. func (p *Peer) SendReceiptsRLP(receipts []rlp.RawValue) error { return p2p.Send(p.rw, ReceiptsMsg, receipts) // Not packed into ReceiptsPacket to avoid RLP decoding } +// ReplyReceiptsRLP is the eth/66 response to GetReceipts. +func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { + return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{ + RequestId: id, + ReceiptsRLPPacket: receipts, + }) +} + // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *Peer) RequestOneHeader(hash common.Hash) error { p.Log().Debug("Fetching single header", "hash", hash) - return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket{ + query := GetBlockHeadersPacket{ Origin: HashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false, - }) + } + if p.Version() >= ETH66 { + return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ + RequestId: rand.Uint64(), + GetBlockHeadersPacket: &query, + }) + } + return p2p.Send(p.rw, GetBlockHeadersMsg, &query) } // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket{ + query := GetBlockHeadersPacket{ Origin: HashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, - }) + } + if p.Version() >= ETH66 { + return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ + RequestId: rand.Uint64(), + GetBlockHeadersPacket: &query, + }) + } + return p2p.Send(p.rw, GetBlockHeadersMsg, &query) } // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket{ + query := GetBlockHeadersPacket{ Origin: HashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, - }) + } + if p.Version() >= ETH66 { + return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ + RequestId: rand.Uint64(), + GetBlockHeadersPacket: &query, + }) + } + return p2p.Send(p.rw, GetBlockHeadersMsg, &query) } // ExpectRequestHeadersByNumber is a testing method to mirror the recipient side @@ -406,6 +475,12 @@ func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int, // specified. func (p *Peer) RequestBodies(hashes []common.Hash) error { p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) + if p.Version() >= ETH66 { + return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{ + RequestId: rand.Uint64(), + GetBlockBodiesPacket: hashes, + }) + } return p2p.Send(p.rw, GetBlockBodiesMsg, GetBlockBodiesPacket(hashes)) } @@ -413,17 +488,35 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error { // data, corresponding to the specified hashes. func (p *Peer) RequestNodeData(hashes []common.Hash) error { p.Log().Debug("Fetching batch of state data", "count", len(hashes)) + if p.Version() >= ETH66 { + return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{ + RequestId: rand.Uint64(), + GetNodeDataPacket: hashes, + }) + } return p2p.Send(p.rw, GetNodeDataMsg, GetNodeDataPacket(hashes)) } // RequestReceipts fetches a batch of transaction receipts from a remote node. func (p *Peer) RequestReceipts(hashes []common.Hash) error { p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) + if p.Version() >= ETH66 { + return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{ + RequestId: rand.Uint64(), + GetReceiptsPacket: hashes, + }) + } return p2p.Send(p.rw, GetReceiptsMsg, GetReceiptsPacket(hashes)) } // RequestTxs fetches a batch of transactions from a remote node. func (p *Peer) RequestTxs(hashes []common.Hash) error { p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) + if p.Version() >= ETH66 { + return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{ + RequestId: rand.Uint64(), + GetPooledTransactionsPacket: hashes, + }) + } return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes)) } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 9fff64b72a..7f1832754f 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -32,6 +32,7 @@ import ( const ( ETH64 = 64 ETH65 = 65 + ETH66 = 66 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -40,11 +41,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH65, ETH64} +var ProtocolVersions = []uint{ETH66, ETH65, ETH64} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH65: 17, ETH64: 17} +var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17, ETH64: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -79,7 +80,6 @@ var ( errNetworkIDMismatch = errors.New("network ID mismatch") errGenesisMismatch = errors.New("genesis mismatch") errForkIDRejected = errors.New("fork ID rejected") - errExtraStatusMsg = errors.New("extra status message") ) // Packet represents a p2p message in the `eth` protocol. @@ -129,6 +129,12 @@ type GetBlockHeadersPacket struct { Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) } +// GetBlockHeadersPacket represents a block header query over eth/66 +type GetBlockHeadersPacket66 struct { + RequestId uint64 + *GetBlockHeadersPacket +} + // HashOrNumber is a combined field for specifying an origin block. type HashOrNumber struct { Hash common.Hash // Block hash from which to retrieve headers (excludes Number) @@ -168,6 +174,12 @@ func (hn *HashOrNumber) DecodeRLP(s *rlp.Stream) error { // BlockHeadersPacket represents a block header response. type BlockHeadersPacket []*types.Header +// BlockHeadersPacket represents a block header response over eth/66. +type BlockHeadersPacket66 struct { + RequestId uint64 + BlockHeadersPacket +} + // NewBlockPacket is the network packet for the block propagation message. type NewBlockPacket struct { Block *types.Block @@ -190,9 +202,32 @@ func (request *NewBlockPacket) sanityCheck() error { // GetBlockBodiesPacket represents a block body query. type GetBlockBodiesPacket []common.Hash +// GetBlockBodiesPacket represents a block body query over eth/66. +type GetBlockBodiesPacket66 struct { + RequestId uint64 + GetBlockBodiesPacket +} + // BlockBodiesPacket is the network packet for block content distribution. type BlockBodiesPacket []*BlockBody +// BlockBodiesPacket is the network packet for block content distribution over eth/66. +type BlockBodiesPacket66 struct { + RequestId uint64 + BlockBodiesPacket +} + +// BlockBodiesRLPPacket is used for replying to block body requests, in cases +// where we already have them RLP-encoded, and thus can avoid the decode-encode +// roundtrip. +type BlockBodiesRLPPacket []rlp.RawValue + +// BlockBodiesRLPPacket66 is the BlockBodiesRLPPacket over eth/66 +type BlockBodiesRLPPacket66 struct { + RequestId uint64 + BlockBodiesRLPPacket +} + // BlockBody represents the data content of a single block. type BlockBody struct { Transactions []*types.Transaction // Transactions contained within a block @@ -215,24 +250,78 @@ func (p *BlockBodiesPacket) Unpack() ([][]*types.Transaction, [][]*types.Header) // GetNodeDataPacket represents a trie node data query. type GetNodeDataPacket []common.Hash +// GetNodeDataPacket represents a trie node data query over eth/66. +type GetNodeDataPacket66 struct { + RequestId uint64 + GetNodeDataPacket +} + // NodeDataPacket is the network packet for trie node data distribution. type NodeDataPacket [][]byte +// NodeDataPacket is the network packet for trie node data distribution over eth/66. +type NodeDataPacket66 struct { + RequestId uint64 + NodeDataPacket +} + // GetReceiptsPacket represents a block receipts query. type GetReceiptsPacket []common.Hash +// GetReceiptsPacket represents a block receipts query over eth/66. +type GetReceiptsPacket66 struct { + RequestId uint64 + GetReceiptsPacket +} + // ReceiptsPacket is the network packet for block receipts distribution. type ReceiptsPacket [][]*types.Receipt +// ReceiptsPacket is the network packet for block receipts distribution over eth/66. +type ReceiptsPacket66 struct { + RequestId uint64 + ReceiptsPacket +} + +// ReceiptsRLPPacket is used for receipts, when we already have it encoded +type ReceiptsRLPPacket []rlp.RawValue + +// ReceiptsPacket66 is the eth-66 version of ReceiptsRLPPacket +type ReceiptsRLPPacket66 struct { + RequestId uint64 + ReceiptsRLPPacket +} + // NewPooledTransactionHashesPacket represents a transaction announcement packet. type NewPooledTransactionHashesPacket []common.Hash // GetPooledTransactionsPacket represents a transaction query. type GetPooledTransactionsPacket []common.Hash +type GetPooledTransactionsPacket66 struct { + RequestId uint64 + GetPooledTransactionsPacket +} + // PooledTransactionsPacket is the network packet for transaction distribution. type PooledTransactionsPacket []*types.Transaction +// PooledTransactionsPacket is the network packet for transaction distribution over eth/66. +type PooledTransactionsPacket66 struct { + RequestId uint64 + PooledTransactionsPacket +} + +// PooledTransactionsPacket is the network packet for transaction distribution, used +// in the cases we already have them in rlp-encoded form +type PooledTransactionsRLPPacket []rlp.RawValue + +// PooledTransactionsRLPPacket66 is the eth/66 form of PooledTransactionsRLPPacket +type PooledTransactionsRLPPacket66 struct { + RequestId uint64 + PooledTransactionsRLPPacket +} + func (*StatusPacket) Name() string { return "Status" } func (*StatusPacket) Kind() byte { return StatusMsg } diff --git a/eth/protocols/eth/protocol_test.go b/eth/protocols/eth/protocol_test.go index 056ea56480..d92f3ea837 100644 --- a/eth/protocols/eth/protocol_test.go +++ b/eth/protocols/eth/protocol_test.go @@ -17,9 +17,12 @@ package eth import ( + "bytes" + "math/big" "testing" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" ) @@ -66,3 +69,200 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) { } } } + +// TestEth66EmptyMessages tests encoding of empty eth66 messages +func TestEth66EmptyMessages(t *testing.T) { + // All empty messages encodes to the same format + want := common.FromHex("c4820457c0") + + for i, msg := range []interface{}{ + // Headers + GetBlockHeadersPacket66{1111, nil}, + BlockHeadersPacket66{1111, nil}, + // Bodies + GetBlockBodiesPacket66{1111, nil}, + BlockBodiesPacket66{1111, nil}, + BlockBodiesRLPPacket66{1111, nil}, + // Node data + GetNodeDataPacket66{1111, nil}, + NodeDataPacket66{1111, nil}, + // Receipts + GetReceiptsPacket66{1111, nil}, + ReceiptsPacket66{1111, nil}, + // Transactions + GetPooledTransactionsPacket66{1111, nil}, + PooledTransactionsPacket66{1111, nil}, + PooledTransactionsRLPPacket66{1111, nil}, + + // Headers + BlockHeadersPacket66{1111, BlockHeadersPacket([]*types.Header{})}, + // Bodies + GetBlockBodiesPacket66{1111, GetBlockBodiesPacket([]common.Hash{})}, + BlockBodiesPacket66{1111, BlockBodiesPacket([]*BlockBody{})}, + BlockBodiesRLPPacket66{1111, BlockBodiesRLPPacket([]rlp.RawValue{})}, + // Node data + GetNodeDataPacket66{1111, GetNodeDataPacket([]common.Hash{})}, + NodeDataPacket66{1111, NodeDataPacket([][]byte{})}, + // Receipts + GetReceiptsPacket66{1111, GetReceiptsPacket([]common.Hash{})}, + ReceiptsPacket66{1111, ReceiptsPacket([][]*types.Receipt{})}, + // Transactions + GetPooledTransactionsPacket66{1111, GetPooledTransactionsPacket([]common.Hash{})}, + PooledTransactionsPacket66{1111, PooledTransactionsPacket([]*types.Transaction{})}, + PooledTransactionsRLPPacket66{1111, PooledTransactionsRLPPacket([]rlp.RawValue{})}, + } { + if have, _ := rlp.EncodeToBytes(msg); !bytes.Equal(have, want) { + t.Errorf("test %d, type %T, have\n\t%x\nwant\n\t%x", i, msg, have, want) + } + } + +} + +// TestEth66Messages tests the encoding of all redefined eth66 messages +func TestEth66Messages(t *testing.T) { + + // Some basic structs used during testing + var ( + header *types.Header + blockBody *BlockBody + blockBodyRlp rlp.RawValue + txs []*types.Transaction + txRlps []rlp.RawValue + hashes []common.Hash + receipts []*types.Receipt + receiptsRlp rlp.RawValue + + err error + ) + header = &types.Header{ + Difficulty: big.NewInt(2222), + Number: big.NewInt(3333), + GasLimit: 4444, + GasUsed: 5555, + Time: 6666, + Extra: []byte{0x77, 0x88}, + } + // Init the transactions, taken from a different test + { + for _, hexrlp := range []string{ + "f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10", + "f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afb", + } { + var tx *types.Transaction + rlpdata := common.FromHex(hexrlp) + if err := rlp.DecodeBytes(rlpdata, &tx); err != nil { + t.Fatal(err) + } + txs = append(txs, tx) + txRlps = append(txRlps, rlpdata) + } + } + // init the block body data, both object and rlp form + blockBody = &BlockBody{ + Transactions: txs, + Uncles: []*types.Header{header}, + } + blockBodyRlp, err = rlp.EncodeToBytes(blockBody) + if err != nil { + t.Fatal(err) + } + + hashes = []common.Hash{ + common.HexToHash("deadc0de"), + common.HexToHash("feedbeef"), + } + byteSlices := [][]byte{ + common.FromHex("deadc0de"), + common.FromHex("feedbeef"), + } + // init the receipts + { + receipts = []*types.Receipt{ + &types.Receipt{ + Status: types.ReceiptStatusFailed, + CumulativeGasUsed: 1, + Logs: []*types.Log{ + { + Address: common.BytesToAddress([]byte{0x11}), + Topics: []common.Hash{common.HexToHash("dead"), common.HexToHash("beef")}, + Data: []byte{0x01, 0x00, 0xff}, + }, + }, + TxHash: hashes[0], + ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}), + GasUsed: 111111, + }, + } + rlpData, err := rlp.EncodeToBytes(receipts) + if err != nil { + t.Fatal(err) + } + receiptsRlp = rlpData + } + + for i, tc := range []struct { + message interface{} + want []byte + }{ + { + GetBlockHeadersPacket66{1111, &GetBlockHeadersPacket{HashOrNumber{hashes[0], 0}, 5, 5, false}}, + common.FromHex("e8820457e4a000000000000000000000000000000000000000000000000000000000deadc0de050580"), + }, + { + GetBlockHeadersPacket66{1111, &GetBlockHeadersPacket{HashOrNumber{common.Hash{}, 9999}, 5, 5, false}}, + common.FromHex("ca820457c682270f050580"), + }, + { + BlockHeadersPacket66{1111, BlockHeadersPacket{header}}, + common.FromHex("f90202820457f901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000"), + }, + { + GetBlockBodiesPacket66{1111, GetBlockBodiesPacket(hashes)}, + common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"), + }, + { + BlockBodiesPacket66{1111, BlockBodiesPacket([]*BlockBody{blockBody})}, + common.FromHex("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000"), + }, + { // Identical to non-rlp-shortcut version + BlockBodiesRLPPacket66{1111, BlockBodiesRLPPacket([]rlp.RawValue{blockBodyRlp})}, + common.FromHex("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000"), + }, + { + GetNodeDataPacket66{1111, GetNodeDataPacket(hashes)}, + common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"), + }, + { + NodeDataPacket66{1111, NodeDataPacket(byteSlices)}, + common.FromHex("ce820457ca84deadc0de84feedbeef"), + }, + { + GetReceiptsPacket66{1111, GetReceiptsPacket(hashes)}, + common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"), + }, + { + ReceiptsPacket66{1111, ReceiptsPacket([][]*types.Receipt{receipts})}, + common.FromHex("f90172820457f9016cf90169f901668001b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ff"), + }, + { + ReceiptsRLPPacket66{1111, ReceiptsRLPPacket([]rlp.RawValue{receiptsRlp})}, + common.FromHex("f90172820457f9016cf90169f901668001b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ff"), + }, + { + GetPooledTransactionsPacket66{1111, GetPooledTransactionsPacket(hashes)}, + common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"), + }, + { + PooledTransactionsPacket66{1111, PooledTransactionsPacket(txs)}, + common.FromHex("f8d7820457f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afb"), + }, + { + PooledTransactionsRLPPacket66{1111, PooledTransactionsRLPPacket(txRlps)}, + common.FromHex("f8d7820457f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afb"), + }, + } { + if have, _ := rlp.EncodeToBytes(tc.message); !bytes.Equal(have, tc.want) { + t.Errorf("test %d, type %T, have\n\t%x\nwant\n\t%x", i, tc.message, have, tc.want) + } + } +} diff --git a/p2p/message.go b/p2p/message.go index 10b55a939c..bd048138c3 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -70,6 +70,10 @@ func (msg Msg) Discard() error { return err } +func (msg Msg) Time() time.Time { + return msg.ReceivedAt +} + type MsgReader interface { ReadMsg() (Msg, error) }