package fasthttp
import (
"errors"
"net"
"runtime"
"strings"
"sync"
"time"
)
type workerPool struct {
WorkerFunc ServeHandler
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time .Duration
Logger Logger
lock sync .Mutex
workersCount int
mustStop bool
ready []*workerChan
stopCh chan struct {}
workerChanPool sync .Pool
connState func (net .Conn , ConnState )
}
type workerChan struct {
lastUseTime time .Time
ch chan net .Conn
}
func (wp *workerPool ) Start () {
if wp .stopCh != nil {
return
}
wp .stopCh = make (chan struct {})
stopCh := wp .stopCh
wp .workerChanPool .New = func () interface {} {
return &workerChan {
ch : make (chan net .Conn , workerChanCap ),
}
}
go func () {
var scratch []*workerChan
for {
wp .clean (&scratch )
select {
case <- stopCh :
return
default :
time .Sleep (wp .getMaxIdleWorkerDuration ())
}
}
}()
}
func (wp *workerPool ) Stop () {
if wp .stopCh == nil {
return
}
close (wp .stopCh )
wp .stopCh = nil
wp .lock .Lock ()
ready := wp .ready
for i := range ready {
ready [i ].ch <- nil
ready [i ] = nil
}
wp .ready = ready [:0 ]
wp .mustStop = true
wp .lock .Unlock ()
}
func (wp *workerPool ) getMaxIdleWorkerDuration () time .Duration {
if wp .MaxIdleWorkerDuration <= 0 {
return 10 * time .Second
}
return wp .MaxIdleWorkerDuration
}
func (wp *workerPool ) clean (scratch *[]*workerChan ) {
maxIdleWorkerDuration := wp .getMaxIdleWorkerDuration ()
criticalTime := time .Now ().Add (-maxIdleWorkerDuration )
wp .lock .Lock ()
ready := wp .ready
n := len (ready )
l , r , mid := 0 , n -1 , 0
for l <= r {
mid = (l + r ) / 2
if criticalTime .After (wp .ready [mid ].lastUseTime ) {
l = mid + 1
} else {
r = mid - 1
}
}
i := r
if i == -1 {
wp .lock .Unlock ()
return
}
*scratch = append ((*scratch )[:0 ], ready [:i +1 ]...)
m := copy (ready , ready [i +1 :])
for i = m ; i < n ; i ++ {
ready [i ] = nil
}
wp .ready = ready [:m ]
wp .lock .Unlock ()
tmp := *scratch
for i := range tmp {
tmp [i ].ch <- nil
tmp [i ] = nil
}
}
func (wp *workerPool ) Serve (c net .Conn ) bool {
ch := wp .getCh ()
if ch == nil {
return false
}
ch .ch <- c
return true
}
var workerChanCap = func () int {
if runtime .GOMAXPROCS (0 ) == 1 {
return 0
}
return 1
}()
func (wp *workerPool ) getCh () *workerChan {
var ch *workerChan
createWorker := false
wp .lock .Lock ()
ready := wp .ready
n := len (ready ) - 1
if n < 0 {
if wp .workersCount < wp .MaxWorkersCount {
createWorker = true
wp .workersCount ++
}
} else {
ch = ready [n ]
ready [n ] = nil
wp .ready = ready [:n ]
}
wp .lock .Unlock ()
if ch == nil {
if !createWorker {
return nil
}
vch := wp .workerChanPool .Get ()
ch = vch .(*workerChan )
go func () {
wp .workerFunc (ch )
wp .workerChanPool .Put (vch )
}()
}
return ch
}
func (wp *workerPool ) release (ch *workerChan ) bool {
ch .lastUseTime = time .Now ()
wp .lock .Lock ()
if wp .mustStop {
wp .lock .Unlock ()
return false
}
wp .ready = append (wp .ready , ch )
wp .lock .Unlock ()
return true
}
func (wp *workerPool ) workerFunc (ch *workerChan ) {
var c net .Conn
var err error
for c = range ch .ch {
if c == nil {
break
}
if err = wp .WorkerFunc (c ); err != nil && err != errHijacked {
errStr := err .Error()
if wp .LogAllErrors || !(strings .Contains (errStr , "broken pipe" ) ||
strings .Contains (errStr , "reset by peer" ) ||
strings .Contains (errStr , "request headers: small read buffer" ) ||
strings .Contains (errStr , "unexpected EOF" ) ||
strings .Contains (errStr , "i/o timeout" ) ||
errors .Is (err , ErrBadTrailer )) {
wp .Logger .Printf ("error when serving connection %q<->%q: %v" , c .LocalAddr (), c .RemoteAddr (), err )
}
}
if err == errHijacked {
wp .connState (c , StateHijacked )
} else {
_ = c .Close ()
wp .connState (c , StateClosed )
}
c = nil
if !wp .release (ch ) {
break
}
}
wp .lock .Lock ()
wp .workersCount --
wp .lock .Unlock ()
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .