Skip to content

Commit 182c8ea

Browse files
albu-dikujonasbardino
authored andcommitted
Remove arcwrapper and associated logic.
1 parent d5b2230 commit 182c8ea

File tree

14 files changed

+3
-2659
lines changed

14 files changed

+3
-2659
lines changed

mig/cgi-bin/arcresources.py

Lines changed: 0 additions & 36 deletions
This file was deleted.

mig/server/grid_script.py

Lines changed: 1 addition & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
from mig.shared.gridscript import clean_grid_stdin, \
5353
remove_jobrequest_pending_files, check_mrsl_files, requeue_job, \
5454
server_cleanup, load_queue, save_queue, load_schedule_cache, \
55-
save_schedule_cache, arc_job_status, clean_arc_job
55+
save_schedule_cache
5656
from mig.shared.notification import notify_user_thread
5757
from mig.shared.resadm import atomic_resource_exe_restart, put_exe_pgid
5858
from mig.shared.vgrid import job_fits_res_vgrid, validated_vgrid_list
@@ -168,38 +168,6 @@ def time_out_jobs(stop_event):
168168
send_message_to_grid_script(grid_script_msg,
169169
logger, configuration)
170170

171-
elif job['UNIQUE_RESOURCE_NAME'] == 'ARC':
172-
if not configuration.arc_clusters:
173-
logger.error('ARC backend disabled - ignore %s' %
174-
job)
175-
continue
176-
jobstatus = arc_job_status(job, configuration, logger)
177-
178-
# take action if the job is failed or killed.
179-
# No action for a finished job, since other
180-
# machinery will be at work to update it
181-
182-
if jobstatus in ['FINISHED', 'FAILED', 'KILLED']:
183-
logger.debug(
184-
'discovered %s job %s, clean it on the server'
185-
% (jobstatus, job['JOB_ID']))
186-
if jobstatus in ['FAILED', 'KILLED']:
187-
msg = '(failed inside ARC)'
188-
else:
189-
msg = None
190-
exec_job = executing_queue.dequeue_job_by_id(
191-
job['JOB_ID'])
192-
if exec_job:
193-
# job was still there, clean up here
194-
# (otherwise, someone else picked it up in
195-
# the meantime)
196-
clean_arc_job(exec_job, jobstatus, msg,
197-
configuration, logger, False)
198-
else:
199-
logger.debug(
200-
'Status %s for ARC job %s, no action required'
201-
% (jobstatus, job['JOB_ID']))
202-
203171
except Exception as err:
204172
logger.error('time_out_jobs: unexpected exception: %s' % err)
205173
logger.info('time_out_jobs: time out thread terminating')
@@ -447,66 +415,6 @@ def graceful_shutdown():
447415
dict_userjob['OWNER'] = user_id
448416
dict_userjob['MIGRATE_COUNT'] = "0"
449417

450-
# ARC jobs: directly submit, and put in executing_queue
451-
if dict_userjob['JOBTYPE'] == 'arc':
452-
if not configuration.arc_clusters:
453-
logger.error('ARC backend disabled - ignore %s' %
454-
dict_userjob)
455-
continue
456-
logger.debug('ARC Job')
457-
(arc_job, msg) = jobscriptgenerator.create_arc_job(
458-
dict_userjob, configuration, logger)
459-
if not arc_job:
460-
# something has gone wrong
461-
logger.error('Job NOT submitted (%s)' % msg)
462-
# discard this job (as FAILED, including message)
463-
# see gridscript::requeue_job for how to do this...
464-
465-
dict_userjob['STATUS'] = 'FAILED'
466-
dict_userjob['FAILED_TIMESTAMP'] = time.gmtime()
467-
# and create an execution history (basically empty)
468-
hist = (
469-
{'QUEUED_TIMESTAMP': dict_userjob['QUEUED_TIMESTAMP'],
470-
'EXECUTING_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'],
471-
'FAILED_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'],
472-
'FAILED_MESSAGE': ('ARC Submission failed: %s' % msg),
473-
'UNIQUE_RESOURCE_NAME': 'ARC', })
474-
dict_userjob['EXECUTION_HISTORY'] = [hist]
475-
476-
# should also notify the user (if requested)
477-
# not implented for this branch.
478-
479-
else:
480-
# all fine, job is now in some ARC queue
481-
logger.debug('Job submitted (%s,%s)' %
482-
(arc_job['SESSIONID'], arc_job['ARCID']))
483-
# set some job fields for job status retrieval, and
484-
# put in exec.queue for job status queries and timeout
485-
dict_userjob['SESSIONID'] = arc_job['SESSIONID']
486-
# abuse these two fields,
487-
# expected by timeout thread to be there anyway
488-
dict_userjob['UNIQUE_RESOURCE_NAME'] = 'ARC'
489-
dict_userjob['EXE'] = arc_job['ARCID']
490-
491-
# this one is used by the timeout thread as well
492-
# We put in a wild guess, 10 minutes. Perhaps not enough
493-
dict_userjob['EXECUTION_DELAY'] = 600
494-
495-
# set to executing even though it is kind-of wrong...
496-
dict_userjob['STATUS'] = 'EXECUTING'
497-
dict_userjob['EXECUTING_TIMESTAMP'] = time.gmtime()
498-
executing_queue.enqueue_job(dict_userjob,
499-
executing_queue.queue_length())
500-
501-
# Either way, save the job mrsl.
502-
# Status is EXECUTING or FAILED
503-
pickle(dict_userjob, file_userjob, logger)
504-
505-
# go on with scheduling loop (do not use scheduler magic below)
506-
continue
507-
508-
# following: non-ARC code
509-
510418
# put job in queue
511419

512420
job_queue.enqueue_job(dict_userjob, job_queue.queue_length())
@@ -1296,22 +1204,6 @@ def graceful_shutdown():
12961204
msg += \
12971205
', but job is being executed by %s:%s, ignoring result.'\
12981206
% (job_dict['UNIQUE_RESOURCE_NAME'], job_dict['EXE'])
1299-
elif job_dict['UNIQUE_RESOURCE_NAME'] == 'ARC':
1300-
if not configuration.arc_clusters:
1301-
logger.error('ARC backend disabled - ignore %s' %
1302-
job_dict)
1303-
continue
1304-
msg += (', which is an ARC job (ID %s).' % job_dict['EXE'])
1305-
1306-
# remove from the executing queue
1307-
executing_queue.dequeue_job_by_id(job_id)
1308-
1309-
# job status has been checked by put script already
1310-
# we need to clean up the job remainder (links, queue, and ARC
1311-
# side)
1312-
clean_arc_job(job_dict, 'FINISHED', None,
1313-
configuration, logger, False)
1314-
msg += 'ARC job completed'
13151207

13161208
else:
13171209

@@ -1455,26 +1347,6 @@ def graceful_shutdown():
14551347
'Cancel job: Could not get job_dict for executing job')
14561348
continue
14571349

1458-
# special treatment of ARC jobs: delete two links and cancel job
1459-
# in ARC
1460-
if unique_resource_name == 'ARC':
1461-
if not configuration.arc_clusters:
1462-
logger.error('ARC backend disabled - ignore %s' %
1463-
job_dict)
1464-
continue
1465-
1466-
# remove from the executing queue
1467-
executing_queue.dequeue_job_by_id(job_id)
1468-
1469-
# job status has been set by the cancel request already, but
1470-
# we need to kill the ARC job, or clean it (if already
1471-
# finished), and clean up the job remainder links
1472-
clean_arc_job(job_dict, 'CANCELED', None,
1473-
configuration, logger, True)
1474-
1475-
logger.debug('ARC job completed')
1476-
continue
1477-
14781350
if not server_cleanup(
14791351
job_dict['SESSIONID'],
14801352
job_dict['IOSESSIONID'],
@@ -1539,26 +1411,6 @@ def graceful_shutdown():
15391411

15401412
job_dict = executing_queue.get_job_by_id(jobid)
15411413

1542-
# special treatment of ARC jobs: delete two links and
1543-
# clean job in ARC system, do not retry.
1544-
if job_dict and unique_resource_name == 'ARC':
1545-
if not configuration.arc_clusters:
1546-
logger.error('ARC backend disabled - ignore %s' %
1547-
job_dict)
1548-
continue
1549-
1550-
# remove from the executing queue
1551-
executing_queue.dequeue_job_by_id(jobid)
1552-
1553-
# job status has been set by the cancel request already, but
1554-
# we need to kill the ARC job, or clean it (if already finished),
1555-
# and clean up the job remainder links
1556-
clean_arc_job(job_dict, 'FAILED', 'Job timed out',
1557-
configuration, logger, True)
1558-
1559-
logger.debug('ARC job timed out, removed')
1560-
continue
1561-
15621414
# Execution information is removed from job_dict in
15631415
# requeue_job - save here
15641416

mig/server/jobscriptgenerator.py

Lines changed: 0 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,6 @@
4545
from mig.shared.mrslparser import expand_variables
4646
from mig.shared.ssh import copy_file_to_resource, generate_ssh_rsa_key_pair
4747

48-
try:
49-
from mig.shared import mrsltoxrsl
50-
from mig.shared import arcwrapper
51-
except Exception as exc:
52-
# Ignore errors and let it crash if ARC is enabled without the lib
53-
pass
54-
5548

5649
def create_empty_job(
5750
unique_resource_name,
@@ -441,130 +434,6 @@ def create_job_script(
441434
return (job_dict, 'OK')
442435

443436

444-
def create_arc_job(
445-
job,
446-
configuration,
447-
logger,
448-
):
449-
"""Analog to create_job_script for ARC jobs:
450-
Creates symLinks for receiving result files, translates job dict to ARC
451-
xrsl, and stores resulting job script (xrsl + sh script) for submitting.
452-
453-
We do _not_ create a separate job_dict with copies and SESSIONID inside,
454-
as opposed to create_job_script, all we need is the link from
455-
webserver_home / sessionID into the user's home directory
456-
("job_output/job['JOB_ID']" is added to the result upload URLs in the
457-
translation).
458-
459-
Returns message (ARC job ID if no error) and sessionid (None if error)
460-
"""
461-
462-
if not configuration.arc_clusters:
463-
return (None, 'No ARC support!')
464-
if not job['JOBTYPE'] == 'arc':
465-
return (None, 'Error. This is not an ARC job')
466-
467-
# Deep copy job for local changes
468-
job_dict = deepcopy(job)
469-
# Finally expand reserved job variables like +JOBID+ and +JOBNAME+
470-
job_dict = expand_variables(job_dict)
471-
# ... no more changes to job_dict from here on
472-
client_id = "%(USER_CERT)s" % job_dict
473-
474-
# we do not want to see empty jobs here. Test as done in create_job_script.
475-
if client_id == configuration.empty_job_name:
476-
return (None, 'Error. empty job for ARC?')
477-
478-
# generate random session ID:
479-
sessionid = hexlify(os.urandom(session_id_bytes))
480-
logger.debug('session ID (for creating links): %s' % sessionid)
481-
482-
client_dir = client_id_dir(client_id)
483-
484-
# make symbolic links inside webserver_home:
485-
#
486-
# we need: link to owner's dir. to receive results,
487-
# job mRSL inside sessid_to_mrsl_link_home
488-
linklist = [(configuration.user_home + client_dir,
489-
configuration.webserver_home + sessionid),
490-
(configuration.mrsl_files_dir + client_dir + '/' +
491-
"%(JOB_ID)s" % job_dict + '.mRSL',
492-
configuration.sessid_to_mrsl_link_home + sessionid + '.mRSL')
493-
]
494-
495-
for (dest, loc) in linklist:
496-
make_symlink(dest, loc, logger)
497-
498-
# the translation generates an xRSL object which specifies to execute
499-
# a shell script with script_name. If sessionid != None, results will
500-
# be uploaded to sid_redirect/sessionid/job_output/job_id
501-
502-
try:
503-
(xrsl, script, script_name) = mrsltoxrsl.translate(job_dict, sessionid)
504-
logger.debug('translated to xRSL: %s' % xrsl)
505-
logger.debug('script:\n %s' % script)
506-
507-
except Exception as err:
508-
# error during translation, pass a message
509-
logger.error('Error during xRSL translation: %s' % err.__str__())
510-
return (None, err.__str__())
511-
512-
# we submit directly from here (the other version above does
513-
# copyFileToResource and gen_job_script generates all files)
514-
515-
# we have to put the generated script somewhere..., and submit from there.
516-
# inputfiles are given by the user as relative paths from his home,
517-
# so we should use that location (and clean up afterwards).
518-
519-
# write script (to user home)
520-
user_home = os.path.join(configuration.user_home, client_dir)
521-
script_path = os.path.abspath(os.path.join(user_home, script_name))
522-
write_file(script, script_path, logger)
523-
524-
os.chdir(user_home)
525-
526-
try:
527-
logger.debug('submitting job to ARC')
528-
session = arcwrapper.Ui(user_home)
529-
arc_job_ids = session.submit(xrsl)
530-
531-
# if no exception occurred, we are done:
532-
533-
job_dict['ARCID'] = arc_job_ids[0]
534-
job_dict['SESSIONID'] = sessionid
535-
536-
msg = 'OK'
537-
result = job_dict
538-
539-
# when errors occurred, pass a message to the caller.
540-
except arcwrapper.ARCWrapperError as err:
541-
msg = err.what()
542-
result = None # unsuccessful
543-
except arcwrapper.NoProxyError as err:
544-
msg = 'No Proxy found: %s' % err.what()
545-
result = None # unsuccessful
546-
except Exception as err:
547-
msg = err.__str__()
548-
result = None # unsuccessful
549-
550-
# always remove the generated script
551-
os.remove(script_name)
552-
# and remove the created links immediately if failed
553-
if not result:
554-
for (_, link) in linklist:
555-
os.remove(link)
556-
logger.error('Unsuccessful ARC job submission: %s' % msg)
557-
else:
558-
logger.debug('submitted to ARC as job %s' % msg)
559-
return (result, msg)
560-
561-
# errors are handled inside grid_script. For ARC jobs, set status = FAILED
562-
# on errors, and include the message
563-
# One potential error is that the proxy is invalid,
564-
# which should be checked inside the parser, before informing
565-
# grid_script about the new job.
566-
567-
568437
def gen_job_script(
569438
job_dictionary,
570439
resource_config,

0 commit comments

Comments
 (0)