Source code for ftpsync.ftp_target

"""
(c) 2012-2024 Martin Wendt; see https://github.com/mar10/pyftpsync
Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""

import calendar
import codecs
import ftplib
import json
import os
import time
from posixpath import join as join_url
from posixpath import normpath as normpath_url
from posixpath import relpath as relpath_url
from tempfile import SpooledTemporaryFile

from ftpsync.metadata import DirMetadata, IncompatibleMetadataVersionError
from ftpsync.resources import DirectoryEntry, FileEntry
from ftpsync.targets import _get_encoding_opt, _Target
from ftpsync.util import (
    CliSilentRuntimeError,
    get_credentials_for_url,
    is_native,
    prompt_for_password,
    save_password,
    write,
    write_error,
)


# ===============================================================================
# FTPTarget
# ===============================================================================
[docs] class FTPTarget(_Target): """Represents a synchronization target on an FTP server. Attributes: path (str): Current working directory on FTP server. ftp (FTP): Instance of ftplib.FTP. host (str): hostname of FTP server port (int): FTP port (defaults to 21) username (str): password (str): """ DEFAULT_BLOCKSIZE = 8 * 1024 # ftplib uses 8k chunks by default MAX_SPOOL_MEM = ( 100 * 1024 ) # keep open_readable() buffer in memory if smaller than 100kB def __init__( self, path, host, port=0, username=None, password=None, tls=False, timeout=None, extra_opts=None, ): """Create FTP target with host, initial path, optional credentials and options. Args: path (str): root path on FTP server, relative to *host* host (str): hostname of FTP server port (int): FTP port (defaults to 21) username (str): password (str): tls (bool): encrypt the connection using TLS (Python 2.7/3.2+) timeout (int): the timeout to set against the ftp socket (seconds) extra_opts (dict): """ self.encoding = _get_encoding_opt(None, extra_opts, "utf-8") # path = self.to_unicode(path) path = path or "/" assert is_native(path) super().__init__(path, extra_opts) if tls: try: self.ftp = ftplib.FTP_TLS() except AttributeError: write("Python 2.7/3.2+ required for FTPS (TLS).") raise else: self.ftp = ftplib.FTP() self.ftp.set_debuglevel(self.get_option("ftp_debug", 0)) self.host = host self.port = port or 0 self.username = username self.password = password self.tls = tls self.timeout = timeout #: dict: written to ftp target root folder before synchronization starts. #: set to False, if write failed. Default: None self.lock_data = None self.lock_write_time = None self.feat_response = None self.syst_response = None self.is_unix = None #: True if server reports FEAT UTF8 self.support_utf8 = None #: Time difference between <local upload time> and the mtime that the server reports afterwards. #: The value is added to the 'u' time stored in meta data. #: (This is only a rough estimation, derived from the lock-file.) self.server_time_ofs = None self.ftp_socket_connected = False self.support_set_time = False # #: Optionally define an encoding for this server # encoding = self.get_option("encoding", "utf-8") # self.encoding = codecs.lookup(encoding).name # return def __str__(self): return "<{} + {}>".format( self.get_base_name(), relpath_url(self.cur_dir or "/", self.root_dir) )
[docs] def get_base_name(self): scheme = "ftps" if self.tls else "ftp" return f"{scheme}://{self.host}{self.root_dir}"
[docs] def open(self): assert not self.ftp_socket_connected super().open() options = self.get_options_dict() no_prompt = self.get_option("no_prompt", True) store_password = self.get_option("store_password", False) verbose = self.get_option("verbose", 3) self.ftp.set_debuglevel(self.get_option("ftp_debug", 0)) # Optionally use FTP active mode (default: PASV) (issue #21) force_active = self.get_option("ftp_active", False) self.ftp.set_pasv(not force_active) self.ftp.connect(self.host, self.port, self.timeout) # if self.timeout: # self.ftp.connect(self.host, self.port, self.timeout) # else: # # Py2.7 uses -999 as default for `timeout`, Py3 uses None # self.ftp.connect(self.host, self.port) self.ftp_socket_connected = True if self.username is None or self.password is None: creds = get_credentials_for_url( self.host, options, force_user=self.username ) if creds: self.username, self.password = creds while True: try: # Login (as 'anonymous' if self.username is undefined): self.ftp.login(self.username, self.password) if verbose >= 4: write( "Login as '{}'.".format( self.username if self.username else "anonymous" ) ) break except ftplib.error_perm as e: # If credentials were passed, but authentication fails, prompt # for new password if not e.args[0].startswith("530"): raise # error other then '530 Login incorrect' write_error(f"Could not login to {self.username}@{self.host}: {e}") if no_prompt or not self.username: raise creds = prompt_for_password(self.host, self.username) self.username, self.password = creds # Continue while-loop if self.tls: # Upgrade data connection to TLS. self.ftp.prot_p() try: self.syst_response = self.ftp.sendcmd("SYST") if verbose >= 5: write("SYST: '{}'.".format(self.syst_response.replace("\n", " "))) # self.is_unix = "unix" in resp.lower() # not necessarily true, better check with r/w tests # TODO: case sensitivity? except Exception as e: write(f"SYST command failed: '{e}'") try: self.feat_response = self.ftp.sendcmd("FEAT") self.support_utf8 = "UTF8" in self.feat_response if verbose >= 5: write("FEAT: '{}'.".format(self.feat_response.replace("\n", " "))) except Exception as e: write(f"FEAT command failed: '{e}'") if self.encoding == "utf-8": if not self.support_utf8 and verbose >= 4: write( "Server does not list utf-8 as supported feature (using it anyway).", warning=True, ) try: # Announce our wish to use UTF-8 to the server as proposed here: # See https://tools.ietf.org/html/draft-ietf-ftpext-utf-8-option-00 # Note: this RFC is inactive, expired, and failed on Strato self.ftp.sendcmd("OPTS UTF-8") if verbose >= 4: write("Sent 'OPTS UTF-8'.") except Exception as e: if verbose >= 4: write(f"Could not send 'OPTS UTF-8': '{e}'", warning=True) try: # Announce our wish to use UTF-8 to the server as proposed here: # See https://tools.ietf.org/html/rfc2389 # https://www.cerberusftp.com/phpBB3/viewtopic.php?t=2608 # Note: this was accepted on Strato self.ftp.sendcmd("OPTS UTF8 ON") if verbose >= 4: write("Sent 'OPTS UTF8 ON'.") except Exception as e: write(f"Could not send 'OPTS UTF8 ON': '{e}'", warning=True) if hasattr(self.ftp, "encoding"): # Python 3 encodes using latin-1 by default(!) # (In Python 2 ftp.encoding does not exist, but ascii is used) if self.encoding != codecs.lookup(self.ftp.encoding).name: write( "Setting FTP encoding to {} (was {}).".format( self.encoding, self.ftp.encoding ) ) self.ftp.encoding = self.encoding try: self.ftp.cwd(self.root_dir) except ftplib.error_perm as e: if not e.args[0].startswith("550"): raise # error other then 550 No such directory' # Implement --create-folder option for remote targets: if self.is_unbound(): # E.g. 'tree' command write_error( f"Could not change directory to {self.root_dir} ({e}): missing permissions?" ) elif self.is_local(): write_error( f"Could not change local directory to {self.root_dir} ({e}): missing permissions?" ) else: parent = os.path.dirname(self.root_dir) subfolder = os.path.basename(self.root_dir) if not self.get_option("create_folder", False): msg = ( f"Could not change remote directory to {self.root_dir!r} ({e!r}). " "This may be due to missing permissions or because the folder does not exist. " f"Pass `--create-folder` if you want to create {subfolder!r} within {parent!r}." ) raise CliSilentRuntimeError(msg, min_verbosity=4) write_error( f"Could not change remote directory to {self.root_dir!r} ({e!r}). " f"`--create-folder` was passed: creating {subfolder!r} within {parent!r}..." ) self.ftp.cwd(parent) self.mkdir(subfolder) # Must work now: self.ftp.cwd(self.root_dir) pwd = self.pwd() if pwd != self.root_dir: raise RuntimeError( "Unable to navigate to working directory {!r} (now at {!r})".format( self.root_dir, pwd ) ) self.cur_dir = pwd # Successfully authenticated: store password if store_password: save_password(self.host, self.username, self.password) self._lock() return
[docs] def close(self): if self.lock_data: self._unlock(closing=True) if self.ftp_socket_connected: try: self.ftp.quit() except (ConnectionError, EOFError) as e: write_error(f"ftp.quit() failed: {e}") self.ftp_socket_connected = False super().close()
[docs] def _lock(self, break_existing=False): """Write a special file to the target root folder.""" # write("_lock") data = {"lock_time": time.time(), "lock_holder": None} try: assert self.cur_dir == self.root_dir self.write_text(DirMetadata.LOCK_FILE_NAME, json.dumps(data)) self.lock_data = data self.lock_write_time = time.time() except Exception as e: errmsg = f"{e}" write_error(f"Could not write lock file: {errmsg}") if errmsg.startswith("550") and self.ftp.passiveserver: try: self.ftp.makepasv() except Exception: write_error( "The server probably requires FTP Active mode. " "Try passing the --ftp-active option." ) # Set to False, so we don't try to remove later self.lock_data = False
[docs] def _unlock(self, closing=False): """Remove lock file to the target root folder.""" # write("_unlock", closing) try: if self.cur_dir != self.root_dir: if closing: write( "Changing to ftp root folder to remove lock file: {}".format( self.root_dir ) ) self.cwd(self.root_dir) else: write_error( "Could not remove lock file, because CWD != ftp root: {}".format( self.cur_dir ) ) return if self.lock_data is False: if self.get_option("verbose", 3) >= 4: write("Skip remove lock file (was not written).") else: # direct delete, without updating metadata or checking for target access: try: self.ftp.delete(DirMetadata.LOCK_FILE_NAME) # self.remove_file(DirMetadata.LOCK_FILE_NAME) except Exception as e: # I have seen '226 Closing data connection' responses here, # probably when a previous command threw another error. # However here, 2xx response should be Ok(?): # A 226 reply code is sent by the server before closing the # data connection after successfully processing the previous client command if e.args[0][:3] == "226": write_error("Ignoring 226 response for ftp.delete() lockfile") else: raise self.lock_data = None except Exception as e: write_error(f"Could not remove lock file: {e}") raise
[docs] def _probe_lock_file(self, reported_mtime): """Called by get_dir""" delta = reported_mtime - self.lock_data["lock_time"] # delta2 = reported_mtime - self.lock_write_time self.server_time_ofs = delta if self.get_option("verbose", 3) >= 4: write(f"Server time offset: {delta:.2f} seconds.")
# write("Server time offset2: {:.2f} seconds.".format(delta2))
[docs] def get_id(self): return self.host + self.root_dir
[docs] def cwd(self, dir_name): assert is_native(dir_name) path = normpath_url(join_url(self.cur_dir, dir_name)) if not path.startswith(self.root_dir): # paranoic check to prevent that our sync tool goes berserk raise RuntimeError( f"Tried to navigate outside root {self.root_dir!r}: {path!r}" ) self.ftp.cwd(dir_name) self.cur_dir = path self.cur_dir_meta = None return self.cur_dir
[docs] def pwd(self): """Return current working dir as native `str` (uses fallback-encoding).""" pwd = self._ftp_pwd() if pwd != "/": # #38 pwd = pwd.rstrip("/") return pwd
[docs] def mkdir(self, dir_name): assert is_native(dir_name) self.check_write(dir_name) self.ftp.mkd(dir_name)
[docs] def _rmdir_impl(self, dir_name, keep_root_folder=False, predicate=None): # FTP does not support deletion of non-empty directories. assert is_native(dir_name) self.check_write(dir_name) names = [] nlst_res = self._ftp_nlst(dir_name) # nlst_res = self.ftp.nlst(dir_name) # write("rmdir(%s): %s" % (dir_name, nlst_res)) for name in nlst_res: # name = self.re_encode_to_native(name) if "/" in name: name = os.path.basename(name) if name in (".", ".."): continue if predicate and not predicate(name): continue names.append(name) if len(names) > 0: self.ftp.cwd(dir_name) try: for name in names: try: # try to delete this as a file self.ftp.delete(name) except ftplib.all_errors as _e: write( " ftp.delete({}) failed: {}, trying rmdir()...".format( name, _e ) ) # assume <name> is a folder self.rmdir(name) finally: if dir_name != ".": self.ftp.cwd("..") # write("ftp.rmd(%s)..." % (dir_name, )) if not keep_root_folder: self.ftp.rmd(dir_name) return
[docs] def rmdir(self, dir_name): return self._rmdir_impl(dir_name)
[docs] def get_dir(self): entry_list = [] entry_map = {} local_var = {"has_meta": False} # pass local variables outside func scope encoding = self.encoding def _addline(status, line): # _ftp_retrlines_native() made sure that we always get `str` type lines assert status in (0, 1, 2) assert is_native(line) data, _, name = line.partition("; ") # print(status, name, u_name) if status == 1: write( "WARNING: File name seems not to be {}; re-encoded from CP-1252:".format( encoding ), name, ) elif status == 2: write_error("File name is neither UTF-8 nor CP-1252 encoded:", name) res_type = size = mtime = unique = None fields = data.split(";") # https://tools.ietf.org/html/rfc3659#page-23 # "Size" / "Modify" / "Create" / "Type" / "Unique" / "Perm" / "Lang" # / "Media-Type" / "CharSet" / os-depend-fact / local-fact for field in fields: field_name, _, field_value = field.partition("=") field_name = field_name.lower() if field_name == "type": res_type = field_value elif field_name in ("sizd", "size"): size = int(field_value) elif field_name == "modify": # Use calendar.timegm() instead of time.mktime(), because # the date was returned as UTC if "." in field_value: mtime = calendar.timegm( time.strptime(field_value, "%Y%m%d%H%M%S.%f") ) else: mtime = calendar.timegm( time.strptime(field_value, "%Y%m%d%H%M%S") ) elif field_name == "unique": unique = field_value entry = None if res_type == "dir": entry = DirectoryEntry(self, self.cur_dir, name, size, mtime, unique) elif res_type == "file": if name == DirMetadata.META_FILE_NAME: # the meta-data file is silently ignored local_var["has_meta"] = True elif ( name == DirMetadata.LOCK_FILE_NAME and self.cur_dir == self.root_dir ): # this is the root lock file. compare reported mtime with # local upload time self._probe_lock_file(mtime) else: entry = FileEntry(self, self.cur_dir, name, size, mtime, unique) elif res_type in ("cdir", "pdir"): pass else: write_error(f"Could not parse '{line}'") raise NotImplementedError( f"MLSD returned unsupported type: {res_type!r}" ) if entry: entry_map[name] = entry entry_list.append(entry) try: # We use a custom wrapper here, so we can implement a codding fall back: self._ftp_retrlines_native("MLSD", _addline, encoding) # self.ftp.retrlines("MLSD", _addline) except ftplib.error_perm as e: # write_error("The FTP server responded with {}".format(e)) # raises error_perm "500 Unknown command" if command is not supported if "500" in str(e.args): raise RuntimeError( "The FTP server does not support the 'MLSD' command." ) raise # load stored meta data if present self.cur_dir_meta = DirMetadata(self) if local_var["has_meta"]: try: self.cur_dir_meta.read() except IncompatibleMetadataVersionError: raise # this should end the script (user should pass --migrate) except Exception as e: write_error(f"Could not read meta info {self.cur_dir_meta}: {e}") meta_files = self.cur_dir_meta.list # Adjust file mtime from meta-data if present missing = [] for n in meta_files: meta = meta_files[n] if n in entry_map: # We have a meta-data entry for this resource upload_time = meta.get("u", 0) # Discard stored meta-data if # 1. the reported files size is different than the # size we stored in the meta-data # or # 2. the the mtime reported by the FTP server is later # than the stored upload time (which indicates # that the file was modified directly on the server) if entry_map[n].size != meta.get("s"): if self.get_option("verbose", 3) >= 5: write( "Removing meta entry {} (size changed from {} to {}).".format( n, entry_map[n].size, meta.get("s") ) ) missing.append(n) elif (entry_map[n].mtime - upload_time) > self.mtime_compare_eps: if self.get_option("verbose", 3) >= 5: write( "Removing meta entry {} (modified {} > {}).".format( n, time.ctime(entry_map[n].mtime), time.ctime(upload_time), ) ) missing.append(n) else: # Use meta-data mtime instead of the one reported by FTP server entry_map[n].meta = meta entry_map[n].mtime = meta["m"] else: # File is stored in meta-data, but no longer exists on FTP server # write("META: Removing missing meta entry %s" % n) missing.append(n) # Remove missing or invalid files from cur_dir_meta for n in missing: self.cur_dir_meta.remove(n) return entry_list
[docs] def open_readable(self, name): """Open cur_dir/name for reading. Note: we read everything into a buffer that supports .read(). Args: name (str): file name, located in self.curdir Returns: file-like (must support read() method) """ # print("FTP open_readable({})".format(name)) assert is_native(name) out = SpooledTemporaryFile(max_size=self.MAX_SPOOL_MEM, mode="w+b") self.ftp.retrbinary(f"RETR {name}", out.write, FTPTarget.DEFAULT_BLOCKSIZE) out.seek(0) return out
[docs] def write_file(self, name, fp_src, blocksize=DEFAULT_BLOCKSIZE, callback=None): """Write file-like `fp_src` to cur_dir/name. Args: name (str): file name, located in self.curdir fp_src (file-like): must support read() method blocksize (int, optional): callback (function, optional): Called like `func(buf)` for every written chunk """ # print("FTP write_file({})".format(name), blocksize) assert is_native(name) self.check_write(name) self.ftp.storbinary(f"STOR {name}", fp_src, blocksize, callback)
# TODO: check result
[docs] def copy_to_file(self, name, fp_dest, callback=None): """Write cur_dir/name to file-like `fp_dest`. Args: name (str): file name, located in self.curdir fp_dest (file-like): must support write() method callback (function, optional): Called like `func(buf)` for every written chunk """ assert is_native(name) def _write_to_file(data): # print("_write_to_file() {} bytes.".format(len(data))) fp_dest.write(data) if callback: callback(data) self.ftp.retrbinary(f"RETR {name}", _write_to_file, FTPTarget.DEFAULT_BLOCKSIZE)
[docs] def remove_file(self, name): """Remove cur_dir/name.""" assert is_native(name) self.check_write(name) # self.cur_dir_meta.remove(name) self.ftp.delete(name) self.remove_sync_info(name)
[docs] def set_mtime(self, name, mtime, size): assert is_native(name) self.check_write(name) # write("META set_mtime(%s): %s" % (name, time.ctime(mtime))) # We cannot set the mtime on FTP servers, so we store this as additional # meta data in the same directory # TODO: try "SITE UTIME", "MDTM (set version)", or "SRFT" command self.cur_dir_meta.set_mtime(name, mtime, size)
[docs] def _ftp_pwd(self): """Variant of `self.ftp.pwd()` that supports encoding-fallback. Returns: Current working directory as native string. """ try: return self.ftp.pwd() except UnicodeEncodeError: if self.ftp.encoding != "utf-8": raise # should not happen, since Py2 does not try to encode # TODO: this is NOT THREAD-SAFE! prev_encoding = self.ftp.encoding try: write("ftp.pwd() failed with utf-8: trying Cp1252...", warning=True) return self.ftp.pwd() finally: self.ftp.encoding = prev_encoding
[docs] def _ftp_nlst(self, dir_name): """Variant of `self.ftp.nlst()` that supports encoding-fallback.""" assert is_native(dir_name) lines = [] def _add_line(status, line): lines.append(line) cmd = "NLST " + dir_name self._ftp_retrlines_native(cmd, _add_line, self.encoding) # print(cmd, lines) return lines
[docs] def _ftp_retrlines_native(self, command, callback, encoding): """A re-implementation of ftp.retrlines that returns lines as native `str`. This is needed on Python 3, where `ftp.retrlines()` returns unicode `str` by decoding the incoming command response using `ftp.encoding`. This would fail for the whole request if a single line of the MLSD listing cannot be decoded. FTPTarget wants to fall back to Cp1252 if UTF-8 fails for a single line, so we need to process the raw original binary input lines. On Python 2, the response is already bytes, but we try to decode in order to check validity and optionally re-encode from Cp1252. Args: command (str): A valid FTP command like 'NLST', 'MLSD', ... callback (function): Called for every line with these args: status (int): 0:ok 1:fallback used, 2:decode failed line (str): result line decoded using `encoding`. If `encoding` is 'utf-8', a fallback to cp1252 is accepted. encoding (str): Coding that is used to convert the FTP response to `str`. Returns: None """ LF = b"\n" # noqa N806 buffer = b"" # needed to access buffer accross function scope local_var = {"buffer": buffer} fallback_enc = "cp1252" if encoding == "utf-8" else None def _on_read_line(line): # Line is a byte string # print(" line ", line) status = 2 # fault line_decoded = None try: line_decoded = line.decode(encoding) status = 0 # successfully decoded except UnicodeDecodeError: if fallback_enc: try: line_decoded = line.decode(fallback_enc) status = 1 # used fallback encoding except UnicodeDecodeError: raise # if compat.PY2: # # line is a native binary `str`. # if status == 1: # # We used a fallback: re-encode # callback(status, line_decoded.encode(encoding)) # else: # callback(status, line) # else: # line_decoded is a native text `str`. callback(status, line_decoded) # on_read_line = _on_read_line_py2 if compat.PY2 else _on_read_line_py3 def _on_read_chunk(chunk): buffer = local_var["buffer"] # Normalize line endings chunk = chunk.replace(b"\r\n", LF) chunk = chunk.replace(b"\r", LF) chunk = buffer + chunk try: # print("Add chunk ", chunk, "to buffer", buffer) while True: item, chunk = chunk.split(LF, 1) _on_read_line(item) # + LF) except ValueError: pass # print("Rest chunk", chunk) local_var["buffer"] = chunk self.ftp.retrbinary(command, _on_read_chunk) if buffer: _on_read_line(buffer) return