package fasthttputil

import (
	
	
	
	
	
)

// NewPipeConns returns new bi-directional connection pipe.
//
// PipeConns is NOT safe for concurrent use by multiple goroutines!
func () *PipeConns {
	 := make(chan *byteBuffer, 4)
	 := make(chan *byteBuffer, 4)

	 := &PipeConns{
		stopCh: make(chan struct{}),
	}
	.c1.rCh = 
	.c1.wCh = 
	.c2.rCh = 
	.c2.wCh = 
	.c1.pc = 
	.c2.pc = 
	return 
}

// PipeConns provides bi-directional connection pipe,
// which use in-process memory as a transport.
//
// PipeConns must be created by calling NewPipeConns.
//
// PipeConns has the following additional features comparing to connections
// returned from net.Pipe():
//
//   - It is faster.
//   - It buffers Write calls, so there is no need to have concurrent goroutine
//     calling Read in order to unblock each Write call.
//   - It supports read and write deadlines.
//
// PipeConns is NOT safe for concurrent use by multiple goroutines!
type PipeConns struct {
	c1         pipeConn
	c2         pipeConn
	stopCh     chan struct{}
	stopChLock sync.Mutex
}

// SetAddresses sets the local and remote addresses for the connection.
func ( *PipeConns) (, , ,  net.Addr) {
	.c1.addrLock.Lock()
	defer .c1.addrLock.Unlock()

	.c2.addrLock.Lock()
	defer .c2.addrLock.Unlock()

	.c1.localAddr = 
	.c1.remoteAddr = 

	.c2.localAddr = 
	.c2.remoteAddr = 
}

// Conn1 returns the first end of bi-directional pipe.
//
// Data written to Conn1 may be read from Conn2.
// Data written to Conn2 may be read from Conn1.
func ( *PipeConns) () net.Conn {
	return &.c1
}

// Conn2 returns the second end of bi-directional pipe.
//
// Data written to Conn2 may be read from Conn1.
// Data written to Conn1 may be read from Conn2.
func ( *PipeConns) () net.Conn {
	return &.c2
}

// Close closes pipe connections.
func ( *PipeConns) () error {
	.stopChLock.Lock()
	select {
	case <-.stopCh:
	default:
		close(.stopCh)
	}
	.stopChLock.Unlock()

	return nil
}

type pipeConn struct {
	b  *byteBuffer
	bb []byte

	rCh chan *byteBuffer
	wCh chan *byteBuffer
	pc  *PipeConns

	readDeadlineTimer  *time.Timer
	writeDeadlineTimer *time.Timer

	readDeadlineCh  <-chan time.Time
	writeDeadlineCh <-chan time.Time

	readDeadlineChLock sync.Mutex

	localAddr  net.Addr
	remoteAddr net.Addr
	addrLock   sync.RWMutex
}

func ( *pipeConn) ( []byte) (int, error) {
	 := acquireByteBuffer()
	.b = append(.b[:0], ...)

	select {
	case <-.pc.stopCh:
		releaseByteBuffer()
		return 0, errConnectionClosed
	default:
	}

	select {
	case .wCh <- :
	default:
		select {
		case .wCh <- :
		case <-.writeDeadlineCh:
			.writeDeadlineCh = closedDeadlineCh
			return 0, ErrTimeout
		case <-.pc.stopCh:
			releaseByteBuffer()
			return 0, errConnectionClosed
		}
	}

	return len(), nil
}

func ( *pipeConn) ( []byte) (int, error) {
	 := true
	 := 0
	for len() > 0 {
		,  := .read(, )
		 += 
		if  != nil {
			if ! &&  == errWouldBlock {
				 = nil
			}
			return , 
		}
		 = [:]
		 = false
	}

	return , nil
}

func ( *pipeConn) ( []byte,  bool) (int, error) {
	if len(.bb) == 0 {
		if  := .readNextByteBuffer();  != nil {
			return 0, 
		}
	}
	 := copy(, .bb)
	.bb = .bb[:]

	return , nil
}

func ( *pipeConn) ( bool) error {
	releaseByteBuffer(.b)
	.b = nil

	select {
	case .b = <-.rCh:
	default:
		if ! {
			return errWouldBlock
		}
		.readDeadlineChLock.Lock()
		 := .readDeadlineCh
		.readDeadlineChLock.Unlock()
		select {
		case .b = <-.rCh:
		case <-:
			.readDeadlineChLock.Lock()
			.readDeadlineCh = closedDeadlineCh
			.readDeadlineChLock.Unlock()
			// rCh may contain data when deadline is reached.
			// Read the data before returning ErrTimeout.
			select {
			case .b = <-.rCh:
			default:
				return ErrTimeout
			}
		case <-.pc.stopCh:
			// rCh may contain data when stopCh is closed.
			// Read the data before returning EOF.
			select {
			case .b = <-.rCh:
			default:
				return io.EOF
			}
		}
	}

	.bb = .b.b
	return nil
}

var (
	errWouldBlock       = errors.New("would block")
	errConnectionClosed = errors.New("connection closed")
)

type timeoutError struct{}

func ( *timeoutError) () string {
	return "timeout"
}

// Only implement the Timeout() function of the net.Error interface.
// This allows for checks like:
//
//	if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
func ( *timeoutError) () bool {
	return true
}

// ErrTimeout is returned from Read() or Write() on timeout.
var ErrTimeout = &timeoutError{}

func ( *pipeConn) () error {
	return .pc.Close()
}

func ( *pipeConn) () net.Addr {
	.addrLock.RLock()
	defer .addrLock.RUnlock()

	if .localAddr != nil {
		return .localAddr
	}

	return pipeAddr(0)
}

func ( *pipeConn) () net.Addr {
	.addrLock.RLock()
	defer .addrLock.RUnlock()

	if .remoteAddr != nil {
		return .remoteAddr
	}

	return pipeAddr(0)
}

func ( *pipeConn) ( time.Time) error {
	.SetReadDeadline()  //nolint:errcheck
	.SetWriteDeadline() //nolint:errcheck
	return nil
}

func ( *pipeConn) ( time.Time) error {
	if .readDeadlineTimer == nil {
		.readDeadlineTimer = time.NewTimer(time.Hour)
	}
	 := updateTimer(.readDeadlineTimer, )
	.readDeadlineChLock.Lock()
	.readDeadlineCh = 
	.readDeadlineChLock.Unlock()
	return nil
}

func ( *pipeConn) ( time.Time) error {
	if .writeDeadlineTimer == nil {
		.writeDeadlineTimer = time.NewTimer(time.Hour)
	}
	.writeDeadlineCh = updateTimer(.writeDeadlineTimer, )
	return nil
}

func updateTimer( *time.Timer,  time.Time) <-chan time.Time {
	if !.Stop() {
		select {
		case <-.C:
		default:
		}
	}
	if .IsZero() {
		return nil
	}
	 := time.Until()
	if  <= 0 {
		return closedDeadlineCh
	}
	.Reset()
	return .C
}

var closedDeadlineCh = func() <-chan time.Time {
	 := make(chan time.Time)
	close()
	return 
}()

type pipeAddr int

func (pipeAddr) () string {
	return "pipe"
}

func (pipeAddr) () string {
	return "pipe"
}

type byteBuffer struct {
	b []byte
}

func acquireByteBuffer() *byteBuffer {
	return byteBufferPool.Get().(*byteBuffer)
}

func releaseByteBuffer( *byteBuffer) {
	if  != nil {
		byteBufferPool.Put()
	}
}

var byteBufferPool = &sync.Pool{
	New: func() interface{} {
		return &byteBuffer{
			b: make([]byte, 1024),
		}
	},
}