package sftp

import (
	
	
	
	
)

// conn implements a bidirectional channel on which client and server
// connections are multiplexed.
type conn struct {
	io.Reader
	io.WriteCloser
	// this is the same allocator used in packet manager
	alloc      *allocator
	sync.Mutex // used to serialise writes to sendPacket
}

// the orderID is used in server mode if the allocator is enabled.
// For the client mode just pass 0.
// It returns io.EOF if the connection is closed and
// there are no more packets to read.
func ( *conn) ( uint32) (uint8, []byte, error) {
	return recvPacket(, .alloc, )
}

func ( *conn) ( encoding.BinaryMarshaler) error {
	.Lock()
	defer .Unlock()

	return sendPacket(, )
}

func ( *conn) () error {
	.Lock()
	defer .Unlock()
	return .WriteCloser.Close()
}

type clientConn struct {
	conn
	wg sync.WaitGroup

	sync.Mutex                          // protects inflight
	inflight   map[uint32]chan<- result // outstanding requests

	closed chan struct{}
	err    error
}

// Wait blocks until the conn has shut down, and return the error
// causing the shutdown. It can be called concurrently from multiple
// goroutines.
func ( *clientConn) () error {
	<-.closed
	return .err
}

// Close closes the SFTP session.
func ( *clientConn) () error {
	defer .wg.Wait()
	return .conn.Close()
}

// recv continuously reads from the server and forwards responses to the
// appropriate channel.
func ( *clientConn) () error {
	defer .conn.Close()

	for {
		, ,  := .recvPacket(0)
		if  != nil {
			return 
		}
		, ,  := unmarshalUint32Safe()
		if  != nil {
			return 
		}

		,  := .getChannel()
		if ! {
			// This is an unexpected occurrence. Send the error
			// back to all listeners so that they terminate
			// gracefully.
			return fmt.Errorf("sid not found: %d", )
		}

		 <- result{typ: , data: }
	}
}

func ( *clientConn) ( chan<- result,  uint32) bool {
	.Lock()
	defer .Unlock()

	select {
	case <-.closed:
		// already closed with broadcastErr, return error on chan.
		 <- result{err: ErrSSHFxConnectionLost}
		return false
	default:
	}

	.inflight[] = 
	return true
}

func ( *clientConn) ( uint32) (chan<- result, bool) {
	.Lock()
	defer .Unlock()

	,  := .inflight[]
	delete(.inflight, )

	return , 
}

// result captures the result of receiving the a packet from the server
type result struct {
	typ  byte
	data []byte
	err  error
}

type idmarshaler interface {
	id() uint32
	encoding.BinaryMarshaler
}

func ( *clientConn) ( chan result,  idmarshaler) (byte, []byte, error) {
	if cap() < 1 {
		 = make(chan result, 1)
	}

	.dispatchRequest(, )
	 := <-
	return .typ, .data, .err
}

// dispatchRequest should ideally only be called by race-detection tests outside of this file,
// where you have to ensure two packets are in flight sequentially after each other.
func ( *clientConn) ( chan<- result,  idmarshaler) {
	 := .id()

	if !.putChannel(, ) {
		// already closed.
		return
	}

	if  := .conn.sendPacket();  != nil {
		if ,  := .getChannel();  {
			 <- result{err: }
		}
	}
}

// broadcastErr sends an error to all goroutines waiting for a response.
func ( *clientConn) ( error) {
	.Lock()
	defer .Unlock()

	 := result{err: ErrSSHFxConnectionLost}
	for ,  := range .inflight {
		 <- 

		// Replace the chan in inflight,
		// we have hijacked this chan,
		// and this guarantees always-only-once sending.
		.inflight[] = make(chan<- result, 1)
	}

	.err = 
	close(.closed)
}

type serverConn struct {
	conn
}

func ( *serverConn) ( uint32,  error) error {
	return .sendPacket(statusFromError(, ))
}