package sftpimport ()var (// ErrInternalInconsistency indicates the packets sent and the data queued to be // written to the file don't match up. It is an unusual error and usually is // caused by bad behavior server side or connection issues. The error is // limited in scope to the call where it happened, the client object is still // OK to use as long as the connection is still open.ErrInternalInconsistency = errors.New("internal inconsistency")// InternalInconsistency alias for ErrInternalInconsistency. // // Deprecated: please use ErrInternalInconsistencyInternalInconsistency = ErrInternalInconsistency)// A ClientOption is a function which applies configuration to a Client.typeClientOptionfunc(*Client) error// MaxPacketChecked sets the maximum size of the payload, measured in bytes.// This option only accepts sizes servers should support, ie. <= 32768 bytes.//// If you get the error "failed to send packet header: EOF" when copying a// large file, try lowering this number.//// The default packet size is 32768 bytes.func ( int) ClientOption {returnfunc( *Client) error {if < 1 {returnerrors.New("size must be greater or equal to 1") }if > 32768 {returnerrors.New("sizes larger than 32KB might not work with all servers") } .maxPacket = returnnil }}// MaxPacketUnchecked sets the maximum size of the payload, measured in bytes.// It accepts sizes larger than the 32768 bytes all servers should support.// Only use a setting higher than 32768 if your application always connects to// the same server or after sufficiently broad testing.//// If you get the error "failed to send packet header: EOF" when copying a// large file, try lowering this number.//// The default packet size is 32768 bytes.func ( int) ClientOption {returnfunc( *Client) error {if < 1 {returnerrors.New("size must be greater or equal to 1") } .maxPacket = returnnil }}// MaxPacket sets the maximum size of the payload, measured in bytes.// This option only accepts sizes servers should support, ie. <= 32768 bytes.// This is a synonym for MaxPacketChecked that provides backward compatibility.//// If you get the error "failed to send packet header: EOF" when copying a// large file, try lowering this number.//// The default packet size is 32768 bytes.func ( int) ClientOption {returnMaxPacketChecked()}// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.//// The default maximum concurrent requests is 64.func ( int) ClientOption {returnfunc( *Client) error {if < 1 {returnerrors.New("n must be greater or equal to 1") } .maxConcurrentRequests = returnnil }}// UseConcurrentWrites allows the Client to perform concurrent Writes.//// Using concurrency while doing writes, requires special consideration.// A write to a later offset in a file after an error,// could end up with a file length longer than what was successfully written.//// When using this option, if you receive an error during `io.Copy` or `io.WriteTo`,// you may need to `Truncate` the target Writer to avoid “holes” in the data written.func ( bool) ClientOption {returnfunc( *Client) error { .useConcurrentWrites = returnnil }}// UseConcurrentReads allows the Client to perform concurrent Reads.//// Concurrent reads are generally safe to use and not using them will degrade// performance, so this option is enabled by default.//// When enabled, WriteTo will use Stat/Fstat to get the file size and determines// how many concurrent workers to use.// Some "read once" servers will delete the file if they receive a stat call on an// open file and then the download will fail.// Disabling concurrent reads you will be able to download files from these servers.// If concurrent reads are disabled, the UseFstat option is ignored.func ( bool) ClientOption {returnfunc( *Client) error { .disableConcurrentReads = !returnnil }}// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called// (usually when copying files).// Some servers limit the amount of open files and calling Stat after opening// the file will throw an error From the server. Setting this flag will call// Fstat instead of Stat which is suppose to be called on an open file handle.//// It has been found that that with IBM Sterling SFTP servers which have// "extractability" level set to 1 which means only 1 file can be opened at// any given time.//// If the server you are working with still has an issue with both Stat and// Fstat calls you can always open a file and read it until the end.//// Another reason to read the file until its end and Fstat doesn't work is// that in some servers, reading a full file will automatically delete the// file as some of these mainframes map the file to a message in a queue.// Once the file has been read it will get deleted.func ( bool) ClientOption {returnfunc( *Client) error { .useFstat = returnnil }}// Client represents an SFTP session on a *ssh.ClientConn SSH connection.// Multiple Clients can be active on a single SSH connection, and a Client// may be called concurrently from multiple Goroutines.//// Client implements the github.com/kr/fs.FileSystem interface.typeClientstruct {clientConn ext map[string]string// Extensions (name -> data). maxPacket int// max packet size read or written. maxConcurrentRequests int nextid uint32// write concurrency is… error prone. // Default behavior should be to not use it. useConcurrentWrites bool useFstat bool disableConcurrentReads bool}// NewClient creates a new SFTP client on conn, using zero or more option// functions.func ( *ssh.Client, ...ClientOption) (*Client, error) { , := .NewSession()if != nil {returnnil, }if := .RequestSubsystem("sftp"); != nil {returnnil, } , := .StdinPipe()if != nil {returnnil, } , := .StdoutPipe()if != nil {returnnil, }returnNewClientPipe(, , ...)}// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.// This can be used for connecting to an SFTP server over TCP/TLS or by using// the system's ssh client program (e.g. via exec.Command).func ( io.Reader, io.WriteCloser, ...ClientOption) (*Client, error) { := &Client{clientConn: clientConn{conn: conn{Reader: ,WriteCloser: , },inflight: make(map[uint32]chan<- result),closed: make(chanstruct{}), },ext: make(map[string]string),maxPacket: 1 << 15,maxConcurrentRequests: 64, }for , := range {if := (); != nil { .Close()returnnil, } }if := .sendInit(); != nil { .Close()returnnil, fmt.Errorf("error sending init packet to server: %w", ) }if := .recvVersion(); != nil { .Close()returnnil, fmt.Errorf("error receiving version packet from server: %w", ) } .clientConn.wg.Add(1)gofunc() {defer .clientConn.wg.Done()if := .clientConn.recv(); != nil { .clientConn.broadcastErr() } }()return , nil}// Create creates the named file mode 0666 (before umask), truncating it if it// already exists. If successful, methods on the returned File can be used for// I/O; the associated file descriptor has mode O_RDWR. If you need more// control over the flags/mode used to open the file see client.OpenFile.//// Note that some SFTP servers (eg. AWS Transfer) do not support opening files// read/write at the same time. For those services you will need to use// `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`.func ( *Client) ( string) (*File, error) {return .open(, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))}const sftpProtocolVersion = 3// https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txtfunc ( *Client) () error {return .clientConn.conn.sendPacket(&sshFxInitPacket{Version: sftpProtocolVersion, // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt })}// returns the next value of c.nextidfunc ( *Client) () uint32 {returnatomic.AddUint32(&.nextid, 1)}func ( *Client) () error { , , := .recvPacket(0)if != nil {if == io.EOF {returnfmt.Errorf("server unexpectedly closed connection: %w", io.ErrUnexpectedEOF) }return }if != sshFxpVersion {return &unexpectedPacketErr{sshFxpVersion, } } , , := unmarshalUint32Safe()if != nil {return }if != sftpProtocolVersion {return &unexpectedVersionErr{sftpProtocolVersion, } }forlen() > 0 {varextensionPair , , = unmarshalExtensionPair()if != nil {return } .ext[.Name] = .Data }returnnil}// HasExtension checks whether the server supports a named extension.//// The first return value is the extension data reported by the server// (typically a version number).func ( *Client) ( string) (string, bool) { , := .ext[]return , }// Walk returns a new Walker rooted at root.func ( *Client) ( string) *fs.Walker {returnfs.WalkFS(, )}// ReadDir reads the directory named by dirname and returns a list of// directory entries.func ( *Client) ( string) ([]os.FileInfo, error) { , := .opendir()if != nil {returnnil, }defer .close() // this has to defer earlier than the lock belowvar []os.FileInfovar = falsefor ! { := .nextID() , , := .sendPacket(nil, &sshFxpReaddirPacket{ID: ,Handle: , })if != nil { = = truebreak }switch {casesshFxpName: , := unmarshalUint32()if != {returnnil, &unexpectedIDErr{, } } , := unmarshalUint32()for := uint32(0); < ; ++ {varstring , = unmarshalString() _, = unmarshalString() // discard longnamevar *FileStat , = unmarshalAttrs()if == "." || == ".." {continue } = append(, fileInfoFromStat(, path.Base())) }casesshFxpStatus:// TODO(dfc) scope warning! = normaliseError(unmarshalStatus(, )) = truedefault:returnnil, unimplementedPacketErr() } }if == io.EOF { = nil }return , }func ( *Client) ( string) (string, error) { := .nextID() , , := .sendPacket(nil, &sshFxpOpendirPacket{ID: ,Path: , })if != nil {return"", }switch {casesshFxpHandle: , := unmarshalUint32()if != {return"", &unexpectedIDErr{, } } , := unmarshalString()return , nilcasesshFxpStatus:return"", normaliseError(unmarshalStatus(, ))default:return"", unimplementedPacketErr() }}// Stat returns a FileInfo structure describing the file specified by path 'p'.// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.func ( *Client) ( string) (os.FileInfo, error) { , := .stat()if != nil {returnnil, }returnfileInfoFromStat(, path.Base()), nil}// Lstat returns a FileInfo structure describing the file specified by path 'p'.// If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link.func ( *Client) ( string) (os.FileInfo, error) { := .nextID() , , := .sendPacket(nil, &sshFxpLstatPacket{ID: ,Path: , })if != nil {returnnil, }switch {casesshFxpAttrs: , := unmarshalUint32()if != {returnnil, &unexpectedIDErr{, } } , := unmarshalAttrs()returnfileInfoFromStat(, path.Base()), nilcasesshFxpStatus:returnnil, normaliseError(unmarshalStatus(, ))default:returnnil, unimplementedPacketErr() }}// ReadLink reads the target of a symbolic link.func ( *Client) ( string) (string, error) { := .nextID() , , := .sendPacket(nil, &sshFxpReadlinkPacket{ID: ,Path: , })if != nil {return"", }switch {casesshFxpName: , := unmarshalUint32()if != {return"", &unexpectedIDErr{, } } , := unmarshalUint32()if != 1 {return"", unexpectedCount(1, ) } , := unmarshalString() // ignore dummy attributesreturn , nilcasesshFxpStatus:return"", normaliseError(unmarshalStatus(, ))default:return"", unimplementedPacketErr() }}// Link creates a hard link at 'newname', pointing at the same inode as 'oldname'func ( *Client) (, string) error { := .nextID() , , := .sendPacket(nil, &sshFxpHardlinkPacket{ID: ,Oldpath: ,Newpath: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// Symlink creates a symbolic link at 'newname', pointing at target 'oldname'func ( *Client) (, string) error { := .nextID() , , := .sendPacket(nil, &sshFxpSymlinkPacket{ID: ,Linkpath: ,Targetpath: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}func ( *Client) ( string, uint32, interface{}) error { := .nextID() , , := .sendPacket(nil, &sshFxpFsetstatPacket{ID: ,Handle: ,Flags: ,Attrs: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// setstat is a convience wrapper to allow for changing of various parts of the file descriptor.func ( *Client) ( string, uint32, interface{}) error { := .nextID() , , := .sendPacket(nil, &sshFxpSetstatPacket{ID: ,Path: ,Flags: ,Attrs: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// Chtimes changes the access and modification times of the named file.func ( *Client) ( string, time.Time, time.Time) error {typestruct {uint32uint32 } := {uint32(.Unix()), uint32(.Unix())}return .setstat(, sshFileXferAttrACmodTime, )}// Chown changes the user and group owners of the named file.func ( *Client) ( string, , int) error {typestruct {uint32uint32 } := {uint32(), uint32()}return .setstat(, sshFileXferAttrUIDGID, )}// Chmod changes the permissions of the named file.//// Chmod does not apply a umask, because even retrieving the umask is not// possible in a portable way without causing a race condition. Callers// should mask off umask bits, if desired.func ( *Client) ( string, os.FileMode) error {return .setstat(, sshFileXferAttrPermissions, toChmodPerm())}// Truncate sets the size of the named file. Although it may be safely assumed// that if the size is less than its current size it will be truncated to fit,// the SFTP protocol does not specify what behavior the server should do when setting// size greater than the current size.func ( *Client) ( string, int64) error {return .setstat(, sshFileXferAttrSize, uint64())}// Open opens the named file for reading. If successful, methods on the// returned file can be used for reading; the associated file descriptor// has mode O_RDONLY.func ( *Client) ( string) (*File, error) {return .open(, flags(os.O_RDONLY))}// OpenFile is the generalized open call; most users will use Open or// Create instead. It opens the named file with specified flag (O_RDONLY// etc.). If successful, methods on the returned File can be used for I/O.func ( *Client) ( string, int) (*File, error) {return .open(, flags())}func ( *Client) ( string, uint32) (*File, error) { := .nextID() , , := .sendPacket(nil, &sshFxpOpenPacket{ID: ,Path: ,Pflags: , })if != nil {returnnil, }switch {casesshFxpHandle: , := unmarshalUint32()if != {returnnil, &unexpectedIDErr{, } } , := unmarshalString()return &File{c: , path: , handle: }, nilcasesshFxpStatus:returnnil, normaliseError(unmarshalStatus(, ))default:returnnil, unimplementedPacketErr() }}// close closes a handle handle previously returned in the response// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid// immediately after this request has been sent.func ( *Client) ( string) error { := .nextID() , , := .sendPacket(nil, &sshFxpClosePacket{ID: ,Handle: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}func ( *Client) ( string) (*FileStat, error) { := .nextID() , , := .sendPacket(nil, &sshFxpStatPacket{ID: ,Path: , })if != nil {returnnil, }switch {casesshFxpAttrs: , := unmarshalUint32()if != {returnnil, &unexpectedIDErr{, } } , := unmarshalAttrs()return , nilcasesshFxpStatus:returnnil, normaliseError(unmarshalStatus(, ))default:returnnil, unimplementedPacketErr() }}func ( *Client) ( string) (*FileStat, error) { := .nextID() , , := .sendPacket(nil, &sshFxpFstatPacket{ID: ,Handle: , })if != nil {returnnil, }switch {casesshFxpAttrs: , := unmarshalUint32()if != {returnnil, &unexpectedIDErr{, } } , := unmarshalAttrs()return , nilcasesshFxpStatus:returnnil, normaliseError(unmarshalStatus(, ))default:returnnil, unimplementedPacketErr() }}// StatVFS retrieves VFS statistics from a remote host.//// It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt.func ( *Client) ( string) (*StatVFS, error) {// send the StatVFS packet to the server := .nextID() , , := .sendPacket(nil, &sshFxpStatvfsPacket{ID: ,Path: , })if != nil {returnnil, }switch {// server responded with valid datacasesshFxpExtendedReply:varStatVFS = binary.Read(bytes.NewReader(), binary.BigEndian, &)if != nil {returnnil, errors.New("can not parse reply") }return &, nil// the resquest failedcasesshFxpStatus:returnnil, normaliseError(unmarshalStatus(, ))default:returnnil, unimplementedPacketErr() }}// Join joins any number of path elements into a single path, adding a// separating slash if necessary. The result is Cleaned; in particular, all// empty strings are ignored.func ( *Client) ( ...string) string { returnpath.Join(...) }// Remove removes the specified file or directory. An error will be returned if no// file or directory with the specified path exists, or if the specified directory// is not empty.func ( *Client) ( string) error { := .removeFile()// some servers, *cough* osx *cough*, return EPERM, not ENODIR. // serv-u returns ssh_FX_FILE_IS_A_DIRECTORY // EPERM is converted to os.ErrPermission so it is not a StatusErrorif , := .(*StatusError); {switch .Code {casesshFxFailure, sshFxFileIsADirectory:return .RemoveDirectory() } }ifos.IsPermission() {return .RemoveDirectory() }return}func ( *Client) ( string) error { := .nextID() , , := .sendPacket(nil, &sshFxpRemovePacket{ID: ,Filename: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// RemoveDirectory removes a directory path.func ( *Client) ( string) error { := .nextID() , , := .sendPacket(nil, &sshFxpRmdirPacket{ID: ,Path: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// Rename renames a file.func ( *Client) (, string) error { := .nextID() , , := .sendPacket(nil, &sshFxpRenamePacket{ID: ,Oldpath: ,Newpath: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// PosixRename renames a file using the posix-rename@openssh.com extension// which will replace newname if it already exists.func ( *Client) (, string) error { := .nextID() , , := .sendPacket(nil, &sshFxpPosixRenamePacket{ID: ,Oldpath: ,Newpath: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// RealPath can be used to have the server canonicalize any given path name to an absolute path.//// This is useful for converting path names containing ".." components,// or relative pathnames without a leading slash into absolute paths.func ( *Client) ( string) (string, error) { := .nextID() , , := .sendPacket(nil, &sshFxpRealpathPacket{ID: ,Path: , })if != nil {return"", }switch {casesshFxpName: , := unmarshalUint32()if != {return"", &unexpectedIDErr{, } } , := unmarshalUint32()if != 1 {return"", unexpectedCount(1, ) } , := unmarshalString() // ignore attributesreturn , nilcasesshFxpStatus:return"", normaliseError(unmarshalStatus(, ))default:return"", unimplementedPacketErr() }}// Getwd returns the current working directory of the server. Operations// involving relative paths will be based at this location.func ( *Client) () (string, error) {return .RealPath(".")}// Mkdir creates the specified directory. An error will be returned if a file or// directory with the specified path already exists, or if the directory's// parent folder does not exist (the method cannot create complete paths).func ( *Client) ( string) error { := .nextID() , , := .sendPacket(nil, &sshFxpMkdirPacket{ID: ,Path: , })if != nil {return }switch {casesshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:returnunimplementedPacketErr() }}// MkdirAll creates a directory named path, along with any necessary parents,// and returns nil, or else returns an error.// If path is already a directory, MkdirAll does nothing and returns nil.// If path contains a regular file, an error is returnedfunc ( *Client) ( string) error {// Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13 // Fast path: if we can tell whether path is a directory or file, stop with success or error. , := .Stat()if == nil {if .IsDir() {returnnil }return &os.PathError{Op: "mkdir", Path: , Err: syscall.ENOTDIR} }// Slow path: make sure parent exists and then call Mkdir for path. := len()for > 0 && [-1] == '/' { // Skip trailing path separator. -- } := for > 0 && [-1] != '/' { // Scan backward over element. -- }if > 1 {// Create parent = .([0 : -1])if != nil {return } }// Parent now exists; invoke Mkdir and use its result. = .Mkdir()if != nil {// Handle arguments like "foo/." by // double-checking that directory doesn't exist. , := .Lstat()if == nil && .IsDir() {returnnil }return }returnnil}// RemoveAll delete files recursively in the directory and Recursively delete subdirectories.// An error will be returned if no file or directory with the specified path existsfunc ( *Client) ( string) error {// Get the file/directory information , := .Stat()if != nil {return }if .IsDir() {// Delete files recursively in the directory , := .ReadDir()if != nil {return }for , := range {if .IsDir() {// Recursively delete subdirectories = .( + "/" + .Name())if != nil {return } } else {// Delete individual files = .Remove( + "/" + .Name())if != nil {return } } } }return .Remove()}// File represents a remote file.typeFilestruct { c *Client path string handle string mu sync.Mutex offset int64// current offset within remote file}// Close closes the File, rendering it unusable for I/O. It returns an// error, if any.func ( *File) () error {return .c.close(.handle)}// Name returns the name of the file as presented to Open or Create.func ( *File) () string {return .path}// Read reads up to len(b) bytes from the File. It returns the number of bytes// read and an error, if any. Read follows io.Reader semantics, so when Read// encounters an error or EOF condition after successfully reading n > 0 bytes,// it returns the number of bytes read.//// To maximise throughput for transferring the entire file (especially// over high latency links) it is recommended to use WriteTo rather// than calling Read multiple times. io.Copy will do this// automatically.func ( *File) ( []byte) (int, error) { .mu.Lock()defer .mu.Unlock() , := .ReadAt(, .offset) .offset += int64()return , }// readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset.// It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs.func ( *File) ( chanresult, []byte, int64) ( int, error) {for == nil && < len() { := .c.nextID() , , := .c.sendPacket(, &sshFxpReadPacket{ID: ,Handle: .handle,Offset: uint64() + uint64(),Len: uint32(len() - ), })if != nil {return , }switch {casesshFxpStatus:return , normaliseError(unmarshalStatus(, ))casesshFxpData: , := unmarshalUint32()if != {return , &unexpectedIDErr{, } } , := unmarshalUint32() += copy([:], [:])default:return , unimplementedPacketErr() } }return}func ( *File) ( []byte, int64) ( int, error) {for < len() { := [:]iflen() > .c.maxPacket { = [:.c.maxPacket] } , := .readChunkAt(nil, , +int64())if < 0 {panic("sftp.File: returned negative count from readChunkAt") }if > 0 { += }if != nil {return , } }return , nil}// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics,// so the file offset is not altered during the read.func ( *File) ( []byte, int64) (int, error) {iflen() <= .c.maxPacket {// This should be able to be serviced with 1/2 requests. // So, just do it directly.return .readChunkAt(nil, , ) }if .c.disableConcurrentReads {return .readAtSequential(, ) }// Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests. // This allows writes with a suitably large buffer to transfer data at a much faster rate // by overlapping round trip times. := make(chanstruct{}) := len()/.c.maxPacket + 1if > .c.maxConcurrentRequests || < 1 { = .c.maxConcurrentRequests } := newResChanPool()typestruct {uint32chanresult []byteint64 } := make(chan )// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.gofunc() {deferclose() := := := .c.maxPacketforlen() > 0 { := iflen() > { = [:] } := .c.nextID() := .Get() .c.dispatchRequest(, &sshFxpReadPacket{ID: ,Handle: .handle,Offset: uint64(),Len: uint32(), })select {case<- {, , , }:case<-:return } += int64(len()) = [len():] } }()typestruct {int64error } := make(chan )varsync.WaitGroup .Add()for := 0; < ; ++ {// Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset.gofunc() {defer .Done()for := range {varint := <-. .Put(.) := .errif == nil {switch .typ {casesshFxpStatus: = normaliseError(unmarshalStatus(., .data))casesshFxpData: , := unmarshalUint32(.data)if . != { = &unexpectedIDErr{., } } else { , := unmarshalUint32() = copy(., [:])// For normal disk files, it is guaranteed that this will read // the specified number of bytes, or up to end of file. // This implies, if we have a short read, that means EOF.if < len(.) { = io.EOF } }default: = unimplementedPacketErr(.typ) } }if != nil {// return the offset as the start + how much we read before the error. <- {. + int64(), }return } } }() }// Wait for long tail, before closing results.gofunc() { .Wait()close() }()// Reduce: collect all the results into a relevant return: the earliest offset to return an error. := {math.MaxInt64, nil}for := range {if . <= . { = }select {case<-:default:// stop any more work from being distributed. (Just in case.)close() } }if . != nil {// firstErr.err != nil if and only if firstErr.off > our starting offset.returnint(. - ), . }// As per spec for io.ReaderAt, we return nil error if and only if we read everything.returnlen(), nil}// writeToSequential implements WriteTo, but works sequentially with no parallelism.func ( *File) ( io.Writer) ( int64, error) { := make([]byte, .c.maxPacket) := make(chanresult, 1) // reusable channelfor { , := .readChunkAt(, , .offset)if < 0 {panic("sftp.File: returned negative count from readChunkAt") }if > 0 { .offset += int64() , := .Write([:]) += int64()if != nil {return , } }if != nil {if == io.EOF {return , nil// return nil explicitly. }return , } }}// WriteTo writes the file to the given Writer.// The return value is the number of bytes written.// Any error encountered during the write is also returned.//// This method is preferred over calling Read multiple times// to maximise throughput for transferring the entire file,// especially over high latency links.func ( *File) ( io.Writer) ( int64, error) { .mu.Lock()defer .mu.Unlock()if .c.disableConcurrentReads {return .writeToSequential() }// For concurrency, we want to guess how many concurrent workers we should use.var *FileStatif .c.useFstat { , = .c.fstat(.handle) } else { , = .c.stat(.path) }if != nil {return0, } := .Sizeif <= uint64(.c.maxPacket) || !isRegular(.Mode) {// only regular files are guaranteed to return (full read) xor (partial read, next error)return .writeToSequential() } := /uint64(.c.maxPacket) + 1// a bad guess, but better than no guessif > uint64(.c.maxConcurrentRequests) || < 1 { = uint64(.c.maxConcurrentRequests) }// Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow. := int() := .c.maxPacket := newBufPool(, ) := newResChanPool() := make(chanstruct{})varsync.WaitGroupdeferfunc() {// Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop.close()// We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed. // Just to be sure we don’t orphan any goroutines any hanging references. .Wait() }()typestruct { []byteint64errorchan } := make(chan )typestruct {uint32chanresultint64 , chan } := make(chan )// Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing.gofunc() {deferclose() := .offset := for { := .c.nextID() := .Get() := make(chan ) := { : , : , : , : , : , } .c.dispatchRequest(, &sshFxpReadPacket{ID: ,Handle: .handle,Offset: uint64(),Len: uint32(), })select {case<- :case<-:return } += int64() = } }() .Add()for := 0; < ; ++ {// Map_i: each worker gets readWork, and does the Read into a buffer at the given offset.gofunc() {defer .Done()for := range {var []bytevarint := <-. .Put(.) := .errif == nil {switch .typ {casesshFxpStatus: = normaliseError(unmarshalStatus(., .data))casesshFxpData: , := unmarshalUint32(.data)if . != { = &unexpectedIDErr{., } } else { , := unmarshalUint32() = .Get()[:] = copy(, [:]) = [:] }default: = unimplementedPacketErr(.typ) } } := { : , : ., : , : ., }select {case . <- :case<-:return }if != nil {return } } }() }// Reduce: serialize the results from the reads into sequential writes. := for { , := <-if ! {return , errors.New("sftp.File.WriteTo: unexpectedly closed channel") }// Because writes are serialized, this will always be the last successfully read byte. .offset = . + int64(len(.))iflen(.) > 0 { , := .Write(.) += int64()if != nil {return , } }if . != nil {if . == io.EOF {return , nil }return , . } .Put(.) = . }}// Stat returns the FileInfo structure describing file. If there is an// error.func ( *File) () (os.FileInfo, error) { , := .c.fstat(.handle)if != nil {returnnil, }returnfileInfoFromStat(, path.Base(.path)), nil}// Write writes len(b) bytes to the File. It returns the number of bytes// written and an error, if any. Write returns a non-nil error when n !=// len(b).//// To maximise throughput for transferring the entire file (especially// over high latency links) it is recommended to use ReadFrom rather// than calling Write multiple times. io.Copy will do this// automatically.func ( *File) ( []byte) (int, error) { .mu.Lock()defer .mu.Unlock() , := .WriteAt(, .offset) .offset += int64()return , }func ( *File) ( chanresult, []byte, int64) (int, error) { , , := .c.sendPacket(, &sshFxpWritePacket{ID: .c.nextID(),Handle: .handle,Offset: uint64(),Length: uint32(len()),Data: , })if != nil {return0, }switch {casesshFxpStatus: , := unmarshalUint32() := normaliseError(unmarshalStatus(, ))if != nil {return0, }default:return0, unimplementedPacketErr() }returnlen(), nil}// writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially.func ( *File) ( []byte, int64) (int, error) {// Split the write into multiple maxPacket sized concurrent writes // bounded by maxConcurrentRequests. This allows writes with a suitably // large buffer to transfer data at a much faster rate due to // overlapping round trip times. := make(chanstruct{})typestruct {uint32chanresultint64 } := make(chan ) := len()/.c.maxPacket + 1if > .c.maxConcurrentRequests || < 1 { = .c.maxConcurrentRequests } := newResChanPool()// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.gofunc() {deferclose()varint := .c.maxPacketfor < len() { := [:]iflen() > { = [:] } := .c.nextID() := .Get() := + int64() .c.dispatchRequest(, &sshFxpWritePacket{ID: ,Handle: .handle,Offset: uint64(),Length: uint32(len()),Data: , })select {case<- {, , }:case<-:return } += len() } }()typestruct {int64error } := make(chan )varsync.WaitGroup .Add()for := 0; < ; ++ {// Map_i: each worker gets work, and does the Write from each buffer to its respective offset.gofunc() {defer .Done()for := range { := <-. .Put(.) := .errif == nil {switch .typ {casesshFxpStatus: = normaliseError(unmarshalStatus(., .data))default: = unimplementedPacketErr(.typ) } }if != nil { <- {., } } } }() }// Wait for long tail, before closing results.gofunc() { .Wait()close() }()// Reduce: collect all the results into a relevant return: the earliest offset to return an error. := {math.MaxInt64, nil}for := range {if . <= . { = }select {case<-:default:// stop any more work from being distributed. (Just in case.)close() } }if . != nil {// firstErr.err != nil if and only if firstErr.off >= our starting offset.returnint(. - ), . }returnlen(), nil}// WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns// the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics,// so the file offset is not altered during the write.func ( *File) ( []byte, int64) ( int, error) {iflen() <= .c.maxPacket {// We can do this in one write.return .writeChunkAt(nil, , ) }if .c.useConcurrentWrites {return .writeAtConcurrent(, ) } := make(chanresult, 1) // reusable channel := .c.maxPacketfor < len() { := [:]iflen() > { = [:] } , := .writeChunkAt(, , +int64())if > 0 { += }if != nil {return , } }returnlen(), nil}// ReadFromWithConcurrency implements ReaderFrom,// but uses the given concurrency to issue multiple requests at the same time.//// Giving a concurrency of less than one will default to the Client’s max concurrency.//// Otherwise, the given concurrency will be capped by the Client's max concurrency.func ( *File) ( io.Reader, int) ( int64, error) {// Split the write into multiple maxPacket sized concurrent writes. // This allows writes with a suitably large reader // to transfer data at a much faster rate due to overlapping round trip times. := make(chanstruct{})typestruct {uint32chanresultint64 } := make(chan )typestruct {int64error } := make(chan )if > .c.maxConcurrentRequests || < 1 { = .c.maxConcurrentRequests } := newResChanPool()// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.gofunc() {deferclose() := make([]byte, .c.maxPacket) := .offsetfor { , := .Read()if > 0 { += int64() := .c.nextID() := .Get() .c.dispatchRequest(, &sshFxpWritePacket{ID: ,Handle: .handle,Offset: uint64(),Length: uint32(),Data: [:], })select {case<- {, , }:case<-:return } += int64() }if != nil {if != io.EOF { <- {, } }return } } }()varsync.WaitGroup .Add()for := 0; < ; ++ {// Map_i: each worker gets work, and does the Write from each buffer to its respective offset.gofunc() {defer .Done()for := range { := <-. .Put(.) := .errif == nil {switch .typ {casesshFxpStatus: = normaliseError(unmarshalStatus(., .data))default: = unimplementedPacketErr(.typ) } }if != nil { <- {., } } } }() }// Wait for long tail, before closing results.gofunc() { .Wait()close() }()// Reduce: Collect all the results into a relevant return: the earliest offset to return an error. := {math.MaxInt64, nil}for := range {if . <= . { = }select {case<-:default:// stop any more work from being distributed.close() } }if . != nil {// firstErr.err != nil if and only if firstErr.off is a valid offset. // // firstErr.off will then be the lesser of: // * the offset of the first error from writing, // * the last successfully read offset. // // This could be less than the last successfully written offset, // which is the whole reason for the UseConcurrentWrites() ClientOption. // // Callers are responsible for truncating any SFTP files to a safe length. .offset = .// ReadFrom is defined to return the read bytes, regardless of any writer errors.return , . } .offset += return , nil}// ReadFrom reads data from r until EOF and writes it to the file. The return// value is the number of bytes read. Any error except io.EOF encountered// during the read is also returned.//// This method is preferred over calling Write multiple times// to maximise throughput for transferring the entire file,// especially over high-latency links.func ( *File) ( io.Reader) (int64, error) { .mu.Lock()defer .mu.Unlock()if .c.useConcurrentWrites {varint64switch r := .(type) {caseinterface{ () int }: = int64(.())caseinterface{ () int64 }: = .()case *io.LimitedReader: = .Ncaseinterface{ () (os.FileInfo, error) }: , := .()if == nil { = .Size() } }if < 0 {// We can strongly assert that we want default max concurrency here.return .ReadFromWithConcurrency(, .c.maxConcurrentRequests) }if > int64(.c.maxPacket) {// Otherwise, only use concurrency, if it would be at least two packets.// This is the best reasonable guess we can make. := /int64(.c.maxPacket) + 1// We need to cap this value to an `int` size value to avoid overflow on 32-bit machines. // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.if > int64(.c.maxConcurrentRequests) { = int64(.c.maxConcurrentRequests) }return .ReadFromWithConcurrency(, int()) } } := make(chanresult, 1) // reusable channel := make([]byte, .c.maxPacket)varint64for { , := .Read()if < 0 {panic("sftp.File: reader returned negative count from Read") }if > 0 { += int64() , := .writeChunkAt(, [:], .offset) .offset += int64()if == nil { = } }if != nil {if == io.EOF {return , nil// return nil explicitly. }return , } }}// Seek implements io.Seeker by setting the client offset for the next Read or// Write. It returns the next offset read. Seeking before or after the end of// the file is undefined. Seeking relative to the end calls Stat.func ( *File) ( int64, int) (int64, error) { .mu.Lock()defer .mu.Unlock()switch {caseio.SeekStart:caseio.SeekCurrent: += .offsetcaseio.SeekEnd: , := .Stat()if != nil {return .offset, } += .Size()default:return .offset, unimplementedSeekWhence() }if < 0 {return .offset, os.ErrInvalid } .offset = return .offset, nil}// Chown changes the uid/gid of the current file.func ( *File) (, int) error {return .c.Chown(.path, , )}// Chmod changes the permissions of the current file.//// See Client.Chmod for details.func ( *File) ( os.FileMode) error {return .c.setfstat(.handle, sshFileXferAttrPermissions, toChmodPerm())}// Sync requests a flush of the contents of a File to stable storage.//// Sync requires the server to support the fsync@openssh.com extension.func ( *File) () error { := .c.nextID() , , := .c.sendPacket(nil, &sshFxpFsyncPacket{ID: ,Handle: .handle, })switch {case != nil:returncase == sshFxpStatus:returnnormaliseError(unmarshalStatus(, ))default:return &unexpectedPacketErr{want: sshFxpStatus, got: } }}// Truncate sets the size of the current file. Although it may be safely assumed// that if the size is less than its current size it will be truncated to fit,// the SFTP protocol does not specify what behavior the server should do when setting// size greater than the current size.// We send a SSH_FXP_FSETSTAT here since we have a file handlefunc ( *File) ( int64) error {return .c.setfstat(.handle, sshFileXferAttrSize, uint64())}// normaliseError normalises an error into a more standard form that can be// checked against stdlib errors like io.EOF or os.ErrNotExist.func normaliseError( error) error {switch err := .(type) {case *StatusError:switch .Code {casesshFxEOF:returnio.EOFcasesshFxNoSuchFile:returnos.ErrNotExistcasesshFxPermissionDenied:returnos.ErrPermissioncasesshFxOk:returnnildefault:return }default:return }}// flags converts the flags passed to OpenFile into ssh flags.// Unsupported flags are ignored.func flags( int) uint32 {varuint32switch & os.O_WRONLY {caseos.O_WRONLY: |= sshFxfWritecaseos.O_RDONLY: |= sshFxfRead }if &os.O_RDWR == os.O_RDWR { |= sshFxfRead | sshFxfWrite }if &os.O_APPEND == os.O_APPEND { |= sshFxfAppend }if &os.O_CREATE == os.O_CREATE { |= sshFxfCreat }if &os.O_TRUNC == os.O_TRUNC { |= sshFxfTrunc }if &os.O_EXCL == os.O_EXCL { |= sshFxfExcl }return}// toChmodPerm converts Go permission bits to POSIX permission bits.//// This differs from fromFileMode in that we preserve the POSIX versions of// setuid, setgid and sticky in m, because we've historically supported those// bits, and we mask off any non-permission bits.func toChmodPerm( os.FileMode) ( uint32) {const = os.ModePerm | s_ISUID | s_ISGID | s_ISVTX = uint32( & )if &os.ModeSetuid != 0 { |= s_ISUID }if &os.ModeSetgid != 0 { |= s_ISGID }if &os.ModeSticky != 0 { |= s_ISVTX }return}
The pages are generated with Goldsv0.6.7. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds.