This repository was archived by the owner on Jan 8, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcaroniad
executable file
·1167 lines (1001 loc) · 47.3 KB
/
caroniad
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# Copyright 2008 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import pickle
import sys
import os
import time
import logging
import re
import threading
import signal
import getopt
import tarfile
import base64
from subprocess import *
from condorutils import SUCCESS, FAILURE
from condorutils.log import *
from condorutils.osutil import *
from condorutils.readconfig import *
from condorutils.socketutil import *
from condorutils.workfetch import *
from condorec2e.sqs import *
from condorec2e.region import *
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.utils import get_instance_userdata
class MainException(Exception):
def __init__(self, level, *msgs):
self.msgs = msgs
self.level = level
class MsgException(Exception):
def __init__(self, str):
self.msg = str
class ExitSignal(Exception):
def __init__(self, str):
self.msg = str
class work_data(object):
def __init__(self, msg, slot_num):
self.__SQS_msg__ = msg
self.__slot__ = slot_num
self.__access_time__ = time.time()
self.__access_lock__ = threading.Lock()
def lock(self, wait=True):
"""Acquires the lock controlling access to the data in the object"""
if wait == True:
self.__access_time__ = time.time()
return(self.__access_lock__.acquire(wait))
def unlock(self, wait=True):
"""Releases the lock controlling access to the data in the object"""
if wait == True:
self.__access_time__ = time.time()
self.__access_lock__.release()
def __set_SQS_msg__(self, msg):
self.__access_time__ = time.time()
self.__SQS_msg__ = msg
def __get_SQS_msg__(self):
self.__access_time__ = time.time()
return(self.__SQS_msg__)
SQS_msg = property(__get_SQS_msg__, __set_SQS_msg__)
def __set_slot__(self, slot_num):
self.__access_time__ = time.time()
self.__slot__ = slot_num
def __get_slot__(self):
return(self.__slot__)
slot = property(__get_slot__, __set_slot__)
def __get_access_time__(self):
return(self.__access_time__)
access_time = property(__get_access_time__)
class global_data(object):
def __init__(self):
self.__work_list__ = {}
self.__access_lock__ = threading.Lock()
self.__total_jobs_running__ = 0
def lock(self, wait=True):
"""Acquires the lock controlling access to the stored data"""
return(self.__access_lock__.acquire(wait))
def unlock(self):
"""Releases the lock controlling access to the stored data"""
self.__access_lock__.release()
def add_work(self, key, SQS_msg, slot):
"""Add work information to list of known work items. If work is already
in the queue then the new work will return 0, otherwise 1 is
returned. Raises a MsgExecption if the key already exists"""
work = work_data(SQS_msg, slot)
if self.__find_work__(key) == False:
if self.__total_jobs_running__ == 0:
self.__work_list__.update({key:work})
self.__total_jobs_running__ = self.__total_jobs_running__ + 1
else:
raise MsgExecption('Already running %s jobs' % self.__total_jobs_running__)
else:
raise MsgException('Key %s already exists.' % key)
def remove_work(self, key):
"""Remove work information from the list of known work items and
returns the work removed. The work removed will have its lock()
method called, so the caller of this method must unlock the work
item when finished. If the work with the specified key doesn't
exist, None is returned"""
if self.__find_work__(key) == True:
work = self.__work_list__[key]
del self.__work_list__[key]
self.__total_jobs_running__ = self.__total_jobs_running__ - 1
return(work)
else:
return(None)
def get_work(self, key):
"""Get work information from the list of known work items. The
work removed will have its lock() method called, so the caller
of this method must unlock the work item when finished. If the
work with the given key doesn't exist, None is returned"""
if self.__find_work__(key) == True:
work = self.__work_list__[key]
return(work)
else:
return(None)
def slot_in_use(self, slot_num):
"""Returns True if the given slot is currently processing work,
False otherwise"""
result = False
for work in self.__work_list__.values():
if work.slot == slot_num:
result = True
break
return(result)
def __find_work__(self, key):
"""Returns True if the desired key exists, False otherwise"""
value = self.__work_list__.has_key(key)
return(value)
def values(self):
"""Returns a list of work_data objects which contains all known
work information
Warning: This function will not prevent access to the list of
stored data due to its nature of returning a list of
all work_data objects currently stored. To ensure data
integrity, the caller must call the lock() and unlock()
methods itself """
return(self.__work_list__.values())
def get_total_jobs_running(self):
"""Returns the total number of jobs currently running"""
return(self.__total_jobs_running__)
def shutdown_sockets(sock1, sock2):
if sock1 != None:
try:
sock1.shutdown(socket.SHUT_RDWR)
except:
pass
sock1.close()
if sock2 != None:
try:
sock2.shutdown(socket.SHUT_RDWR)
except:
pass
sock2.close()
def time_monitor(msg_list, sock, mins):
"""Monitors how long the system has been running. If it is idle at
around 60 minute intervals, it will shut down the system"""
func_name = "time_monitor"
first_time = 1
interval = mins * 60
process = Popen(['cat', '/proc/uptime'], stdout=PIPE)
uptime = int(round(float(process.communicate()[0].strip().split()[0])))
time_from_next_hour = int(round(float((uptime % interval)/60)))
while True:
if first_time == 1:
first_time = 0
sleep_time = int(interval - ((time_from_next_hour+1)*60))
else:
sleep_time = interval
time.sleep(int(sleep_time))
if msg_list.get_total_jobs_running() < 1:
# No jobs are being processed, so close the socket and shutdown
shutdown_sockets(sock, None)
call(['shutdown', '-h', '-P', 'now'])
def lease_monitor(msg_list, max_lease_time, interval, sqs_con, queue, log_name):
"""Monitor all work for lease expiration. If a lease expired, the work
is released"""
func_name = "lease_monitor"
while True:
current_time = float(time.time())
log(logging.DEBUG, log_name, '%s: max_lease_time = %s' % (func_name, str(max_lease_time)))
log(logging.DEBUG, log_name, '%s: Lease check started' % func_name)
log(logging.DEBUG, log_name, '%s: acquiring list lock' % func_name)
msg_list.lock()
log(logging.DEBUG, log_name, '%s: acquired list lock' % func_name)
for item in msg_list.values():
log(logging.DEBUG, log_name, '%s: access time = %s' % (func_name, str(item.access_time)))
log(logging.DEBUG, log_name, '%s: current time = %s' % (func_name, str(current_time)))
if item.lock(False) == True:
if (float(item.access_time) + float(max_lease_time)) < current_time:
# No other thread is accessing this item and the lease has
# expired, so delete it from the list of known messages and
# release the lock on the SQS message
msg_id = item.SQS_msg.id
log(logging.DEBUG, log_name, '%s: Expiring %s' % (func_name, str(msg_id)))
msg_list.remove_work(msg_id)
# Release the message so it can be consumed by another process
sqs_con.change_message_visibility(queue, item.SQS_msg.receipt_handle, 5)
item.unlock(False)
log(logging.DEBUG, log_name, '%s: releasing list lock' % func_name)
msg_list.unlock()
log(logging.DEBUG, log_name, '%s: released list lock' % func_name)
time.sleep(int(interval))
def exit_signal_handler(signum, frame):
raise ExitSignal('Exit signal %s received' % signum)
def handle_get_work(req_socket, reply, sqs_con, queue, known_items, log_name):
"""Retrieve a message from an SQS queue and send it back to the
requesting client"""
func_name = "handle_get_work"
log(logging.INFO, log_name, '%s called' % (func_name))
remove_attribs = ['ec2origowner', 'iwd', 'userlog', 'sqsmessageid',
'wf_req_slot', 'isfeatched']
file_attribs = ['err', 'out', 'transferoutput']
# If a job is already being processed, do not attempt to process another
# request
if known_items.get_total_jobs_running() > 0:
log(logging.DEBUG, log_name, '%s: Already processing %s job(s). Ignoring request for more work' % (func_name, known_items.get_total_jobs_running()))
reply.data = ''
req_socket.send(pickle.dumps(reply, 2))
close_socket(req_socket)
return(SUCCESS)
# Figure out the SlotID that is requesting work, and don't get any
# more work if it is still processing work from a previous call
slots = grep('^SlotID\s*=\s*(.+)$', reply.data)
if slots == None:
log(logging.ERROR, log_name, 'Unable to determine SlotID for request.')
else:
slot = slots[0].strip()
log(logging.DEBUG, log_name, '%s: Checking if slot %s is doing work' % (func_name, str(slot)))
log(logging.DEBUG, log_name, '%s: Acquiring global message lock for slot check %s' % (func_name, str(slot)))
known_items.lock()
log(logging.DEBUG, log_name, '%s: Acquired global message lock for slot check %s' % (func_name, str(slot)))
if known_items.slot_in_use(slot) == True:
log(logging.DEBUG, log_name, '%s: slot %s is already doing work' % (func_name, str(slot)))
reply.data = ''
req_socket.send(pickle.dumps(reply, 2))
close_socket(req_socket)
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for slot check %s' % (func_name, str(slot)))
return(SUCCESS)
known_items.unlock()
log(logging.DEBUG, log_name, '%s: slot %s is not doing work' % (func_name, str(slot)))
log(logging.DEBUG, log_name, '%s: Released global message lock for slot check %s' % (func_name, str(slot)))
# Get the work off the SQS work queue if it exists
valid_message = 0
while valid_message == 0:
try:
q_msg = queue.read(10)
except:
log(logging.ERROR, log_name, 'Unable to get a job from from SQS')
return(FAILURE)
try:
msg = pickle.loads(q_msg.get_body())
valid_message = 1
except:
# This is an invalid message, so ignore it and look for others.
pass
if q_msg != None:
reply.data = msg.class_ad
job_run_time = grep('^JobLeaseDuration\s*=\s*(.+)$', reply.data)
if job_run_time != None:
sqs_con.change_message_visibility(queue, q_msg.receipt_handle, job_run_time[0])
else:
reply.data = ''
req_socket.send(pickle.dumps(reply, 2))
close_socket(req_socket)
return(SUCCESS)
# Remove any attributes we don't want and modify any entries that have
# file paths that don't exist on the system so that the files will be
# created in the execute directory
ad = ''
for line in reply.data.split('\n'):
match = grep('^([^=]*)\s*=\s*(.*)$', line)
if match != None and match[0] != None:
attr = match[0].strip().lower()
if attr in remove_attribs:
continue
if attr == 'owner' and match[1] != None:
ad += '%s = "condor"\n' % match[0].strip()
ad += 'EC2OrigOwner = %s\n' % match[1].strip()
continue
# Check the file paths
if match[1] != None and attr in file_attribs:
paths = grep ('^"(.+)"$', match[1])
if paths != None and paths[0] != None:
# We have a quoted string, so split on commas since there
# could be multiple files listed
add_line = match[0] + ' = "'
split = paths[0].split(',')
add_comma = 0
for file in split:
dir = os.path.dirname(file)
file_name = os.path.basename(file)
if add_comma == 0:
add_comma = 1
else:
add_line += ','
if dir == '' or os.path.exists(dir) == False:
# This file has no path or the path doesn't exist
add_line += file_name
else:
# File has a full path that exists on the system
add_line += file
ad += add_line + '"\n'
continue
ad += line + '\n'
reply.data = ad
# Add attributes to the ClassAd that is sent to the requesting client
msg_num = str(q_msg.id)
reply.data += 'SQSMessageId = "' + msg_num + '"\n'
reply.data += 'WF_REQ_SLOT = "' + slot + '"\n'
reply.data += 'IsFeatched = TRUE\n'
reply.data += 'HookKeyword = "EC2ENHANCED_JOB"\n'
# Preserve the work data that was processed so it can be
# deleted or released as needed
log(logging.DEBUG, log_name, '%s: Adding msg id %s to known items' % (func_name, msg_num))
known_items.lock()
try:
known_items.add_work(msg_num, q_msg, slot)
except MsgException, error:
# Already processing a job, so adding the job to the list of
# known work was rejected. Release the message and make sure the
# job won't be processed
log(logging.DEBUG, log_name, '%s: Adding job to list of work was rejected. Already processing %s job(s)' % (func_name, known_items.get_total_jobs_running()))
known_items.unlock()
sqs_con.change_message_visibility(queue, q_msg.receipt_handle, 5)
log(logging.ERROR, log_name, '%s: %s' % (func_name, error.msg))
return(FAILURE)
known_items.unlock()
log(logging.DEBUG, '%s: Released global message lock for %s' % (func_name, msg_num))
# Send the work to the requesting client
req_socket.send(pickle.dumps(reply, 2))
close_socket(req_socket)
return(SUCCESS)
def handle_reply_fetch(msg, sqs_con, work_q, status_q, known_items, log_name):
"""Send the data from a reply claim hook to a results SQS queue. Release
the lock on the receiving SQS queue in the case of a reject"""
func_name = "handle_reply_fetch"
log(logging.INFO, log_name, '%s called' % func_name)
remove_attribs = ['iwd', 'owner']
# Find the SQSMessageId in the message received
message_ids = grep('^SQSMessageId\s*=\s*"(.+)"$', msg.data)
if message_ids == None:
log(logging.ERROR, log_name, msg.data, '%s: Unable to find SQS Message ID in exit message' % func_name)
return(FAILURE)
else:
message_id = message_ids[0].strip()
log(logging.DEBUG, log_name, '%s: Acquiring global message lock for %s' % (func_name, str(message_id)))
known_items.lock()
log(logging.DEBUG, log_name, '%s: Acquired global message lock for %s' % (func_name, str(message_id)))
if msg.type == condor_wf_types.reply_claim_reject:
saved_work = known_items.remove_work(message_id)
else:
saved_work = known_items.get_work(message_id)
if saved_work == None:
# Couldn't find the SQS message that corresponds to the SQSMessageId
# in the exit message. This is bad and shouldn't happen.
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
log(logging.ERROR, log_name, '%s: Unable to find stored SQS message with SQSMessageId %s.' % (func_name, str(message_id)))
return(FAILURE)
else:
log(logging.DEBUG, log_name, '%s: Acquiring lock for %s' % (func_name, str(message_id)))
saved_work.lock()
log(logging.DEBUG, log_name, '%s: Acquired lock for %s' % (func_name, str(message_id)))
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
# Only want the job ClassAd and not the Slot Class Ad. Since
# the job classad is listed first followed by the Slot Class Ad
# and separated by a series of dashes (-), cycle through the data
# and look for a number of dashes and then quit. Also want
# to remove some attributes that shouldn't have updated data
# sent to the submitter.
result_ad = ''
orig_owner = ''
for line in msg.data.split('\n'):
match = grep('^([^=]*)\s*=\s*(.+)$', line)
if re.match('---', line) != None:
break
elif match != None and match[0] != None:
attr = match[0].strip().lower()
if attr in remove_attribs:
continue
elif match[0].strip().lower() == 'jobstatus':
if msg.type == condor_wf_types.reply_claim_reject:
result_ad += 'JobStatus = 1\n'
result_ad += 'EC2LastFailureReason = "The job was rejected"\n'
else:
result_ad += 'JobStatus = 2\n'
continue
elif attr == 'ec2origowner' and match[1] != None:
orig_owner = match[1].strip()
continue
if line != '':
result_ad += line + "\n"
result_ad += 'Owner = %s\n' % orig_owner
result_ad += 'EC2JobAttempted = True\n'
result_ad += 'EC2HookArg = %d\n' % msg.type
try:
response = pickle.loads(saved_work.SQS_msg.get_body())
except:
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
saved_work.unlock()
log(logging.ERROR, log_name, '%s: Failed to create response message for id "%s"' % (func_name, message_id))
return(FAILURE)
response.class_ad = result_ad
# Send the results to the appropriate SQS queue
try:
status_q.write(Message(body=pickle.dumps(response)))
except:
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
saved_work.unlock()
log(logging.ERROR, log_name, '%s: Error: Unable to write reply message to SQS' % func_name)
if msg.type == condor_wf_types.reply_claim_reject:
try:
# Reset the visibility timer so it can be read again quickly.
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
except:
log(logging.ERROR, log_name, '%s: Unable to reset visibility timer on "%s"' (func_name, str(message_id)))
log(logging.INFO, log_name, '%s: Work rejected %s' % (func_name, str(message_id)))
else:
log(logging.INFO, log_name, '%s: Work accepted %s' % (func_name, str(message_id)))
log(logging.DEBUG, log_name, '%s: Releasing lock on %s' % (func_name, str(message_id)))
saved_work.unlock()
return(SUCCESS)
def handle_prepare_job(req_socket, reply, s3_storage, sqs_con, queue, known_items, log_name):
"""Prepare the environment for the job. This includes accessing S3
for any data specific to the job and providing it to codor's
temporary execute directory."""
func_name = "handle_prepare_job"
log(logging.INFO, log_name, '%s called ' % func_name)
# Find the SQSMessageId in the message received
message_ids = grep('^SQSMessageId\s*=\s*"(.+)"$', reply.data)
if message_ids == None:
log(logging.ERROR, log_name, reply.data, '%s: Unable to find SQSMessageId in prepare job message' % func_name)
return(FAILURE)
else:
message_id = message_ids[0].strip()
# Find the Current Working Directory of the originating process
# in the message received
work_cwd = grep('^OriginatingCWD\s*=\s*"(.+)"$', reply.data)[0]
log(logging.DEBUG, log_name, '%s: Acquiring global message lock for %s' % (func_name, str(message_id)))
known_items.lock()
log(logging.DEBUG, log_name, '%s: Acquired global message lock for %s' % (func_name, str(message_id)))
saved_work = known_items.get_work(message_id)
if saved_work == None:
# Couldn't find the SQS message that corresponds to the SQSMessageId
# in the exit message. This is bad and shouldn't happen.
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
log(logging.ERROR, log_name, '%s: Unable to find stored SQS message with SQSMessageId %s' % (func_name, str(message_id)))
return(FAILURE)
else:
log(logging.DEBUG, log_name, '%s: Acquiring lock for %s' % (func_name, str(message_id)))
saved_work.lock()
log(logging.DEBUG, log_name, '%s: Acquired lock for %s' % (func_name, str(message_id)))
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
# If the S3 parameters are not None, then there is data to pull
# from S3 for this job
try:
msg = pickle.loads(saved_work.SQS_msg.get_body())
except:
sqs_con.change_message_visibility(queue, saved_work.SQS_msg.receipt_handle, 5)
known_items.remove_work(message_id)
saved_work.unlock()
log(logging.ERROR, log_name, '%s: Failed to retrieve job details for id "%s"' % (func_name, message_id))
return(FAILURE)
if msg.s3_bucket != None and msg.s3_key != None:
# Retrive the S3 key from the message
try:
s3_bucket = s3_storage.get_bucket(msg.s3_bucket)
s3_key = Key(s3_bucket)
except:
sqs_con.change_message_visibility(queue, saved_work.SQS_msg.receipt_handle, 5)
known_items.remove_work(message_id)
saved_work.unlock()
log(logging.ERROR, log_name, 'Error: Unable to access S3 to prepare job for execution')
return(FAILURE)
s3_key.key = msg.s3_key
# Retrieve the archived file from S3 and put it into the
# directory for the job
input_filename = work_cwd + '/data.tar.gz'
try:
s3_key.get_contents_to_filename(input_filename)
except:
sqs_con.change_message_visibility(queue, saved_work.SQS_msg.receipt_handle, 5)
known_items.remove_work(message_id)
saved_work.unlock()
log(logging.ERROR, log_name, 'Error: Unable to get job data from S3')
return(FAILURE)
reply.data = input_filename
else:
reply.data = ''
# Send the information about the archive file to the requester
req_socket.send(pickle.dumps(reply, 2))
close_socket(req_socket)
log(logging.DEBUG, log_name, '%s: Releasing lock on %s' % (func_name, str(message_id)))
saved_work.unlock()
return(SUCCESS)
def handle_update_job_status(msg, queue, known_items, log_name):
"""Send the job status update information to a results SQS queue."""
func_name = "handle_update_job_status"
log(logging.INFO, log_name, '%s called at %s' % (func_name, str(time.localtime())))
remove_attribs = ['iwd', 'owner']
# Find the SQSMessageId in the message received
message_ids = grep('^SQSMessageId\s*=\s*"(.+)"$', msg.data)
if message_ids == None:
log(logging.ERROR, log_name, msg.data, '%s: Unable to find SQSMessageId in exit message' % func_name)
return(FAILURE)
else:
message_id = message_ids[0].strip()
log(logging.DEBUG, log_name, '%s: Acquiring global message lock for %s' % (func_name, str(message_id)))
known_items.lock()
log(logging.DEBUG, log_name, '%s: Acquired global message lock for %s' % (func_name, str(message_id)))
saved_work = known_items.get_work(message_id)
if saved_work == None:
# Couldn't find the SQS message that corresponds to the SQSMessageId
# in the exit message. This is bad and shouldn't happen.
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
log(logging.ERROR, log_name, '%s: Unable to find stored SQS message with SQSMessageId %s' % (message_id, str(message_id)))
return(FAILURE)
else:
log(logging.DEBUG, log_name, '%s: Acquiring lock for %s' % (func_name, str(message_id)))
saved_work.lock()
log(logging.DEBUG, log_name, '%s: Acquired lock for %s' % (func_name, str(message_id)))
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
# Remove some attributes that shouldn't have updated data
# sent to the submitter. The Class Ad doesn't reflect the
# appropriate state, so change it to say the job is running(2).
result_ad = ''
for line in msg.data.split('\n'):
match = grep('^([^=]*)\s*=\s*(.+)$', line)
if match != None and match[0] != None:
attr = match[0].strip().lower()
if attr in remove_attribs:
continue
elif match[0].strip().lower() == 'jobstatus':
result_ad += 'JobStatus = 2\n'
continue
elif attr == 'ec2origowner' and match[1] != None:
orig_owner = match[1].strip()
continue
if line != '':
result_ad += line + "\n"
result_ad += 'Owner = %s\n' % orig_owner
result_ad += 'EC2HookArg = %d\n' % msg.type
try:
response = pickle.loads(saved_work.SQS_msg.get_body())
except:
saved_work.unlock()
log(logging.ERROR, log_name, '%s: Failed to retrieve job details for id "%s"' % (func_name, message_id))
return(FAILURE)
response.class_ad = result_ad
# Send the results to the appropriate SQS queue
try:
queue.write(Message(body=pickle.dumps(response)))
except:
saved_work.unlock()
log(logging.WARNING, log_name, '%s: Warning: Unable to write status message to SQS' % func_name)
log(logging.DEBUG, log_name, '%s: Releasing lock on %s' % (func_name, str(message_id)))
saved_work.unlock()
return(SUCCESS)
def handle_exit(req_socket, msg, s3_storage, sqs_con, work_q, results_q, known_items, log_name):
"""The job exited, so handle the reasoning appropriately. If the
job exited normally, then remove the work job from the SQS queue,
otherwise release the lock on the work. Always place the results
on the SQS results queue"""
func_name = "handle_exit"
log(logging.INFO, log_name, '%s called' % func_name)
file_list = []
remove_attribs = ['iwd', 'owner', 'jobfinishedhookdone', 'jobstatus']
global_id = ''
# Determine the slot that is reporting results
slots = grep('^WF_REQ_SLOT\s*=\s*"(.+)"$', msg.data)
if slots == None:
log(logging.WARNING, log_name, '%s: Unable to determine SlotID for results.' % func_name)
else:
# Verify the slot sending results is known to be in use. If not,
# somehow results have been sent from an unknown slot.
slot = slots[0].strip()
known_items.lock()
if known_items.slot_in_use(slot) == False:
log(logging.WARNING, log_name, '%s: Received exit message from unknown slot %s' % (func_name, slot))
known_items.unlock()
# Find the SQSMessageId in the message we received
message_ids = grep('^SQSMessageId\s*=\s*"(.+)"$', msg.data)
if message_ids == None:
log(logging.ERROR, log_name, msg.data, '%s: Unable to find SQSMessageId in exit message' % func_name)
return(FAILURE)
else:
message_id = message_ids[0].strip()
# Find the Current Working Directory of the originating process
# in the message received
work_cwd = grep('^OriginatingCWD\s*=\s*"(.+)"$', msg.data)[0]
# Retrieve the SQS message from the list of known messages so it
# can be acknowledged or released
log(logging.DEBUG, log_name, '%s: Acquiring global message lock for %s' % (func_name, str(message_id)))
known_items.lock()
log(logging.DEBUG, log_name, '%s: Acquired global message lock for %s' % (func_name, str(message_id)))
saved_work = known_items.remove_work(message_id)
if saved_work == None:
# Couldn't find the SQS message that corresponds to the SQSMessageId
# in the exit message. This is bad and shouldn't happen.
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
log(logging.ERROR, log_name, '%s: Unable to find stored SQS message with SQSMessageId %s. Message cannot be acknowledged nor results sent!' % (func_name, str(message_id)))
return(FAILURE)
else:
log(logging.DEBUG, log_name, '%s: Acquiring lock for %s' % (func_name, str(message_id)))
saved_work.lock()
log(logging.DEBUG, log_name, '%s: Acquired lock for %s' % (func_name, str(message_id)))
known_items.unlock()
log(logging.DEBUG, log_name, '%s: Released global message lock for %s' % (func_name, str(message_id)))
try:
# Retrieve the saved classad information for the
# results message.
results = pickle.loads(saved_work.SQS_msg.get_body())
except:
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
saved_work.unlock()
log(logging.WARNING, log_name, '%s: Unable to retrive job information for id %s' % (func_name, message_id))
# The Class Ad we have doesn't reflect the appropriate state, so
# change it to correct job state and remove attributes that
# shouldn't have updated data sent to the originater.
result_ad = ''
start_time = 0
run_time = 0
orig_owner = ''
for line in msg.data.split('\n'):
match = grep('^([^=]*)\s*=\s*(.+)$', line)
if match != None and match[0] != None:
attr = match[0].strip().lower()
if attr in remove_attribs:
continue
elif attr == 'ec2origowner' and match[1] != None:
orig_owner = match[1].strip()
continue
elif attr == 'jobstartdate' and match[1] != None:
start_time = int(match[1])
elif attr == 'jobduration' and match[1] != None:
run_time = int(round(float(match[1])))
if line != '':
result_ad += line + "\n"
result_ad += 'EC2HookArg = %d\n' % msg.type
result_ad += 'Owner = %s\n' % orig_owner
if msg.type == condor_wf_types.exit_exit:
# Job exited normally, so upload the sandbox to S3,
# and remove the message on the SQS work queue
log(logging.DEBUG, log_name, '%s: Normal exit' % func_name)
# Set job completed
result_ad += 'JobStatus = 4\n'
# Set the job completion time by adding together the start time and
# job duration from the class ad.
result_ad += 'JobFinishedHookDone = %s\n' % str(start_time + run_time)
try:
# If the S3 key is None, then there wasn't any input data for this
# job so we'll need to create a new key to store the data in.
# Otherwise, reuse the values from the message
s3_bucket = s3_storage.get_bucket(results.s3_bucket)
if results.s3_key != None:
s3_key = s3_bucket.get_key(results.s3_key)
else:
match = grep('^GlobalJobId\s*=\s*"(.*)"$', msg.data)
if match != None and match[0] != None:
global_id = match[0].replace('#', '').replace('@', '').replace('.', '')
aws_access_key = s3_storage.aws_access_key_id
s3_key = Key(s3_bucket)
s3_key.key = str(aws_access_key) + '-' + str(global_id)
results.s3_key = s3_key.key
result_ad += 'S3KeyID = "%s"\n' % s3_key.key
except:
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
saved_work.unlock()
log(logging.WARNING, log_name, '%s: Unable to create storage bucket in S3 for id %s' % (func_name, message_id))
# Create the list of files to put into the results archive
if os.path.exists(work_cwd) == False:
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
saved_work.unlock()
log(logging.ERROR, log_name, '%s: Unable to change to exe dir "%s" for id "%s"' % (func_name, work_cwd, str(message_id)))
return(FAILURE)
try:
os.chdir(work_cwd)
transfer_files = grep('^TransferOutput\s*=\s*"(.*)"$', msg.data)
if transfer_files != None and transfer_files[0] != None:
if os.path.exists('_condor_stderr') == True:
file_list.append('_condor_stderr')
else:
err_file = grep('^Err\s+=\s+"(.*)"$', msg.data)
if err_file != None and err_file[0] != None:
file_list.append(os.path.basename(err_file[0].strip()))
if os.path.exists('_condor_stdout') == True:
file_list.append('_condor_stdout')
else:
out_file = grep('^Out\s+=\s+"(.*)"$', msg.data)
if out_file != None and out_file[0] != None:
file_list.append(os.path.basename(out_file[0].strip()))
for file in transfer_files[0].split(','):
if file not in file_list:
file_list.append(os.path.basename(file.strip()))
else:
file_list = os.listdir(".")
except:
saved_work.unlock()
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
log(logging.ERROR, log_name, '%s: Failed to create list of files for results archive for id "%s"' % (func_name, str(message_id)))
return(FAILURE)
# Archive all the important files
results_filename = work_cwd + '/results.tar.gz'
try:
results_file = tarfile.open(results_filename, 'w:gz')
except:
saved_work.unlock()
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
log(logging.ERROR, log_name, '%s: Failed to open archive file for writing for id "%s"' % (func_name, str(message_id)))
return(FAILURE)
for fname in file_list:
f = fname.strip()
if os.path.exists(f):
# Only parse the dir/file path if it actually is a dir/file
# format (ie doesn't end in /)
if os.path.dirname(f) != '' and \
os.path.basename(f) != '' and \
os.path.exists(f) == True:
os.chdir(os.path.dirname(f))
try:
results_file.add(os.path.basename(f))
except:
saved_work.unlock()
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
log(logging.ERROR, log_name, '%s: Failed to write file "%s" to archive for id "%s"' % (func_name, f, str(message_id)))
return(FAILURE)
os.chdir(work_cwd)
else:
results_file.add(f)
try:
results_file.close()
except:
saved_work.unlock()
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
log(logging.ERROR, log_name, '%s: Failed to write archive to disk for id "%s"' % (func_name, str(message_id)))
return(FAILURE)
# Upload the sandbox to S3
try:
s3_key.set_contents_from_filename(results_filename)
except:
# Reset the visibility timer so it can be read again quickly.
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
known_items.remove_work(message_id)
saved_work.unlock()
log(logging.ERROR, log_name, 'Error: Unable to write results to S3')
return(FAILURE)
# Remove the message from the SQS queue
try:
work_q.delete_message(saved_work.SQS_msg)
except:
# Don't raise an exception here because the job has completed
# processing, but SQS is having issues. The system shutdown
# will prevent it from being run again
saved_work.unlock()
log(logging.ERROR, log_name, 'Error: Unable to delete SQS message')
else:
# Job didn't exit normally
log(logging.DEBUG, log_name, '%s: Not normal exit: %s' % (func_name, str(msg.type)))
# Set job idle
result_ad += 'JobStatus = 1\n'
# Add a FailureReson
result_ad += 'EC2LastFailureReason = "HOOK_JOB_EXIT was called with a non-successful exit type (%d)"\n' % msg.type
# Reset the visibility timer so it can be read again quickly.
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
results.class_ad = result_ad
# Send the results to the appropriate SQS queue
try:
results_q.write(Message(body=pickle.dumps(results)))
except:
# Reset the visibility timer so it can be read again quickly.
sqs_con.change_message_visibility(work_q, saved_work.SQS_msg.receipt_handle, 5)
saved_work.unlock()
log(logging.ERROR, log_name, 'Error: Unable to write job results message to SQS')
return(FAILURE)
saved_work.unlock()
if msg.type == condor_wf_types.exit_exit:
# Check if a shutdown delay was defined
delay = grep('^amazonamishutdowndelay\s*=\s*(.*)$', result_ad)
if delay != None and delay[0] != None:
time.sleep(int(delay[0]) * 60)
# Shutdown now that the job has finished
call(['shutdown', '-h', '-P', 'now'])
# Send acknowledgement to the originator that exit work is complete
req_socket.send('Completed')
close_socket(req_socket)
return(SUCCESS)
def main(argv=None):
if argv is None:
argv = sys.argv
listen_socket = None
sock = None
private_key_file = '/root/.ec2/rsa_key'
pidfile = ''
daemon_mode = 0
share_data = None
conf_file = '/etc/condor/caroniad.conf'
log_name = os.path.basename(argv[0])
debug_logging = False
try:
# Configure the logging system
try:
file = read_condor_config('CARONIAD', ['LOG'], permit_param_only = False)
except ConfigError, error:
try:
file = read_condor_config('EC2E_DAEMON', ['LOG'], permit_param_only = False)
except:
print 'Error: %s. Exiting' % error.msg
return(FAILURE)
try:
size = int(read_condor_config('', ['MAX_CARONIAD_LOG'])['max_caroniad_log'])
except:
try:
size = int(read_condor_config('', ['MAX_EC2E_DAEMON_LOG'])['max_ec2e_daemon_log'])
except:
size = 1000000
try:
opts, args = getopt.getopt(argv[1:], 'dlhp:', ['daemon', 'logdebug', 'help', 'pidfile='])
except getopt.GetoptError, error:
print str(error)
return(FAILURE)
for option, arg in opts:
if option in ('-d', '--daemon'):
daemon_mode = 1
if option in ('-l', '--logdebug'):
debug_logging = True
if option in ('-h', '--help'):
print 'usage: ' + os.path.basename(argv[0]) + ' [-d|--daemon] [-l|--logdebug] [-h|--help] [-p|--pidfile= <pidfile>]'
return(SUCCESS)
if option in ('-p', '--pidfile'):
pidfile = arg
if debug_logging == True:
base_logger = create_file_logger(log_name, file['log'], logging.DEBUG, size=size)
add_debug_console(base_logger)
else:
base_logger = create_file_logger(log_name, file['log'], logging.INFO, size=size)
# Read the user data
user_data = get_instance_userdata()
if user_data == None:
# Send message about startup failure
raise MainException(logging.ERROR, "No User Data")
else: