etcd_tools/easy_client.go

243 lines
5.1 KiB
Go

package etcd_tools
import (
"context"
"encoding/json"
"errors"
"time"
etcd "go.etcd.io/etcd/clientv3"
)
type EasyClient struct {
Client *etcd.Client
Timeout time.Duration
}
func NewClient(conf EasyConfig) (*EasyClient, error) {
config, err := conf.prepare()
if err != nil {
return nil, err
}
cli, err := etcd.New(config)
if err != nil {
return nil, err
}
return &EasyClient{cli, 5 * time.Second}, nil
}
func (ez *EasyClient) Close() {
ez.Client.Close()
}
func (ez *EasyClient) Get(key string, options ...etcd.OpOption) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
result, err := ez.Client.Get(ctx, key, options...)
cancel()
if err != nil {
return "", err
}
if len(result.Kvs) == 0 {
return "", nil
}
return string(result.Kvs[0].Value), nil
}
func (ez *EasyClient) GetPrefix(prefix string, options ...etcd.OpOption) ([]*Pair, error) {
options = append(options, etcd.WithPrefix())
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
resp, err := ez.Client.Get(ctx, prefix, options...)
cancel()
if err != nil {
return []*Pair{}, err
}
return Pairs(resp.Kvs), nil
}
func value(v interface{}) (string, error) {
if s, ok := v.(string); ok {
return s, nil
}
b, err := json.Marshal(v)
if err != nil {
return "", err
}
return string(b), nil
}
func (ez *EasyClient) Put(key string, v interface{}, options ...etcd.OpOption) (string, error) {
str, err := value(v)
if err != nil {
return "", err
}
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
result, err := ez.Client.Put(ctx, key, str, options...)
cancel()
if err != nil {
return "", err
}
if result.PrevKv != nil {
return string(result.PrevKv.Value), nil
}
return "", nil
}
func (ez *EasyClient) Delete(key string, options ...etcd.OpOption) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
_, err := ez.Client.Delete(ctx, key, options...)
cancel()
return "", err
}
func (ez *EasyClient) Grant(ttl int64) (etcd.LeaseID, error) {
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
resp, err := ez.Client.Grant(ctx, ttl)
cancel()
if err != nil {
return 0, err
}
return resp.ID, nil
}
func (ez *EasyClient) KeepAliveOnce(lease etcd.LeaseID) error {
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
_, err := ez.Client.KeepAliveOnce(ctx, lease)
cancel()
return err
}
func (ez *EasyClient) WatchOnce(key string, timeoutSeconds int) (*Pair, error) {
timeout := time.Duration(timeoutSeconds) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
rch := ez.Client.Watch(ctx, key)
for wresp := range rch {
for _, ev := range wresp.Events {
cancel()
return NewPair(ev.Kv), nil
}
}
return nil, errors.New("Timed out watching key.")
}
type Txn struct {
ez *EasyClient
If *Cmps
Then *Ops
Else *Ops
Pairs []*Pair
Success bool
Err error
}
type Cmps struct {
slice []etcd.Cmp
err error
}
func (cmps *Cmps) setErr(e error) {
if cmps.err != nil {
cmps.err = e
}
}
func (cmps *Cmps) AddCmp(cmp etcd.Cmp) {
if cmps.err != nil {
return
}
cmps.slice = append(cmps.slice, cmp)
}
func (cmps *Cmps) Value(key string, operator string, v interface{}) {
str, err := value(v)
cmps.setErr(err)
cmps.AddCmp(etcd.Compare(etcd.Value(key), operator, str))
}
func (cmps *Cmps) Version(key string, operator string, v int64) {
cmps.AddCmp(etcd.Compare(etcd.Version(key), operator, v))
}
type Ops struct {
slice []etcd.Op
err error
}
func (ops *Ops) setErr(e error) {
if ops.err != nil {
ops.err = e
}
}
func (ops *Ops) AddOp(op etcd.Op) {
if ops.err != nil {
return
}
ops.slice = append(ops.slice, op)
}
func (ops *Ops) Put(key string, v interface{}, options ...etcd.OpOption) {
str, err := value(v)
ops.setErr(err)
ops.AddOp(etcd.OpPut(key, str, options...))
}
func (ops *Ops) Delete(key string, options ...etcd.OpOption) {
ops.AddOp(etcd.OpDelete(key, options...))
}
func (ops *Ops) Get(key string, options ...etcd.OpOption) {
ops.AddOp(etcd.OpGet(key, options...))
}
func (ez *EasyClient) Txn() *Txn {
cmps := &Cmps{}
ops1 := &Ops{}
ops2 := &Ops{}
return &Txn{ez: ez, If: cmps, Then: ops1, Else: ops2}
}
func (txn *Txn) Commit() error {
if txn.If.err != nil {
return txn.If.err
}
if txn.Then.err != nil {
return txn.Then.err
}
if txn.Else.err != nil {
return txn.Else.err
}
ez := txn.ez
ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout)
t := ez.Client.Txn(ctx)
resp, err := t.If(txn.If.slice...).Then(txn.Then.slice...).Else(txn.Else.slice...).Commit()
cancel()
if err != nil {
txn.Err = err
return err
}
txn.Success = resp.Succeeded
result := make([]*Pair, 0, 4)
for _, r := range resp.Responses {
if rr := r.GetResponseRange(); rr != nil {
p := Pairs(rr.Kvs)
result = append(result, p...)
}
}
txn.Pairs = result
return nil
}
func (txn *Txn) Value() string {
if txn.Pairs != nil && len(txn.Pairs) > 0 {
return txn.Pairs[0].Value
}
return ""
}
func (txn *Txn) Bytes() []byte {
if txn.Pairs != nil && len(txn.Pairs) > 0 {
return txn.Pairs[0].Bytes
}
return []byte{}
}