250 lines
5.3 KiB
Go
250 lines
5.3 KiB
Go
|
package etcd_tools
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
etcd "github.com/coreos/etcd/clientv3"
|
||
|
)
|
||
|
|
||
|
type EasyClient struct {
|
||
|
Client *etcd.Client
|
||
|
Timeout time.Duration
|
||
|
}
|
||
|
|
||
|
func NewLocalClient() (*EasyClient, error) {
|
||
|
return NewClient([]string{"localhost"}, 2379)
|
||
|
}
|
||
|
|
||
|
func NewClient(hosts []string, port int) (*EasyClient, error) {
|
||
|
hp := make([]string, 0, len(hosts))
|
||
|
for _, h := range hosts {
|
||
|
hp = append(hp, fmt.Sprintf("%s:%d", h, port))
|
||
|
}
|
||
|
config := etcd.Config{Endpoints: hp, DialTimeout: 5 * time.Second}
|
||
|
cli, err := etcd.New(config)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &EasyClient{cli, 5 * time.Second}, nil
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) Close() {
|
||
|
ez.Client.Close()
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) Get(key string, options ...etcd.OpOption) (string, error) {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
|
||
|
result, err := ez.Client.Get(ctx, key, options...)
|
||
|
cancel()
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
if len(result.Kvs) == 0 {
|
||
|
return "", nil
|
||
|
}
|
||
|
return string(result.Kvs[0].Value), nil
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) GetPrefix(prefix string, options ...etcd.OpOption) ([]*Pair, error) {
|
||
|
options = append(options, etcd.WithPrefix())
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
|
||
|
resp, err := ez.Client.Get(ctx, prefix, options...)
|
||
|
cancel()
|
||
|
if err != nil {
|
||
|
return []*Pair{}, err
|
||
|
}
|
||
|
return Pairs(resp.Kvs), nil
|
||
|
}
|
||
|
|
||
|
func value(v interface{}) (string, error) {
|
||
|
if s, ok := v.(string); ok {
|
||
|
return s, nil
|
||
|
}
|
||
|
b, err := json.Marshal(v)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
return string(b), nil
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) Put(key string, v interface{}, options ...etcd.OpOption) (string, error) {
|
||
|
str, err := value(v)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
|
||
|
result, err := ez.Client.Put(ctx, key, str, options...)
|
||
|
cancel()
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
if result.PrevKv != nil {
|
||
|
return string(result.PrevKv.Value), nil
|
||
|
}
|
||
|
return "", nil
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) Delete(key string, options ...etcd.OpOption) (string, error) {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
|
||
|
_, err := ez.Client.Delete(ctx, key, options...)
|
||
|
cancel()
|
||
|
return "", err
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) Grant(ttl int64) (etcd.LeaseID, error) {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
|
||
|
resp, err := ez.Client.Grant(ctx, ttl)
|
||
|
cancel()
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
return resp.ID, nil
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) KeepAliveOnce(lease etcd.LeaseID) error {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
|
||
|
_, err := ez.Client.KeepAliveOnce(ctx, lease)
|
||
|
cancel()
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) WatchOnce(key string, timeoutSeconds int) (*Pair, error) {
|
||
|
timeout := time.Duration(timeoutSeconds) * time.Second
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||
|
rch := ez.Client.Watch(ctx, key)
|
||
|
for wresp := range rch {
|
||
|
for _, ev := range wresp.Events {
|
||
|
cancel()
|
||
|
return NewPair(ev.Kv), nil
|
||
|
}
|
||
|
}
|
||
|
return nil, errors.New("Timed out watching key.")
|
||
|
}
|
||
|
|
||
|
|
||
|
type Txn struct {
|
||
|
ez *EasyClient
|
||
|
If *Cmps
|
||
|
Then *Ops
|
||
|
Else *Ops
|
||
|
Pairs []*Pair
|
||
|
Success bool
|
||
|
Err error
|
||
|
}
|
||
|
|
||
|
type Cmps struct {
|
||
|
slice []etcd.Cmp
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
func (cmps *Cmps) setErr(e error) {
|
||
|
if cmps.err != nil {
|
||
|
cmps.err = e
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cmps *Cmps) AddCmp(cmp etcd.Cmp) {
|
||
|
if cmps.err != nil {
|
||
|
return
|
||
|
}
|
||
|
cmps.slice = append(cmps.slice, cmp)
|
||
|
}
|
||
|
|
||
|
func (cmps *Cmps) Value(key string, operator string, v interface{}) {
|
||
|
str, err := value(v)
|
||
|
cmps.setErr(err)
|
||
|
cmps.AddCmp(etcd.Compare(etcd.Value(key), operator, str))
|
||
|
}
|
||
|
|
||
|
func (cmps *Cmps) Version(key string, operator string, v int64) {
|
||
|
cmps.AddCmp(etcd.Compare(etcd.Version(key), operator, v))
|
||
|
}
|
||
|
|
||
|
type Ops struct {
|
||
|
slice []etcd.Op
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
func (ops *Ops) setErr(e error) {
|
||
|
if ops.err != nil {
|
||
|
ops.err = e
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (ops *Ops) AddOp(op etcd.Op) {
|
||
|
if ops.err != nil {
|
||
|
return
|
||
|
}
|
||
|
ops.slice = append(ops.slice, op)
|
||
|
}
|
||
|
|
||
|
func (ops *Ops) Put(key string, v interface{}, options ...etcd.OpOption) {
|
||
|
str, err := value(v)
|
||
|
ops.setErr(err)
|
||
|
ops.AddOp(etcd.OpPut(key, str, options...))
|
||
|
}
|
||
|
|
||
|
func (ops *Ops) Delete(key string, options ...etcd.OpOption) {
|
||
|
ops.AddOp(etcd.OpDelete(key, options...))
|
||
|
}
|
||
|
|
||
|
func (ops *Ops) Get(key string, options ...etcd.OpOption) {
|
||
|
ops.AddOp(etcd.OpGet(key, options...))
|
||
|
}
|
||
|
|
||
|
func (ez *EasyClient) Txn() *Txn {
|
||
|
cmps := &Cmps{}
|
||
|
ops1 := &Ops{}
|
||
|
ops2 := &Ops{}
|
||
|
return &Txn{ez: ez, If: cmps, Then: ops1, Else: ops2}
|
||
|
}
|
||
|
|
||
|
func (txn *Txn) Commit() error {
|
||
|
if txn.If.err != nil {
|
||
|
return txn.If.err
|
||
|
}
|
||
|
if txn.Then.err != nil {
|
||
|
return txn.Then.err
|
||
|
}
|
||
|
if txn.Else.err != nil {
|
||
|
return txn.Else.err
|
||
|
}
|
||
|
ez := txn.ez
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
|
||
|
t := ez.Client.Txn(ctx)
|
||
|
resp, err := t.If(txn.If.slice...).Then(txn.Then.slice...).Else(txn.Else.slice...).Commit()
|
||
|
cancel()
|
||
|
txn.Success = resp.Succeeded
|
||
|
if err != nil {
|
||
|
txn.Err = err
|
||
|
return err
|
||
|
}
|
||
|
result := make([]*Pair, 0, 4)
|
||
|
for _, r := range resp.Responses {
|
||
|
if rr := r.GetResponseRange(); rr != nil {
|
||
|
p := Pairs(rr.Kvs)
|
||
|
result = append(result, p...)
|
||
|
}
|
||
|
}
|
||
|
txn.Pairs = result
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (txn *Txn) Value() string {
|
||
|
if txn.Pairs != nil && len(txn.Pairs) > 0 {
|
||
|
return txn.Pairs[0].Value
|
||
|
}
|
||
|
return ""
|
||
|
}
|
||
|
|
||
|
func (txn *Txn) Bytes() []byte {
|
||
|
if txn.Pairs != nil && len(txn.Pairs) > 0 {
|
||
|
return txn.Pairs[0].Bytes
|
||
|
}
|
||
|
return []byte{}
|
||
|
}
|