"""
(c) 2012-2024 Martin Wendt; see https://github.com/mar10/pyftpsync
Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
import codecs
import contextlib
import io
import os
import shutil
import sys
import threading
from posixpath import join as join_url
from posixpath import normpath as normpath_url
from urllib.parse import unquote, urlparse
from ftpsync.metadata import DirMetadata
from ftpsync.resources import DirectoryEntry, FileEntry
from ftpsync.util import is_native, to_bytes, to_native, to_unicode, write
# ===============================================================================
# make_target
# ===============================================================================
[docs]
def make_target(url, extra_opts=None):
"""Factory that creates `_Target` objects from URLs.
FTP targets must begin with the scheme ``ftp://``, ``ftps://`` for TLS,
or ``sftp://`` for SFTP.
Args:
url (str):
extra_opts (dict, optional): Passed to Target constructor. Default: None.
Returns:
:class:`_Target`
"""
# debug = extra_opts.get("debug", 1)
parts = urlparse(url, allow_fragments=False)
password = unquote(parts.password) if parts.password else None
# scheme is case-insensitive according to https://tools.ietf.org/html/rfc3986
scheme = parts.scheme.lower()
if scheme in ("ftp", "ftps"):
from ftpsync.ftp_target import FTPTarget
target = FTPTarget(
parts.path,
parts.hostname,
parts.port,
username=parts.username,
password=password,
tls=(scheme == "ftps"),
timeout=None,
extra_opts=extra_opts,
)
elif scheme == "sftp":
from ftpsync.sftp_target import SFTPTarget
target = SFTPTarget(
parts.path,
parts.hostname,
parts.port,
username=parts.username,
password=password,
timeout=None,
extra_opts=extra_opts,
)
else:
target = FsTarget(url, extra_opts)
return target
[docs]
def _get_encoding_opt(synchronizer, extra_opts, default):
"""Helper to figure out encoding setting inside constructors."""
encoding = default
# if synchronizer and "encoding" in synchronizer.options:
# encoding = synchronizer.options.get("encoding")
if extra_opts and "encoding" in extra_opts:
encoding = extra_opts.get("encoding")
if encoding:
# Normalize name (e.g. 'UTF8' => 'utf-8')
encoding = codecs.lookup(encoding).name
# print("_get_encoding_opt", encoding)
return encoding or None
# ===============================================================================
# _Target
# ===============================================================================
[docs]
class _Target:
"""Base class for :class:`FsTarget`, :class:`FTPTarget`, etc."""
DEFAULT_BLOCKSIZE = 16 * 1024 # shutil.copyobj() uses 16k blocks by default
def __init__(self, root_dir, extra_opts):
# All internal paths should use unicode.
# (We cannot convert here, since we don't know the target encoding.)
assert is_native(root_dir)
if root_dir != "/":
root_dir = root_dir.rstrip("/")
# This target is not thread safe
self._rlock = threading.RLock()
#: The target's top-level folder
self.root_dir = root_dir
self.extra_opts = extra_opts or {}
self.readonly = False
self.dry_run = False
self.host = None
#: Set by BaseSynchronizer.__init__(). May be None for tree command, etc.
self.synchronizer = None
self.peer = None
self.cur_dir = None
self.connected = False
self.save_mode = True
self.case_sensitive = None # TODO: don't know yet
#: Time difference between <local upload time> and the mtime that the server reports afterwards.
#: The value is added to the 'u' time stored in meta data.
#: (This is only a rough estimation, derived from the lock-file.)
self.server_time_ofs = None
#: Maximum allowed difference between a reported mtime and the last known update time,
#: before we classify the entry as 'modified externally'
self.mtime_compare_eps = FileEntry.EPS_TIME
self.cur_dir_meta = DirMetadata(self)
self.meta_stack = []
# Optionally define an encoding for this target, but don't override
# derived class's setting
if not hasattr(self, "encoding"):
#: Assumed encoding for this target. Used to decode binary paths.
self.encoding = _get_encoding_opt(None, extra_opts, None)
return
def __del__(self):
# TODO: http://pydev.blogspot.de/2015/01/creating-safe-cyclic-reference.html
if self.connected:
self.close()
# def __enter__(self):
# self.open()
# return self
# def __exit__(self, exc_type, exc_value, traceback):
# self.close()
[docs]
def get_base_name(self):
return f"{self.root_dir}"
[docs]
def is_local(self):
if not self.synchronizer:
raise RuntimeError(
"Unbound target: Consider calling target.is_remote(or_unbound=True)"
)
return self.synchronizer.local is self
[docs]
def is_remote(self, or_unbound=False):
if or_unbound and not self.synchronizer:
return True
return not self.is_local()
[docs]
def is_unbound(self):
return self.synchronizer is None
[docs]
def get_options_dict(self):
"""Return options from synchronizer (possibly overridden by own extra_opts)."""
d = self.synchronizer.options if self.synchronizer else {}
d.update(self.extra_opts)
return d
[docs]
def get_option(self, key, default=None):
"""Return option from synchronizer (possibly overridden by target extra_opts)."""
if self.synchronizer:
return self.extra_opts.get(key, self.synchronizer.options.get(key, default))
return self.extra_opts.get(key, default)
[docs]
def open(self):
if self.connected:
raise RuntimeError(f"Target already open: {self}. ")
# Not thread safe (issue #20)
if not self._rlock.acquire(False):
raise RuntimeError("Could not acquire _Target lock on open")
self.connected = True
[docs]
def close(self):
if not self.connected:
return
if self.get_option("verbose", 3) >= 5:
write(f"Closing target {self}.")
self.connected = False
self.readonly = False # issue #20
self._rlock.release()
[docs]
def check_write(self, name):
"""Raise exception if writing cur_dir/name is not allowed."""
assert is_native(name)
if self.readonly and name not in (
DirMetadata.META_FILE_NAME,
DirMetadata.LOCK_FILE_NAME,
):
raise RuntimeError(f"Target is read-only: {self} + {name} / ")
[docs]
def get_id(self):
return self.root_dir
[docs]
def get_sync_info(self, name, key=None):
"""Get mtime/size when this target's current dir was last synchronized with remote."""
peer_target = self.peer
if self.is_local():
info = self.cur_dir_meta.dir["peer_sync"].get(peer_target.get_id())
else:
info = peer_target.cur_dir_meta.dir["peer_sync"].get(self.get_id())
if name is not None:
info = info.get(name) if info else None
if info and key:
info = info.get(key)
return info
[docs]
def cwd(self, dir_name):
raise NotImplementedError
[docs]
@contextlib.contextmanager
def enter_subdir(self, name):
"""Temporarily changes the working directory to `name`.
Examples:
with target.enter_subdir(folder):
...
"""
self.cwd(name)
yield
self.cwd("..")
[docs]
def pwd(self, dir_name):
raise NotImplementedError
[docs]
def mkdir(self, dir_name):
raise NotImplementedError
[docs]
def rmdir(self, dir_name):
"""Remove cur_dir/name."""
raise NotImplementedError
[docs]
def get_dir(self):
"""Return a list of _Resource entries."""
raise NotImplementedError
[docs]
def walk(self, pred=None, recursive=True):
"""Iterate over all target entries recursively.
Args:
pred (function, optional):
Callback(:class:`ftpsync.resources._Resource`) should return `False` to
ignore entry. Default: `None`.
recursive (bool, optional):
Pass `False` to generate top level entries only. Default: `True`.
Yields:
:class:`ftpsync.resources._Resource`
"""
for entry in self.get_dir():
if pred and pred(entry) is False:
continue
yield entry
if recursive:
if isinstance(entry, DirectoryEntry):
self.cwd(entry.name)
yield from self.walk(pred)
self.cwd("..")
return
[docs]
def walk_tree(self, sort=True, files=False, pred=None, _prefixes=None):
"""Iterate over target hierarchy, depth-first, adding a connector prefix.
This iterator walks the tree nodes, but slightly delays the output, in
order to add information if a node is the *last* sibling.
This information is then used to create pretty tree connector prefixes.
Args:
sort (bool):
files (bool):
pred (function, optional):
Callback(:class:`ftpsync.resources._Resource`) should return `False` to
ignore entry. Default: `None`.
Yields:
3-tuple (
:class:`ftpsync.resources._Resource`,
is_last_sibling,
prefix,
)
A
+- a
| +- 1
| | `- 1.1
| `- 2
| `- 2.1
`- b
+- 1
| `- 1.1
` 2
"""
# List of parent's `is_last` flags:
if _prefixes is None:
_prefixes = []
def _yield_entry(entry, is_last):
path = "".join([" " if last else " | " for last in _prefixes])
path += " `- " if is_last else " +- "
yield path, entry
if entry.is_dir():
with self.enter_subdir(entry.name):
_prefixes.append(is_last)
yield from self.walk_tree(sort, files, pred, _prefixes)
_prefixes.pop()
return
dir_list = self.get_dir()
if not files:
dir_list = [entry for entry in dir_list if entry.is_dir()]
if sort:
# Sort by name, files first
dir_list.sort(key=lambda entry: (entry.is_dir(), entry.name.lower()))
prev_entry = None
for next_entry in dir_list:
if pred and pred(next_entry) is False:
continue
# Skip first entry
if prev_entry is None:
prev_entry = next_entry
continue
# Yield entry (this is never the last sibling)
yield from _yield_entry(prev_entry, False)
prev_entry = next_entry
# Finally yield the last sibling
if prev_entry:
yield from _yield_entry(prev_entry, True)
return
[docs]
def open_readable(self, name):
"""Return file-like object opened in binary mode for cur_dir/name."""
raise NotImplementedError
[docs]
def open_writable(self, name):
"""Return file-like object opened in binary mode for cur_dir/name."""
raise NotImplementedError
[docs]
def read_text(self, name):
"""Read text string from cur_dir/name using open_readable()."""
with self.open_readable(name) as fp:
res = fp.read() # StringIO or file object
# try:
# res = fp.getvalue() # StringIO returned by FTPTarget
# except AttributeError:
# res = fp.read() # file object returned by FsTarget
res = res.decode("utf-8")
return res
[docs]
def copy_to_file(self, name, fp_dest, callback=None):
"""Write cur_dir/name to file-like `fp_dest`.
Args:
name (str): file name, located in self.curdir
fp_dest (file-like): must support write() method
callback (function, optional):
Called like `func(buf)` for every written chunk
"""
raise NotImplementedError
[docs]
def write_file(self, name, fp_src, blocksize=DEFAULT_BLOCKSIZE, callback=None):
"""Write binary data from file-like to cur_dir/name."""
raise NotImplementedError
[docs]
def write_text(self, name, s):
"""Write string data to cur_dir/name using write_file()."""
buf = io.BytesIO(to_bytes(s))
self.write_file(name, buf)
[docs]
def remove_file(self, name):
"""Remove cur_dir/name."""
raise NotImplementedError
[docs]
def set_mtime(self, name, mtime, size):
raise NotImplementedError
[docs]
def set_sync_info(self, name, mtime, size):
"""Store mtime/size when this resource was last synchronized with remote."""
if not self.is_local():
return self.peer.set_sync_info(name, mtime, size)
return self.cur_dir_meta.set_sync_info(name, mtime, size)
[docs]
def remove_sync_info(self, name):
if not self.is_local():
return self.peer.remove_sync_info(name)
if self.cur_dir_meta:
return self.cur_dir_meta.remove(name)
# write("%s.remove_sync_info(%s): nothing to do" % (self, name))
return
# ===============================================================================
# FsTarget
# ===============================================================================
[docs]
class FsTarget(_Target):
DEFAULT_BLOCKSIZE = 16 * 1024 # shutil.copyobj() uses 16k blocks by default
def __init__(self, root_dir, extra_opts=None):
def_enc = sys.getfilesystemencoding()
if not def_enc:
def_enc = "utf-8"
self.encoding = _get_encoding_opt(None, extra_opts, def_enc)
# root_dir = self.to_unicode(root_dir)
root_dir = os.path.expanduser(root_dir)
root_dir = os.path.abspath(root_dir)
super().__init__(root_dir, extra_opts)
if not os.path.isdir(root_dir):
raise ValueError(f"{root_dir} is not a directory.")
self.support_set_time = True
def __str__(self):
return "<FS:{} + {}>".format(
self.root_dir, os.path.relpath(self.cur_dir, self.root_dir)
)
[docs]
def open(self):
super().open()
self.cur_dir = self.root_dir
[docs]
def close(self):
super().close()
[docs]
def cwd(self, dir_name):
path = normpath_url(join_url(self.cur_dir, dir_name))
if not path.startswith(self.root_dir):
raise RuntimeError(
f"Tried to navigate outside root {self.root_dir!r}: {path!r}"
)
self.cur_dir_meta = None
self.cur_dir = path
return self.cur_dir
[docs]
def pwd(self):
return self.cur_dir
[docs]
def mkdir(self, dir_name):
self.check_write(dir_name)
path = normpath_url(join_url(self.cur_dir, dir_name))
os.mkdir(path)
[docs]
def rmdir(self, dir_name):
"""Remove cur_dir/name."""
self.check_write(dir_name)
path = normpath_url(join_url(self.cur_dir, dir_name))
# write("REMOVE %r" % path)
shutil.rmtree(path)
[docs]
def get_dir(self):
res = []
# self.cur_dir_meta = None
self.cur_dir_meta = DirMetadata(self)
# List directory. Pass in unicode on Py2, so we get unicode in return
unicode_cur_dir = to_unicode(self.cur_dir)
for name in os.listdir(unicode_cur_dir):
name = to_native(name)
path = os.path.join(self.cur_dir, name)
stat = os.lstat(path)
# write(name)
# write(" mt : %s" % stat.st_mtime)
# write(" lc : %s" % (time.localtime(stat.st_mtime),))
# write(" : %s" % time.asctime(time.localtime(stat.st_mtime)))
# write(" gmt: %s" % (time.gmtime(stat.st_mtime),))
# write(" : %s" % time.asctime(time.gmtime(stat.st_mtime)))
# utc_stamp = st_mtime_to_utc(stat.st_mtime)
# write(" utc: %s" % utc_stamp)
# write(" diff: %s" % ((utc_stamp - stat.st_mtime) / (60*60)))
# stat.st_mtime is returned as UTC
mtime = stat.st_mtime
if os.path.isdir(path):
res.append(
DirectoryEntry(
self, self.cur_dir, name, stat.st_size, mtime, str(stat.st_ino)
)
)
elif os.path.isfile(path):
if name == DirMetadata.META_FILE_NAME:
self.cur_dir_meta.read()
# elif not name in (DirMetadata.DEBUG_META_FILE_NAME, ):
else:
res.append(
FileEntry(
self,
self.cur_dir,
name,
stat.st_size,
mtime,
str(stat.st_ino),
)
)
return res
[docs]
def open_readable(self, name):
fp = open(os.path.join(self.cur_dir, name), "rb")
# print("open_readable({})".format(name))
return fp
[docs]
def open_writable(self, name):
fp = open(os.path.join(self.cur_dir, name), "wb")
# print("open_readable({})".format(name))
return fp
[docs]
def write_file(self, name, fp_src, blocksize=DEFAULT_BLOCKSIZE, callback=None):
self.check_write(name)
with open(os.path.join(self.cur_dir, name), "wb") as fp_dst:
while True:
data = fp_src.read(blocksize)
# print("write_file({})".format(name), len(data))
if data is None or not len(data):
break
fp_dst.write(data)
if callback:
callback(data)
return
[docs]
def remove_file(self, name):
"""Remove cur_dir/name."""
self.check_write(name)
path = os.path.join(self.cur_dir, name)
os.remove(path)
[docs]
def set_mtime(self, name, mtime, size):
"""Set modification time on file."""
self.check_write(name)
os.utime(os.path.join(self.cur_dir, name), (-1, mtime))