From e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 14:28:59 +0300 Subject: [PATCH] whisper: add known message expiration to peers, cleanup --- whisper/peer.go | 175 +++++++++++++++++++++++++++++---------------- whisper/whisper.go | 77 ++++++++++---------- 2 files changed, 151 insertions(+), 101 deletions(-) diff --git a/whisper/peer.go b/whisper/peer.go index e50c9ec37a..f077dbe70b 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -4,106 +4,155 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" ) +// peer represents a whisper protocol peer connection. type peer struct { host *Whisper peer *p2p.Peer ws p2p.MsgReadWriter - // XXX Eventually this is going to reach exceptional large space. We need an expiry here - known *set.Set + known *set.Set // Messages already known by the peer to avoid wasting bandwidth quit chan struct{} } -func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { - return &peer{host, p, ws, set.New(), make(chan struct{})} -} - -func (self *peer) init() error { - if err := self.handleStatus(); err != nil { - return err +// newPeer creates and initializes a new whisper peer connection, returning either +// the newly constructed link or a failure reason. +func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) { + p := &peer{ + host: host, + peer: remote, + ws: rw, + known: set.New(), + quit: make(chan struct{}), } - - return nil + if err := p.handshake(); err != nil { + return nil, err + } + return p, nil } +// start initiates the peer updater, periodically broadcasting the whisper packets +// into the network. func (self *peer) start() { go self.update() self.peer.Debugln("whisper started") } +// stop terminates the peer updater, stopping message forwarding to it. func (self *peer) stop() { - self.peer.Debugln("whisper stopped") - close(self.quit) + self.peer.Debugln("whisper stopped") } -func (self *peer) update() { - relay := time.NewTicker(300 * time.Millisecond) -out: - for { - select { - case <-relay.C: - err := self.broadcast(self.host.envelopes()) - if err != nil { - self.peer.Infoln("broadcast err:", err) - break out - } - - case <-self.quit: - break out - } - } -} - -func (self *peer) broadcast(envelopes []*Envelope) error { - envs := make([]*Envelope, 0, len(envelopes)) - for _, env := range envelopes { - if !self.known.Has(env.Hash()) { - envs = append(envs, env) - self.known.Add(env.Hash()) - } - } - if len(envs) > 0 { - if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil { - return err - } - self.peer.DebugDetailln("broadcasted", len(envs), "message(s)") - } - return nil -} - -func (self *peer) addKnown(envelope *Envelope) { - self.known.Add(envelope.Hash()) -} - -func (self *peer) handleStatus() error { - ws := self.ws - if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil { +// handshake sends the protocol initiation status message to the remote peer and +// verifies the remote status too. +func (self *peer) handshake() error { + // Send own status message, fetch remote one + if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil { return err } - msg, err := ws.ReadMsg() + packet, err := self.ws.ReadMsg() if err != nil { return err } - if msg.Code != statusMsg { - return fmt.Errorf("peer send %x before status msg", msg.Code) + if packet.Code != statusCode { + return fmt.Errorf("peer sent %x before status packet", packet.Code) } - s := rlp.NewStream(msg.Payload) + // Decode the rest of the status packet and verify protocol match + s := rlp.NewStream(packet.Payload) if _, err := s.List(); err != nil { return fmt.Errorf("bad status message: %v", err) } - pv, err := s.Uint() + peerVersion, err := s.Uint() if err != nil { return fmt.Errorf("bad status message: %v", err) } - if pv != protocolVersion { - return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + if peerVersion != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) } - return msg.Discard() // ignore anything after protocol version + return packet.Discard() // ignore anything after protocol version +} + +// update executes periodic operations on the peer, including message transmission +// and expiration. +func (self *peer) update() { + // Start the tickers for the updates + expire := time.NewTicker(expirationTicks) + transmit := time.NewTicker(transmissionTicks) + + // Loop and transmit until termination is requested + for { + select { + case <-expire.C: + self.expire() + + case <-transmit.C: + if err := self.broadcast(); err != nil { + self.peer.Infoln("broadcast failed:", err) + return + } + + case <-self.quit: + return + } + } +} + +// mark marks an envelope known to the peer so that it won't be sent back. +func (self *peer) mark(envelope *Envelope) { + self.known.Add(envelope.Hash()) +} + +// marked checks if an envelope is already known to the remote peer. +func (self *peer) marked(envelope *Envelope) bool { + return self.known.Has(envelope.Hash()) +} + +// expire iterates over all the known envelopes in the host and removes all +// expired (unknown) ones from the known list. +func (self *peer) expire() { + // Assemble the list of available envelopes + available := set.NewNonTS() + for _, envelope := range self.host.envelopes() { + available.Add(envelope.Hash()) + } + // Cross reference availability with known status + unmark := make(map[common.Hash]struct{}) + self.known.Each(func(v interface{}) bool { + if !available.Has(v.(common.Hash)) { + unmark[v.(common.Hash)] = struct{}{} + } + return true + }) + // Dump all known but unavailable + for hash, _ := range unmark { + self.known.Remove(hash) + } +} + +// broadcast iterates over the collection of envelopes and transmits yet unknown +// ones over the network. +func (self *peer) broadcast() error { + // Fetch the envelopes and collect the unknown ones + envelopes := self.host.envelopes() + transmit := make([]*Envelope, 0, len(envelopes)) + for _, envelope := range envelopes { + if !self.marked(envelope) { + transmit = append(transmit, envelope) + self.mark(envelope) + } + } + // Transmit the unknown batch (potentially empty) + if err := p2p.Send(self.ws, messagesCode, transmit); err != nil { + return err + } + self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)") + + return nil } diff --git a/whisper/whisper.go b/whisper/whisper.go index f51f14a9f9..e56c457861 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -16,8 +16,8 @@ import ( ) const ( - statusMsg = 0x00 - envelopesMsg = 0x01 + statusCode = 0x00 + messagesCode = 0x01 protocolVersion uint64 = 0x02 protocolName = "shh" @@ -25,7 +25,8 @@ const ( signatureFlag = byte(1 << 7) signatureLength = 65 - expirationTicks = 800 * time.Millisecond + expirationTicks = 800 * time.Millisecond + transmissionTicks = 300 * time.Millisecond ) const ( @@ -69,7 +70,7 @@ func New() *Whisper { Name: protocolName, Version: uint(protocolVersion), Length: 2, - Run: whisper.msgHandler, + Run: whisper.handlePeer, } return whisper @@ -168,6 +169,40 @@ func (self *Whisper) Stop() { return }*/ +// handlePeer is called by the underlying P2P layer when the whisper sub-protocol +// connection is negotiated. +func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + // Create, initialize and start the whisper peer + whisperPeer, err := newPeer(self, peer, rw) + if err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + + // Read and process inbound messages directly to merge into client-global state + for { + // Fetch the next packet and decode the contained envelopes + packet, err := rw.ReadMsg() + if err != nil { + return err + } + var envelopes []*Envelope + if err := packet.Decode(&envelopes); err != nil { + peer.Infof("failed to decode enveloped: %v", err) + continue + } + // Inject all envelopes into the internal pool + for _, envelope := range envelopes { + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + peer.Debugf("failed to pool envelope: %f", err) + } + whisperPeer.mark(envelope) + } + } +} + // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. @@ -198,40 +233,6 @@ func (self *Whisper) add(envelope *Envelope) error { return nil } -// Main handler for passing whisper messages to whisper peer objects -func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { - wpeer := NewPeer(self, peer, ws) - // initialise whisper peer (handshake/status) - if err := wpeer.init(); err != nil { - return err - } - // kick of the main handler for broadcasting/managing envelopes - go wpeer.start() - defer wpeer.stop() - - // Main *read* loop. Writing is done by the peer it self. - for { - msg, err := ws.ReadMsg() - if err != nil { - return err - } - - var envelopes []*Envelope - if err := msg.Decode(&envelopes); err != nil { - peer.Infoln(err) - continue - } - - for _, envelope := range envelopes { - if err := self.add(envelope); err != nil { - // TODO Punish peer here. Invalid envelope. - peer.Debugln(err) - } - wpeer.addKnown(envelope) - } - } -} - // postEvent opens an envelope with the configured identities and delivers the // message upstream from application processing. func (self *Whisper) postEvent(envelope *Envelope) {