forked from shodanshok/blocksync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblocksync.py
executable file
·391 lines (360 loc) · 13 KB
/
blocksync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
#!/usr/bin/env python2
"""
Synchronize dev/files over the network or locally
Copyright 2006-2008 Justin Azoff <[email protected]>
Copyright 2011 Robert Coup <[email protected]>
Copyright 2018 Gionatan Danti <[email protected]>
License: GPL
Getting started:
- For network copy
* Copy blocksync.py to the home directory on the remote host
* Make sure your remote user can either sudo or is root itself.
* Make sure your local user can ssh to the remote host
* Invoke:
sudo python2 blocksync.py /dev/source user@remotehost /dev/dest
- For local copy
* Simply run ./blocksync with 'localhost' as the target host
"""
#pylint: disable=E1101
#pylint: disable=W0702,W0621,W0703
#pylint: disable=C0111,C0103,R0914,R0912,R0915
# Imports
import os
import sys
import hashlib
import subprocess
import time
from optparse import OptionParser
try:
import fadvise
FADVISE_AVAILABLE = True
except:
FADVISE_AVAILABLE = False
try:
import lzo
LZO_AVAILABLE = True
except:
LZO_AVAILABLE = False
try:
import lz4
LZ4_AVAILABLE = True
# Newer lz4 bindings (as the FreeBSD one) require importing a sub-packages
try:
import lz4.block
lz4.compress = lz4.block.compress
lz4.decompress = lz4.block.decompress
except:
pass
except:
LZ4_AVAILABLE = False
# Comparison constants
SAME = "same"
DIFF = "diff"
# Checking for availables libs. If not found, disable the corresponding option
def check_available_libs():
hostname = os.uname()[1]
if options.nocache and not FADVISE_AVAILABLE:
sys.stderr.write("Missing FADVISE library.\n\
Please run 'pip install fadvise' on "+hostname+"\n\n")
sys.exit(1)
if options.compress == "lzo" and not LZO_AVAILABLE:
sys.stderr.write("Missing LZO library.\n\
Please run 'pip install python-lzo' on "+hostname+"\n\n")
sys.exit(1)
if options.compress == "lz4" and not LZ4_AVAILABLE:
sys.stderr.write("Missing LZ4 library.\n\
Please run 'pip install python-lz4' on "+hostname+"\n\n")
sys.exit(1)
# Open file/dev
def do_open(f, mode):
# If dryrun, force open in read-only mode
if options.dryrun:
mode = 'rb'
f = open(f, mode)
f.seek(0, 2)
size = f.tell()
f.seek(0)
return f, size
def create_file(f):
if os.path.exists(dstpath):
f = open(f, 'r+b')
else:
f = open(f, 'w+b')
if not (os.path.getsize(dstpath) == options.devsize):
f.truncate(options.devsize)
f.close()
# Read, hash and put blocks on internal multiprocessing pipe
def getblocks(f):
zeroblock = '\0'*options.blocksize
while True:
block = f.read(options.blocksize)
if not block:
break
if block == zeroblock:
csum = "0"
else:
csum = hashfunc(block).hexdigest()
# fadvises
if options.nocache:
fadvise.posix_fadvise(f.fileno(),
f.tell()-options.blocksize, options.blocksize,
fadvise.POSIX_FADV_DONTNEED)
if FADVISE_AVAILABLE:
fadvise.posix_fadvise(f.fileno(), f.tell(), options.blocksize*4,
fadvise.POSIX_FADV_WILLNEED)
# return data
yield (block, csum)
# This is the server (remote, or write-enabled) component
def server(dstpath):
check_available_libs()
# Should dst be created?
if options.force:
create_file(dstpath)
# Open and read dst
try:
f, size = do_open(dstpath, 'r+b')
except Exception, e:
sys.stderr.write("ERROR: can not access destination path! %s\n" % e)
sys.exit(1)
# Begin comparison
f.seek(options.skip*options.blocksize)
print dstpath, options.blocksize
print size
print f.tell()
sys.stdout.flush()
block_id = 0
for (block, csum) in getblocks(f):
print csum
sys.stdout.flush()
in_line = sys.stdin.readline()
res, complen = in_line.split(":")
if res != SAME:
if options.compress:
block = decompfunc(sys.stdin.read(int(complen)))
else:
block = sys.stdin.read(options.blocksize)
# Do not write anything if dryrun
if not options.dryrun:
f.seek((block_id+options.skip)*options.blocksize, 0)
f.write(block)
f.flush()
block_id = block_id+1
# Local component. It print current options and send SAME/DIFF flags to server
def sync(srcpath, dsthost, dstpath):
# If dstpath is not specified, use the same name as srcpath
if not dstpath:
dstpath = srcpath
# Open srcpath readonly
try:
f, size = do_open(srcpath, 'rb')
except Exception, e:
sys.stderr.write("ERROR: can not access source path! %s\n" % e)
sys.exit(1)
# Print a session summary
print
print "Dry run : "+str(options.dryrun)
print "Local : "+str(local)
print "Block size : %0.1f KB" % (float(options.blocksize) / (1024))
print "Skipped : "+str(options.skip)
print "Hash alg : "+options.hashalg
print "Crypto alg : "+options.encalg
print "Compression : "+str(options.compress)
print "Read cache : "+str(not options.nocache)
print "SRC command : "+" ".join(sys.argv)
# Generate server command
cmd = ['python2', 'blocksync.py', 'server', dstpath, '-a', options.hashalg,
'-b', str(options.blocksize), '-k', str(options.skip)]
if options.sudo:
cmd = ['sudo'] + cmd
if not local:
cmd = ['ssh', '-c', options.encalg, dsthost] + cmd
# Extra options
if options.nocache:
cmd.append("-x")
if options.compress:
cmd.append("--compress="+options.compress)
if options.force:
cmd.append("-f")
cmd.append("--devsize")
cmd.append(str(size))
if options.dryrun:
cmd.append("-d")
# Run remote command
print "DST command : "+" ".join(cmd)
print
p = subprocess.Popen(cmd, bufsize=0,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
close_fds=True)
p_in, p_out = p.stdin, p.stdout
# Sanity checks
line = p_out.readline()
p.poll()
if p.returncode is not None:
sys.stderr.write("ERROR: connecting to or invoking blocksync on the remote host!\n\n")
sys.exit(1)
a, b = line.split()
if a != dstpath:
sys.stderr.write("ERROR: DST path (%s) doesn't match with the remote host (%s)!\n\n" %\
(dstpath, a))
sys.exit(1)
if int(b) != options.blocksize:
sys.stderr.write("ERROR: SRC block size (%d) doesn't match with the remote (%d)!\n\n" %\
(options.blocksize, int(b)))
sys.exit(1)
line = p_out.readline()
p.poll()
if p.returncode is not None:
sys.stderr.write("ERROR: can not access path on remote host!\n\n")
sys.exit(1)
remote_size = int(line)
if size != remote_size:
sys.stderr.write("ERROR: SRC path size (%d) doesn't match DST path size (%d)!\n\n" %\
(size, remote_size))
sys.exit(1)
line = p_out.readline()
p.poll()
if p.returncode is not None:
sys.stderr.write("ERROR: ???\n\n")
sys.exit(1)
remote_pos = int(line)
print "Current remote pos in bytes: %i" % (remote_pos)
f.seek(options.skip*options.blocksize)
print "Current local pos in bytes: %i\n" % (f.tell())
# Start sync
same_blocks = diff_blocks = 0
print "Synching..."
t0 = time.time()
t_last = t0
size_blocks = size / options.blocksize
if size_blocks * options.blocksize < size:
size_blocks = size_blocks+1
c_sum = hashfunc()
block_id = 0
for (l_block, l_sum) in getblocks(f):
if options.showsum:
c_sum.update(l_block)
r_sum = p_out.readline().strip()
if l_sum == r_sum:
p_in.write(SAME+":"+str(len(l_block))+"\n")
p_in.flush()
same_blocks += 1
else:
if options.compress:
l_block = compfunc(l_block)
p_in.write(DIFF+":"+str(len(l_block))+"\n")
p_in.flush()
p_in.write(l_block)
p_in.flush()
diff_blocks += 1
t1 = time.time()
if t1 - t_last > 1 or (same_blocks + diff_blocks) >= size_blocks:
rate = ((block_id + 1.0) * options.blocksize / (1024.0 * 1024.0) /
(t1 - t0))
show_stats(same_blocks, diff_blocks, size_blocks, rate)
t_last = t1
block_id = block_id+1
# Print final info
print "\n\nCompleted in %d seconds" % (time.time() - t0)
if options.showsum:
print "Source checksum: "+c_sum.hexdigest()
return same_blocks, diff_blocks
# Show stats
def show_stats(same_blocks, diff_blocks, size_blocks, rate):
sumstring = "\rsame: %d, diff: %d, %d/%d, %5.1f MB/s"
if not options.quiet or (same_blocks + diff_blocks) >= size_blocks:
print sumstring % (same_blocks, diff_blocks, same_blocks + diff_blocks,
size_blocks, rate),
# Dynamically loaded hash function
def get_hashfunc():
hashalg = options.hashalg
if hashalg == "md5":
hashfunc = hashlib.md5
elif hashalg == "sha1":
hashfunc = hashlib.sha1
elif hashalg == "sha256":
hashfunc = hashlib.sha256
else:
hashfunc = hashlib.sha512
return hashfunc
# Dynamically loaded compression function
def get_compfunc():
if options.compress == "lz4":
compfunc = lz4.compress
decompfunc = lz4.decompress
elif options.compress == "lzo":
compfunc = lzo.compress
decompfunc = lzo.decompress
else:
compfunc = None
decompfunc = None
return (compfunc, decompfunc)
# Main entry point
if __name__ == "__main__":
parser = OptionParser(
usage="%prog [options] /dev/source user@remotehost [/dev/dest]\n\
%prog [options] /dev/source localhost /dev/dest")
parser.add_option("-b", "--blocksize", dest="blocksize", action="store",
type="int", help="block size (bytes). Default: 128 KiB",
default=128 * 1024)
parser.add_option("-a", "--hashalg", dest="hashalg", action="store",
type="string", help="Hash alg (md5, sha1, sha256, sha512)\
Default: sha512", default="sha512")
parser.add_option("-e", "--encalg", dest="encalg", action="store",
type="string", help="SSH encryption alg. Default: aes128",
default="aes128-cbc")
parser.add_option("-x", "--nocache", dest="nocache", action="store_true",
help="Minimize read cache usage. Default: off. \
NOTE: it requires the fadvise extension", default=False)
parser.add_option("-c", "--showsum", dest="showsum", action="store_true",
help="Calculate and show complete source hashsum. \
Default: off", default=False)
parser.add_option("-C", "--compress", dest="compress", action="store",
help="Use lzo or lz4 compression for block transfer. \
Default: off", default=False)
parser.add_option("-s", "--sudo", dest="sudo", action="store_true",
help="Use sudo. Defaul: off", default=False)
parser.add_option("-f", "--force", dest="force", action="store_true",
help="Force transfer even if dst does not exist. \
Default: False", default=False)
parser.add_option("-d", "--dryrun", dest="dryrun", action="store_true",
help="Dry run (do not alter destination file). \
Default: False", default=False)
parser.add_option("-q", "--quiet", dest="quiet", action="store_true",
help="Quiet. Do not display progress. \
Default: False", default=False)
parser.add_option("-k", "--skip", dest="skip", action="store",
type="int", help="Skip N blocks from the beginning. \
Default: 0", default=0)
parser.add_option("--devsize", dest="devsize", action="store", type="int",
help="*INTERNAL USE ONLY* Specify dev/file size. \
Do NOT use it directly", default=False)
(options, args) = parser.parse_args()
check_available_libs()
# Basic sanity check
if len(args) < 2:
parser.print_help()
print __doc__
sys.exit(1)
# Check if right side is local or remote
local = False
if args[1] == "localhost":
local = True
if local and len(args) < 3:
parser.print_help()
print __doc__
sys.exit(1)
# Select hash function
hashfunc = get_hashfunc()
(compfunc, decompfunc) = get_compfunc()
# Detect if server side is needed
if args[0] == 'server':
dstpath = args[1]
server(dstpath)
else:
srcpath = args[0]
dsthost = args[1]
if len(args) > 2:
dstpath = args[2]
else:
dstpath = srcpath
sync(srcpath, dsthost, dstpath)