Source code for ftpsync.synchronizers

"""
(c) 2012-2024 Martin Wendt; see https://github.com/mar10/pyftpsync
Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""

import fnmatch
import sys
import time

from ftpsync.ftp_target import FTPTarget
from ftpsync.metadata import DirMetadata
from ftpsync.resources import DirectoryEntry, EntryPair, FileEntry, operation_map
from ftpsync.util import (
    DRY_RUN_PREFIX,
    IS_REDIRECTED,
    VT_ERASE_LINE,
    ansi_code,
    byte_compare,
    colorama,
    eps_compare,
    pretty_stamp,
    write,
    write_error,
)

CONFIG_FILE_NAME = "pyftpsync.yaml"

#: Default for --exclude CLI option
#: Note: DirMetadata.META_FILE_NAME and LOCK_FILE_NAME are always ignored
DEFAULT_OMIT = [".DS_Store", ".git", ".hg", ".svn", "#recycle"]
ALWAYS_OMIT = (CONFIG_FILE_NAME, DirMetadata.META_FILE_NAME, DirMetadata.LOCK_FILE_NAME)

# ===============================================================================
# Helpers
# ===============================================================================

_ts = pretty_stamp


[docs] def process_options(opts): """Check and prepare options dict.""" # Convert match and exclude args into pattern lists match = opts.get("match") if match and isinstance(match, str): opts["match"] = [pat.strip() for pat in match.split(",")] elif match: assert isinstance(match, list), match else: opts["match"] = [] exclude = opts.get("exclude") if exclude and isinstance(exclude, str): opts["exclude"] = [pat.strip() for pat in exclude.split(",")] elif exclude: assert isinstance(exclude, list), exclude else: # opts["exclude"] = DEFAULT_OMIT opts["exclude"] = []
# print(match, exclude, opts)
[docs] def match_path(entry, opts): """Return True if `path` matches `match` and `exclude` options.""" if entry.name in ALWAYS_OMIT: return False # TODO: currently we use fnmatch syntax and match against names. # We also might allow glob syntax and match against the whole relative path instead # path = entry.get_rel_path() path = entry.name ok = True match = opts.get("match") exclude = opts.get("exclude") if entry.is_file() and match: assert isinstance(match, list), match ok = False for pat in match: if fnmatch.fnmatch(path, pat): ok = True break if ok and exclude: assert isinstance(exclude, list), exclude for pat in exclude: if fnmatch.fnmatch(path, pat): ok = False break # write("match", ok, entry) return ok
# =============================================================================== # BaseSynchronizer # ===============================================================================
[docs] class BaseSynchronizer: """Synchronizes two target instances in dry_run mode (also base class for other synchronizers).""" _resolve_shortcuts = {"l": "local", "r": "remote", "s": "skip"} def __init__(self, local, remote, options): self.local = local self.remote = remote # TODO: check for self-including paths self.options = options or {} process_options(self.options) self.verbose = self.options.get("verbose", 3) self.dry_run = self.options.get("dry_run", False) self.ignore_copy_errors = True #: bool: True if this synchronizer is used by a command line script (e.g. pyftpsync.exe) self.is_script = None #: str: Conflict resolution strategy self.resolve_all = None self._stats = { "bytes_written": 0, "conflict_files": 0, "conflict_files_skipped": 0, "copy_errors": 0, "dirs_created": 0, "dirs_deleted": 0, "errors": 0, "download_bytes_written": 0, "download_files_written": 0, "elap_secs": None, "elap_str": None, "entries_seen": 0, "entries_touched": 0, "files_created": 0, "files_deleted": 0, "files_written": 0, "interactive_ask": 0, "local_dirs": 0, "local_files": 0, "meta_bytes_read": 0, "meta_bytes_written": 0, "remote_dirs": 0, "remote_files": 0, "result_code": None, "upload_bytes_written": 0, "upload_files_written": 0, } def __del__(self): self.close()
[docs] def get_info_strings(self): raise NotImplementedError
[docs] def close(self): if self.local.connected: self.local.close() if self.remote.connected: self.remote.close()
[docs] def get_stats(self): return self._stats
[docs] def error_count(self) -> int: return self._stats["errors"]
[docs] def problem_count(self) -> int: n = self._stats["conflict_files_skipped"] + self._stats["copy_errors"] return n
[docs] def _inc_stat(self, name, ofs=1): self._stats[name] = self._stats.get(name, 0) + ofs
[docs] def _match(self, entry): return match_path(entry, self.options)
[docs] def run(self): start = time.time() info_strings = self.get_info_strings() if self.verbose >= 3: write( "{} {}\n{:>20} {}".format( info_strings[0].capitalize(), self.local.get_base_name(), info_strings[1], self.remote.get_base_name(), ) ) write( "Encoding local: {}, remote: {}".format( self.local.encoding, self.remote.encoding ) ) try: self.local.synchronizer = self.remote.synchronizer = self self.local.peer = self.remote self.remote.peer = self.local if self.dry_run: self.local.readonly = True self.local.dry_run = True self.remote.readonly = True self.remote.dry_run = True if not self.local.connected: self.local.open() if not self.remote.connected: self.remote.open() res = self._sync_dir() finally: self.local.synchronizer = self.remote.synchronizer = None self.local.peer = self.remote.peer = None self.close() stats = self._stats stats["elap_secs"] = time.time() - start stats["elap_str"] = "{:0.2f} sec".format(stats["elap_secs"]) def _add(rate, size, time): if stats.get(time) and stats.get(size): stats[rate] = f"{0.001 * stats[size] / stats[time]:0.2f} kB/sec" _add("upload_rate_str", "upload_bytes_written", "upload_write_time") _add("download_rate_str", "download_bytes_written", "download_write_time") return res
[docs] def _compare_file(self, local, remote): """Byte compare two files (early out on first difference).""" assert isinstance(local, FileEntry) and isinstance(remote, FileEntry) if not local or not remote: write(f" Files cannot be compared ({local} != {remote}).") return False elif local.size != remote.size: write( " Files are different (size {:,d} != {:,d}).".format( local.size, remote.size ) ) return False with local.target.open_readable( local.name ) as fp_src, remote.target.open_readable(remote.name) as fp_dest: res, ofs = byte_compare(fp_src, fp_dest) if not res: write(f" Files are different at offset {ofs:,d}.") else: write(" Files are equal.") return res
[docs] def _copy_file(self, src, dest, file_entry): # TODO: safe replace: # 1. remove temp file # 2. copy to target.temp # 3. use loggingFile for feedback # 4. rename target.temp # write("_copy_file(%s, %s --> %s)" % (file_entry, src, dest)) assert isinstance(file_entry, FileEntry) self._inc_stat("files_written") self._inc_stat("entries_touched") is_upload = dest is self.remote if is_upload: self._inc_stat("upload_files_written") else: self._inc_stat("download_files_written") self._tick() if self.dry_run: return self._dry_run_action(f"copy file ({file_entry}, {src} --> {dest})") elif dest.readonly: raise RuntimeError(f"target is read-only: {dest}") start = time.time() def _show_error(msg, exc): write_error( f"{ansi_code('Fore.LIGHTRED_EX')}{msg}: {exc} {ansi_code('Style.RESET_ALL')}" ) def __block_written(data): # write("__block_written() {} bytes".format(len(data))) self._inc_stat("bytes_written", len(data)) if is_upload: self._inc_stat("upload_bytes_written", len(data)) else: self._inc_stat("download_bytes_written", len(data)) if isinstance(src, FTPTarget) and not isinstance(dest, FTPTarget): # Copy FTP to File: # FTPTarget.open_readable() would read everything into a temporary buffer # before we can start writing. # It is more efficient to let FTPTarget write in the retrbinary() callbacks. # (Note that copying FTP to FTP would require a temp buffer anyway, # so we handle this in the default branch below.) try: writer = dest.open_writable(file_entry.name) except Exception as e: self._inc_stat("errors") self._inc_stat("copy_errors") if self.ignore_copy_errors: _show_error(f"Could not copy {file_entry.name}", e) return raise with writer as fp_dest: src.copy_to_file(file_entry.name, fp_dest, callback=__block_written) else: try: reader = src.open_readable(file_entry.name) except Exception as e: self._inc_stat("errors") self._inc_stat("copy_errors") if self.ignore_copy_errors: _show_error(f"Could not copy {file_entry.name}", e) return raise with reader as fp_src: dest.write_file(file_entry.name, fp_src, callback=__block_written) dest.set_mtime(file_entry.name, file_entry.mtime, file_entry.size) dest.set_sync_info(file_entry.name, file_entry.mtime, file_entry.size) elap = time.time() - start self._inc_stat("write_time", elap) if is_upload: self._inc_stat("upload_write_time", elap) else: self._inc_stat("download_write_time", elap) return
[docs] def _copy_recursive(self, src, dest, dir_entry): # write("_copy_recursive(%s, %s --> %s)" % (dir_entry, src, dest)) assert isinstance(dir_entry, DirectoryEntry) self._inc_stat("entries_touched") self._inc_stat("dirs_created") self._tick() if self.dry_run: return self._dry_run_action( f"copy directory ({dir_entry}, {src} --> {dest})" ) elif dest.readonly: raise RuntimeError(f"target is read-only: {dest}") dest.set_sync_info(dir_entry.name, None, None) src.push_meta() dest.push_meta() src.cwd(dir_entry.name) dest.mkdir(dir_entry.name) dest.cwd(dir_entry.name) dest.cur_dir_meta = DirMetadata(dest) for entry in src.get_dir(): # the outer call was already accompanied by an increment, but not recursions self._inc_stat("entries_seen") if entry.is_dir(): self._copy_recursive(src, dest, entry) else: self._copy_file(src, dest, entry) src.flush_meta() dest.flush_meta() src.cwd("..") dest.cwd("..") src.pop_meta() dest.pop_meta() return
[docs] def _remove_file(self, file_entry): # TODO: honor backup # write("_remove_file(%s)" % (file_entry, )) assert isinstance(file_entry, FileEntry) self._inc_stat("entries_touched") self._inc_stat("files_deleted") if self.dry_run: return self._dry_run_action(f"delete file ({file_entry})") elif file_entry.target.readonly: raise RuntimeError(f"target is read-only: {file_entry.target}") file_entry.target.remove_file(file_entry.name) file_entry.target.remove_sync_info(file_entry.name)
[docs] def _remove_dir(self, dir_entry): # TODO: honor backup assert isinstance(dir_entry, DirectoryEntry) self._inc_stat("entries_touched") self._inc_stat("dirs_deleted") if self.dry_run: return self._dry_run_action(f"delete directory ({dir_entry})") elif dir_entry.target.readonly: raise RuntimeError(f"target is read-only: {dir_entry.target}") dir_entry.target.rmdir(dir_entry.name) dir_entry.target.remove_sync_info(dir_entry.name)
[docs] def _log_action(self, action, status, symbol, entry, min_level=3): if self.verbose < min_level: return if len(symbol) > 1 and symbol[0] in (">", "<"): symbol = ( " " + symbol ) # make sure direction characters are aligned at 2nd column color = "" final = "" if not self.options.get("no_color"): # CM = self.COLOR_MAP # color = CM.get((action, status), # CM.get(("*", status), # CM.get((action, "*"), # ""))) if action in ("copy", "restore"): if "<" in symbol: if status == "new": color = ansi_code("Fore.GREEN") + ansi_code("Style.BRIGHT") else: color = ansi_code("Fore.GREEN") else: if status == "new": color = ansi_code("Fore.CYAN") + ansi_code("Style.BRIGHT") else: color = ansi_code("Fore.CYAN") elif action == "delete": color = ansi_code("Fore.RED") elif status == "conflict": color = ansi_code("Fore.LIGHTRED_EX") elif action == "skip" or status == "equal" or status == "visit": color = ansi_code("Fore.LIGHTBLACK_EX") final = ansi_code("Style.RESET_ALL") if colorama: # Clear line"ESC [ mode K" mode 0:to-right, 2:all final += colorama.ansi.clear_line(0) else: final += " " * 10 prefix = "" if self.dry_run: prefix = DRY_RUN_PREFIX if action and status: tag = (f"{action} {status}").upper() else: assert status tag = (f"{status}").upper() name = entry.get_rel_path() if entry.is_dir(): name = f"[{name}]" write(f"{prefix}{color}{tag:<16} {symbol:^3} {name}{final}")
[docs] def _tick(self): """Write progress info and move cursor to beginning of line.""" if (self.verbose >= 3 and not IS_REDIRECTED) or self.options.get("progress"): stats = self.get_stats() prefix = DRY_RUN_PREFIX if self.dry_run else "" sys.stdout.write( "{}Touched {}/{} entries in {} directories...\r".format( prefix, stats["entries_touched"], stats["entries_seen"], stats["local_dirs"], ) ) sys.stdout.flush() return
[docs] def _dry_run_action(self, action): """Called in dry-run mode after call to _log_action() and before exiting function.""" # write("dry-run", action) return
[docs] def _test_match_or_print(self, entry): """Return True if entry matches filter. Otherwise print 'skip' and return False.""" if not self._match(entry): self._log_action("skip", "unmatched", "-", entry, min_level=4) return False return True
[docs] def _before_sync(self, entry): """Called by the synchronizer for each entry. Return False to prevent the synchronizer's default action. """ self._inc_stat("entries_seen") self._tick() return True
[docs] def _sync_dir(self): """Traverse the local folder structure and remote peers. This is the core algorithm that generates calls to self.sync_XXX() handler methods. _sync_dir() is called by self.run(). """ # --case may be 'local', 'remote', 'strict', or None case_mode = self.options.get("case") # Convert into a dict {name: FileEntry, ...} local_entries = self.local.get_dir() if case_mode == "strict": local_entry_map = {e.name: e for e in local_entries} # local_entry_map = dict(map(lambda e: (e.name, e), local_entries)) else: local_entry_map = {e.name.lower(): e for e in local_entries} # local_entry_map = dict(map(lambda e: (e.name.lower(), e), local_entries)) if len(local_entry_map) != len(local_entries): raise RuntimeError( "Local target contains file names that only differ in case: " "pass `--case strict`" ) # Convert into a dict {name: FileEntry, ...} remote_entries = self.remote.get_dir() if case_mode == "strict": remote_entry_map = {e.name: e for e in remote_entries} # remote_entry_map = dict(map(lambda e: (e.name, e), remote_entries)) else: remote_entry_map = {e.name.lower(): e for e in remote_entries} # remote_entry_map = dict(map(lambda e: (e.name.lower(), e), remote_entries)) if len(remote_entry_map) != len(remote_entries): raise RuntimeError( "Remote target contains file names that only differ in case: " "pass `--case strict`" ) # print(sorted([(k, v.name) for k, v in local_entry_map.items()])) # print(sorted([(k, v.name) for k, v in remote_entry_map.items()])) entry_pair_list = [] # 1. Loop over all local files and classify the relationship to the # peer entries. for local_entry in local_entries: if isinstance(local_entry, DirectoryEntry): self._inc_stat("local_dirs") else: self._inc_stat("local_files") if not self._before_sync(local_entry): # TODO: currently, if a file is skipped, it will not be # considered for deletion on the peer target continue # Unless `--case strict` was set, we lookup by lowercase name. # If file names differ, we adjust one side: if case_mode == "strict": remote_entry = remote_entry_map.get(local_entry.name) else: remote_entry = remote_entry_map.get(local_entry.name.lower()) if remote_entry and remote_entry.name != local_entry.name: if case_mode == "local": remote_entry.name = local_entry.name elif case_mode == "remote": local_entry.name = remote_entry.name else: raise RuntimeError( "Found ambigiuos name ({} != {}): " "`--case` argument is required.".format( local_entry, remote_entry ) ) entry_pair = EntryPair(local_entry, remote_entry) entry_pair_list.append(entry_pair) # TODO: renaming could be triggered, if we find an existing # entry.unique with a different entry.name # 2. Collect all remote entries that do NOT exist on the local target. for remote_entry in remote_entries: if isinstance(remote_entry, DirectoryEntry): self._inc_stat("remote_dirs") else: self._inc_stat("remote_files") if not self._before_sync(remote_entry): continue if case_mode == "strict": if remote_entry.name not in local_entry_map: entry_pair = EntryPair(None, remote_entry) entry_pair_list.append(entry_pair) else: if remote_entry.name.lower() not in local_entry_map: entry_pair = EntryPair(None, remote_entry) entry_pair_list.append(entry_pair) # 3. Classify all entries and pairs. # We pass the additional meta data here peer_dir_meta = self.local.cur_dir_meta.peer_sync.get(self.remote.get_id()) for pair in entry_pair_list: pair.classify(peer_dir_meta) # 4. Perform (or schedule) resulting file operations for pair in entry_pair_list: # print(pair) # Let synchronizer modify the default operation (e.g. apply `--force` option) hook_result = self.re_classify_pair(pair) # Let synchronizer implement special handling of unmatched entries # (e.g. `--delete_unmatched`) if not self._match(pair.any_entry): self.on_mismatch(pair) # ... do not call operation handler... elif hook_result is not False: handler = getattr(self, "on_" + pair.operation, None) # print(handler) if handler: try: handler(pair) except Exception as e: if self.on_error(e, pair) is not True: raise else: # write("NO HANDLER") raise NotImplementedError(f"No handler for {pair}") if pair.is_conflict(): self._inc_stat("conflict_files") # 5. Let the target provider write its meta data for the files in the # current directory. self.local.flush_meta() self.remote.flush_meta() # 6. Finally visit all local sub-directories recursively that also # exist on the remote target. for local_dir in local_entries: # write("local_dir(%s, %s)" % (local_dir, local_dir)) if not local_dir.is_dir(): continue elif not self._before_sync(local_dir): continue remote_dir = remote_entry_map.get(local_dir.name) if remote_dir: if local_dir.was_deleted or remote_dir.was_deleted: pass # self.on_mismatch() removed an entry else: self.local.cwd(local_dir.name) self.remote.cwd(local_dir.name) self._sync_dir() self.local.cwd("..") self.remote.cwd("..") return True
[docs] def re_classify_pair(self, pair): """Allow derrived classes to override default classification and operation. Returns: False to prevent default operation. """ return True
[docs] def on_error(self, exc, pair): """Called for pairs that don't match `match` and `exclude` filters.""" # any_entry = pair.any_entry msg = "{red}ERROR: {exc}\n {pair}{reset}".format( exc=exc, pair=pair, red=ansi_code("Fore.LIGHTRED_EX"), reset=ansi_code("Style.RESET_ALL"), ) write(msg) # Return True to ignore this error (instead of raising and terminating the app) # if "[Errno 92] Illegal byte sequence" in "{}".format(e) and compat.PY2: # write(RED + "This _may_ be solved by using Python 3." + R) # # return True return False
[docs] def on_mismatch(self, pair): """Called for pairs that don't match `match` and `exclude` filters. A synchronizer may decide to implement `--delete-unmatched` and set `pair.entry.was_deleted` accordingly. """ self._log_action("skip", "mismatch", "?", pair.any_entry, min_level=4)
[docs] def on_equal(self, pair): """Called for (unmodified, unmodified) pairs.""" self._log_action("", "equal", "=", pair.local, min_level=4)
[docs] def on_copy_local(self, pair): """Called when the local resource should be copied to remote.""" status = pair.remote_classification self._log_action("copy", status, ">", pair.local)
[docs] def on_copy_remote(self, pair): """Called when the remote resource should be copied to local.""" status = pair.local_classification self._log_action("copy", status, "<", pair.remote)
[docs] def on_delete_local(self, pair): """Called when the local resource should be deleted.""" self._log_action("", "modified", "X< ", pair.local)
[docs] def on_delete_remote(self, pair): """Called when the remote resource should be deleted.""" self._log_action("", "modified", " >X", pair.remote)
[docs] def on_need_compare(self, pair): """Re-classify pair based on file attributes and options.""" self._log_action("", "different", "?", pair.local, min_level=2)
[docs] def on_conflict(self, pair): """Called when resources have been modified on local *and* remote. Returns: False to prevent visiting of children (if pair is a directory) """ self._log_action("skip", "conflict", "!", pair.local, min_level=2)
# =============================================================================== # BiDirSynchronizer # ===============================================================================
[docs] class BiDirSynchronizer(BaseSynchronizer): """Synchronizer that performs up- and download operations as required. - Newer files override unmodified older files - When both files are newer than last sync -> conflict! Conflicts may be resolved by these options:: --resolve=old: use the older file --resolve=new: use the newer file --resolve=local: use the local file --resolve=remote: use the remote file --resolve=ask: prompt user for decision - When a file is missing: check if it existed in the past. If so, delete it. Otherwise copy it. In order to know if a file was modified, deleted, or created since last sync, we store a snapshot of the directory in the local directory. """ def __init__(self, local, remote, options): super().__init__(local, remote, options)
[docs] def get_info_strings(self): return ("synchronize", "with")
[docs] def _print_pair_diff(self, pair): any_entry = pair.any_entry has_meta = any_entry.get_sync_info("m") is not None # write("pair", pair) # print("pair.local", pair.local) # print("pair.remote", pair.remote) write( ( VT_ERASE_LINE + ansi_code("Fore.LIGHTRED_EX") + "CONFLICT: {!r} was modified on both targets since last sync ({})." + ansi_code("Style.RESET_ALL") ).format(any_entry.get_rel_path(), _ts(any_entry.get_sync_info("u"))) ) if has_meta: write( " Original modification time: {}, size: {:,d} bytes.".format( _ts(any_entry.get_sync_info("m")), any_entry.get_sync_info("s") ) ) else: write(" (No meta data available.)") write(" Local: {}".format(pair.local.as_string() if pair.local else "n.a.")) write( " Remote: {}".format( pair.remote.as_string(pair.local) if pair.remote else "n.a." ) )
[docs] def _interactive_resolve(self, pair): """Return 'local', 'remote', or 'skip' to use local, remote resource or skip.""" if self.resolve_all: # A resolution strategy was selected using Shift+MODE resolve = self.resolve_all else: # A resolution strategy was configured resolve = self.options.get("resolve", "skip") if resolve in ("new", "old") and pair.is_same_time(): # We cannot apply this resolution: force an alternative print(f"Cannot resolve using '{resolve}' strategy: {pair}") resolve = "ask" if self.is_script else "skip" if resolve == "ask" or self.verbose >= 5: self._print_pair_diff(pair) if resolve in ("local", "remote", "old", "new", "skip"): # self.resolve_all = resolve return resolve self._inc_stat("interactive_ask") prompt = ( "Use {m}L{r}ocal, {m}R{r}emote, {m}O{r}lder, {m}N{r}ewer, " + "{m}S{r}kip, {m}B{r}inary compare, {m}H{r}elp ? " ).format( m=ansi_code("Style.BRIGHT") + ansi_code("Style.UNDERLINE"), r=ansi_code("Style.RESET_ALL"), ) while True: r = input(prompt).strip() if r in ("h", "H", "?"): print("The following keys are supported:") print(" 'b': Binary compare") print(" 'n': Use newer file") print(" 'o': Use older file") print(" 'l': Use local file") print(" 'r': Use remote file") print(" 's': Skip this file (leave both targets unchanged)") print( "Hold Shift (upper case letters) to apply choice for all " "remaining conflicts." ) print("Hit Ctrl+C to abort.") self._print_pair_diff(pair) continue elif r in ("b", "B"): # TODO: we could (offer to) set both mtimes to the same value # if files are identical self._compare_file(pair.local, pair.remote) # self._print_pair_diff(pair) continue elif r in ("o", "O", "n", "N") and pair.is_same_time(): # Ignore 'old' or 'new' selection if times are the same print("Files have identical modification times.") continue elif r in ("L", "R", "O", "N", "S"): r = self._resolve_shortcuts[r.lower()] self.resolve_all = r break elif r in ("l", "r", "o", "n", "s"): r = self._resolve_shortcuts[r] break return r
[docs] def run(self): # Don't override setting by derived up/downloader res = super().run() return res
[docs] def on_mismatch(self, pair): """Called for pairs that don't match `match` and `exclude` filters.""" self._log_action("skip", "mismatch", "?", pair.any_entry, min_level=4)
[docs] def on_equal(self, pair): self._log_action("", "equal", "=", pair.local, min_level=4)
[docs] def on_copy_local(self, pair): local_entry = pair.local if self._test_match_or_print(local_entry): self._log_action("copy", pair.local_classification, ">", local_entry) if pair.is_dir: self._copy_recursive(self.local, self.remote, local_entry) else: self._copy_file(self.local, self.remote, local_entry)
[docs] def on_copy_remote(self, pair): remote_entry = pair.remote if self._test_match_or_print(remote_entry): self._log_action("copy", pair.remote_classification, "<", remote_entry) if pair.is_dir: self._copy_recursive(self.remote, self.local, remote_entry) else: self._copy_file(self.remote, self.local, remote_entry)
[docs] def on_delete_local(self, pair): self._log_action("delete", "missing", "X< ", pair.local) # self._log_action("delete", pair.local_classification, "X< ", pair.local) if pair.is_dir: self._remove_dir(pair.local) else: self._remove_file(pair.local)
[docs] def on_delete_remote(self, pair): self._log_action("delete", "missing", " >X", pair.remote) # self._log_action("delete", pair.remote_classification, " >X", pair.remote) if pair.is_dir: self._remove_dir(pair.remote) else: self._remove_file(pair.remote)
[docs] def on_need_compare(self, pair): """Re-classify pair based on file attributes and options.""" # print("on_need_compare", pair) # If no metadata is available, we could only classify file entries as # 'existing'. # Now we use peer information to improve this classification. c_pair = (pair.local_classification, pair.remote_classification) org_pair = c_pair org_operation = pair.operation # print("need_compare", pair) if pair.is_dir: # For directores, we cannot compare existing peer entries. # Instead, we simply log (and traverse the children later). pair.local_classification = pair.remote_classification = "existing" pair.operation = "equal" self._log_action("", "visit", "?", pair.local, min_level=4) # self._log_action("", "equal", "=", pair.local, min_level=4) return elif c_pair == ("existing", "existing"): # Naive classification derived from file time and size time_cmp = eps_compare( pair.local.mtime, pair.remote.mtime, FileEntry.EPS_TIME ) if time_cmp < 0: c_pair = ("unmodified", "modified") # remote is newer elif time_cmp > 0: c_pair = ("modified", "unmodified") # local is newer elif pair.local.size == pair.remote.size: c_pair = ("unmodified", "unmodified") # equal else: c_pair = ("modified", "modified") # conflict! elif c_pair == ("new", "new"): # Naive classification derived from file time and size time_cmp = eps_compare( pair.local.mtime, pair.remote.mtime, FileEntry.EPS_TIME ) if time_cmp == 0 and pair.local.size == pair.remote.size: c_pair = ("unmodified", "unmodified") # equal else: c_pair = ("modified", "modified") # conflict! # elif c_pair == ("unmodified", "unmodified"): pair.local_classification = c_pair[0] pair.remote_classification = c_pair[1] pair.operation = operation_map.get(c_pair) # print("on_need_compare {} => {}".format(org_pair, pair)) if not pair.operation: raise RuntimeError(f"Undefined operation for pair classification {c_pair}") elif pair.operation == org_operation: raise RuntimeError(f"Could not re-classify {org_pair}") handler = getattr(self, "on_" + pair.operation, None) res = handler(pair) # self._log_action("", "different", "?", pair.local, min_level=2) return res
[docs] def on_conflict(self, pair): """Return False to prevent visiting of children.""" # self._log_action("skip", "conflict", "!", pair.local, min_level=2) # print("on_conflict", pair) any_entry = pair.any_entry if not self._test_match_or_print(any_entry): return resolve = self._interactive_resolve(pair) if resolve == "skip": self._log_action("skip", "conflict", "*?*", any_entry) self._inc_stat("conflict_files_skipped") return if pair.local and pair.remote: assert pair.local.is_file() is_newer = pair.local > pair.remote if ( resolve == "local" or (is_newer and resolve == "new") or (not is_newer and resolve == "old") ): self._log_action("copy", "conflict", "*>*", pair.local) self._copy_file(self.local, self.remote, pair.local) elif ( resolve == "remote" or (is_newer and resolve == "old") or (not is_newer and resolve == "new") ): self._log_action("copy", "conflict", "*<*", pair.local) self._copy_file(self.remote, self.local, pair.remote) else: raise NotImplementedError elif pair.local: assert pair.local.is_file() if resolve == "local": self._log_action("restore", "conflict", "*>x", pair.local) self._copy_file(self.local, self.remote, pair.local) elif resolve == "remote": self._log_action("delete", "conflict", "*<x", pair.local) self._remove_file(pair.local) else: raise NotImplementedError else: assert pair.remote.is_file() if resolve == "local": self._log_action("delete", "conflict", "x>*", pair.remote) self._remove_file(pair.remote) elif resolve == "remote": self._log_action("restore", "conflict", "x<*", pair.remote) self._copy_file(self.remote, self.local, pair.remote) else: raise NotImplementedError return
# =============================================================================== # UploadSynchronizer # ===============================================================================
[docs] class UploadSynchronizer(BiDirSynchronizer): def __init__(self, local, remote, options): super().__init__(local, remote, options) # local.readonly = True
[docs] def get_info_strings(self): return ("upload", "to")
[docs] def re_classify_pair(self, pair): force = self.options.get("force") # delete = self.options.get("delete") is_file = not pair.is_dir classification = (pair.local_classification, pair.remote_classification) # Upload never modifies `local`, so we suggest to delete missing files # on `remote` instead. However `delete_remote` will also check for a # `--delete` flag if classification == ("missing", "new"): assert pair.operation == "copy_remote" pair.override_operation("delete_remote", "restore") elif classification == ("missing", "existing"): assert pair.operation == "copy_remote" pair.override_operation("delete_remote", "restore") if force: if is_file and classification == ("new", "new"): pair.override_operation("copy_local", "force") elif is_file and classification == ("modified", "modified"): pair.override_operation("copy_local", "force") elif is_file and classification == ("unmodified", "modified"): pair.override_operation("copy_local", "restore") elif is_file and classification == ("existing", "existing"): pair.override_operation("copy_local", "force") elif classification == ("unmodified", "deleted"): pair.override_operation("copy_local", "restore") return True
[docs] def _interactive_resolve(self, pair): """Return 'local', 'remote', or 'skip' to use local, remote resource or skip.""" resolve = self.options.get("resolve", "skip") assert resolve in ("local", "ask", "skip") if self.resolve_all: if self.verbose >= 5: self._print_pair_diff(pair) return self.resolve_all if resolve == "ask" or self.verbose >= 5: self._print_pair_diff(pair) if resolve in ("local", "skip"): # self.resolve_all = resolve return resolve self._inc_stat("interactive_ask") prompt = ( "Use {m}L{r}ocal, {m}S{r}kip, {m}B{r}inary compare, {m}H{r}elp ? " ).format( m=ansi_code("Style.BRIGHT") + ansi_code("Style.UNDERLINE"), r=ansi_code("Style.RESET_ALL"), ) while True: r = input(prompt).strip() if r in ("h", "H", "?"): print("The following keys are supported:") print(" 'b': Binary compare") print(" 'l': Upload local file") print(" 's': Skip this file (leave both targets unchanged)") print( "Hold Shift (upper case letters) to apply choice for all " "remaining conflicts." ) print("Hit Ctrl+C to abort.") continue elif r in ("B", "b"): self._compare_file(pair.local, pair.remote) continue elif r in ("L", "S"): r = self._resolve_shortcuts[r.lower()] self.resolve_all = r break elif r in ("l", "s"): r = self._resolve_shortcuts[r] break return r
[docs] def run(self): self.local.readonly = True self.remote.readonly = False res = super().run() return res
[docs] def on_mismatch(self, pair): """Called for pairs that don't match `match` and `exclude` filters. If --delete-unmatched is on, remove the remote resource. """ remote_entry = pair.remote if self.options.get("delete_unmatched") and remote_entry: self._log_action("delete", "unmatched", ">", remote_entry) if remote_entry.is_dir(): self._remove_dir(remote_entry) else: self._remove_file(remote_entry) remote_entry.was_deleted = True # add a hint for the synchronizer else: self._log_action("skip", "unmatched", "-", pair.any_entry, min_level=4) return
# def on_equal(self, pair): # self._log_action("", "equal", "=", pair.local, min_level=4)
[docs] def on_copy_remote(self, pair): # Uploads does not modify local target # status = pair.local.classification if pair.local else "missing" self._log_action("skip", "download", "<", pair.remote)
[docs] def on_delete_local(self, pair): # Uploads does not modify local target self._log_action("skip", "local del.", "X< ", pair.local)
[docs] def on_delete_remote(self, pair): # Upload does not delete unless --delete was given if not self.options.get("delete"): self._log_action("skip", "remote del.", " >X", pair.remote) return return super().on_delete_remote(pair)
# def on_need_compare(self, pair): # self._log_action("", "different", "?", pair.local, min_level=2) # def on_conflict(self, pair): # """Return False to prevent visiting of children""" # self._log_action("skip", "conflict", "!", pair.local, min_level=2) # =============================================================================== # DownloadSynchronizer # ===============================================================================
[docs] class DownloadSynchronizer(BiDirSynchronizer): """""" def __init__(self, local, remote, options): super().__init__(local, remote, options) # remote.readonly = True
[docs] def get_info_strings(self): return ("download to", "from")
[docs] def re_classify_pair(self, pair): force = self.options.get("force") # delete = self.options.get("delete") is_file = not pair.is_dir classification = (pair.local_classification, pair.remote_classification) # Download never modifies `remote`, so we suggest to delete missing files # on `local` instead. However `delete_local` will also check for a # `--delete` flag if classification == ("new", "missing"): assert pair.operation == "copy_local" pair.override_operation("delete_local", "restore") elif classification == ("existing", "missing"): assert pair.operation == "copy_local" pair.override_operation("delete_local", "restore") if force: if is_file and classification == ("new", "new"): pair.override_operation("copy_remote", "forced") elif is_file and classification == ("modified", "unmodified"): pair.override_operation("copy_remote", "restore") elif is_file and classification == ("modified", "modified"): pair.override_operation("copy_remote", "force") elif is_file and classification == ("existing", "existing"): pair.override_operation("copy_remote", "force") elif classification == ("deleted", "unmodified"): pair.override_operation("copy_remote", "restore") return True
[docs] def _interactive_resolve(self, pair): """Return 'local', 'remote', or 'skip' to use local, remote resource or skip.""" if self.resolve_all: if self.verbose >= 5: self._print_pair_diff(pair) return self.resolve_all resolve = self.options.get("resolve", "skip") assert resolve in ("remote", "ask", "skip") if resolve == "ask" or self.verbose >= 5: self._print_pair_diff(pair) if resolve in ("remote", "skip"): # self.resolve_all = resolve return resolve # self._print_pair_diff(pair) self._inc_stat("interactive_ask") prompt = ( "Use {m}R{r}emote, {m}S{r}kip, {m}B{r}inary compare, {m}H{r}elp ? " ).format( m=ansi_code("Style.BRIGHT") + ansi_code("Style.UNDERLINE"), r=ansi_code("Style.RESET_ALL"), ) while True: r = input(prompt).strip() if r in ("h", "H", "?"): print("The following keys are supported:") print(" 'b': Binary compare") print(" 'r': Download remote file") print(" 's': Skip this file (leave both targets unchanged)") print( "Hold Shift (upper case letters) to apply choice for all " "remaining conflicts." ) print("Hit Ctrl+C to abort.") continue elif r in ("B", "b"): self._compare_file(pair.local, pair.remote) continue elif r in ("R", "S"): r = self._resolve_shortcuts[r.lower()] self.resolve_all = r break elif r in ("r", "s"): r = self._resolve_shortcuts[r] break return r
[docs] def run(self): self.local.readonly = False self.remote.readonly = True res = super().run() return res
[docs] def on_mismatch(self, pair): """Called for pairs that don't match `match` and `exclude` filters. If --delete-unmatched is on, remove the local resource. """ local_entry = pair.local if self.options.get("delete_unmatched") and local_entry: self._log_action("delete", "unmatched", "<", local_entry) if local_entry.is_dir(): self._remove_dir(local_entry) else: self._remove_file(local_entry) local_entry.was_deleted = True # add a hint for the synchronizer else: self._log_action("skip", "unmatched", "-", pair.any_entry, min_level=4) return
# def on_equal(self, pair): # self._log_action("", "equal", "=", pair.local, min_level=4) # # self._check_del_unmatched(local_file)
[docs] def on_copy_local(self, pair): # Download does not modify remote target # status = pair.remote.classification if pair.remote else "missing" self._log_action("skip", "upload", ">", pair.local)
[docs] def on_delete_local(self, pair): # Download does not delete unless --delete was given if not self.options.get("delete"): self._log_action("skip", "local del.", "X< ", pair.local) return return super().on_delete_local(pair)
[docs] def on_delete_remote(self, pair): # Download does not modify remote target self._log_action("skip", "remote del.", " >X", pair.remote)
# def on_need_compare(self, pair): # self._log_action("", "different", "?", pair.local, min_level=2) # # def on_conflict(self, pair): # """Return False to prevent visiting of children""" # self._log_action("skip", "conflict", "!", pair.local, min_level=2)