package fasthttpimport ()// Do performs the given http request and fills the given http response.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Request, *Response) error {returndefaultClient.Do(, )}// DoTimeout performs the given request and waits for response during// the given timeout duration.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned during// the given timeout.//// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Request, *Response, time.Duration) error {returndefaultClient.DoTimeout(, , )}// DoDeadline performs the given request and waits for response until// the given deadline.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned until// the given deadline.//// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Request, *Response, time.Time) error {returndefaultClient.DoDeadline(, , )}// DoRedirects performs the given http request and fills the given http response,// following up to maxRedirectsCount redirects. When the redirect count exceeds// maxRedirectsCount, ErrTooManyRedirects is returned.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// Response is ignored if resp is nil.//// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Request, *Response, int) error { , , := doRequestFollowRedirects(, , .URI().String(), , &defaultClient)return}// Get returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.func ( []byte, string) ( int, []byte, error) {returndefaultClient.Get(, )}// GetTimeout returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// ErrTimeout error is returned if url contents couldn't be fetched// during the given timeout.func ( []byte, string, time.Duration) ( int, []byte, error) {returndefaultClient.GetTimeout(, , )}// GetDeadline returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// ErrTimeout error is returned if url contents couldn't be fetched// until the given deadline.func ( []byte, string, time.Time) ( int, []byte, error) {returndefaultClient.GetDeadline(, , )}// Post sends POST request to the given url with the given POST arguments.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// Empty POST body is sent if postArgs is nil.func ( []byte, string, *Args) ( int, []byte, error) {returndefaultClient.Post(, , )}var defaultClient Client// Client implements http client.//// Copying Client by value is prohibited. Create new instance instead.//// It is safe calling Client methods from concurrently running goroutines.//// The fields of a Client should not be changed while it is in use.typeClientstruct { noCopy noCopy// Client name. Used in User-Agent request header. // // Default client name is used if not set. Name string// NoDefaultUserAgentHeader when set to true, causes the default // User-Agent header to be excluded from the Request. NoDefaultUserAgentHeader bool// Callback for establishing new connections to hosts. // // Default Dial is used if not set. Dial DialFunc// Attempt to connect to both ipv4 and ipv6 addresses if set to true. // // This option is used only if default TCP dialer is used, // i.e. if Dial is blank. // // By default client connects only to ipv4 addresses, // since unfortunately ipv6 remains broken in many networks worldwide :) DialDualStack bool// TLS config for https connections. // // Default TLS config is used if not set. TLSConfig *tls.Config// Maximum number of connections per each host which may be established. // // DefaultMaxConnsPerHost is used if not set. MaxConnsPerHost int// Idle keep-alive connections are closed after this duration. // // By default idle connections are closed // after DefaultMaxIdleConnDuration. MaxIdleConnDuration time.Duration// Keep-alive connections are closed after this duration. // // By default connection duration is unlimited. MaxConnDuration time.Duration// Maximum number of attempts for idempotent calls // // DefaultMaxIdemponentCallAttempts is used if not set. MaxIdemponentCallAttempts int// Per-connection buffer size for responses' reading. // This also limits the maximum header size. // // Default buffer size is used if 0. ReadBufferSize int// Per-connection buffer size for requests' writing. // // Default buffer size is used if 0. WriteBufferSize int// Maximum duration for full response reading (including body). // // By default response read timeout is unlimited. ReadTimeout time.Duration// Maximum duration for full request writing (including body). // // By default request write timeout is unlimited. WriteTimeout time.Duration// Maximum response body size. // // The client returns ErrBodyTooLarge if this limit is greater than 0 // and response body is greater than the limit. // // By default response body size is unlimited. MaxResponseBodySize int// Header names are passed as-is without normalization // if this option is set. // // Disabled header names' normalization may be useful only for proxying // responses to other clients expecting case-sensitive // header names. See https://github.com/valyala/fasthttp/issues/57 // for details. // // By default request and response header names are normalized, i.e. // The first letter and the first letters following dashes // are uppercased, while all the other letters are lowercased. // Examples: // // * HOST -> Host // * content-type -> Content-Type // * cONTENT-lenGTH -> Content-Length DisableHeaderNamesNormalizing bool// Path values are sent as-is without normalization // // Disabled path normalization may be useful for proxying incoming requests // to servers that are expecting paths to be forwarded as-is. // // By default path values are normalized, i.e. // extra slashes are removed, special characters are encoded. DisablePathNormalizing bool// Maximum duration for waiting for a free connection. // // By default will not waiting, return ErrNoFreeConns immediately MaxConnWaitTimeout time.Duration// RetryIf controls whether a retry should be attempted after an error. // // By default will use isIdempotent function RetryIf RetryIfFunc// Connection pool strategy. Can be either LIFO or FIFO (default). ConnPoolStrategy ConnPoolStrategyType// StreamResponseBody enables response body streaming StreamResponseBody bool// ConfigureClient configures the fasthttp.HostClient. ConfigureClient func(hc *HostClient) error mLock sync.RWMutex mOnce sync.Once m map[string]*HostClient ms map[string]*HostClient readerPool sync.Pool writerPool sync.Pool}// Get returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.func ( *Client) ( []byte, string) ( int, []byte, error) {returnclientGetURL(, , )}// GetTimeout returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// ErrTimeout error is returned if url contents couldn't be fetched// during the given timeout.func ( *Client) ( []byte, string, time.Duration) ( int, []byte, error) {returnclientGetURLTimeout(, , , )}// GetDeadline returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// ErrTimeout error is returned if url contents couldn't be fetched// until the given deadline.func ( *Client) ( []byte, string, time.Time) ( int, []byte, error) {returnclientGetURLDeadline(, , , )}// Post sends POST request to the given url with the given POST arguments.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// Empty POST body is sent if postArgs is nil.func ( *Client) ( []byte, string, *Args) ( int, []byte, error) {returnclientPostURL(, , , )}// DoTimeout performs the given request and waits for response during// the given timeout duration.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned during// the given timeout.// Immediately returns ErrTimeout if timeout value is negative.//// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Client) ( *Request, *Response, time.Duration) error { .timeout = if .timeout <= 0 {returnErrTimeout }return .Do(, )}// DoDeadline performs the given request and waits for response until// the given deadline.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned until// the given deadline.// Immediately returns ErrTimeout if the deadline has already been reached.//// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Client) ( *Request, *Response, time.Time) error { .timeout = time.Until()if .timeout <= 0 {returnErrTimeout }return .Do(, )}// DoRedirects performs the given http request and fills the given http response,// following up to maxRedirectsCount redirects. When the redirect count exceeds// maxRedirectsCount, ErrTooManyRedirects is returned.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// Response is ignored if resp is nil.//// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Client) ( *Request, *Response, int) error { , , := doRequestFollowRedirects(, , .URI().String(), , )return}// Do performs the given http request and fills the given http response.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// Response is ignored if resp is nil.//// The function doesn't follow redirects. Use Get* for following redirects.//// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *Client) ( *Request, *Response) error { := .URI()if == nil {returnErrorInvalidURI } := .Host() := falseif .isHTTPS() { = true } elseif !.isHTTP() {returnfmt.Errorf("unsupported protocol %q. http and https are supported", .Scheme()) } .mOnce.Do(func() { .mLock.Lock() .m = make(map[string]*HostClient) .ms = make(map[string]*HostClient) .mLock.Unlock() }) := false .mLock.RLock() := .mif { = .ms } := [string()]if != nil {atomic.AddInt32(&.pendingClientRequests, 1)deferatomic.AddInt32(&.pendingClientRequests, -1) } .mLock.RUnlock()if == nil { .mLock.Lock() = [string()]if == nil { = &HostClient{Addr: AddMissingPort(string(), ),Name: .Name,NoDefaultUserAgentHeader: .NoDefaultUserAgentHeader,Dial: .Dial,DialDualStack: .DialDualStack,IsTLS: ,TLSConfig: .TLSConfig,MaxConns: .MaxConnsPerHost,MaxIdleConnDuration: .MaxIdleConnDuration,MaxConnDuration: .MaxConnDuration,MaxIdemponentCallAttempts: .MaxIdemponentCallAttempts,ReadBufferSize: .ReadBufferSize,WriteBufferSize: .WriteBufferSize,ReadTimeout: .ReadTimeout,WriteTimeout: .WriteTimeout,MaxResponseBodySize: .MaxResponseBodySize,DisableHeaderNamesNormalizing: .DisableHeaderNamesNormalizing,DisablePathNormalizing: .DisablePathNormalizing,MaxConnWaitTimeout: .MaxConnWaitTimeout,RetryIf: .RetryIf,ConnPoolStrategy: .ConnPoolStrategy,StreamResponseBody: .StreamResponseBody,clientReaderPool: &.readerPool,clientWriterPool: &.writerPool, }if .ConfigureClient != nil {if := .ConfigureClient(); != nil { .mLock.Unlock()return } } [string()] = iflen() == 1 { = true } }atomic.AddInt32(&.pendingClientRequests, 1)deferatomic.AddInt32(&.pendingClientRequests, -1) .mLock.Unlock() }if {go .mCleaner() }return .Do(, )}// CloseIdleConnections closes any connections which were previously// connected from previous requests but are now sitting idle in a// "keep-alive" state. It does not interrupt any connections currently// in use.func ( *Client) () { .mLock.RLock()for , := range .m { .CloseIdleConnections() }for , := range .ms { .CloseIdleConnections() } .mLock.RUnlock()}func ( *Client) ( map[string]*HostClient) { := false := .MaxIdleConnDurationif < time.Second { = time.Second } elseif > 10*time.Second { = 10 * time.Second }for {time.Sleep() .mLock.Lock()for , := range { .connsLock.Lock()/* #nosec G601 */if .connsCount == 0 && atomic.LoadInt32(&.pendingClientRequests) == 0 {delete(, ) } .connsLock.Unlock() }iflen() == 0 { = true } .mLock.Unlock()if {break } }}// DefaultMaxConnsPerHost is the maximum number of concurrent connections// http client may establish per host by default (i.e. if// Client.MaxConnsPerHost isn't set).constDefaultMaxConnsPerHost = 512// DefaultMaxIdleConnDuration is the default duration before idle keep-alive// connection is closed.constDefaultMaxIdleConnDuration = 10 * time.Second// DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.constDefaultMaxIdemponentCallAttempts = 5// DialFunc must establish connection to addr.//// There is no need in establishing TLS (SSL) connection for https.// The client automatically converts connection to TLS// if HostClient.IsTLS is set.//// TCP address passed to DialFunc always contains host and port.// Example TCP addr values://// - foobar.com:80// - foobar.com:443// - foobar.com:8080typeDialFuncfunc(addr string) (net.Conn, error)// RetryIfFunc signature of retry if function//// Request argument passed to RetryIfFunc, if there are any request errors.typeRetryIfFuncfunc(request *Request) bool// RoundTripper wraps every request/response.typeRoundTripperinterface {RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)}// ConnPoolStrategyType define strategy of connection pool enqueue/dequeuetypeConnPoolStrategyTypeintconst (FIFOConnPoolStrategyType = iotaLIFO)// HostClient balances http requests among hosts listed in Addr.//// HostClient may be used for balancing load among multiple upstream hosts.// While multiple addresses passed to HostClient.Addr may be used for balancing// load among them, it would be better using LBClient instead, since HostClient// may unevenly balance load among upstream hosts.//// It is forbidden copying HostClient instances. Create new instances instead.//// It is safe calling HostClient methods from concurrently running goroutines.typeHostClientstruct { noCopy noCopy// Comma-separated list of upstream HTTP server host addresses, // which are passed to Dial in a round-robin manner. // // Each address may contain port if default dialer is used. // For example, // // - foobar.com:80 // - foobar.com:443 // - foobar.com:8080 Addr string// Client name. Used in User-Agent request header. Name string// NoDefaultUserAgentHeader when set to true, causes the default // User-Agent header to be excluded from the Request. NoDefaultUserAgentHeader bool// Callback for establishing new connection to the host. // // Default Dial is used if not set. Dial DialFunc// Attempt to connect to both ipv4 and ipv6 host addresses // if set to true. // // This option is used only if default TCP dialer is used, // i.e. if Dial is blank. // // By default client connects only to ipv4 addresses, // since unfortunately ipv6 remains broken in many networks worldwide :) DialDualStack bool// Whether to use TLS (aka SSL or HTTPS) for host connections. IsTLS bool// Optional TLS config. TLSConfig *tls.Config// Maximum number of connections which may be established to all hosts // listed in Addr. // // You can change this value while the HostClient is being used // with HostClient.SetMaxConns(value) // // DefaultMaxConnsPerHost is used if not set. MaxConns int// Keep-alive connections are closed after this duration. // // By default connection duration is unlimited. MaxConnDuration time.Duration// Idle keep-alive connections are closed after this duration. // // By default idle connections are closed // after DefaultMaxIdleConnDuration. MaxIdleConnDuration time.Duration// Maximum number of attempts for idempotent calls // // DefaultMaxIdemponentCallAttempts is used if not set. MaxIdemponentCallAttempts int// Per-connection buffer size for responses' reading. // This also limits the maximum header size. // // Default buffer size is used if 0. ReadBufferSize int// Per-connection buffer size for requests' writing. // // Default buffer size is used if 0. WriteBufferSize int// Maximum duration for full response reading (including body). // // By default response read timeout is unlimited. ReadTimeout time.Duration// Maximum duration for full request writing (including body). // // By default request write timeout is unlimited. WriteTimeout time.Duration// Maximum response body size. // // The client returns ErrBodyTooLarge if this limit is greater than 0 // and response body is greater than the limit. // // By default response body size is unlimited. MaxResponseBodySize int// Header names are passed as-is without normalization // if this option is set. // // Disabled header names' normalization may be useful only for proxying // responses to other clients expecting case-sensitive // header names. See https://github.com/valyala/fasthttp/issues/57 // for details. // // By default request and response header names are normalized, i.e. // The first letter and the first letters following dashes // are uppercased, while all the other letters are lowercased. // Examples: // // * HOST -> Host // * content-type -> Content-Type // * cONTENT-lenGTH -> Content-Length DisableHeaderNamesNormalizing bool// Path values are sent as-is without normalization // // Disabled path normalization may be useful for proxying incoming requests // to servers that are expecting paths to be forwarded as-is. // // By default path values are normalized, i.e. // extra slashes are removed, special characters are encoded. DisablePathNormalizing bool// Will not log potentially sensitive content in error logs // // This option is useful for servers that handle sensitive data // in the request/response. // // Client logs full errors by default. SecureErrorLogMessage bool// Maximum duration for waiting for a free connection. // // By default will not waiting, return ErrNoFreeConns immediately MaxConnWaitTimeout time.Duration// RetryIf controls whether a retry should be attempted after an error. // // By default will use isIdempotent function RetryIf RetryIfFunc// Transport defines a transport-like mechanism that wraps every request/response. Transport RoundTripper// Connection pool strategy. Can be either LIFO or FIFO (default). ConnPoolStrategy ConnPoolStrategyType// StreamResponseBody enables response body streaming StreamResponseBody bool lastUseTime uint32 connsLock sync.Mutex connsCount int conns []*clientConn connsWait *wantConnQueue addrsLock sync.Mutex addrs []string addrIdx uint32 tlsConfigMap map[string]*tls.Config tlsConfigMapLock sync.Mutex readerPool sync.Pool writerPool sync.Pool clientReaderPool *sync.Pool clientWriterPool *sync.Pool pendingRequests int32// pendingClientRequests counts the number of requests that a Client is currently running using this HostClient. // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use. pendingClientRequests int32 connsCleanerRun bool}type clientConn struct { c net.Conn createdTime time.Time lastUseTime time.Time}var startTimeUnix = time.Now().Unix()// LastUseTime returns time the client was last usedfunc ( *HostClient) () time.Time { := atomic.LoadUint32(&.lastUseTime)returntime.Unix(startTimeUnix+int64(), 0)}// Get returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.func ( *HostClient) ( []byte, string) ( int, []byte, error) {returnclientGetURL(, , )}// GetTimeout returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// ErrTimeout error is returned if url contents couldn't be fetched// during the given timeout.func ( *HostClient) ( []byte, string, time.Duration) ( int, []byte, error) {returnclientGetURLTimeout(, , , )}// GetDeadline returns the status code and body of url.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// ErrTimeout error is returned if url contents couldn't be fetched// until the given deadline.func ( *HostClient) ( []byte, string, time.Time) ( int, []byte, error) {returnclientGetURLDeadline(, , , )}// Post sends POST request to the given url with the given POST arguments.//// The contents of dst will be replaced by the body and returned, if the dst// is too small a new slice will be allocated.//// The function follows redirects. Use Do* for manually handling redirects.//// Empty POST body is sent if postArgs is nil.func ( *HostClient) ( []byte, string, *Args) ( int, []byte, error) {returnclientPostURL(, , , )}type clientDoer interface { Do(req *Request, resp *Response) error}func clientGetURL( []byte, string, clientDoer) ( int, []byte, error) { := AcquireRequest() , , = doRequestFollowRedirectsBuffer(, , , )ReleaseRequest()return , , }func clientGetURLTimeout( []byte, string, time.Duration, clientDoer) ( int, []byte, error) { := time.Now().Add()returnclientGetURLDeadline(, , , )}type clientURLResponse struct { statusCode int body []byte err error}func clientGetURLDeadline( []byte, string, time.Time, clientDoer) ( int, []byte, error) { := time.Until()if <= 0 {return0, , ErrTimeout }varchanclientURLResponse := clientURLResponseChPool.Get()if == nil { = make(chanclientURLResponse, 1) } = .(chanclientURLResponse)// Note that the request continues execution on ErrTimeout until // client-specific ReadTimeout exceeds. This helps limiting load // on slow hosts by MaxConns* concurrent requests. // // Without this 'hack' the load on slow host could exceed MaxConns* // concurrent requests, since timed out requests on client side // usually continue execution on the host.varsync.Mutexvar , boolgofunc() { := AcquireRequest() , , := doRequestFollowRedirectsBuffer(, , , ) .Lock()if ! { <- clientURLResponse{statusCode: ,body: ,err: , } = true } .Unlock()ReleaseRequest() }() := AcquireTimer()select {case := <-: = .statusCode = .body = .errcase<-.C: .Lock()if { := <- = .statusCode = .body = .err } else { = true = ErrTimeout = } .Unlock() }ReleaseTimer()clientURLResponseChPool.Put()return , , }var clientURLResponseChPool sync.Poolfunc clientPostURL( []byte, string, *Args, clientDoer) ( int, []byte, error) { := AcquireRequest()deferReleaseRequest() .Header.SetMethod(MethodPost) .Header.SetContentTypeBytes(strPostArgsContentType)if != nil {if , := .WriteTo(.BodyWriter()); != nil {return0, nil, } } , , = doRequestFollowRedirectsBuffer(, , , )return , , }var (// ErrMissingLocation is returned by clients when the Location header is missing on // an HTTP response with a redirect status code.ErrMissingLocation = errors.New("missing Location header for http redirect")// ErrTooManyRedirects is returned by clients when the number of redirects followed // exceed the max count.ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")// HostClients are only able to follow redirects to the same protocol.ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol, please use Client instead"))const defaultMaxRedirectsCount = 16func doRequestFollowRedirectsBuffer( *Request, []byte, string, clientDoer) ( int, []byte, error) { := AcquireResponse() := .bodyBuffer() .keepBodyBuffer = true := .B .B = , _, = doRequestFollowRedirects(, , , defaultMaxRedirectsCount, ) = .B .B = .keepBodyBuffer = falseReleaseResponse()return , , }func doRequestFollowRedirects( *Request, *Response, string, int, clientDoer) ( int, []byte, error) { := 0for { .SetRequestURI()if := .parseURI(); != nil {return0, nil, }if = .Do(, ); != nil {break } = .Header.StatusCode()if !StatusCodeIsRedirect() {break } ++if > { = ErrTooManyRedirectsbreak } := .Header.peek(strLocation)iflen() == 0 { = ErrMissingLocationbreak } = getRedirectURL(, ) }return , , }func getRedirectURL( string, []byte) string { := AcquireURI() .Update() .UpdateBytes() := .String()ReleaseURI()return}// StatusCodeIsRedirect returns true if the status code indicates a redirect.func ( int) bool {return == StatusMovedPermanently || == StatusFound || == StatusSeeOther || == StatusTemporaryRedirect || == StatusPermanentRedirect}var ( requestPool sync.Pool responsePool sync.Pool)// AcquireRequest returns an empty Request instance from request pool.//// The returned Request instance may be passed to ReleaseRequest when it is// no longer needed. This allows Request recycling, reduces GC pressure// and usually improves performance.func () *Request { := requestPool.Get()if == nil {return &Request{} }return .(*Request)}// ReleaseRequest returns req acquired via AcquireRequest to request pool.//// It is forbidden accessing req and/or its' members after returning// it to request pool.func ( *Request) { .Reset()requestPool.Put()}// AcquireResponse returns an empty Response instance from response pool.//// The returned Response instance may be passed to ReleaseResponse when it is// no longer needed. This allows Response recycling, reduces GC pressure// and usually improves performance.func () *Response { := responsePool.Get()if == nil {return &Response{} }return .(*Response)}// ReleaseResponse return resp acquired via AcquireResponse to response pool.//// It is forbidden accessing resp and/or its' members after returning// it to response pool.func ( *Response) { .Reset()responsePool.Put()}// DoTimeout performs the given request and waits for response during// the given timeout duration.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned during// the given timeout.// Immediately returns ErrTimeout if timeout value is negative.//// ErrNoFreeConns is returned if all HostClient.MaxConns connections// to the host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *HostClient) ( *Request, *Response, time.Duration) error { .timeout = if .timeout <= 0 {returnErrTimeout }return .Do(, )}// DoDeadline performs the given request and waits for response until// the given deadline.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned until// the given deadline.// Immediately returns ErrTimeout if the deadline has already been reached.//// ErrNoFreeConns is returned if all HostClient.MaxConns connections// to the host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *HostClient) ( *Request, *Response, time.Time) error { .timeout = time.Until()if .timeout <= 0 {returnErrTimeout }return .Do(, )}// DoRedirects performs the given http request and fills the given http response,// following up to maxRedirectsCount redirects. When the redirect count exceeds// maxRedirectsCount, ErrTooManyRedirects is returned.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// Client determines the server to be requested in the following order://// - from RequestURI if it contains full url with scheme and host;// - from Host header otherwise.//// Response is ignored if resp is nil.//// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections// to the requested host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *HostClient) ( *Request, *Response, int) error { , , := doRequestFollowRedirects(, , .URI().String(), , )return}// Do performs the given http request and sets the corresponding response.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// ErrNoFreeConns is returned if all HostClient.MaxConns connections// to the host are busy.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *HostClient) ( *Request, *Response) error {varerrorvarbool := .MaxIdemponentCallAttemptsif <= 0 { = DefaultMaxIdemponentCallAttempts } := isIdempotentif .RetryIf != nil { = .RetryIf } := 0 := .IsBodyStream()// If a request has a timeout we store the timeout // and calculate a deadline so we can keep updating the // timeout on each retry. := time.Time{} := .timeoutif > 0 { = time.Now().Add() }atomic.AddInt32(&.pendingRequests, 1)for {// If the original timeout was set, we need to update // the one set on the request to reflect the remaining time.if > 0 { .timeout = time.Until()if .timeout <= 0 { = ErrTimeoutbreak } } , = .do(, )if == nil || ! {break }if {break }if !() {// Retry non-idempotent requests if the server closes // the connection before sending the response. // // This case is possible if the server closes the idle // keep-alive connection on timeout. // // Apache and nginx usually do this.if != io.EOF {break } } ++if >= {break } }atomic.AddInt32(&.pendingRequests, -1)// Restore the original timeout. .timeout = if == io.EOF { = ErrConnectionClosed }return}// PendingRequests returns the current number of requests the client// is executing.//// This function may be used for balancing load among multiple HostClient// instances.func ( *HostClient) () int {returnint(atomic.LoadInt32(&.pendingRequests))}func isIdempotent( *Request) bool {return .Header.IsGet() || .Header.IsHead() || .Header.IsPut()}func ( *HostClient) ( *Request, *Response) (bool, error) {if == nil { = AcquireResponse()deferReleaseResponse() } , := .doNonNilReqResp(, )return , }func ( *HostClient) ( *Request, *Response) (bool, error) {if == nil {// for debugging purposespanic("BUG: req cannot be nil") }if == nil {// for debugging purposespanic("BUG: resp cannot be nil") }// Secure header error logs configuration .secureErrorLogMessage = .SecureErrorLogMessage .Header.secureErrorLogMessage = .SecureErrorLogMessage .secureErrorLogMessage = .SecureErrorLogMessage .Header.secureErrorLogMessage = .SecureErrorLogMessageif .IsTLS != .URI().isHTTPS() {returnfalse, ErrHostClientRedirectToDifferentScheme }atomic.StoreUint32(&.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))// Free up resources occupied by response before sending the request, // so the GC may reclaim these resources (e.g. response body).// backing up SkipBody in case it was set explicitly := .SkipBody := .StreamBody || .StreamResponseBody .Reset() .SkipBody = .StreamBody = .URI().DisablePathNormalizing = .DisablePathNormalizing := .Header.UserAgent()iflen() == 0 { := .Nameif == "" && !.NoDefaultUserAgentHeader { = defaultUserAgent }if != "" { .Header.userAgent = append(.Header.userAgent[:0], ...) } }return .transport().RoundTrip(, , )}func ( *HostClient) () RoundTripper {if .Transport == nil {returnDefaultTransport }return .Transport}var (// ErrNoFreeConns is returned when no free connections available // to the given host. // // Increase the allowed number of connections per host if you // see this error.ErrNoFreeConns = errors.New("no free connections available to host")// ErrConnectionClosed may be returned from client methods if the server // closes connection before returning the first response byte. // // If you see this error, then either fix the server by returning // 'Connection: close' response header before closing the connection // or add 'Connection: close' request header before sending requests // to broken server.ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +"Make sure the server returns 'Connection: close' response header before closing the connection")// ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet. // If you see this error, then you need to check your HostClient configuration.ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement"))type timeoutError struct{}func ( *timeoutError) () string {return"timeout"}// Only implement the Timeout() function of the net.Error interface.// This allows for checks like://// if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {func ( *timeoutError) () bool {returntrue}// ErrTimeout is returned from timed out calls.varErrTimeout = &timeoutError{}// SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.func ( *HostClient) ( int) { .connsLock.Lock() .MaxConns = .connsLock.Unlock()}func ( *HostClient) ( time.Duration, bool) ( *clientConn, error) { := false := falsevarint .connsLock.Lock() = len(.conns)if == 0 { := .MaxConnsif <= 0 { = DefaultMaxConnsPerHost }if .connsCount < { .connsCount++ = trueif !.connsCleanerRun && ! { = true .connsCleanerRun = true } } } else {switch .ConnPoolStrategy {caseLIFO: -- = .conns[] .conns[] = nil .conns = .conns[:]caseFIFO: = .conns[0]copy(.conns, .conns[1:]) .conns[-1] = nil .conns = .conns[:-1]default: .connsLock.Unlock()returnnil, ErrConnPoolStrategyNotImpl } } .connsLock.Unlock()if != nil {return , nil }if ! {if .MaxConnWaitTimeout <= 0 {returnnil, ErrNoFreeConns }// reqTimeout c.MaxConnWaitTimeout wait duration // d1 d2 min(d1, d2) // 0(not set) d2 d2 // d1 0(don't wait) 0(don't wait) // 0(not set) d2 d2 := .MaxConnWaitTimeout := false// reqTimeout == 0 means not setif > 0 && < { = = true }// wait for a free connection := AcquireTimer()deferReleaseTimer() := &wantConn{ready: make(chanstruct{}, 1), }deferfunc() {if != nil { .cancel(, ) } }() .queueForIdle()select {case<-.ready:return .conn, .errcase<-.C:if {returnnil, ErrTimeout }returnnil, ErrNoFreeConns } }if {go .connsCleaner() } , := .dialHostHard()if != nil { .decConnsCount()returnnil, } = acquireClientConn()return , nil}func ( *HostClient) ( *wantConn) { .connsLock.Lock()defer .connsLock.Unlock()if .connsWait == nil { .connsWait = &wantConnQueue{} } .connsWait.clearFront() .connsWait.pushBack()}func ( *HostClient) ( *wantConn) { , := .dialHostHard(0)if != nil { .tryDeliver(nil, ) .decConnsCount()return } := acquireClientConn()if !.tryDeliver(, nil) {// not delivered, return idle connection .releaseConn() }}// CloseIdleConnections closes any connections which were previously// connected from previous requests but are now sitting idle in a// "keep-alive" state. It does not interrupt any connections currently// in use.func ( *HostClient) () { .connsLock.Lock() := append([]*clientConn{}, .conns...)for := range .conns { .conns[] = nil } .conns = .conns[:0] .connsLock.Unlock()for , := range { .closeConn() }}func ( *HostClient) () {var ( []*clientConn = .MaxIdleConnDuration )if <= 0 { = DefaultMaxIdleConnDuration }for { := time.Now()// Determine idle connections to be closed. .connsLock.Lock() := .conns := len() := 0for < && .Sub([].lastUseTime) > { ++ } := if < {// + 1 so we actually sleep past the expiration time and not up to it. // Otherwise the > check above would still fail. = - .Sub([].lastUseTime) + 1 } = append([:0], [:]...)if > 0 { := copy(, [:])for = ; < ; ++ { [] = nil } .conns = [:] } .connsLock.Unlock()// Close idle connections.for , := range { .closeConn() [] = nil }// Determine whether to stop the connsCleaner. .connsLock.Lock() := .connsCount == 0if { .connsCleanerRun = false } .connsLock.Unlock()if {break }time.Sleep() }}func ( *HostClient) ( *clientConn) { .decConnsCount() .c.Close()releaseClientConn()}func ( *HostClient) () {if .MaxConnWaitTimeout <= 0 { .connsLock.Lock() .connsCount-- .connsLock.Unlock()return } .connsLock.Lock()defer .connsLock.Unlock() := falseif := .connsWait; != nil && .len() > 0 {for .len() > 0 { := .popFront()if .waiting() {go .dialConnFor() = truebreak } } }if ! { .connsCount-- }}// ConnsCount returns connection count of HostClientfunc ( *HostClient) () int { .connsLock.Lock()defer .connsLock.Unlock()return .connsCount}func acquireClientConn( net.Conn) *clientConn { := clientConnPool.Get()if == nil { = &clientConn{} } := .(*clientConn) .c = .createdTime = time.Now()return}func releaseClientConn( *clientConn) {// Reset all fields. * = clientConn{}clientConnPool.Put()}var clientConnPool sync.Poolfunc ( *HostClient) ( *clientConn) { .lastUseTime = time.Now()if .MaxConnWaitTimeout <= 0 { .connsLock.Lock() .conns = append(.conns, ) .connsLock.Unlock()return }// try to deliver an idle connection to a *wantConn .connsLock.Lock()defer .connsLock.Unlock() := falseif := .connsWait; != nil && .len() > 0 {for .len() > 0 { := .popFront()if .waiting() { = .tryDeliver(, nil)break } } }if ! { .conns = append(.conns, ) }}func ( *HostClient) ( net.Conn) *bufio.Writer {varinterface{}if .clientWriterPool != nil { = .clientWriterPool.Get()if == nil { := .WriteBufferSizeif <= 0 { = defaultWriteBufferSize }returnbufio.NewWriterSize(, ) } } else { = .writerPool.Get()if == nil { := .WriteBufferSizeif <= 0 { = defaultWriteBufferSize }returnbufio.NewWriterSize(, ) } } := .(*bufio.Writer) .Reset()return}func ( *HostClient) ( *bufio.Writer) {if .clientWriterPool != nil { .clientWriterPool.Put() } else { .writerPool.Put() }}func ( *HostClient) ( net.Conn) *bufio.Reader {varinterface{}if .clientReaderPool != nil { = .clientReaderPool.Get()if == nil { := .ReadBufferSizeif <= 0 { = defaultReadBufferSize }returnbufio.NewReaderSize(, ) } } else { = .readerPool.Get()if == nil { := .ReadBufferSizeif <= 0 { = defaultReadBufferSize }returnbufio.NewReaderSize(, ) } } := .(*bufio.Reader) .Reset()return}func ( *HostClient) ( *bufio.Reader) {if .clientReaderPool != nil { .clientReaderPool.Put() } else { .readerPool.Put() }}func newClientTLSConfig( *tls.Config, string) *tls.Config {if == nil { = &tls.Config{} } else { = .Clone() }iflen(.ServerName) == 0 { := tlsServerName()if == "*" { .InsecureSkipVerify = true } else { .ServerName = } }return}func tlsServerName( string) string {if !strings.Contains(, ":") {return } , , := net.SplitHostPort()if != nil {return"*" }return}func ( *HostClient) () string { .addrsLock.Lock()if .addrs == nil { .addrs = strings.Split(.Addr, ",") } := .addrs[0]iflen(.addrs) > 1 { = .addrs[.addrIdx%uint32(len(.addrs))] .addrIdx++ } .addrsLock.Unlock()return}func ( *HostClient) ( time.Duration) ( net.Conn, error) {// use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or dial has been set. // attempt to dial all the available hosts before giving up. .addrsLock.Lock() := len(.addrs) .addrsLock.Unlock()if == 0 {// It looks like c.addrs isn't initialized yet. = 1 } := .Dialif != 0 && == nil { = func( string) (net.Conn, error) {returnDialTimeout(, ) } } := .ReadTimeout + .WriteTimeoutif <= 0 { = DefaultDialTimeout } := time.Now().Add()for > 0 { := .nextAddr() := .cachedTLSConfig() , = dialAddr(, , .DialDualStack, .IsTLS, , .WriteTimeout)if == nil {return , nil }iftime.Since() >= 0 {break } -- }returnnil, }func ( *HostClient) ( string) *tls.Config {if !.IsTLS {returnnil } .tlsConfigMapLock.Lock()if .tlsConfigMap == nil { .tlsConfigMap = make(map[string]*tls.Config) } := .tlsConfigMap[]if == nil { = newClientTLSConfig(.TLSConfig, ) .tlsConfigMap[] = } .tlsConfigMapLock.Unlock()return}// ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.varErrTLSHandshakeTimeout = errors.New("tls handshake timed out")func tlsClientHandshake( net.Conn, *tls.Config, time.Time) ( net.Conn, error) {deferfunc() {if != nil { .Close() } }() := tls.Client(, ) := .SetDeadline()if != nil {returnnil, } = .Handshake()if , := .(net.Error); && .Timeout() {returnnil, ErrTLSHandshakeTimeout }if != nil {returnnil, } = .SetDeadline(time.Time{})if != nil {returnnil, }return , nil}func dialAddr( string, DialFunc, , bool, *tls.Config, time.Duration) (net.Conn, error) { := time.Now().Add()if == nil {if { = DialDualStack } else { = Dial } = AddMissingPort(, ) } , := ()if != nil {returnnil, }if == nil {returnnil, errors.New("dialling unsuccessful. Please report this bug!") }// We assume that any conn that has the Handshake() method is a TLS conn already. // This doesn't cover just tls.Conn but also other TLS implementations. , := .(interface{ () error })if && ! {if == 0 {returntls.Client(, ), nil }returntlsClientHandshake(, , ) }return , nil}// AddMissingPort adds a port to a host if it is missing.// A literal IPv6 address in hostport must be enclosed in square// brackets, as in "[::1]:80", "[::1%lo0]:80".func ( string, bool) string { := len()if == 0 {return } := [0] == '['if {// if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port := [-1] == ']'if ! {return } } else { // IPv4 := strings.LastIndexByte(, ':')if > 0 {return } } := ":80"if { = ":443" }return + }// A wantConn records state about a wanted connection// (that is, an active call to getConn).// The conn may be gotten by dialing or by finding an idle connection,// or a cancellation may make the conn no longer wanted.// These three options are racing against each other and use// wantConn to coordinate and agree about the winning outcome.//// inspired by net/http/transport.gotype wantConn struct { ready chanstruct{} mu sync.Mutex// protects conn, err, close(ready) conn *clientConn err error}// waiting reports whether w is still waiting for an answer (connection or error).func ( *wantConn) () bool {select {case<-.ready:returnfalsedefault:returntrue }}// tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.func ( *wantConn) ( *clientConn, error) bool { .mu.Lock()defer .mu.Unlock()if .conn != nil || .err != nil {returnfalse } .conn = .err = if .conn == nil && .err == nil {panic("fasthttp: internal error: misuse of tryDeliver") }close(.ready)returntrue}// cancel marks w as no longer wanting a result (for example, due to cancellation).// If a connection has been delivered already, cancel returns it with c.releaseConn.func ( *wantConn) ( *HostClient, error) { .mu.Lock()if .conn == nil && .err == nil {close(.ready) // catch misbehavior in future delivery } := .conn .conn = nil .err = .mu.Unlock()if != nil { .releaseConn() }}// A wantConnQueue is a queue of wantConns.//// inspired by net/http/transport.gotype wantConnQueue struct {// This is a queue, not a dequeue. // It is split into two stages - head[headPos:] and tail. // popFront is trivial (headPos++) on the first stage, and // pushBack is trivial (append) on the second stage. // If the first stage is empty, popFront can swap the // first and second stages to remedy the situation. // // This two-stage split is analogous to the use of two lists // in Okasaki's purely functional queue but without the // overhead of reversing the list when swapping stages. head []*wantConn headPos int tail []*wantConn}// len returns the number of items in the queue.func ( *wantConnQueue) () int {returnlen(.head) - .headPos + len(.tail)}// pushBack adds w to the back of the queue.func ( *wantConnQueue) ( *wantConn) { .tail = append(.tail, )}// popFront removes and returns the wantConn at the front of the queue.func ( *wantConnQueue) () *wantConn {if .headPos >= len(.head) {iflen(.tail) == 0 {returnnil }// Pick up tail as new head, clear tail. .head, .headPos, .tail = .tail, 0, .head[:0] } := .head[.headPos] .head[.headPos] = nil .headPos++return}// peekFront returns the wantConn at the front of the queue without removing it.func ( *wantConnQueue) () *wantConn {if .headPos < len(.head) {return .head[.headPos] }iflen(.tail) > 0 {return .tail[0] }returnnil}// clearFront pops any wantConns that are no longer waiting from the head of the// queue, reporting whether any were popped.func ( *wantConnQueue) () ( bool) {for { := .peekFront()if == nil || .waiting() {return } .popFront() = true }}// PipelineClient pipelines requests over a limited set of concurrent// connections to the given Addr.//// This client may be used in highly loaded HTTP-based RPC systems for reducing// context switches and network level overhead.// See https://en.wikipedia.org/wiki/HTTP_pipelining for details.//// It is forbidden copying PipelineClient instances. Create new instances// instead.//// It is safe calling PipelineClient methods from concurrently running// goroutines.typePipelineClientstruct { noCopy noCopy// Address of the host to connect to. Addr string// PipelineClient name. Used in User-Agent request header. Name string// NoDefaultUserAgentHeader when set to true, causes the default // User-Agent header to be excluded from the Request. NoDefaultUserAgentHeader bool// The maximum number of concurrent connections to the Addr. // // A single connection is used by default. MaxConns int// The maximum number of pending pipelined requests over // a single connection to Addr. // // DefaultMaxPendingRequests is used by default. MaxPendingRequests int// The maximum delay before sending pipelined requests as a batch // to the server. // // By default requests are sent immediately to the server. MaxBatchDelay time.Duration// Callback for connection establishing to the host. // // Default Dial is used if not set. Dial DialFunc// Attempt to connect to both ipv4 and ipv6 host addresses // if set to true. // // This option is used only if default TCP dialer is used, // i.e. if Dial is blank. // // By default client connects only to ipv4 addresses, // since unfortunately ipv6 remains broken in many networks worldwide :) DialDualStack bool// Response header names are passed as-is without normalization // if this option is set. // // Disabled header names' normalization may be useful only for proxying // responses to other clients expecting case-sensitive // header names. See https://github.com/valyala/fasthttp/issues/57 // for details. // // By default request and response header names are normalized, i.e. // The first letter and the first letters following dashes // are uppercased, while all the other letters are lowercased. // Examples: // // * HOST -> Host // * content-type -> Content-Type // * cONTENT-lenGTH -> Content-Length DisableHeaderNamesNormalizing bool// Path values are sent as-is without normalization // // Disabled path normalization may be useful for proxying incoming requests // to servers that are expecting paths to be forwarded as-is. // // By default path values are normalized, i.e. // extra slashes are removed, special characters are encoded. DisablePathNormalizing bool// Whether to use TLS (aka SSL or HTTPS) for host connections. IsTLS bool// Optional TLS config. TLSConfig *tls.Config// Idle connection to the host is closed after this duration. // // By default idle connection is closed after // DefaultMaxIdleConnDuration. MaxIdleConnDuration time.Duration// Buffer size for responses' reading. // This also limits the maximum header size. // // Default buffer size is used if 0. ReadBufferSize int// Buffer size for requests' writing. // // Default buffer size is used if 0. WriteBufferSize int// Maximum duration for full response reading (including body). // // By default response read timeout is unlimited. ReadTimeout time.Duration// Maximum duration for full request writing (including body). // // By default request write timeout is unlimited. WriteTimeout time.Duration// Logger for logging client errors. // // By default standard logger from log package is used. Logger Logger connClients []*pipelineConnClient connClientsLock sync.Mutex}type pipelineConnClient struct { noCopy noCopy Addr string Name string NoDefaultUserAgentHeader bool MaxPendingRequests int MaxBatchDelay time.Duration Dial DialFunc DialDualStack bool DisableHeaderNamesNormalizing bool DisablePathNormalizing bool IsTLS bool TLSConfig *tls.Config MaxIdleConnDuration time.Duration ReadBufferSize int WriteBufferSize int ReadTimeout time.Duration WriteTimeout time.Duration Logger Logger workPool sync.Pool chLock sync.Mutex chW chan *pipelineWork chR chan *pipelineWork tlsConfigLock sync.Mutex tlsConfig *tls.Config}type pipelineWork struct { reqCopy Request respCopy Response req *Request resp *Response t *time.Timer deadline time.Time err error done chanstruct{}}// DoTimeout performs the given request and waits for response during// the given timeout duration.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// The function doesn't follow redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned during// the given timeout.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *PipelineClient) ( *Request, *Response, time.Duration) error {return .DoDeadline(, , time.Now().Add())}// DoDeadline performs the given request and waits for response until// the given deadline.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// The function doesn't follow redirects.//// Response is ignored if resp is nil.//// ErrTimeout is returned if the response wasn't returned until// the given deadline.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *PipelineClient) ( *Request, *Response, time.Time) error {return .getConnClient().DoDeadline(, , )}func ( *pipelineConnClient) ( *Request, *Response, time.Time) error { .init() := time.Until()if <= 0 {returnErrTimeout }if .DisablePathNormalizing { .URI().DisablePathNormalizing = true } := .Header.UserAgent()iflen() == 0 { := .Nameif == "" && !.NoDefaultUserAgentHeader { = defaultUserAgent }if != "" { .Header.userAgent = append(.Header.userAgent[:0], ...) } } := .acquirePipelineWork() .respCopy.Header.disableNormalizing = .DisableHeaderNamesNormalizing .req = &.reqCopy .resp = &.respCopy// Make a copy of the request in order to avoid data races on timeouts .copyToSkipBody(&.reqCopy)swapRequestBody(, &.reqCopy)// Put the request to outgoing queueselect {case .chW<- :// Fast path: len(c.ch) < cap(c.ch)default:// Slow pathselect {case .chW<- :case<-.t.C: .releasePipelineWork()returnErrTimeout } }// Wait for the responsevarerrorselect {case<-.done:if != nil { .respCopy.copyToSkipBody()swapResponseBody(, &.respCopy) } = .err .releasePipelineWork()case<-.t.C: = ErrTimeout }return}func ( *pipelineConnClient) ( time.Duration) ( *pipelineWork) { := .workPool.Get()if != nil { = .(*pipelineWork) } else { = &pipelineWork{done: make(chanstruct{}, 1), } }if > 0 {if .t == nil { .t = time.NewTimer() } else { .t.Reset() } .deadline = time.Now().Add() } else { .deadline = zeroTime }return}func ( *pipelineConnClient) ( *pipelineWork) {if .t != nil { .t.Stop() } .reqCopy.Reset() .respCopy.Reset() .req = nil .resp = nil .err = nil .workPool.Put()}// Do performs the given http request and sets the corresponding response.//// Request must contain at least non-zero RequestURI with full url (including// scheme and host) or non-zero Host header + RequestURI.//// The function doesn't follow redirects. Use Get* for following redirects.//// Response is ignored if resp is nil.//// It is recommended obtaining req and resp via AcquireRequest// and AcquireResponse in performance-critical code.func ( *PipelineClient) ( *Request, *Response) error {return .getConnClient().Do(, )}func ( *pipelineConnClient) ( *Request, *Response) error { .init()if .DisablePathNormalizing { .URI().DisablePathNormalizing = true } := .Header.UserAgent()iflen() == 0 { := .Nameif == "" && !.NoDefaultUserAgentHeader { = defaultUserAgent }if != "" { .Header.userAgent = append(.Header.userAgent[:0], ...) } } := .acquirePipelineWork(0) .req = if != nil { .Header.disableNormalizing = .DisableHeaderNamesNormalizing .resp = } else { .resp = &.respCopy }// Put the request to outgoing queueselect {case .chW<- :default:// Try substituting the oldest w with the current one.select {case := <-.chW: .err = ErrPipelineOverflow .done <- struct{}{}default: }select {case .chW<- :default: .releasePipelineWork()returnErrPipelineOverflow } }// Wait for the response <-.done := .err .releasePipelineWork()return}func ( *PipelineClient) () *pipelineConnClient { .connClientsLock.Lock() := .getConnClientUnlocked() .connClientsLock.Unlock()return}func ( *PipelineClient) () *pipelineConnClient {iflen(.connClients) == 0 {return .newConnClient() }// Return the client with the minimum number of pending requests. := .connClients[0] := .PendingRequests()if == 0 {return }for := 1; < len(.connClients); ++ { := .connClients[] := .PendingRequests()if == 0 {return }if < { = = } } := .MaxConnsif <= 0 { = 1 }iflen(.connClients) < {return .newConnClient() }return}func ( *PipelineClient) () *pipelineConnClient { := &pipelineConnClient{Addr: .Addr,Name: .Name,NoDefaultUserAgentHeader: .NoDefaultUserAgentHeader,MaxPendingRequests: .MaxPendingRequests,MaxBatchDelay: .MaxBatchDelay,Dial: .Dial,DialDualStack: .DialDualStack,DisableHeaderNamesNormalizing: .DisableHeaderNamesNormalizing,DisablePathNormalizing: .DisablePathNormalizing,IsTLS: .IsTLS,TLSConfig: .TLSConfig,MaxIdleConnDuration: .MaxIdleConnDuration,ReadBufferSize: .ReadBufferSize,WriteBufferSize: .WriteBufferSize,ReadTimeout: .ReadTimeout,WriteTimeout: .WriteTimeout,Logger: .Logger, } .connClients = append(.connClients, )return}// ErrPipelineOverflow may be returned from PipelineClient.Do*// if the requests' queue is overflown.varErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxConns and/or MaxPendingRequests")// DefaultMaxPendingRequests is the default value// for PipelineClient.MaxPendingRequests.constDefaultMaxPendingRequests = 1024func ( *pipelineConnClient) () { .chLock.Lock()if .chR == nil { := .MaxPendingRequestsif <= 0 { = DefaultMaxPendingRequests } .chR = make(chan *pipelineWork, )if .chW == nil { .chW = make(chan *pipelineWork, ) }gofunc() {// Keep restarting the worker if it fails (connection errors for example).for {if := .worker(); != nil { .logger().Printf("error in PipelineClient(%q): %v", .Addr, )if , := .(net.Error); && .Timeout() {// Throttle client reconnections on timeout errorstime.Sleep(time.Second) } } else { .chLock.Lock() := len(.chR) == 0 && len(.chW) == 0if ! { .chR = nil .chW = nil } .chLock.Unlock()if {break } } } }() } .chLock.Unlock()}func ( *pipelineConnClient) () error { := .cachedTLSConfig() , := dialAddr(.Addr, .Dial, .DialDualStack, .IsTLS, , .WriteTimeout)if != nil {return }// Start reader and writer := make(chanstruct{}) := make(chanerror)gofunc() { <- .writer(, ) }() := make(chanstruct{}) := make(chanerror)gofunc() { <- .reader(, ) }()// Wait until reader and writer are stoppedselect {case = <-: .Close()close() <-case = <-: .Close()close() <- }// Notify pending readersforlen(.chR) > 0 { := <-.chR .err = errPipelineConnStopped .done <- struct{}{} }return}func ( *pipelineConnClient) () *tls.Config {if !.IsTLS {returnnil } .tlsConfigLock.Lock() := .tlsConfigif == nil { = newClientTLSConfig(.TLSConfig, .Addr) .tlsConfig = } .tlsConfigLock.Unlock()return}func ( *pipelineConnClient) ( net.Conn, <-chanstruct{}) error { := .WriteBufferSizeif <= 0 { = defaultWriteBufferSize } := bufio.NewWriterSize(, )defer .Flush() := .chR := .chW := .WriteTimeout := .MaxIdleConnDurationif <= 0 { = DefaultMaxIdleConnDuration } := .MaxBatchDelayvar ( = time.NewTimer(time.Hour) = time.NewTimer(time.Hour) <-chantime.Time = make(chantime.Time) *pipelineWorkerror )close()for { :select {case = <-:// Fast path: len(chW) > 0default:// Slow path .Reset()select {case = <-:case<-.C:returnnilcase<-:returnnilcase<-:if = .Flush(); != nil {return } = nilgoto } }if !.deadline.IsZero() && time.Since(.deadline) >= 0 { .err = ErrTimeout .done <- struct{}{}continue } .resp.parseNetConn()if > 0 {// Set Deadline every time, since golang has fixed the performance issue // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details := time.Now()if = .SetWriteDeadline(.Add()); != nil { .err = .done <- struct{}{}return } }if = .req.Write(); != nil { .err = .done <- struct{}{}return }if == nil && (len() == 0 || len() == cap()) {if > 0 { .Reset() = .C } else { = } } :select {case<- :// Fast path: len(chR) < cap(chR)default:// Slow pathselect {case<- :case<-: .err = errPipelineConnStopped .done <- struct{}{}returnnilcase<-:if = .Flush(); != nil { .err = .done <- struct{}{}return } = nilgoto } } }}func ( *pipelineConnClient) ( net.Conn, <-chanstruct{}) error { := .ReadBufferSizeif <= 0 { = defaultReadBufferSize } := bufio.NewReaderSize(, ) := .chR := .ReadTimeoutvar ( *pipelineWorkerror )for {select {case = <-:// Fast path: len(chR) > 0default:// Slow pathselect {case = <-:case<-:returnnil } }if > 0 {// Set Deadline every time, since golang has fixed the performance issue // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details := time.Now()if = .SetReadDeadline(.Add()); != nil { .err = .done <- struct{}{}return } }if = .resp.Read(); != nil { .err = .done <- struct{}{}return } .done <- struct{}{} }}func ( *pipelineConnClient) () Logger {if .Logger != nil {return .Logger }returndefaultLogger}// PendingRequests returns the current number of pending requests pipelined// to the server.//// This number may exceed MaxPendingRequests*MaxConns by up to two times, since// each connection to the server may keep up to MaxPendingRequests requests// in the queue before sending them to the server.//// This function may be used for balancing load among multiple PipelineClient// instances.func ( *PipelineClient) () int { .connClientsLock.Lock() := 0for , := range .connClients { += .PendingRequests() } .connClientsLock.Unlock()return}func ( *pipelineConnClient) () int { .init() .chLock.Lock() := len(.chR) + len(.chW) .chLock.Unlock()return}var errPipelineConnStopped = errors.New("pipeline connection has been stopped")varDefaultTransportRoundTripper = &transport{}type transport struct{}func ( *transport) ( *HostClient, *Request, *Response) ( bool, error) { := .SkipBody := .StreamBodyvartime.Timeif .timeout > 0 { = time.Now().Add(.timeout) } , := .acquireConn(.timeout, .ConnectionClose())if != nil {returnfalse, } := .c .parseNetConn() := if .WriteTimeout > 0 { := time.Now().Add(.WriteTimeout)if .IsZero() || .Before() { = } }if = .SetWriteDeadline(); != nil { .closeConn()returntrue, } := falseif .MaxConnDuration > 0 && time.Since(.createdTime) > .MaxConnDuration && !.ConnectionClose() { .SetConnectionClose() = true } := .acquireWriter() = .Write()if { .Header.ResetConnectionClose() }if == nil { = .Flush() } .releaseWriter()// Return ErrTimeout on any timeout.if , := .(interface{ () bool }); && .() { = ErrTimeout } := isConnectionReset()if != nil && ! { .closeConn()returntrue, } := if .ReadTimeout > 0 { := time.Now().Add(.ReadTimeout)if .IsZero() || .Before() { = } }if = .SetReadDeadline(); != nil { .closeConn()returntrue, }if || .Header.IsHead() { .SkipBody = true }if .DisableHeaderNamesNormalizing { .Header.DisableNormalizing() } := .acquireReader() = .ReadLimitBody(, .MaxResponseBodySize)if != nil { .releaseReader() .closeConn()// Don't retry in case of ErrBodyTooLarge since we will just get the same again. := != ErrBodyTooLargereturn , } := || .ConnectionClose() || .ConnectionClose() || if && .bodyStream != nil { := .bodyStream .bodyStream = newCloseReader(, func() error { .releaseReader()if , := .(*requestStream); {releaseRequestStream() }if || .ConnectionClose() { .closeConn() } else { .releaseConn() }returnnil })returnfalse, nil } else { .releaseReader() }if { .closeConn() } else { .releaseConn() }returnfalse, nil}
The pages are generated with Goldsv0.6.7. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds.