etcd_tools/discovery.go

126 lines
2.4 KiB
Go
Raw Normal View History

2018-04-12 12:10:17 -05:00
package etcd_tools
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"os"
"time"
etcd "github.com/coreos/etcd/clientv3"
)
type Discovery struct {
cli *EasyClient
prefix string
Timeout time.Duration
ctx context.Context
cancel context.CancelFunc
channel <-chan *etcd.LeaseKeepAliveResponse
}
func NewDiscovery(cli *EasyClient, prefix string, timeout time.Duration) *Discovery {
return &Discovery{cli, prefix, timeout, nil, nil, nil}
}
func (d *Discovery) Announce(iface string, port int, ttl int64) error {
host, err := os.Hostname()
if err != nil {
return err
}
ips, err := listIPs(iface)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%s", d.prefix, host)
m := map[string]interface{}{
"host": host,
"port": port,
"ip_addrs": ips,
}
value, err := json.Marshal(m)
if err != nil {
return err
}
lease, err := d.cli.Grant(ttl)
if err != nil {
return err
}
_, err = d.cli.Put(key, string(value), etcd.WithLease(lease))
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
ch, err := d.cli.Client.KeepAlive(ctx, lease)
// Note we do NOT cancel the context here; doing so cancels the KeepAlive
d.ctx = ctx
d.cancel = cancel
d.channel = ch
go func() {
for {
if ctx.Err() != nil {
// log.Println("Keepalive context cancelled.")
return
}
_ = <-ch
//msg := <-ch
//log.Printf("KEEPALIVE %s\n", msg)
}
}()
return err
}
func (d *Discovery) Cancel() {
d.cancel()
}
type Host struct {
Hostname string `json:"host"`
IPs []string `json:"ip_addrs"`
Port int `json:"port"`
}
func Discover(cli *EasyClient, prefix string) ([]Host, error) {
pairs, err := cli.GetPrefix(prefix)
if err != nil {
return nil, err
}
result := []Host{}
for _, ev := range pairs {
var host Host
err = json.Unmarshal([]byte(ev.Value), &host)
if err == nil {
result = append(result, host)
} else {
log.Printf("WARNING: Ignoring invalid json in %s\n", string(ev.Key))
}
}
return result, nil
}
func listIPs(ifaceName string) ([]string, error) {
result := []string{}
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return result, err
}
addrs, err := iface.Addrs()
if err != nil {
return result, err
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
result = append(result, v.IP.String())
case *net.IPAddr:
result = append(result, v.IP.String())
}
}
return result, nil
}