Fixed whisper messages
* Whisper protocol wasn't properly suppling envelope slices * Message history wasn't properly propagated * Added 'Messages' method, filtering any current envelope with the supplied filter.
This commit is contained in:
parent
1e5353824a
commit
e3cad04dec
|
@ -357,7 +357,9 @@ Rectangle {
|
||||||
case "shh_getMessages":
|
case "shh_getMessages":
|
||||||
require(1);
|
require(1);
|
||||||
|
|
||||||
shh.trigger(data.args[0]);
|
var m = shh.messages(data.args[0]);
|
||||||
|
var messages = JSON.parse(JSON.parse(JSON.stringify(m)));
|
||||||
|
postData(data._id, messages);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,9 +8,9 @@ import (
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
ref *whisper.Message
|
ref *whisper.Message
|
||||||
Flags int32
|
Flags int32 `json:"flags"`
|
||||||
Payload string
|
Payload string `json:"payload"`
|
||||||
From string
|
From string `json:"from"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func ToQMessage(msg *whisper.Message) *Message {
|
func ToQMessage(msg *whisper.Message) *Message {
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (self *Whisper) Post(payload []string, to, from string, topics []string, pr
|
||||||
|
|
||||||
msg := whisper.NewMessage(data)
|
msg := whisper.NewMessage(data)
|
||||||
envelope, err := msg.Seal(time.Duration(priority*100000), whisper.Opts{
|
envelope, err := msg.Seal(time.Duration(priority*100000), whisper.Opts{
|
||||||
Ttl: time.Duration(ttl),
|
Ttl: time.Duration(ttl) * time.Second,
|
||||||
To: crypto.ToECDSAPub(fromHex(to)),
|
To: crypto.ToECDSAPub(fromHex(to)),
|
||||||
From: crypto.ToECDSA(fromHex(from)),
|
From: crypto.ToECDSA(fromHex(from)),
|
||||||
Topics: whisper.TopicsFromString(topics...),
|
Topics: whisper.TopicsFromString(topics...),
|
||||||
|
@ -84,8 +84,14 @@ func (self *Whisper) Watch(opts map[string]interface{}, view *qml.Common) int {
|
||||||
return i
|
return i
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Whisper) Trigger(id int) {
|
func (self *Whisper) Messages(id int) (messages *ethutil.List) {
|
||||||
go self.Whisper.Trigger(id)
|
msgs := self.Whisper.Messages(id)
|
||||||
|
messages = ethutil.EmptyList()
|
||||||
|
for _, message := range msgs {
|
||||||
|
messages.Append(ToQMessage(message))
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterFromMap(opts map[string]interface{}) (f whisper.Filter) {
|
func filterFromMap(opts map[string]interface{}) (f whisper.Filter) {
|
||||||
|
|
|
@ -1,11 +1,9 @@
|
||||||
package whisper
|
package whisper
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
|
@ -28,22 +26,6 @@ type Envelope struct {
|
||||||
hash Hash
|
hash Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEnvelopeFromReader(reader io.Reader) (*Envelope, error) {
|
|
||||||
var envelope Envelope
|
|
||||||
|
|
||||||
buf := new(bytes.Buffer)
|
|
||||||
buf.ReadFrom(reader)
|
|
||||||
|
|
||||||
h := H(crypto.Sha3(buf.Bytes()))
|
|
||||||
if err := rlp.Decode(buf, &envelope); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
envelope.hash = h
|
|
||||||
|
|
||||||
return &envelope, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *Envelope) Hash() Hash {
|
func (self *Envelope) Hash() Hash {
|
||||||
if self.hash == EmptyHash {
|
if self.hash == EmptyHash {
|
||||||
self.hash = H(crypto.Sha3(ethutil.Encode(self)))
|
self.hash = H(crypto.Sha3(ethutil.Encode(self)))
|
||||||
|
@ -126,3 +108,27 @@ func (self *Envelope) withoutNonce() interface{} {
|
||||||
func (self *Envelope) RlpData() interface{} {
|
func (self *Envelope) RlpData() interface{} {
|
||||||
return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data, self.Nonce}
|
return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data, self.Nonce}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *Envelope) DecodeRLP(s *rlp.Stream) error {
|
||||||
|
var extenv struct {
|
||||||
|
Expiry uint32
|
||||||
|
Ttl uint32
|
||||||
|
Topics [][]byte
|
||||||
|
Data []byte
|
||||||
|
Nonce uint32
|
||||||
|
}
|
||||||
|
if err := s.Decode(&extenv); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
self.Expiry = extenv.Expiry
|
||||||
|
self.Ttl = extenv.Ttl
|
||||||
|
self.Topics = extenv.Topics
|
||||||
|
self.Data = extenv.Data
|
||||||
|
self.Nonce = extenv.Nonce
|
||||||
|
|
||||||
|
// TODO We should use the stream directly here.
|
||||||
|
self.hash = H(crypto.Sha3(ethutil.Encode(self)))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -67,7 +67,11 @@ func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
envelope := NewEnvelope(DefaultTtl, opts.Topics, self)
|
if opts.Ttl == 0 {
|
||||||
|
opts.Ttl = DefaultTtl
|
||||||
|
}
|
||||||
|
|
||||||
|
envelope := NewEnvelope(opts.Ttl, opts.Topics, self)
|
||||||
envelope.Seal(pow)
|
envelope.Seal(pow)
|
||||||
|
|
||||||
return envelope, nil
|
return envelope, nil
|
||||||
|
|
|
@ -126,18 +126,20 @@ func (self *Whisper) Watch(opts Filter) int {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Whisper) Trigger(id int) {
|
func (self *Whisper) Messages(id int) (messages []*Message) {
|
||||||
filter := self.filters.Get(id)
|
filter := self.filters.Get(id)
|
||||||
if filter != nil {
|
if filter != nil {
|
||||||
for _, e := range self.messages {
|
for _, e := range self.messages {
|
||||||
if msg, key := self.open(e); msg != nil {
|
if msg, key := self.open(e); msg != nil {
|
||||||
f := createFilter(msg, e.Topics, key)
|
f := createFilter(msg, e.Topics, key)
|
||||||
if self.filters.Match(filter, f) {
|
if self.filters.Match(filter, f) {
|
||||||
self.filters.Notify(f, msg)
|
messages = append(messages, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Main handler for passing whisper messages to whisper peer objects
|
// Main handler for passing whisper messages to whisper peer objects
|
||||||
|
@ -158,12 +160,13 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
envelope, err := NewEnvelopeFromReader(msg.Payload)
|
var envelopes []*Envelope
|
||||||
if err != nil {
|
if err := msg.Decode(&envelopes); err != nil {
|
||||||
peer.Infoln(err)
|
peer.Infoln(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, envelope := range envelopes {
|
||||||
if err := self.add(envelope); err != nil {
|
if err := self.add(envelope); err != nil {
|
||||||
// TODO Punish peer here. Invalid envelope.
|
// TODO Punish peer here. Invalid envelope.
|
||||||
peer.Infoln(err)
|
peer.Infoln(err)
|
||||||
|
@ -171,6 +174,7 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
|
||||||
wpeer.addKnown(envelope)
|
wpeer.addKnown(envelope)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed.
|
// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed.
|
||||||
func (self *Whisper) add(envelope *Envelope) error {
|
func (self *Whisper) add(envelope *Envelope) error {
|
||||||
|
@ -192,6 +196,8 @@ func (self *Whisper) add(envelope *Envelope) error {
|
||||||
go self.postEvent(envelope)
|
go self.postEvent(envelope)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wlogger.DebugDetailln("added whisper message")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue