package fasthttp
import (
"sync"
"sync/atomic"
"time"
)
type BalancingClient interface {
DoDeadline (req *Request , resp *Response , deadline time .Time ) error
PendingRequests () int
}
type LBClient struct {
noCopy noCopy
Clients []BalancingClient
HealthCheck func (req *Request , resp *Response , err error ) bool
Timeout time .Duration
cs []*lbClient
once sync .Once
mu sync .RWMutex
}
const DefaultLBClientTimeout = time .Second
func (cc *LBClient ) DoDeadline (req *Request , resp *Response , deadline time .Time ) error {
return cc .get ().DoDeadline (req , resp , deadline )
}
func (cc *LBClient ) DoTimeout (req *Request , resp *Response , timeout time .Duration ) error {
deadline := time .Now ().Add (timeout )
return cc .get ().DoDeadline (req , resp , deadline )
}
func (cc *LBClient ) Do (req *Request , resp *Response ) error {
timeout := cc .Timeout
if timeout <= 0 {
timeout = DefaultLBClientTimeout
}
return cc .DoTimeout (req , resp , timeout )
}
func (cc *LBClient ) init () {
cc .mu .Lock ()
defer cc .mu .Unlock ()
if len (cc .Clients ) == 0 {
panic ("BUG: LBClient.Clients cannot be empty" )
}
for _ , c := range cc .Clients {
cc .cs = append (cc .cs , &lbClient {
c : c ,
healthCheck : cc .HealthCheck ,
})
}
}
func (cc *LBClient ) AddClient (c BalancingClient ) int {
cc .mu .Lock ()
cc .cs = append (cc .cs , &lbClient {
c : c ,
healthCheck : cc .HealthCheck ,
})
cc .mu .Unlock ()
return len (cc .cs )
}
func (cc *LBClient ) RemoveClients (rc func (BalancingClient ) bool ) int {
cc .mu .Lock ()
n := 0
for idx , cs := range cc .cs {
cc .cs [idx ] = nil
if rc (cs .c ) {
continue
}
cc .cs [n ] = cs
n ++
}
cc .cs = cc .cs [:n ]
cc .mu .Unlock ()
return len (cc .cs )
}
func (cc *LBClient ) get () *lbClient {
cc .once .Do (cc .init )
cc .mu .RLock ()
cs := cc .cs
minC := cs [0 ]
minN := minC .PendingRequests ()
minT := atomic .LoadUint64 (&minC .total )
for _ , c := range cs [1 :] {
n := c .PendingRequests ()
t := atomic .LoadUint64 (&c .total )
if n < minN || (n == minN && t < minT ) {
minC = c
minN = n
minT = t
}
}
cc .mu .RUnlock ()
return minC
}
type lbClient struct {
c BalancingClient
healthCheck func (req *Request , resp *Response , err error ) bool
penalty uint32
total uint64
}
func (c *lbClient ) DoDeadline (req *Request , resp *Response , deadline time .Time ) error {
err := c .c .DoDeadline (req , resp , deadline )
if !c .isHealthy (req , resp , err ) && c .incPenalty () {
time .AfterFunc (penaltyDuration , c .decPenalty )
} else {
atomic .AddUint64 (&c .total , 1 )
}
return err
}
func (c *lbClient ) PendingRequests () int {
n := c .c .PendingRequests ()
m := atomic .LoadUint32 (&c .penalty )
return n + int (m )
}
func (c *lbClient ) isHealthy (req *Request , resp *Response , err error ) bool {
if c .healthCheck == nil {
return err == nil
}
return c .healthCheck (req , resp , err )
}
func (c *lbClient ) incPenalty () bool {
m := atomic .AddUint32 (&c .penalty , 1 )
if m > maxPenalty {
c .decPenalty ()
return false
}
return true
}
func (c *lbClient ) decPenalty () {
atomic .AddUint32 (&c .penalty , ^uint32 (0 ))
}
const (
maxPenalty = 300
penaltyDuration = 3 * time .Second
)
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .