-
Notifications
You must be signed in to change notification settings - Fork 0
/
backup.py
163 lines (133 loc) · 6.5 KB
/
backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import sys
import argparse
import shutil
import tarfile
def parse_args():
parser = argparse.ArgumentParser(description='Backup Files')
parser.add_argument('-d', '--destination', required=True, help='Backup destination directory')
parser.add_argument('-s', '--source', required=True, help='Backup source file/folder')
parser.add_argument('-i', '--include', nargs='+', help='Files to transfer')
parser.add_argument('-e', '--exclude', nargs='+', help='Files to ignore')
parser.add_argument('-c', '--compress', action='store_true', default=False, help='Should compress source')
parser.add_argument('-t', '--trees', action='store_true', default=False, help='Remove deleted files in destination')
parser.add_argument('-f', '--force', action='store_true', default=False, help='Skip checking for changes')
if len(sys.argv) == 1:
parser.print_help()
sys.exit()
return parser.parse_args()
def get_last_modified(directory, include, exclude):
list_of_files = get_file_tree(directory, '', include, exclude)
latest_file = max(list_of_files, key=lambda fn: os.path.getmtime(os.path.join(directory, fn)), default=0)
return os.stat(os.path.join(directory, latest_file)).st_mtime
def is_newer(source_time, destination_time):
return source_time - destination_time > 2
def has_file_changed(source, destination):
if not os.path.exists(source):
return False
if not os.path.exists(destination):
return True
return is_newer(os.stat(source).st_mtime, os.stat(destination).st_mtime)
def transfer_file(source, destination):
try:
if os.path.isdir(source):
if not os.path.exists(destination):
os.mkdir(destination)
shutil.copystat(source, destination)
else:
shutil.copy2(source, destination)
except FileNotFoundError:
os.makedirs(os.path.dirname(destination))
transfer_file(source, destination)
def should_copy(file_name, include=None, exclude=None):
if include:
for include_item in include:
if file_name.startswith(include_item):
return True
return False
if exclude:
for exclude_item in exclude:
if file_name.startswith(exclude_item):
return False
return True
return True
def get_file_tree(directory, base, include=None, exclude=None, update_dirs=False):
for item in os.listdir(directory):
s = os.path.join(base, item)
if should_copy(s, include, exclude):
yield s
if os.path.isdir(os.path.join(directory, item)):
yield from get_file_tree(os.path.join(directory, item), s, include, exclude, update_dirs)
if update_dirs:
yield s
def print_progress(header, status, is_end=False):
w, _ = shutil.get_terminal_size((80, 20))
print('\r' + header + ' ' * (w-len(status)-len(header)) + status, end='\n' if is_end else '')
def print_backup_state(file, status, is_end=False):
print_progress('Backing up ' + '\'' + file + '\'', status, is_end)
def backup(source, destination, include=None, exclude=None, compress=False, compare_trees=False, force=False):
print_backup_state(source, 'WORKING')
if not os.path.exists(source):
print_backup_state(source, 'NOT FOUND', True)
return
if not os.path.exists(destination):
os.makedirs(destination)
if os.path.isdir(source):
if compress:
destination_compressed = os.path.join(destination, os.path.basename(source) + '.tgz')
last_modified = get_last_modified(source, include, exclude)
if force or not os.path.exists(destination_compressed) or is_newer(last_modified, os.stat(destination_compressed).st_mtime):
tar = tarfile.open(destination_compressed, 'w')
for file_item in get_file_tree(source, '', include, exclude):
tar.add(os.path.join(source, file_item), arcname=file_item, recursive=False)
tar.close()
# Windows doesn't parse time data correctly
# os.utime(destination_compressed, (last_modified, last_modified))
print_backup_state(source, 'DONE', True)
else:
print_backup_state(source, 'UP TO DATE', True)
else:
if compare_trees:
for file_item in get_file_tree(destination, '', None, None):
if not os.path.exists(os.path.join(source, file_item)) or not should_copy(file_item, include, exclude):
d = os.path.join(destination, file_item)
if os.path.isdir(d):
shutil.rmtree(d)
else:
os.remove(d)
for file_item in get_file_tree(source, '', include, exclude, True):
s = os.path.join(source, file_item)
d = os.path.join(destination, file_item)
if os.path.isdir(s):
if not os.path.exists(d):
os.makedirs(d)
shutil.copystat(s, d)
else:
if force or has_file_changed(s, d):
transfer_file(s, d)
shutil.copystat(source, destination)
print_backup_state(source, 'DONE', True)
else:
file_name, file_ext = os.path.splitext(source)
if compress and file_ext != '.tgz':
d = os.path.join(destination, os.path.basename(file_name) + '.tgz')
if force or has_file_changed(source, d):
tar = tarfile.open(d, 'w')
tar.add(source, arcname=os.path.basename(source))
tar.close()
shutil.copystat(source, d)
print_backup_state(source, 'DONE', True)
else:
print_backup_state(source, 'UP TO DATE', True)
else:
d = os.path.join(destination, os.path.basename(source))
if force or has_file_changed(source, d):
transfer_file(source, d)
print_backup_state(source, 'DONE', True)
else:
print_backup_state(source, 'UP TO DATE', True)
def main() -> None:
args = parse_args()
backup(args.source, args.destination, args.include, args.exclude, args.compress, args.trees, args.force)
if __name__ == '__main__':
main()