forked from salilab/SOAP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobSchedule.py
1040 lines (973 loc) · 41.1 KB
/
jobSchedule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
SOAP job control(local parallel or SGE parallel).
"""
from env import *
class task(object):
"""
A single SGE task
:param str dir: name of the parent directory of the run directory
:param str rdirname: name of the run directory
:param object afterprocessing: An ojbect with afterprocessing method for processing the run output, called when the sge run finsihes.
:param object preparing: An ojbect with prepare_task_input method for prepare the run output, called when the task is started by monitor().
"""
def __init__(self,dir='',rdirname='',afterprocessing=None, preparing=None, targetfile=''):
self.dir=dir
self.rdirname=rdirname.replace('#','\#')
if '#' in self.rdirname:# '#' is not recoginized by sge as valid filename
self.logpath=os.path.join(runenv.serverLogPath, self.rdirname.replace('#',''))
else:
self.logpath=os.path.join(runenv.serverUserPath,rdirname,'output')
self.runname='runme.sh' #should not be initilized with other values
self.rtrys=0
self.afterprocessing=afterprocessing
self.k=0
self.runstatusdict={}
self.serverdiskfull=False
self.unnoticederror=False
self.started=False
self.preparing=preparing
self.justwait=0
self.targetfile=targetfile
self.queues=runenv.queues
#if isinstance(afterprocessing,k2cvcluster) and alllabqueue==0:
# self.queues='-q lab.q'
self.crashjobdict={}
self.rsn=-9999
self.runduration=-9999 # the time it takes for the run to finish after it is submitted
self.finished=False
self.waitinglist=[]
def get_runf(self):
rfh=open(self.dir+self.runname)
rfs=rfh.read()
rfh.close()
rer=re.search('1-([0-9]+)',rfs)
self.rfs=rfs
self.numofruns=int(rer.group(1))
def get_inputlist(self):
ilfh=open(self.dir+'inputlist')
il=ilfh.read()
self.inputlist=il.split(',')
def get_sn(self):
if self.rsn==-9999:
while os.path.isfile(runenv.basedir+'rsnlock'):
time.sleep(1)
print os.system('touch '+runenv.basedir+'rsnlock')
rsnf=open(runenv.basedir+'rsn','r')
rsn=int(rsnf.read())
rsnf.close()
rsnf=open(runenv.basedir+'rsn','w')
rsn=rsn+1
rsnf.write(str(rsn))
rsnf.close()
print os.system('rm -f '+runenv.basedir+'rsnlock')
self.rsn=rsn
def get_tasksn(self):
#get a unique number for this task to be used as the unique identifier...
if self.rsn<0:
self.get_sn()
rsn=self.rsn
runname='r'+str(rsn)+'.sh'
self.runname=runname
os.system('mv '+self.dir+'runme.sh '+self.dir+self.runname)
def run_task_cluster(self):
self.get_tasksn()
self.get_runf()
self.get_inputlist()
self.submit_task()
self.started=True
self.monitor_task()
def reload_existingruns(self):
self.started=True
self.justwait=0
self.waittinglist=[]
os.chdir(self.dir)
afl=os.listdir('./')
runname=[f for f in afl if f.endswith('.sh.gz') and f.startswith('r')][0]
self.runname=runname[:-3]
print os.system('gunzip '+runname)
self.rsn=int(self.runname[1:-3])
self.get_runf()
print os.system('gzip '+self.runname)
self.recheck_runstatus()
self.afterprocessing.nors=self.numofruns
def start_task_cluster(self):
self.started=True
if os.path.isfile(self.targetfile):
print 'Already executed by some other instances :)'
return 1
elif os.path.isdir(self.dir):
if runenv.otherrun or (self.dir in runenv.currentrundirlist):
print self.dir+' exist, meaning some other guys are generating what you want, just be patient and wait:)'
self.waitinglist=[self.targetfile]
self.justwait=1
return 0
else:
print "reloading existing runs"
self.reload_existingruns()
runenv.currentrundirlist.append(self.dir)
return 0
self.waitinglist=self.preparing.prepare_task_input()
runenv.currentrundirlist.append(self.dir)
if isinstance(self.waitinglist,list) and len(self.waitinglist)>0:
print 'waiting for others to calc this:'
print self.waitinglist
self.justwait=2
return 0
self.get_tasksn()
self.get_runf()
self.get_inputlist()
self.submit_task()
if '#' in self.rdirname:# '#' is not recoginized by sge as valid filename
self.logpath=os.path.join(runenv.serverLogPath, self.rdirname.replace('#',''))
else:
self.logpath=os.path.join(runenv.serverUserPath,self.rdirname,'output')
return 0
def run_task_local(self,command,inputlist=None):
self.get_runf()
self.get_inputlist()
if inputlist:
self.inputlist=inputlist
for item in self.inputlist:
command(item)
def submit_task(self):
try:
print os.system('ssh '+runenv.jobserver+' \' rm -rf '+self.rdirname+' ;mkdir '+self.rdirname+'; mkdir '+self.rdirname+'/output''\'')
os.chdir(self.dir)
print os.system('gzip -r -1 *')
rc1=1
while rc1:
fl=os.listdir('./')
hfl=[f for f in fl if f.endswith('hdf5')]
if len(hfl)>0 and os.path.getsize(hfl[0])>1000000000:
hf=hfl[0]
ssh = subprocess.Popen(["ssh", "%s" % runenv.jobserver, 'ls '+runenv.jobserverhdf5path],
shell=False,
stdout=subprocess.PIPE)
result = ssh.communicate()#ssh.stdout.readlines()
if not hf in result[0]:
print os.system('scp '+hf+' '+runenv.jobserver+':\''+runenv.jobserverhdf5path+'\'')
print os.system('mv '+hf+' ../temp')
rc1=os.system('scp -r * '+runenv.jobserver+':\'~/'+self.rdirname+'/\'')
print os.system('mv ../temp'+' ./'+hf)
print os.system('ssh '+runenv.jobserver+' \' ln -s '+runenv.jobserverhdf5path+hf+' ~/'+self.rdirname+'/'+hf+'\'')
else:
rc1=os.system('scp -r * '+runenv.jobserver+':\'~/'+self.rdirname+'/\'')
time.sleep(0.1)
rc2=os.system('ssh '+runenv.jobserver+' \'cd '+self.rdirname+';gunzip *gz;qsub '+self.queues+' ./'+self.runname+'\'')#
self.runstatus=np.ones(self.numofruns,dtype=np.int8) # 0 finished, 1 running, 2 failed, 3 marked as finished
self.runtime=np.zeros(self.numofruns)
self.runduration=time.time()
for jn in range(0,self.numofruns):
self.runstatusdict[jn]={'errorcodelist':[],'starttime':0}
if runtimetest:
self.runtime=np.ones(self.numofruns,dtype=[('runtime','i4'),('cn','f4'),('r2n','f4'),('host','a10')])
except Exception,e:
traceback.print_exc()
time.sleep(60)
def start_sr(self,jnl):
for jn in jnl:
#pdb.set_trace()
self.runstatus[jn]=1
jn=str(jn+1)
runmdt2=open(self.dir+self.runname[:-3]+jn+'.sh','w')
runmdt2s=self.rfs.replace(str(1)+'-'+str(self.numofruns),jn)
runmdt2.write(runmdt2s)
runmdt2.flush()
os.chdir(self.dir)
rc1=os.system('scp '+self.runname[:-3]+jn+'.sh'+' '+runenv.jobserver+':\'~/'+self.rdirname+'/\'')
print os.system('rm -f '+self.runname[:-3]+jn+'.sh')
#rc2=os.system('scp -r '+self.inputlist[int(jn)-1]+' '+runenv.jobserver+':~/'+self.rdirname+'/')
rc3=os.system('ssh '+runenv.jobserver+' '+'\'qsub -p 0 '+self.queues+' ./'+self.rdirname+'/'+self.runname[:-3]+jn+'.sh\'')
if rc3:
print rc3
raise NetworkError('Can not restart runs, possibly due to network problems')
self.runstatusdict[int(jn)-1]['starttime']=0
self.rtrys=self.rtrys+1
def check_start_failed_runs(self,jnl):
self.finishedrn=(self.runstatus==0).sum()
self.activern=(self.runstatus==1).sum()
self.waitingrn=(self.runstatus==4).sum()
if len(jnl)==0:
return 0
if self.finishedrn==0 and self.activern==0 and len(jnl)>min(20,0.1*self.numofruns):
raise Bugs('Bugs in code, all runs failed.')
else:
for jn in jnl:
self.runstatusdict[jn]['errorcodelist'].append(self.check_runlog(jn))
self.check_failed_runs(jnl)
if len(jnl)>0.5*(self.numofruns-self.waitingrn):
print "50% of the runs failed with error codes (): "+str(jnl)
raise Exception('50% of the runs failed with error codes')
self.errorjnl=jnl
print "starting runs"
self.start_sr(jnl)
#repeated error for a single run should be noticed...
def check_failed_runs(self,jnl):
fcl=[]
fkl=[]
ct=True
cn=0
for key in self.runstatusdict:
if len(self.runstatusdict[key]['errorcodelist'])>0 and self.runstatus[key]>0:
fcl.append(self.runstatusdict[key]['errorcodelist'])
fkl.append(key)
if ct:
cn=cn+1
else:
ct=False
if self.serverdiskfull:
print "Disk quota exceeded, please clean disk before continue!!!!!!!!!!!!!!"
pdb.set_trace()
return 0
if cn>max(20,0.1*self.numofruns):
raise Bugs('First 20 or 10% has all failed, probably bugs in code')
if len(squeeze_list(fcl))==1 and len(fcl[0])>1 and (len(fcl[0])**2*len(fcl)>50):
raise Bugs('Bugs for some of the runs '+str(jnl))
def check_runlog(self,jn):
jn=str(jn+1)
print 'checking run number '+jn
proc1=subprocess.Popen('ssh '+runenv.jobserver+' '+'\'cd '+self.logpath+'; ls -c\'',shell=True,stdout=subprocess.PIPE)
fc=proc1.communicate()[0]
fl=fc.split('\n')
fl2=[item for item in fl if item.endswith('.'+jn)]
if len(fl2)==0:
print 'no output log'
return 'No output log'
else:
f=fl2[0]
proc1=subprocess.Popen('ssh '+runenv.jobserver+' '+'\'cd '+self.logpath+'; cat '+f+' \'',shell=True,stdout=subprocess.PIPE)
fc=proc1.communicate()[0]
self.serverdiskfull=False
if re.search('fork failure',fc) or re.search('Dynamic memory allocation failed',fc):
print "memory problem with the run"
ecode="memory problem with the run"
elif re.search('Bugs',fc):
print "Bugs"
ecode='Bugs'
pdb.set_trace()
elif re.search('EOFError',fc):
#rer=re.search('\\n.*\\nEOFError.*\\n')
print "EOFError when reading "+fc
ecode='EOFError'
elif re.search('No space left on device',fc):
print "The node or server has no space left"
ecode='No space'
elif re.search('Killed',fc):
print "Killed?????"
ecode='Killed'
elif re.search('Segmentation fault',fc):
print "Segmentation fault - code problem"
ecode='Segmentation fault'
elif re.search('Error',fc):
print "general Error"
ecode='Errors'
elif re.search('Aborted',fc):
print "Aborted"
ecode='Aborted'
elif re.search('MemoryError',fc):
print "MemoryError"
ecode='MemoryError'
elif re.search('Disk quota exceeded',fc):
print "Disk quota exceeded, please clean disk before continue!!!!!!!!!!!!!!"
ecode='Disk quota exceeded'
self.serverdiskfull=True
else:
print "Unknown problem"
ecode='Unknown'
return ecode
#_mdt.MDTError: unable to open file - no hdf5 file or file corrupt
def copy_result(self,djl):
#the limit of command length is 131072, so big lists need to be separated into small ones
dl=len(djl)
ep=0
while ep<dl:
if ep+400<dl:
self.copy_result_400(djl[ep:ep+400])
ep=ep+400
else:
self.copy_result_400(djl[ep:dl])
ep=dl
def copy_result_400(self,djl):
#might worth check whether the file sizes agress after copy... in the future
if len(djl)==0:
return 0
filelist=''
rfl=[]
rd='~/'+self.rdirname+'/'
for (fjn, fjf) in djl:
if self.runstatus[int(fjn)-1]>0:
filelist=filelist+rd+fjf+' '
rfl.append(fjf)
if filelist=='':
return 0
mt=10
if os.system('scp -r '+runenv.jobserver+':\"'+filelist+'\" '+self.dir):
print 'file copy failure'
efl,nefl=check_remote_file_exist(runenv.jobserver,rd,rfl)
if len(nefl)>0:
ndjl=[]
for item in djl:
if item[1] in efl:
ndjl.append(item)
else:
self.runstatus[int(item[0])-1]=2
djl=ndjl
filelist=''
for (fjn, fjf) in djl:
if self.runstatus[int(fjn)-1]>0:
filelist=filelist+rd+fjf+' '
if len(efl)>0:
ntry=10
while os.system('scp -r '+runenv.jobserver+':\"'+filelist+'\" '+self.dir) and ntry>0:
ntry=ntry-1
if ntry==0:
raise Exception('Can not copy files, disk full?'+filelist)
tr=os.system('for i in *.tar.gz; do tar -xvzf $i; done')
if tr:
tr=os.system('for i in *.tar.gz; do tar -xvzf $i; done')
if not tr:
print os.system('rm -f *.tar.gz')
else:
raise Exception('can not untar file ')
else:
print os.system('rm -f *.tar.gz')
logfilelist=[]
for (fjn, fjf) in djl:
if self.runstatus[int(fjn)-1]>0:
self.runstatus[int(fjn)-1]=0
logfilelist.append('*.'+fjn)
print os.system('ssh '+runenv.jobserver+' '+'\'cd '+os.path.join(self.logpath)+'; rm -f '+' '.join(logfilelist)+'\'')
print os.system('ssh '+runenv.jobserver+' '+'\'cd '+self.rdirname+'; rm -f '+filelist+'\'')
def get_runtimestats(self):
#if not (runtimetest and os.path.isfile(self.dir+'pdbs1')):
# return 0
proc1=subprocess.Popen('ssh '+runenv.jobserver+' '+'\'cd '+self.logpath+'; ls \'',shell=True,stdout=subprocess.PIPE)
runstats=proc1.communicate()[0]
rsl=runstats.split('\n')
for runstatus in rsl:
if runstatus.startswith('r'):
continue
rl=runstatus.split('.')
print rl
if len(rl)<3:
continue
fjn=rl[0]
self.runtime[int(fjn)-1]['runtime']=int(rl[1])
self.runtime[int(fjn)-1]['cn']=pir(path=self.dir+'pdbs'+fjn).count_pir()
self.runtime[int(fjn)-1]['r2n']=pir(path=self.dir+'pdbs'+fjn).count_residue2()
self.runtime[int(fjn)-1]['host']=rl[2]
def get_wait_time(self):
#not implemented yet
self.wait_time=40
return 40
def process_result(self):
self.runduration=time.time()-self.runduration
if self.justwait==1:
self.finished=True
return 1
print 'run finished:)'
#try:
# self.get_runtimestats()
#except Exception,e:
# traceback.print_exc()
print os.system('ssh '+runenv.jobserver+' \'rm -rf '+self.rdirname+' \'')
if not self.logpath in [runenv.serverUserPath+'output',runenv.serverUserPath+'output/']:
print os.system('ssh '+runenv.jobserver+' \'rm -rf '+self.logpath+' \'')
#if runtimetest:
# np.save("../runtime"+str(self.rsn),self.runtime)
if self.afterprocessing:
#try:
rv=self.afterprocessing.afterprocessing()
#except Exception, e:
# print "error while processing the result of this run "
# traceback.print_exc()
# print "Please use the break point to look for whatever it wrong and continue without affecting other runs"
# pdb.set_trace()
# print "saving the failed object..."
# fh=open('run_failed_bugs.pickle','wb')
# cPickle.dump(self.afterprocessing,fh)
# fh.close()
# return 1
else:
rv=1
self.finished=True
return rv
def monitor_task(self):
time.sleep(60)
k=0
while True:
if self.monitor():
break
time.sleep(60)
def monitor(self):
"""
The task is started on a remote SGE cluster, monitored until all tasks finish, after which `afterprocessing` will be called.
Some errors will be handled automatically, but others will cause the program to pause, see :ref:`taskmonitor`
.. note::
It is better to wrap the task in a tasklist object and call :meth:`tasklist.monitor2end`.
"""
try:
if self.finished:
return 1
if not self.started:
print "##########monitoring:########## "+self.rdirname
return self.start_task_cluster()
print "##########monitoring:########## "+self.rdirname
if self.justwait:
#print '############ Waiting for others to finish '
nwl=[]
for f in self.waitinglist:
if not os.path.isfile(f):
nwl.append(f)
#print 'WAITING FOR: '+f
self.waitinglist=nwl
if len(nwl)==0:
return self.process_result()
else:
print 'waiting for others'
return 0
if self.unnoticederror:
self.process_keyboard_interrupt(self.error)
print 'continue looping'
return 0
os.chdir(self.dir)
proc1=subprocess.Popen('ssh '+runenv.jobserver+' '+'\' qstat | grep '+self.runname[:-3]+'\'',shell=True,stdout=subprocess.PIPE)
runstatus=proc1.communicate()[0]
if proc1.returncode==255:
raise NetworkError('Network problem')
proc2=subprocess.Popen('ssh '+runenv.jobserver+' '+'\' ls ~/'+self.rdirname+'/job*; rm -f ~/'+self.rdirname+'/jobfailed* \'',shell=True,stdout=subprocess.PIPE)
jobstatlist=proc2.communicate()[0]
if proc1.returncode==255:
raise NetworkError('Network problem')
print "obtaining runstats from server finished"
print "***Don't interrupt***"
self.runstatusstr=runstatus
self.runstatus[self.runstatus>0]=2 #set status to failed
if runstatus:
self.analyze_runstatus(runstatus)
fjl=re.findall('jobfailed\.([0-9]+)',jobstatlist)
self.runstatus[[int(item)-1 for item in fjl if self.runstatus[int(item)-1]>0]]=2
djl=re.findall('jobfinished\.([0-9]+)\.(.+)',jobstatlist)
self.copy_result(djl)
self.check_start_failed_runs(np.nonzero(self.runstatus==2)[0])
self.k=self.k+1
print "*********************"
print 'Num of finished runs: '+str(self.finishedrn)+'/'+str(self.numofruns)+' active:'+str(self.activern) +' @iter#'+str(self.k)
if (self.runstatus==0).sum()==self.numofruns:
return self.process_result()
elif self.rtrys >= 2*self.numofruns:
raise Exception('jobfailed because of too many trys, check input files or scripts for errors')
except NetworkError, e:
traceback.print_exc()
time.sleep(60)
except Bugs, e:
traceback.print_exc()
#pdb.set_trace()
#self.qdel_jobs()
self.process_keyboard_interrupt(e)
except KeyboardInterrupt,e:
self.process_keyboard_interrupt(e)
except Exception, e:
print e
traceback.print_exc()
print self.dir
self.process_keyboard_interrupt(e)
return 0
def recheck_k2cv_runstatus(self):
os.chdir(self.dir)
fl=os.listdir('./')
fnl=[int(f.split('.')[0])-1 for f in fl if f.endswith('optimizer.pickle')]
self.runstatus=np.ones(self.numofruns)
self.runstatus[fnl]=0
def recheck_runstatus(self):
os.chdir(self.dir)
fl=os.listdir('./')
fl=[f for f in fl if not (f.endswith('gz') or f.endswith('sh'))]
#every input are gzipped ...
fnl=[int(f.split('.')[0])-1 for f in fl ]
self.runstatus=np.ones(self.numofruns)
self.runstatus[fnl]=0
for jn in range(0,self.numofruns):
self.runstatusdict[jn]={'errorcodelist':[],'starttime':0}
def reload_k2cvruns(self,rsn,numofruns=200):
self.rsn=rsn
self.dir=runenv.runs+str(rsn)+'/'
self.rdirname=str(rsn)
self.numofruns=numofruns
self.runname='r'+str(rsn)+'.sh'
rfh=open(self.dir+self.runname)
rfs=rfh.read()
rfh.close()
self.rfs=rfs
self.started=True
self.unnoticederror=False
for jn in range(0,self.numofruns):
self.runstatusdict[jn]={'errorcodelist':[],'starttime':0}
self.inputlist=['runme.py' for i in range(200)]
self.recheck_k2cv_runstatus()
self.monitor()
def check_long_runs(self,runstatus): # will not be used
runtime=self.runtime['runtime']
runtime=runtime[runtime>0]
runtimemean=runtime.mean()
runtimestd=runtime.std()
runtimelimit=min(runtimemean+5*runtimestd,runtimemean*2)
rsl=self.decode_runstatus(runstatus)
currenttime=time.time()
nodechecklist=[]
for item in rsl:
if item[4] in ['r','Rr','t']:
if currenttime-self.runstatusdict[int(item[-1])-1]['starttime']>rumtimelimit:
nodechecklist.append(item[-3])
self.delete_runs_on_crashed_nodes(nodechecklist)
def delete_runs_on_crashed_nodes(self):
try:
proc1=subprocess.Popen('ssh '+runenv.jobserver+' '+'\' qstat -qs u \'',shell=True,stdout=subprocess.PIPE)
runstatus=proc1.communicate()[0]
rsl=self.decode_runstatus(runstatus)
autl=[]
for item in rsl:
if item[4] in ['r','Rr','t','dr']:
crashtask=item[0]+' -t '+item[-1]
if crashtask in self.crashjobdict:
self.crashjobdict[crashtask]+=1
else:
self.crashjobdict[crashtask]=1
for key in self.crashjobdict.keys():
if self.crashjobdict[key]>5:
autl.append(key)
del self.crashjobdict[key]
print 'The following node crashed >5: '+str(autl)
if len(autl)>0:
print os.system('ssh '+runenv.jobserver+' '+'\' qdel -f '+', '.join(autl)+' \'')
except:
return
def get_single_node_status(self,node):
proc1=subprocess.Popen('ssh '+runenv.jobserver+' '+'\' qstat -F -q '+node+'\'',shell=True,stdout=subprocess.PIPE)
nodestatus=proc1.communicate()[0]
def decode_runstatus(self,runstatus):
runstatuslist=runstatus.split('\n')[:-1]
rsl=[]
for item in runstatuslist:
if re.search('@', item):
rsl.append(item.split())
return rsl
def analyze_runstatus(self,runstatus):
runstatuslist=runstatus.split('\n')[:-1]
for i in range(0,len(runstatuslist)):
rstatus=runstatuslist[i].split()[4].strip()
rl=runstatuslist[i].split()[-1].split(',')
rl=[ri.split(':')[0] for ri in rl]
rnl=[]
for sr in rl:
if re.search('([0-9]+)-([0-9]+)',sr):
rer=re.search('([0-9]+)-([0-9]+)',sr)
rnl=rnl+range(int(rer.group(1))-1,int(rer.group(2)))
else:
rnl.append(int(sr)-1)
for trn in rnl:
if self.runstatus[trn]>0:
if rstatus in ['r','Rr','t','Rt','hr']:
self.runstatus[trn]=1
if not self.runstatusdict[trn]['starttime']:
self.runstatusdict[trn]['starttime']=time.time()
elif rstatus in ['qw','hqw']:
self.runstatus[trn]=4# waiting in the queue
elif rstatus.startswith('d'):
self.runstatus[trn]=2
else:
print 'can not decide the status of the run'
pdb.set_trace()
def qdel_jobs(self,delete=False, qw=False):
proc1=subprocess.Popen('ssh '+runenv.jobserver+' '+'qstat | grep '+self.runname[:-3]+' | awk \'{split($0,c,\" \");print c[1]}\' ',shell=True,stdout=subprocess.PIPE)
runnums=proc1.communicate()[0]
runnums=set(runnums.split('\n'))
for runnum in runnums:
if qw and len(runnum)>5:
print os.system('ssh '+runenv.jobserver+' '+'qdel -f '+runnum+' &')
elif len(runnum)>5:
print os.system('ssh '+runenv.jobserver+' '+'qdel -f '+runnum+' &')
if delete:
print os.system('ssh '+runenv.jobserver+' \'rm -rf '+self.rdirname+' \'')
print os.system('rm -rf '+self.dir)
def process_keyboard_interrupt(self,e, pos='task-monitor'):
print '@@@@@@@@ '+pos
print ""
self.error=e
TIMEOUT = 120 # number of seconds your want for timeout
def interrupted(signum, frame):
print "waiting more than 120s, continue looping......"
raise Exception('timeout')
signal.signal(signal.SIGALRM, interrupted)
#def input():
signal.alarm(TIMEOUT)
try:
print 'You have 120s to interrupt the code to handle the error, otherwise code will continue looping \n 0-quit and delete;\n 1-quit this job;\n 2-re-raise;\n 3-quit whole script and delete;\n 4-enter debug mode;\n 5-continue\n 6-restart runs with updated SOAP code\n 7-re-parepare and restart job \n 8-ignore erros \n '
s = raw_input()
except:
return 0
signal.alarm(0)
self.unnoticederror=False
if s=='0':
self.qdel_jobs(delete=True)
raise Bugs('Keyboard interruption')
elif s=='1':
self.qdel_jobs()
raise Bugs('Keyboard interruption')
elif s=='2':
raise e
elif s=='3':
if pos=='tasklist-monitorend':
self.qdel_jobs(delete=True)
sys.exit(str(e))
else:
raise FatalError(str(e))
elif s=='4':
pdb.set_trace()
self.started=True
return 0
elif s=='5':
pass
elif s=='6':
self.submit_task()
elif s=='7':
self.start_task_cluster()
elif s=='8':
self.error=None
self.start_sr(self.errorjnl)
else:
self.unnoticederror=True
def parallel_local(self,command,inputlist=[],nors=8):
listoflist=[]
for i in range(0,nors):
list=[]
listoflist.append(list)
k=0
for item in inputlist:
listoflist[k%nors].append(item)
k=k+1
pids=[]
for i in range(0,len(listoflist)):
time.sleep(0.1)
pid=os.fork()
if pid == 0:
print 'running child#'+str(i)
for item in listoflist[i]:
try:
print 'child#'+str(i)+'start running'
command(item)
except Exception,e:
print e
sys.exit()
break
elif pid > 0:
pids.append(pid)
if i==(len(listoflist)-1):
for j in range(0,len(pids)):
print 'waiting '+str(j)
os.wait()
else:
continue
def parallel_local_withreturn(self,command,inputlist=[],nors=8):
if localtest or len(inputlist)==1:
return self.serial_local(command,inputlist)
listoflist=[]
pipelistoflist=[]
for i in range(0,nors):
listoflist.append([])
pipelistoflist.append([])
k=0
pipelist=[]
for item in inputlist:
r,w=os.pipe()
pipelist.append([r,w])
pipelistoflist[k%nors].append([r,w])
listoflist[k%nors].append(item)
k=k+1
rl=[]
pids=[]
for i in range(0,len(listoflist)):
time.sleep(0.1)
pid=os.fork()
if pid == 0:
print 'running child#'+str(i)
k=-1
for item in listoflist[i]:
#try:
if 1:
k=k+1
p=pipelistoflist[i][k]
os.close(p[0])
w=os.fdopen(p[1],'w')
print 'child#'+str(i)+'start running'
result=command(item)
#print 'resultresultresultresult'
#print result
w.write(cPickle.dumps(result))
w.close()
#except Exception,e:
# print e
sys.exit()
break
elif pid > 0:
pids.append(pid)
if i==(len(listoflist)-1):
for j in range(0,len(pids)):
print 'waiting '+str(j)
os.wait()
for p in pipelist:
os.close(p[1])
r=os.fdopen(p[0])
rc=r.read()
print p
print 'result'+rc
rl.append(cPickle.loads(rc))
r.close()
return rl
else:
continue
def serial_local(self,command,inputlist=[]):
outputlist=[]
for input in inputlist:
outputlist.append(command(input))
return outputlist
class localtask(task):
"""
A single local task
"""
def __init__(self,func=None, inputlist=[]):
self.func=func
self.inputlist=inputlist
self.finished=False
def monitor(self):
if self.finished:
return 1
tr=self.func(*self.inputlist)
self.finished=True
return tr
def qdel_jobs(self,delete=False):
pass
class taskchain(task):
"""this calss will chain a list of tasks, and excute the chained functions one by one, including local runs
the inididual function should either excute some code, return some result as the input to the next function
or the function should result a task object, which can be monitored, and the monitor() return 0 when not success"""
def __init__(self, chains=[]):#, sharedinput=[], initialinput=[]
self.taskchains=chains
#self.currenttask=[]
self.currentpos=0
#self.currentresult=initialinput
#self.sharedinput=sharedinput
self.started=False
def monitor(self):
self.currentresult=self.taskchains[self.currentpos].monitor()
while self.currentresult!=0 and self.currentpos<(len(self.taskchains)-1):
self.taskchains[self.currentpos]=0
self.currentpos+=1
self.currentresult=self.taskchains[self.currentpos].monitor()
return self.currentresult #all finished
def qdel_jobs(self,delete=False):
if self.currentpos<len(self.taskchains):
self.taskchains[self.currentpos].qdel_jobs(delete)
class tasklist(task):
"""
List of tasks, task itself can be a :class:`jobSchedule.tasklist`, :class:`jobSchedule.task`,:class:`jobSchedule.localtask`, and :class:`jobSchedule.taskchain`.
"""
def __init__(self,tasklist=[],afterprocessing=None,other=None,reload_rundir='',reload_dirlist=[]):
if reload_rundir:
fl=os.listdir(reload_rundir)
fl=[os.path.join(reload_rundir,f) for f in fl if os.path.isdir(os.path.join(reload_rundir,f))]
reload_dirlist=fl
if len(reload_dirlist)>0:
tasklist=self.reload_tasks(reload_dirlist)
self.tasklist=tasklist
self.afterprocessing=afterprocessing
self.other=other
self.resultlist=[0 for item in tasklist]
for i in range(0,len(tasklist)):
if tasklist[i]==0:
self.resultlist[i]=123456789
self.bugs=False
self.buginfo=''
self.unnoticederror=False
self.crashjobdict={}
def reload_tasks(self,dl):
#only works for tasks with pickle saved...
import gzip
tl=[]
for d in dl:
ro=pickle.load(gzip.open(os.path.join(d,'input.pickle.gz')))
ro.task.reload_existingruns()
tl.append(ro.task)
return tl
def monitor(self):
tli=range(0,len(self.tasklist))
tli.reverse()
for k in tli:
if self.resultlist[k]:
continue
try:
self.resultlist[k]=self.tasklist[k].monitor()
except Bugs,e:
self.resultlist[k]=987654321
self.bugs=True
self.buginfo=e
except FatalError,e:
raise e
except:
try:
traceback.print_exc()
except:
pass
pdb.set_trace()
if self.resultlist[k]!=0:
del self.tasklist[k]
del self.resultlist[k]
#self.tasklist[k]==0
if 0 in self.resultlist:
return 0
else:
print "All task finished or failed"
if self.bugs:
print "bugs during the run "+str(self.buginfo)
pdb.set_trace()
if self.afterprocessing:
print "processing tasklist result"
return self.afterprocessing(self.resultlist, self.tasklist,self.other)
else:
return self.resultlist
def monitor2end(self):
"""
The list of tasks are started(at the same time), local or on remote SGE cluster, monitored until all tasks finish, after whcih the `afterprocessing` will be called.
.. note::
A task, tasklist, or taskchain and be interrupted by CTRL+c at runtime, to perform a list job controlling actions::
@@@@@@@@ task-monitor
You have 120s to interrupt the code to handle the error, otherwise code will continue looping
0-quit and delete;
1-quit this job;
2-re-raise;
3-quit whole script and delete;
4-enter debug mode;
5-continue
6-restart runs with updated SOAP code
7-re-parepare and restart job
8-ignore erros
The task will continue if no input is entered.
*The "task-monitor" is also triggered by some fetal errors.*
"""
try:
while 0 in self.resultlist:
try:
for k in range(0,len(self.tasklist)):
if self.resultlist[k]:
continue
try:
self.resultlist[k]=self.tasklist[k].monitor()
except Bugs,e:
self.bugs=True
self.buginfo=e
self.resultlist[k]=987654321
#except Exception,e:
# pdb.set_trace()
if self.resultlist[k]!=0:
#del self.tasklist[k]
self.tasklist[k]==0
if 0 in self.resultlist:
print 'waiting'
time.sleep(60)
self.delete_runs_on_crashed_nodes()
except KeyboardInterrupt,e:
task.process_keyboard_interrupt(self,e,pos='tasklist-monitorend')
print "All all task finished or failed"
if self.bugs:
print "bugs during the run "+str(self.buginfo)
raw_input('Input anything to enter debug mode(others finished): ')
pdb.set_trace()
if self.afterprocessing:
print "processing this group of tasks"
return self.afterprocessing(self.resultlist,self.tasklist,self.other)
else:
return self.resultlist
except FatalError,e:
self.qdel_jobs(delete=True)
sys.exit(str(e))
except KeyboardInterrupt,e:
task.process_keyboard_interrupt(self,e,pos='tasklist-monitorend')
def qdel_jobs(self,delete=False):
for k in range(0,len(self.tasklist)):
if self.resultlist[k]:
continue
self.tasklist[k].qdel_jobs(delete)
def report_job_runstatus(runpath, runsuccess, runnumber, outputname,inputname='runme.py',temppath=''):
"""
Utility function for report the job run status when the runs finished.
"""
if runenv.hostn:
if temppath:
os.chdir(temppath)
else:
temppath=runpath
if runsuccess:
fjf=str(runnumber)+outputname
fl=os.listdir('./')
nfl=[f for f in fl if f.startswith(fjf)]
if len(nfl)==0:
print 'Bugs in code, output file does not exist '
print fl
runsuccess=False
#if temppath!=runpath:
# print os.system('rm -rff '+temppath)
else:
tr=os.system('tar cvzf '+fjf+'.tar.gz '+fjf+'* --remove-files')
if tr:
print os.system('rm -f '+fjf+'.tar.gz ')
tr=os.system('tar cvzf '+fjf+'.tar.gz '+fjf+'* --remove-files')
if tr:
print os.system('rm -f '+fjf+'*')
print 'FatalError: can not tar the result file, disk full?'
runsuccess=False
else:
if temppath!=runpath:
cr=os.system('mv '+fjf+'.tar.gz '+runpath)
if cr:
cr=os.system('mv '+fjf+'.tar.gz '+runpath)
if cr:
cr=os.system('mv '+fjf+'.tar.gz'+runpath)