etcd_tools/queue.go

159 lines
3.4 KiB
Go

package etcd_tools
import (
"context"
"errors"
"fmt"
"strings"
"time"
etcd "go.etcd.io/etcd/clientv3"
)
var timeFormat = "2006-01-02.15:04:05.000000000"
type Queue struct {
cli *EasyClient
prefix string
cancel context.CancelFunc
}
func NewQueue(cli *EasyClient, prefix string) *Queue {
return &Queue{cli: cli, prefix: prefix}
}
func (q *Queue) Client() *EasyClient {
return q.cli
}
func (q *Queue) Prefix() string {
return q.prefix
}
func (q *Queue) Put(value interface{}) (string, error) {
now := time.Now().Format(timeFormat)
key := fmt.Sprintf("%s/%s", q.prefix, now)
_, err := q.cli.Put(key, value)
return key, err
}
func (q *Queue) PutWithTTL(value interface{}, ttl int64) (string, error) {
lease, err := q.cli.Grant(ttl)
if err != nil {
return "", err
}
now := time.Now().Format(timeFormat)
key := fmt.Sprintf("%s/%s", q.prefix, now)
_, err = q.cli.Put(key, value, etcd.WithLease(lease))
if err != nil {
return "", err
}
err = q.cli.KeepAliveOnce(lease)
if err != nil {
return "", err
}
return key, nil
}
func sort() etcd.OpOption {
return etcd.WithSort(etcd.SortByCreateRevision, etcd.SortAscend)
}
func (q *Queue) Poll() ([]*Pair, error) {
prefix := fmt.Sprintf("%s/", q.prefix)
resp, err := q.cli.GetPrefix(prefix, sort())
return resp, err
}
func (q *Queue) Len() (int, error) {
prefix := fmt.Sprintf("%s/", q.prefix)
resp, err := q.cli.GetPrefix(prefix)
fmt.Println("response to Len query for " + prefix + " is:", resp)
return len(resp), err
}
func (q *Queue) Watch(fn func(q *Queue, pair *Pair)) error {
prefix := fmt.Sprintf("%s/", q.prefix)
pairs, err := q.cli.GetPrefix(prefix, etcd.WithLimit(1), sort())
if err != nil {
return err
}
var fromRev int64 = 0
if len(pairs) > 0 {
fromRev = pairs[0].Modified
}
ctx, cancel := context.WithCancel(context.Background())
q.cancel = cancel
rch := q.cli.Client.Watch(ctx, prefix, etcd.WithPrefix(), etcd.WithRev(fromRev), etcd.WithFilterDelete())
for wr := range rch {
for _, ev := range wr.Events {
fn(q, NewPair(ev.Kv))
}
}
cancel()
return nil
}
func (q *Queue) CancelWatch() {
q.cancel()
}
func (q *Queue) ItemName(key string) string {
prefix := fmt.Sprintf("%s/", q.prefix)
if !strings.HasPrefix(key, prefix) {
return ""
}
s := strings.Split(key, "/")
return s[len(s)-1]
}
// Side-effect: from.Key is updated to the new key path
func (q *Queue) Move(from *Pair, to *Queue) error {
itemName := q.ItemName(from.Key)
if itemName == "" {
return errors.New("Invalid fromKey.")
}
toKey := fmt.Sprintf("%s/%s", to.prefix, itemName)
txn := q.cli.Txn()
txn.If.Version(from.Key, "=", from.Version)
txn.Then.Put(toKey, from.Value)
txn.Then.Delete(from.Key)
err := txn.Commit()
if !txn.Success {
return errors.New("Could not move queue item, its version changed.")
}
if err == nil {
from.Key = toKey
}
return err
}
func (q *Queue) MoveWithTTL(from *Pair, to *Queue, ttl int64) error {
lease, err := q.cli.Grant(ttl)
if err != nil {
return err
}
itemName := q.ItemName(from.Key)
if itemName == "" {
return errors.New("Invalid fromKey.")
}
toKey := fmt.Sprintf("%s/%s", to.prefix, itemName)
txn := q.cli.Txn()
txn.If.Version(from.Key, "=", from.Version)
txn.Then.Put(toKey, from.Value, etcd.WithLease(lease))
txn.Then.Delete(from.Key)
err = txn.Commit()
if err != nil {
return err
}
from.Key = toKey
err = q.cli.KeepAliveOnce(lease)
return err
}