diff --git a/p2p/message.go b/p2p/message.go index d61faad13a..2ef84f99dd 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -1,15 +1,11 @@ package p2p import ( - "bufio" "bytes" - "encoding/binary" "errors" "fmt" "io" "io/ioutil" - "math/big" - "net" "sync" "sync/atomic" "time" @@ -138,140 +134,6 @@ func (rw *lockedRW) WriteMsg(msg Msg) error { return rw.wrapped.WriteMsg(msg) } -// frameRW is a MsgReadWriter that reads and writes devp2p message frames. -// As required by the interface, ReadMsg and WriteMsg can be called from -// multiple goroutines. -type frameRW struct { - net.Conn // make Conn methods available. be careful. - bufconn *bufio.ReadWriter - - // this channel is used to 'lend' bufconn to a caller of ReadMsg - // until the message payload has been consumed. the channel - // receives a value when EOF is reached on the payload, unblocking - // a pending call to ReadMsg. - rsync chan struct{} - - // this mutex guards writes to bufconn. - writeMu sync.Mutex -} - -func newFrameRW(conn net.Conn, timeout time.Duration) *frameRW { - rsync := make(chan struct{}, 1) - rsync <- struct{}{} - return &frameRW{ - Conn: conn, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - rsync: rsync, - } -} - -var magicToken = []byte{34, 64, 8, 145} - -func (rw *frameRW) WriteMsg(msg Msg) error { - rw.writeMu.Lock() - defer rw.writeMu.Unlock() - rw.SetWriteDeadline(time.Now().Add(msgWriteTimeout)) - if err := writeMsg(rw.bufconn, msg); err != nil { - return err - } - return rw.bufconn.Flush() -} - -func writeMsg(w io.Writer, msg Msg) error { - // TODO: handle case when Size + len(code) + len(listhdr) overflows uint32 - code := ethutil.Encode(uint32(msg.Code)) - listhdr := makeListHeader(msg.Size + uint32(len(code))) - payloadLen := uint32(len(listhdr)) + uint32(len(code)) + msg.Size - - start := make([]byte, 8) - copy(start, magicToken) - binary.BigEndian.PutUint32(start[4:], payloadLen) - - for _, b := range [][]byte{start, listhdr, code} { - if _, err := w.Write(b); err != nil { - return err - } - } - _, err := io.CopyN(w, msg.Payload, int64(msg.Size)) - return err -} - -func makeListHeader(length uint32) []byte { - if length < 56 { - return []byte{byte(length + 0xc0)} - } - enc := big.NewInt(int64(length)).Bytes() - lenb := byte(len(enc)) + 0xf7 - return append([]byte{lenb}, enc...) -} - -func (rw *frameRW) ReadMsg() (msg Msg, err error) { - <-rw.rsync // wait until bufconn is ours - - rw.SetReadDeadline(time.Now().Add(frameReadTimeout)) - - // read magic and payload size - start := make([]byte, 8) - if _, err = io.ReadFull(rw.bufconn, start); err != nil { - return msg, err - } - if !bytes.HasPrefix(start, magicToken) { - return msg, fmt.Errorf("bad magic token %x", start[:4]) - } - size := binary.BigEndian.Uint32(start[4:]) - - // decode start of RLP message to get the message code - posr := &postrack{rw.bufconn, 0} - s := rlp.NewStream(posr) - if _, err := s.List(); err != nil { - return msg, err - } - msg.Code, err = s.Uint() - if err != nil { - return msg, err - } - msg.Size = size - posr.p - - rw.SetReadDeadline(time.Now().Add(payloadReadTimeout)) - - if msg.Size <= wholePayloadSize { - // msg is small, read all of it and move on to the next message. - pbuf := make([]byte, msg.Size) - if _, err := io.ReadFull(rw.bufconn, pbuf); err != nil { - return msg, err - } - rw.rsync <- struct{}{} // bufconn is available again - msg.Payload = bytes.NewReader(pbuf) - } else { - // lend bufconn to the caller until it has - // consumed the payload. eofSignal will send a value - // on rw.rsync when EOF is reached. - pr := &eofSignal{rw.bufconn, msg.Size, rw.rsync} - msg.Payload = pr - } - return msg, nil -} - -// postrack wraps an rlp.ByteReader with a position counter. -type postrack struct { - r rlp.ByteReader - p uint32 -} - -func (r *postrack) Read(buf []byte) (int, error) { - n, err := r.r.Read(buf) - r.p += uint32(n) - return n, err -} - -func (r *postrack) ReadByte() (byte, error) { - b, err := r.r.ReadByte() - if err == nil { - r.p++ - } - return b, err -} - // eofSignal wraps a reader with eof signaling. the eof channel is // closed when the wrapped reader returns an error or when count bytes // have been read. diff --git a/p2p/message_test.go b/p2p/message_test.go index 4b94ebb5f2..1757cbe7ae 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -25,52 +25,6 @@ func TestNewMsg(t *testing.T) { } } -// func TestEncodeDecodeMsg(t *testing.T) { -// msg := NewMsg(3, 1, "000") -// buf := new(bytes.Buffer) -// if err := writeMsg(buf, msg); err != nil { -// t.Fatalf("encodeMsg error: %v", err) -// } -// // t.Logf("encoded: %x", buf.Bytes()) - -// decmsg, err := readMsg(buf) -// if err != nil { -// t.Fatalf("readMsg error: %v", err) -// } -// if decmsg.Code != 3 { -// t.Errorf("incorrect code %d, want %d", decmsg.Code, 3) -// } -// if decmsg.Size != 5 { -// t.Errorf("incorrect size %d, want %d", decmsg.Size, 5) -// } - -// var data struct { -// I uint -// S string -// } -// if err := decmsg.Decode(&data); err != nil { -// t.Fatalf("Decode error: %v", err) -// } -// if data.I != 1 { -// t.Errorf("incorrect data.I: got %v, expected %d", data.I, 1) -// } -// if data.S != "000" { -// t.Errorf("incorrect data.S: got %q, expected %q", data.S, "000") -// } -// } - -// func TestDecodeRealMsg(t *testing.T) { -// data := ethutil.Hex2Bytes("2240089100000080f87e8002b5457468657265756d282b2b292f5065657220536572766572204f6e652f76302e372e382f52656c656173652f4c696e75782f672b2bc082765fb84086dd80b7aefd6a6d2e3b93f4f300a86bfb6ef7bdc97cb03f793db6bb") -// msg, err := readMsg(bytes.NewReader(data)) -// if err != nil { -// t.Fatalf("unexpected error: %v", err) -// } - -// if msg.Code != 0 { -// t.Errorf("incorrect code %d, want %d", msg.Code, 0) -// } -// } - func ExampleMsgPipe() { rw1, rw2 := MsgPipe() go func() {