"""
(c) 2012-2024 Martin Wendt; see https://github.com/mar10/pyftpsync
Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
import os
from datetime import datetime
from posixpath import join as join_url
from posixpath import normpath as normpath_url
from posixpath import relpath as relpath_url
from ftpsync.util import DEBUG_FLAGS, eps_compare, write
ENTRY_CLASSIFICATIONS = frozenset(
["existing", "unmodified", "modified", "new", "deleted"]
)
# PAIR_CLASSIFICATIONS = frozenset([
# "conflict", "equal", "other"
# ])
PAIR_OPERATIONS = frozenset(
[
"conflict",
"copy_local",
"copy_remote",
"delete_local",
"delete_remote",
"equal",
"need_compare",
]
)
operation_map = {
# (local, remote) => operation
("missing", "missing"): None, # Not allowed
("missing", "new"): "copy_remote",
("missing", "unmodified"): "copy_remote",
("missing", "modified"): "copy_remote",
("missing", "deleted"): True, # Nothing to do (only update metadata)
("new", "missing"): "copy_local",
("new", "new"): "need_compare",
("new", "unmodified"): "need_compare",
("new", "modified"): "need_compare",
("new", "deleted"): "conflict",
("unmodified", "missing"): "copy_local",
("unmodified", "new"): "need_compare",
("unmodified", "unmodified"): "equal",
("unmodified", "modified"): "copy_remote",
("unmodified", "deleted"): "delete_local",
("modified", "missing"): "copy_local",
("modified", "new"): "need_compare",
("modified", "unmodified"): "copy_local",
("modified", "modified"): "conflict",
("modified", "deleted"): "conflict",
("deleted", "missing"): True, # Nothing to do (only update metadata)
("deleted", "new"): "conflict",
("deleted", "unmodified"): "delete_remote",
("deleted", "modified"): "conflict",
("deleted", "deleted"): True, # Nothing to do (only update metadata)
# No meta data available: treat as 'unmodified' in general:
("existing", "missing"): "copy_local",
("missing", "existing"): "copy_remote",
("existing", "existing"): "need_compare",
}
# ===============================================================================
# EntryPair
# ===============================================================================
[docs]
class EntryPair:
""""""
def __init__(self, local, remote):
self.local = local
self.remote = remote
any_entry = local or remote
assert any_entry
if local and remote:
assert local.name == remote.name
assert local.get_rel_path() == remote.get_rel_path()
assert local.is_dir() == remote.is_dir()
#: str:
self.name = any_entry.name
#: str:
self.rel_path = any_entry.get_rel_path()
#: bool:
self.is_dir = any_entry.is_dir()
#: str:
self.local_classification = None
#: str:
self.remote_classification = None
#: str:
self.operation = None
#: str:
self.re_class_reason = None
# #: bool:
# self.was_skipped = None
def __str__(self):
s = "<EntryPair({})>: ({}, {}) => {}".format(
f"[{self.rel_path}]" if self.is_dir else self.rel_path,
self.local_classification,
self.remote_classification,
self.operation,
)
return s
@property
def any_entry(self):
"""Return the local entry (or the remote entry if it is None)."""
return self.local or self.remote
[docs]
def is_conflict(self):
assert self.operation
return self.operation == "conflict"
[docs]
def is_same_time(self):
"""Return True if local.mtime == remote.mtime."""
return (
self.local
and self.remote
and FileEntry._eps_compare(self.local.mtime, self.remote.mtime) == 0
)
[docs]
def override_operation(self, operation, reason):
"""Re-Classify entry pair."""
# prev_class = (self.local_classification, self.remote_classification)
prev_op = self.operation
assert operation != prev_op
assert operation in PAIR_OPERATIONS
if "classify" in DEBUG_FLAGS:
write(
"override_operation {} -> {} (reason: '{}')".format(
self, operation, reason
),
debug=True,
)
self.operation = operation
self.re_class_reason = reason
[docs]
def classify(self, peer_dir_meta):
"""Classify entry pair."""
assert self.operation is None
# Note: We pass False if the entry is not listed in the metadata.
# We pass None if we don't have metadata all.
peer_entry_meta = peer_dir_meta.get(self.name, False) if peer_dir_meta else None
if self.local:
self.local.classify(peer_dir_meta)
self.local_classification = self.local.classification
elif peer_entry_meta:
self.local_classification = "deleted"
else:
self.local_classification = "missing"
if self.remote:
self.remote.classify(peer_dir_meta)
self.remote_classification = self.remote.classification
elif peer_entry_meta:
self.remote_classification = "deleted"
else:
self.remote_classification = "missing"
c_pair = (self.local_classification, self.remote_classification)
self.operation = operation_map.get(c_pair)
if not self.operation:
raise RuntimeError(f"Undefined operation for pair classification {c_pair}")
if "classify" in DEBUG_FLAGS:
write(
f"Classified pair {self}, meta={peer_entry_meta}",
debug=True,
)
# if not entry.meta:
# assert self.classification in PAIR_CLASSIFICATIONS
assert self.operation in PAIR_OPERATIONS
return self.operation
# ===============================================================================
# _Resource
# ===============================================================================
[docs]
class _Resource:
"""Common base class for files and directories."""
def __init__(self, target, rel_path, name, size, mtime, unique):
"""
Args:
target:
rel_path (str):
name (str): base name
size (int): file size in bytes
mtime (float): modification time as UTC stamp
uniqe (str): string
"""
#: :class:`_Target`: Parent target object.
self.target = target
#: str: Path relative to :attr:`target`
self.rel_path = rel_path
#: str: File name.
self.name = name
#: int: Current file size
self.size = size
#: float: Current file modification time stamp
#: (for FTP targets adjusted using metadata information).
self.mtime = mtime
# #: datetime: Converted version of :attr:`mtime`.
# self.dt_modified = datetime.fromtimestamp(self.mtime)
#: float: Modification time stamp (as reported by source FTP server).
self.mtime_org = mtime
# #: datetime: Converted version of :attr:`mtime_org`.
# self.dt_modified_org = self.mtime_org
#: str: Unique id of file/directory.
self.unique = unique
# #: dict: Additional metadata (set by target.get_dir()).
# self.meta = None
#: int: File size at the time of last sync operation
self.ps_size = None
#: float: File modification time stamp at the time of last sync operation
self.ps_mtime = None
#: float: Time stamp of last sync operation
self.ps_utime = None
#: str: (set by synchronizer._classify_entry()).
self.classification = None
#: bool: May be set to true by synchronizer
self.was_deleted = None
def __str__(self):
dt_modified = datetime.fromtimestamp(self.mtime)
path = os.path.join(self.rel_path, self.name)
if self.is_dir():
res = f"{self.__class__.__name__}([{path}])"
else:
res = "{}('{}', size:{}, modified:{})".format(
self.__class__.__name__,
path,
f"{self.size:,}" if self.size is not None else self.size,
dt_modified,
)
# + " ## %s, %s" % (self.mtime, time.asctime(time.gmtime(self.mtime)))
if self.classification:
res += f" => {self.classification}"
return res
[docs]
def as_string(self, other_resource=None):
# dt = datetime.fromtimestamp(self.get_adjusted_mtime())
dt = datetime.fromtimestamp(self.mtime)
res = "{}, {:>8,} bytes".format(dt.strftime("%Y-%m-%d %H:%M:%S"), self.size)
if other_resource:
comp = []
if self.mtime < other_resource.mtime:
comp.append("older")
elif self.mtime > other_resource.mtime:
comp.append("newer")
if self.size < other_resource.size:
comp.append("smaller")
elif self.size > other_resource.size:
comp.append("larger")
if comp:
res += " ({})".format(", ".join(comp))
return res
def __eq__(self, other):
raise NotImplementedError
[docs]
def get_rel_path(self):
path = relpath_url(self.target.cur_dir, self.target.root_dir)
return normpath_url(join_url(path, self.name))
[docs]
def is_file(self):
return False
[docs]
def is_dir(self):
return False
[docs]
def is_local(self):
return self.target.is_local()
[docs]
def get_sync_info(self, key=None):
return None
[docs]
def set_sync_info(self, local_file):
raise NotImplementedError
[docs]
def classify(self, peer_dir_meta):
"""Classify this entry as 'new', 'unmodified', or 'modified'."""
assert self.classification is None, f"{self}, {peer_dir_meta}"
peer_entry_meta = None
if peer_dir_meta:
# Metadata is generally available, so we can detect 'new' or 'modified'
peer_entry_meta = peer_dir_meta.get(self.name, False)
if self.is_dir():
# Directories are considered 'unmodified' (would require deep
# traversal to check otherwise)
if peer_entry_meta:
self.classification = "unmodified"
else:
self.classification = "new"
elif peer_entry_meta:
# File entries can be classified as modified/unmodified
self.ps_size = peer_entry_meta.get("s")
self.ps_mtime = peer_entry_meta.get("m")
self.ps_utime = peer_entry_meta.get("u")
if (
self.size == self.ps_size
and FileEntry._eps_compare(self.mtime, self.ps_mtime) == 0
):
self.classification = "unmodified"
else:
self.classification = "modified"
else:
# A new file entry
self.classification = "new"
else:
# No metadata available:
if self.is_dir():
# Directories are considered 'unmodified' (would require deep
# traversal to check otherwise)
self.classification = "unmodified"
else:
# That's all we know, but EntryPair.classify() may adjust this
self.classification = "existing"
if "classify" in DEBUG_FLAGS:
write(f"Classified {self}, meta={peer_entry_meta}", debug=True)
assert self.classification in ENTRY_CLASSIFICATIONS
return self.classification
# ===============================================================================
# FileEntry
# ===============================================================================
[docs]
class FileEntry(_Resource):
# 2 seconds difference is considered equal.
# mtime stamp resolution depends on filesystem: FAT32. 2 seconds, NTFS ms, OSX. 1 sec.
EPS_TIME = 2.01
# EPS_TIME = 0.1
def __init__(self, target, rel_path, name, size, mtime, unique):
super().__init__(target, rel_path, name, size, mtime, unique)
[docs]
@staticmethod
def _eps_compare(date_1, date_2):
return eps_compare(date_1, date_2, FileEntry.EPS_TIME)
[docs]
def is_file(self):
return True
def __eq__(self, other):
same_time = self._eps_compare(self.mtime, other.mtime) == 0
return (
other
and other.__class__ == self.__class__
and other.name == self.name
and other.size == self.size
and same_time
)
def __gt__(self, other):
time_greater = self._eps_compare(self.mtime, other.mtime) > 0
return (
other
and other.__class__ == self.__class__
and other.name == self.name
and time_greater
)
[docs]
def get_sync_info(self, key=None):
"""Get mtime/size when this resource was last synchronized with remote."""
return self.target.get_sync_info(self.name, key)
[docs]
def was_modified_since_last_sync(self):
"""Return True if this resource was modified since last sync.
None is returned if we don't know (because of missing meta data).
"""
info = self.get_sync_info()
if not info:
return None
if self.size != info["s"]:
return True
if self.mtime > info["m"]:
return True
return False
# ===============================================================================
# DirectoryEntry
# ===============================================================================
[docs]
class DirectoryEntry(_Resource):
def __init__(self, target, rel_path, name, size, mtime, unique):
super().__init__(target, rel_path, name, size, mtime, unique)
# Directories don't have a size (that we could reasonably use for classification)
self.size = 0
[docs]
def is_dir(self):
return True