159 lines
3.4 KiB
Go
159 lines
3.4 KiB
Go
package etcd_tools
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
etcd "github.com/coreos/etcd/clientv3"
|
|
)
|
|
|
|
var timeFormat = "2006-01-02.15:04:05.000000000"
|
|
|
|
type Queue struct {
|
|
cli *EasyClient
|
|
prefix string
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func NewQueue(cli *EasyClient, prefix string) *Queue {
|
|
return &Queue{cli: cli, prefix: prefix}
|
|
}
|
|
|
|
func (q *Queue) Client() *EasyClient {
|
|
return q.cli
|
|
}
|
|
|
|
func (q *Queue) Prefix() string {
|
|
return q.prefix
|
|
}
|
|
|
|
func (q *Queue) Put(value interface{}) (string, error) {
|
|
now := time.Now().Format(timeFormat)
|
|
key := fmt.Sprintf("%s/%s", q.prefix, now)
|
|
_, err := q.cli.Put(key, value)
|
|
return key, err
|
|
}
|
|
|
|
func (q *Queue) PutWithTTL(value interface{}, ttl int64) (string, error) {
|
|
lease, err := q.cli.Grant(ttl)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
now := time.Now().Format(timeFormat)
|
|
key := fmt.Sprintf("%s/%s", q.prefix, now)
|
|
_, err = q.cli.Put(key, value, etcd.WithLease(lease))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
err = q.cli.KeepAliveOnce(lease)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return key, nil
|
|
}
|
|
|
|
func sort() etcd.OpOption {
|
|
return etcd.WithSort(etcd.SortByCreateRevision, etcd.SortAscend)
|
|
}
|
|
|
|
func (q *Queue) Poll() ([]*Pair, error) {
|
|
prefix := fmt.Sprintf("%s/", q.prefix)
|
|
resp, err := q.cli.GetPrefix(prefix, sort())
|
|
return resp, err
|
|
}
|
|
|
|
func (q *Queue) Len() (int, error) {
|
|
prefix := fmt.Sprintf("%s/", q.prefix)
|
|
resp, err := q.cli.GetPrefix(prefix, etcd.WithCountOnly())
|
|
fmt.Println("response to Len query is:", resp)
|
|
return 0, err
|
|
}
|
|
|
|
func (q *Queue) Watch(fn func(q *Queue, pair *Pair)) error {
|
|
prefix := fmt.Sprintf("%s/", q.prefix)
|
|
pairs, err := q.cli.GetPrefix(prefix, etcd.WithLimit(1), sort())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var fromRev int64 = 0
|
|
if len(pairs) > 0 {
|
|
fromRev = pairs[0].Modified
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
q.cancel = cancel
|
|
rch := q.cli.Client.Watch(ctx, prefix, etcd.WithPrefix(), etcd.WithRev(fromRev), etcd.WithFilterDelete())
|
|
for wr := range rch {
|
|
for _, ev := range wr.Events {
|
|
fn(q, NewPair(ev.Kv))
|
|
}
|
|
}
|
|
cancel()
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) CancelWatch() {
|
|
q.cancel()
|
|
}
|
|
|
|
func (q *Queue) ItemName(key string) string {
|
|
prefix := fmt.Sprintf("%s/", q.prefix)
|
|
if !strings.HasPrefix(key, prefix) {
|
|
return ""
|
|
}
|
|
s := strings.Split(key, "/")
|
|
return s[len(s)-1]
|
|
}
|
|
|
|
// Side-effect: from.Key is updated to the new key path
|
|
func (q *Queue) Move(from *Pair, to *Queue) error {
|
|
itemName := q.ItemName(from.Key)
|
|
if itemName == "" {
|
|
return errors.New("Invalid fromKey.")
|
|
}
|
|
toKey := fmt.Sprintf("%s/%s", to.prefix, itemName)
|
|
txn := q.cli.Txn()
|
|
txn.If.Version(from.Key, "=", from.Version)
|
|
txn.Then.Put(toKey, from.Value)
|
|
txn.Then.Delete(from.Key)
|
|
err := txn.Commit()
|
|
if !txn.Success {
|
|
return errors.New("Could not move queue item, its version changed.")
|
|
}
|
|
if err == nil {
|
|
from.Key = toKey
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (q *Queue) MoveWithTTL(from *Pair, to *Queue, ttl int64) error {
|
|
lease, err := q.cli.Grant(ttl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
itemName := q.ItemName(from.Key)
|
|
if itemName == "" {
|
|
return errors.New("Invalid fromKey.")
|
|
}
|
|
toKey := fmt.Sprintf("%s/%s", to.prefix, itemName)
|
|
txn := q.cli.Txn()
|
|
txn.If.Version(from.Key, "=", from.Version)
|
|
txn.Then.Put(toKey, from.Value, etcd.WithLease(lease))
|
|
txn.Then.Delete(from.Key)
|
|
err = txn.Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
from.Key = toKey
|
|
err = q.cli.KeepAliveOnce(lease)
|
|
return err
|
|
}
|