217 lines
5.6 KiB
Go
217 lines
5.6 KiB
Go
// Copyright (c) 2018 The Jaeger Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package remote
|
|
|
|
import (
|
|
"fmt"
|
|
"net/url"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/uber/jaeger-client-go"
|
|
"github.com/uber/jaeger-client-go/utils"
|
|
)
|
|
|
|
const (
|
|
// minimumCredits is the minimum amount of credits necessary to not be throttled.
|
|
// i.e. if currentCredits > minimumCredits, then the operation will not be throttled.
|
|
minimumCredits = 1.0
|
|
)
|
|
|
|
var (
|
|
errorUUIDNotSet = errors.New("Throttler UUID must be set")
|
|
)
|
|
|
|
type operationBalance struct {
|
|
Operation string `json:"operation"`
|
|
Balance float64 `json:"balance"`
|
|
}
|
|
|
|
type creditResponse struct {
|
|
Balances []operationBalance `json:"balances"`
|
|
}
|
|
|
|
type httpCreditManagerProxy struct {
|
|
hostPort string
|
|
}
|
|
|
|
func newHTTPCreditManagerProxy(hostPort string) *httpCreditManagerProxy {
|
|
return &httpCreditManagerProxy{
|
|
hostPort: hostPort,
|
|
}
|
|
}
|
|
|
|
// N.B. Operations list must not be empty.
|
|
func (m *httpCreditManagerProxy) FetchCredits(uuid, serviceName string, operations []string) (*creditResponse, error) {
|
|
params := url.Values{}
|
|
params.Set("service", serviceName)
|
|
params.Set("uuid", uuid)
|
|
for _, op := range operations {
|
|
params.Add("operations", op)
|
|
}
|
|
var resp creditResponse
|
|
if err := utils.GetJSON(fmt.Sprintf("http://%s/credits?%s", m.hostPort, params.Encode()), &resp); err != nil {
|
|
return nil, errors.Wrap(err, "Failed to receive credits from agent")
|
|
}
|
|
return &resp, nil
|
|
}
|
|
|
|
// Throttler retrieves credits from agent and uses it to throttle operations.
|
|
type Throttler struct {
|
|
options
|
|
|
|
mux sync.RWMutex
|
|
service string
|
|
uuid atomic.Value
|
|
creditManager *httpCreditManagerProxy
|
|
credits map[string]float64 // map of operation->credits
|
|
close chan struct{}
|
|
stopped sync.WaitGroup
|
|
}
|
|
|
|
// NewThrottler returns a Throttler that polls agent for credits and uses them to throttle
|
|
// the service.
|
|
func NewThrottler(service string, options ...Option) *Throttler {
|
|
opts := applyOptions(options...)
|
|
creditManager := newHTTPCreditManagerProxy(opts.hostPort)
|
|
t := &Throttler{
|
|
options: opts,
|
|
creditManager: creditManager,
|
|
service: service,
|
|
credits: make(map[string]float64),
|
|
close: make(chan struct{}),
|
|
}
|
|
t.stopped.Add(1)
|
|
go t.pollManager()
|
|
return t
|
|
}
|
|
|
|
// IsAllowed implements Throttler#IsAllowed.
|
|
func (t *Throttler) IsAllowed(operation string) bool {
|
|
t.mux.Lock()
|
|
defer t.mux.Unlock()
|
|
value, ok := t.credits[operation]
|
|
if !ok || value == 0 {
|
|
if !ok {
|
|
// NOTE: This appears to be a no-op at first glance, but it stores
|
|
// the operation key in the map. Necessary for functionality of
|
|
// Throttler#operations method.
|
|
t.credits[operation] = 0
|
|
}
|
|
if !t.synchronousInitialization {
|
|
t.metrics.ThrottledDebugSpans.Inc(1)
|
|
return false
|
|
}
|
|
// If it is the first time this operation is being checked, synchronously fetch
|
|
// the credits.
|
|
credits, err := t.fetchCredits([]string{operation})
|
|
if err != nil {
|
|
// Failed to receive credits from agent, try again next time
|
|
t.logger.Error("Failed to fetch credits: " + err.Error())
|
|
return false
|
|
}
|
|
if len(credits.Balances) == 0 {
|
|
// This shouldn't happen but just in case
|
|
return false
|
|
}
|
|
for _, opBalance := range credits.Balances {
|
|
t.credits[opBalance.Operation] += opBalance.Balance
|
|
}
|
|
}
|
|
return t.isAllowed(operation)
|
|
}
|
|
|
|
// Close stops the throttler from fetching credits from remote.
|
|
func (t *Throttler) Close() error {
|
|
close(t.close)
|
|
t.stopped.Wait()
|
|
return nil
|
|
}
|
|
|
|
// SetProcess implements ProcessSetter#SetProcess. It's imperative that the UUID is set before any remote
|
|
// requests are made.
|
|
func (t *Throttler) SetProcess(process jaeger.Process) {
|
|
if process.UUID != "" {
|
|
t.uuid.Store(process.UUID)
|
|
}
|
|
}
|
|
|
|
// N.B. This function must be called with the Write Lock
|
|
func (t *Throttler) isAllowed(operation string) bool {
|
|
credits := t.credits[operation]
|
|
if credits < minimumCredits {
|
|
t.metrics.ThrottledDebugSpans.Inc(1)
|
|
return false
|
|
}
|
|
t.credits[operation] = credits - minimumCredits
|
|
return true
|
|
}
|
|
|
|
func (t *Throttler) pollManager() {
|
|
defer t.stopped.Done()
|
|
ticker := time.NewTicker(t.refreshInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
t.refreshCredits()
|
|
case <-t.close:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *Throttler) operations() []string {
|
|
t.mux.RLock()
|
|
defer t.mux.RUnlock()
|
|
operations := make([]string, 0, len(t.credits))
|
|
for op := range t.credits {
|
|
operations = append(operations, op)
|
|
}
|
|
return operations
|
|
}
|
|
|
|
func (t *Throttler) refreshCredits() {
|
|
operations := t.operations()
|
|
if len(operations) == 0 {
|
|
return
|
|
}
|
|
newCredits, err := t.fetchCredits(operations)
|
|
if err != nil {
|
|
t.metrics.ThrottlerUpdateFailure.Inc(1)
|
|
t.logger.Error("Failed to fetch credits: " + err.Error())
|
|
return
|
|
}
|
|
t.metrics.ThrottlerUpdateSuccess.Inc(1)
|
|
|
|
t.mux.Lock()
|
|
defer t.mux.Unlock()
|
|
for _, opBalance := range newCredits.Balances {
|
|
t.credits[opBalance.Operation] += opBalance.Balance
|
|
}
|
|
}
|
|
|
|
func (t *Throttler) fetchCredits(operations []string) (*creditResponse, error) {
|
|
uuid := t.uuid.Load()
|
|
uuidStr, _ := uuid.(string)
|
|
if uuid == nil || uuidStr == "" {
|
|
return nil, errorUUIDNotSet
|
|
}
|
|
return t.creditManager.FetchCredits(uuidStr, t.service, operations)
|
|
}
|