package etcd_tools import ( "context" "encoding/json" "errors" "time" etcd "go.etcd.io/etcd/clientv3" ) type EasyClient struct { Client *etcd.Client Timeout time.Duration } func NewClient(conf EasyConfig) (*EasyClient, error) { config, err := conf.prepare() if err != nil { return nil, err } cli, err := etcd.New(config) if err != nil { return nil, err } return &EasyClient{cli, 5 * time.Second}, nil } func (ez *EasyClient) Close() { ez.Client.Close() } func (ez *EasyClient) Get(key string, options ...etcd.OpOption) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout) result, err := ez.Client.Get(ctx, key, options...) cancel() if err != nil { return "", err } if len(result.Kvs) == 0 { return "", nil } return string(result.Kvs[0].Value), nil } func (ez *EasyClient) GetPrefix(prefix string, options ...etcd.OpOption) ([]*Pair, error) { options = append(options, etcd.WithPrefix()) ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout) resp, err := ez.Client.Get(ctx, prefix, options...) cancel() if err != nil { return []*Pair{}, err } return Pairs(resp.Kvs), nil } func value(v interface{}) (string, error) { if s, ok := v.(string); ok { return s, nil } b, err := json.Marshal(v) if err != nil { return "", err } return string(b), nil } func (ez *EasyClient) Put(key string, v interface{}, options ...etcd.OpOption) (string, error) { str, err := value(v) if err != nil { return "", err } ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout) result, err := ez.Client.Put(ctx, key, str, options...) cancel() if err != nil { return "", err } if result.PrevKv != nil { return string(result.PrevKv.Value), nil } return "", nil } func (ez *EasyClient) Delete(key string, options ...etcd.OpOption) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout) _, err := ez.Client.Delete(ctx, key, options...) cancel() return "", err } func (ez *EasyClient) Grant(ttl int64) (etcd.LeaseID, error) { ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout) resp, err := ez.Client.Grant(ctx, ttl) cancel() if err != nil { return 0, err } return resp.ID, nil } func (ez *EasyClient) KeepAliveOnce(lease etcd.LeaseID) error { ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout) _, err := ez.Client.KeepAliveOnce(ctx, lease) cancel() return err } func (ez *EasyClient) WatchOnce(key string, timeoutSeconds int) (*Pair, error) { timeout := time.Duration(timeoutSeconds) * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) rch := ez.Client.Watch(ctx, key) for wresp := range rch { for _, ev := range wresp.Events { cancel() return NewPair(ev.Kv), nil } } return nil, errors.New("Timed out watching key.") } type Txn struct { ez *EasyClient If *Cmps Then *Ops Else *Ops Pairs []*Pair Success bool Err error } type Cmps struct { slice []etcd.Cmp err error } func (cmps *Cmps) setErr(e error) { if cmps.err != nil { cmps.err = e } } func (cmps *Cmps) AddCmp(cmp etcd.Cmp) { if cmps.err != nil { return } cmps.slice = append(cmps.slice, cmp) } func (cmps *Cmps) Value(key string, operator string, v interface{}) { str, err := value(v) cmps.setErr(err) cmps.AddCmp(etcd.Compare(etcd.Value(key), operator, str)) } func (cmps *Cmps) Version(key string, operator string, v int64) { cmps.AddCmp(etcd.Compare(etcd.Version(key), operator, v)) } type Ops struct { slice []etcd.Op err error } func (ops *Ops) setErr(e error) { if ops.err != nil { ops.err = e } } func (ops *Ops) AddOp(op etcd.Op) { if ops.err != nil { return } ops.slice = append(ops.slice, op) } func (ops *Ops) Put(key string, v interface{}, options ...etcd.OpOption) { str, err := value(v) ops.setErr(err) ops.AddOp(etcd.OpPut(key, str, options...)) } func (ops *Ops) Delete(key string, options ...etcd.OpOption) { ops.AddOp(etcd.OpDelete(key, options...)) } func (ops *Ops) Get(key string, options ...etcd.OpOption) { ops.AddOp(etcd.OpGet(key, options...)) } func (ez *EasyClient) Txn() *Txn { cmps := &Cmps{} ops1 := &Ops{} ops2 := &Ops{} return &Txn{ez: ez, If: cmps, Then: ops1, Else: ops2} } func (txn *Txn) Commit() error { if txn.If.err != nil { return txn.If.err } if txn.Then.err != nil { return txn.Then.err } if txn.Else.err != nil { return txn.Else.err } ez := txn.ez ctx, cancel := context.WithTimeout(context.Background(), ez.Timeout) t := ez.Client.Txn(ctx) resp, err := t.If(txn.If.slice...).Then(txn.Then.slice...).Else(txn.Else.slice...).Commit() cancel() if err != nil { txn.Err = err return err } txn.Success = resp.Succeeded result := make([]*Pair, 0, 4) for _, r := range resp.Responses { if rr := r.GetResponseRange(); rr != nil { p := Pairs(rr.Kvs) result = append(result, p...) } } txn.Pairs = result return nil } func (txn *Txn) Value() string { if txn.Pairs != nil && len(txn.Pairs) > 0 { return txn.Pairs[0].Value } return "" } func (txn *Txn) Bytes() []byte { if txn.Pairs != nil && len(txn.Pairs) > 0 { return txn.Pairs[0].Bytes } return []byte{} }