package etcd_tools import ( "context" "errors" "fmt" "strings" "time" etcd "github.com/coreos/etcd/clientv3" ) var timeFormat = "2006-01-02.15:04:05.000000000" type Queue struct { cli *EasyClient prefix string cancel context.CancelFunc } func NewQueue(cli *EasyClient, prefix string) *Queue { return &Queue{cli: cli, prefix: prefix} } func (q *Queue) Client() *EasyClient { return q.cli } func (q *Queue) Prefix() string { return q.prefix } func (q *Queue) Put(value interface{}) (string, error) { now := time.Now().Format(timeFormat) key := fmt.Sprintf("%s/%s", q.prefix, now) _, err := q.cli.Put(key, value) return key, err } func (q *Queue) PutWithTTL(value interface{}, ttl int64) (string, error) { lease, err := q.cli.Grant(ttl) if err != nil { return "", err } now := time.Now().Format(timeFormat) key := fmt.Sprintf("%s/%s", q.prefix, now) _, err = q.cli.Put(key, value, etcd.WithLease(lease)) if err != nil { return "", err } err = q.cli.KeepAliveOnce(lease) if err != nil { return "", err } return key, nil } func sort() etcd.OpOption { return etcd.WithSort(etcd.SortByCreateRevision, etcd.SortAscend) } func (q *Queue) Poll() ([]*Pair, error) { prefix := fmt.Sprintf("%s/", q.prefix) resp, err := q.cli.GetPrefix(prefix, sort()) return resp, err } func (q *Queue) Watch(fn func(q *Queue, pair *Pair)) error { prefix := fmt.Sprintf("%s/", q.prefix) pairs, err := q.cli.GetPrefix(prefix, etcd.WithLimit(1), sort()) if err != nil { return err } var fromRev int64 = 0 if len(pairs) > 0 { fromRev = pairs[0].Modified } ctx, cancel := context.WithCancel(context.Background()) q.cancel = cancel rch := q.cli.Client.Watch(ctx, prefix, etcd.WithPrefix(), etcd.WithRev(fromRev), etcd.WithFilterDelete()) for wr := range rch { for _, ev := range wr.Events { fn(q, NewPair(ev.Kv)) } } cancel() return nil } func (q *Queue) CancelWatch() { q.cancel() } func (q *Queue) ItemName(key string) string { prefix := fmt.Sprintf("%s/", q.prefix) if !strings.HasPrefix(key, prefix) { return "" } s := strings.Split(key, "/") return s[len(s)-1] } // Side-effect: from.Key is updated to the new key path func (q *Queue) Move(from *Pair, to *Queue) error { itemName := q.ItemName(from.Key) if itemName == "" { return errors.New("Invalid fromKey.") } toKey := fmt.Sprintf("%s/%s", to.prefix, itemName) txn := q.cli.Txn() txn.If.Version(from.Key, "=", from.Version) txn.Then.Put(toKey, from.Value) txn.Then.Delete(from.Key) err := txn.Commit() if !txn.Success { return errors.New("Could not move queue item, its version changed.") } if err == nil { from.Key = toKey } return err } func (q *Queue) MoveWithTTL(from *Pair, to *Queue, ttl int64) error { lease, err := q.cli.Grant(ttl) if err != nil { return err } itemName := q.ItemName(from.Key) if itemName == "" { return errors.New("Invalid fromKey.") } toKey := fmt.Sprintf("%s/%s", to.prefix, itemName) txn := q.cli.Txn() txn.If.Version(from.Key, "=", from.Version) txn.Then.Put(toKey, from.Value, etcd.WithLease(lease)) txn.Then.Delete(from.Key) err = txn.Commit() if err != nil { return err } from.Key = toKey err = q.cli.KeepAliveOnce(lease) return err }