forked from commandprompt/PITRTools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcmd_archiver
executable file
·362 lines (326 loc) · 14.9 KB
/
cmd_archiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
#!/usr/bin/env python
""" LICENSE
Copyright Command Prompt, Inc.
Permission to use, copy, modify, and distribute this software and its
documentation for any purpose, without fee, and without a written agreement
is hereby granted, provided that the above copyright notice and this
paragraph and the following two paragraphs appear in all copies.
IN NO EVENT SHALL THE COMMAND PROMPT, INC. BE LIABLE TO ANY PARTY FOR
DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
EVEN IF THE COMMAND PROMPT, INC. HAS BEEN ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
THE COMMAND PROMPT, INC. SPECIFICALLY DISCLAIMS ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN
"AS IS" BASIS, AND THE COMMAND PROMPT, INC. HAS NO OBLIGATIONS TO
PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""
# $Id$
import os
import re
import sys
from cmd_worker import CMDWorker
argslist = (('-F', '--file', dict(dest="archivefilename",
action="store", help="Archive file", metavar="FILE")),
("-C", "--config", dict(dest="configfilename",
action="store", help="the name of the archiver config file",
metavar="FILE", default='cmd_archiver.ini')),
("-f", "--flush", dict(dest="flush", action="store_true",
help="Flush all remaining archives to slave")),
("-I", "--init", dict(dest="init", action="store_true",
help="Initialize master environment")))
classdict = (('state', 's', None),
('rsync_bin', 's', None),
('rsync_flags', 's', ""),
('slaves', 's', None),
('user', 's', None),
('r_archivedir', 's', None),
('l_archivedir', 's', None),
('ssh_timeout', 'i', None),
('notify_ok', 's', None),
('notify_warning', 's', None),
('notify_critical', 's', None),
('debug', 'b', False),
('pgdata', 's', None),
('pgcontroldata', 's', ""),
('rsync_version', 'i', None),
('ssh_debug', 'b', False))
class ArchiveFailure(Exception):
""" Class to propagate archiving failures """
pass
class CMDArchiver(CMDWorker):
def generate_slave_list_func(self):
"""
We now support multiple slaves (see the README) in order do that properly
we have to break up the string and turn it into a list
"""
slaves = str(self.slaves).replace("'", "").split(",")
if self.debug:
print "NOTICE: generate_slave_list_func()"
print "NOTICE: Your slaves are: " + str(slaves)
return slaves
def init_env_func(self):
"""
Initialize the local queues so we can check each directory for left
over files
"""
if self.debug:
print "NOTICE: init_env_func()"
l_archivedir = self.l_archivedir
# bail out if archivedir exists and not empty or inaccessible.
if (os.access(l_archivedir, F_OK)):
if (not os.access(l_archivedir, R_OK | W_OK | X_OK)):
print "ERROR: l_archivedir %s must allow have r/w/x bits set for the current user" % l_archivedir
return False
elif (os.listdir(l_archivedir) != []):
print "ERROR: l_archivedir %s must be be empty" % l_archivedir
return False
else:
print "WARNING: l_archivedir %s already exists" % l_archivedir
queues = self.generate_slave_list_func()
try:
for host in queues:
queue = l_archivedir + "/" + host
os.makedirs("%s" % (queue))
except OSError, e:
print "ERROR: Can not make queue directories"
print "EXCEPTION: %s" % (str(e))
return False
return True
def check_config_func(self):
"""
Let's make sure that our directories and executables exist
"""
if self.debug:
print "NOTICE: check_config_func()"
pathvars = [self.rsync_bin, self.pgdata, self.l_archivedir]
for element in pathvars:
try:
os.stat("%s" % (str(element)))
except OSError, e:
print "Config %s: %s" % (str(element), str(e))
return False
return True
def get_pgcontroldata_func(self):
"""
get_pgcontroldata_func doesn't actually do anything yet. This is more
for archival purposes so we can remember the regex
"""
if not self.pgcontroldata:
print 'WARNING: path to pg_controldata utility is not set, assuming it\'s in PATH'
pgcontroldata = 'pg_controldata'
else:
pgcontroldata = self.pgcontroldata
try:
cmd = os.popen("%s %s" % (str(pgcontroldata), str(self.pgdata)))
#return cmd.readlines
for row in cmd:
match = re.search('^Prior checkpoint location: *.{1,}', '%s' % (str(row)))
if match != None:
print match
except OSError, e:
print
print "EXCEPTION: %s" % (str(e))
sys.exit(1)
def flush_check_func(self):
"""
Simple function to make sure we require input before flushing a system
"""
if self.debug:
print "NOTICE: flush_check_func()"
print "\n\n"
print "Warning! Flushing all logs will cause your slave to exit"
print "Standby and start up. Please verify that this is exactly what you desire.\n\n"""
print "I wish to force my slave into production: No/Yes\n\n"
line = str(raw_input())
if line == "Yes":
print "Flushing all xlogs"
return True
elif line == "No":
print "Exiting!"
else:
print "Your options are Yes and No"
return False
def list_queue_func(self):
"""
We only want to process archives for queues that have files, so we check
and only return a queue/slave that has files to be shipped.
"""
if self.debug:
print "NOTICE: list_queue_func()"
# Empty host array
hosts = []
# Loop through the list of slaves
for host in self.generate_slave_list_func():
queuedir = self.l_archivedir + "/" + str(host)
list_archives = os.listdir(queuedir)
# If an archive directory is not empty, then we're good.
if list_archives:
# add to list of hosts
hosts.append(host)
if self.debug:
for host in self.generate_slave_list_func():
print "NOTICE: SLAVE: " + host + " " + str(list_archives)
return hosts
def send_queue_func(self):
"""
We are called before normal archive process in order to send queue files
that have not been shipped yet. If we have to transfer and we error we
return the slave that failed.
"""
rtn = []
if self.debug:
print "NOTICE: send_queue_func()"
for host in self.list_queue_func():
if self.debug:
print "NOTICE: Host = " + host
queue_dir = self.l_archivedir + "/" + str(host)
if self.debug:
print "NOTICE: queue_dir = " + queue_dir
# To deal with old versions of rsync
if self.rsync_version == 2:
if self.debug:
print "NOTICE: rsync_version = " + str(self.rsync_version)
source_or_sent = "--remove-sent-files"
else:
source_or_sent = "--remove-source-files"
queue_transfer = """%s %s -aq %s -e "ssh %s" %s/ %s@%s:%s/""" % (str(self.rsync_bin), str(self.rsync_flags), str(source_or_sent), str(self.ssh_flags), str(queue_dir), str(self.user), str(host), str(self.r_archivedir))
retval = os.system(queue_transfer)
if self.debug:
print "NOTICE: Transfering queue = " + queue_transfer
print "NOTICE: Transfer retval = " + str(retval)
if retval:
# If we failed to send data to this host - append it to the list
# of hosts to retry the sending attempt on
rtn.append(host)
return rtn
def archive_func(self):
"""
The main archive function.
First we check the queue. If there are files in the queue we try to send
them.
If we can't send the files from the queue, we determining which slaves
can not send files. The archiver then automatically queues all logs for
those slaves which are not sending until they can send.
"""
if self.debug:
print "NOTICE: Performing standard archive"
#If we are not online, exit immediately
if self.state != 'online':
print "We are offline, queuing archives"
self.notify_external(warning=True, message="ARCHIVER: We are offline, queuing archives")
return False
try:
slaves = self.generate_slave_list_func()
# First we send the queue files (if any). If we can't we exit
queue = self.send_queue_func()
if queue:
if self.debug:
print "NOTICE: queue = " + str(queue)
print "ERROR: Unable to send queued archived files, queueing"
print "NOTICE: slaves = generate_slave_list_func() " + str(slaves)
self.notify_external(warning=True, message="Unable to send queued archived files, queueing")
for host in slaves:
if self.debug:
print "NOTICE: " + host + " in " + str(slaves)
# If the host returned is in the list, we automatically
# archive to the queue.
if host in queue:
if self.debug:
print "NOTICE: Saving archives to queue"
queue_dir = self.l_archivedir + "/" + str(host)
queue_transfer = """%s %s %s""" % (str(self.rsync_bin), str(self.archivefile), str(queue_dir))
retval = os.system(queue_transfer)
if retval:
self.notify_external(critical=True, message=str(retval))
raise ArchiveFailure
else:
if self.debug:
print "NOTICE: Sending OK alert"
self.notify_external(ok=True, message=str(retval))
# If the host returned is not in the list, we attempt to
# archive normally. If we can not, we archive to the queue. If we
# can not archive to the queue, we exit critical.
# You may end up with files out of order on the slave if the
# slave comes online after the queue check but before the current
# transfer. This is not a problem because pg_standby will only restore
# files in order, so on the next queue check the slave will receive
# the missing files and pg_standby will correctly restore them.
if self.debug:
print "NOTICE: Entering single file archive transfer"
for host in slaves:
if self.debug:
print "NOTICE: Archiving for: " + str(host)
if self.flush:
rsync_transfer = """%s %s %s/pg_xlog/* -e "ssh %s" %s@%s:%s""" % (str(self.rsync_bin), str(self.rsync_flags), str(self.pgdata), str(self.ssh_flags), str(self.user), str(host), str(self.r_archivedir))
if not self.flush_check_func():
return False
check = self.check_pgpid_func()
if check == 0:
print "ERROR: Can not enter flush mode if PG is already running"
raise ArchiveFailure
else:
rsync_transfer = """%s %s -q -e "ssh %s" %s %s@%s:%s""" % (str(self.rsync_bin), str(self.rsync_flags), str(self.ssh_flags), str(self.archivefile), str(self.user), str(host), str(self.r_archivedir))
if self.debug:
print "NOTICE: Shipping archive to: " + str(host)
print "NOTICE: Using: " + rsync_transfer
retval = os.system("%s" % (rsync_transfer))
if retval:
if self.flush:
# don't try to enqueue the segment archived if this is a flush
print "FATAL: unable to flush pg_xlog to host %s" % (str(host),)
raise ArchiveFailure
print "NOTICE: no luck shipping archive"
queue_dir = self.l_archivedir + "/" + str(host)
queue_transfer = """%s %s %s""" % (str(self.rsync_bin), str(self.archivefile), str(queue_dir))
retval = os.system(queue_transfer)
if retval:
print "FATAL: Unable to rsync_transfer or queue_transfer"
self.notify_external(critical=True, message=str(retval))
raise ArchiveFailure
else:
self.notify_external(warning=True, message=str(retval))
else:
if self.debug:
print "NOTICE: Sending OK alert"
self.notify_external(ok=True, message=str(retval))
except ArchiveFailure:
# archiver process was unable to archive the wal segment
return False
except Exception, e:
# generic exception
print "ERROR: %s" % (e,)
return False
else:
# WAL segment successfully archived or queued
return True
if __name__ == '__main__':
# before we do anything, let's just check you we are
if os.geteuid() == 0:
sys.exit("\nBad Mojo... no root access for this script\n")
archiver = CMDArchiver(classdict)
(options, args) = archiver.parse_commandline_arguments(argslist)
configfilename = options.configfilename
init = options.init
archiver.archivefile = options.archivefilename
archiver.flush = options.flush
try:
cfg_vals = archiver.load_configuration_file(configfilename)
except Exception, e:
print "ERROR: %s" % (e,)
sys.exit(2)
archiver.set_ssh_flags()
success = True
# Actually run
if init:
print "We are initializing queues, one moment.\n"
success = archiver.init_env_func()
else:
success = archiver.check_config_func()
if success:
success = archiver.archive_func()
if success:
sys.exit(0)
else:
sys.exit(1)