Skip to content

Commit 1553266

Browse files
committed
Remove arcwrapper and associated logic.
1 parent 2f70f9c commit 1553266

File tree

14 files changed

+3
-2653
lines changed

14 files changed

+3
-2653
lines changed

mig/cgi-bin/arcresources.py

Lines changed: 0 additions & 36 deletions
This file was deleted.

mig/server/grid_script.py

Lines changed: 1 addition & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
from mig.shared.gridscript import clean_grid_stdin, \
5252
remove_jobrequest_pending_files, check_mrsl_files, requeue_job, \
5353
server_cleanup, load_queue, save_queue, load_schedule_cache, \
54-
save_schedule_cache, arc_job_status, clean_arc_job
54+
save_schedule_cache
5555
from mig.shared.notification import notify_user_thread
5656
from mig.shared.resadm import atomic_resource_exe_restart, put_exe_pgid
5757
from mig.shared.vgrid import job_fits_res_vgrid, validated_vgrid_list
@@ -167,38 +167,6 @@ def time_out_jobs(stop_event):
167167
send_message_to_grid_script(grid_script_msg,
168168
logger, configuration)
169169

170-
elif job['UNIQUE_RESOURCE_NAME'] == 'ARC':
171-
if not configuration.arc_clusters:
172-
logger.error('ARC backend disabled - ignore %s' %
173-
job)
174-
continue
175-
jobstatus = arc_job_status(job, configuration, logger)
176-
177-
# take action if the job is failed or killed.
178-
# No action for a finished job, since other
179-
# machinery will be at work to update it
180-
181-
if jobstatus in ['FINISHED', 'FAILED', 'KILLED']:
182-
logger.debug(
183-
'discovered %s job %s, clean it on the server'
184-
% (jobstatus, job['JOB_ID']))
185-
if jobstatus in ['FAILED', 'KILLED']:
186-
msg = '(failed inside ARC)'
187-
else:
188-
msg = None
189-
exec_job = executing_queue.dequeue_job_by_id(
190-
job['JOB_ID'])
191-
if exec_job:
192-
# job was still there, clean up here
193-
# (otherwise, someone else picked it up in
194-
# the meantime)
195-
clean_arc_job(exec_job, jobstatus, msg,
196-
configuration, logger, False)
197-
else:
198-
logger.debug(
199-
'Status %s for ARC job %s, no action required'
200-
% (jobstatus, job['JOB_ID']))
201-
202170
except Exception as err:
203171
logger.error('time_out_jobs: unexpected exception: %s' % err)
204172
logger.info('time_out_jobs: time out thread terminating')
@@ -446,66 +414,6 @@ def graceful_shutdown():
446414
dict_userjob['OWNER'] = user_id
447415
dict_userjob['MIGRATE_COUNT'] = "0"
448416

449-
# ARC jobs: directly submit, and put in executing_queue
450-
if dict_userjob['JOBTYPE'] == 'arc':
451-
if not configuration.arc_clusters:
452-
logger.error('ARC backend disabled - ignore %s' %
453-
dict_userjob)
454-
continue
455-
logger.debug('ARC Job')
456-
(arc_job, msg) = jobscriptgenerator.create_arc_job(
457-
dict_userjob, configuration, logger)
458-
if not arc_job:
459-
# something has gone wrong
460-
logger.error('Job NOT submitted (%s)' % msg)
461-
# discard this job (as FAILED, including message)
462-
# see gridscript::requeue_job for how to do this...
463-
464-
dict_userjob['STATUS'] = 'FAILED'
465-
dict_userjob['FAILED_TIMESTAMP'] = time.gmtime()
466-
# and create an execution history (basically empty)
467-
hist = (
468-
{'QUEUED_TIMESTAMP': dict_userjob['QUEUED_TIMESTAMP'],
469-
'EXECUTING_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'],
470-
'FAILED_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'],
471-
'FAILED_MESSAGE': ('ARC Submission failed: %s' % msg),
472-
'UNIQUE_RESOURCE_NAME': 'ARC', })
473-
dict_userjob['EXECUTION_HISTORY'] = [hist]
474-
475-
# should also notify the user (if requested)
476-
# not implented for this branch.
477-
478-
else:
479-
# all fine, job is now in some ARC queue
480-
logger.debug('Job submitted (%s,%s)' %
481-
(arc_job['SESSIONID'], arc_job['ARCID']))
482-
# set some job fields for job status retrieval, and
483-
# put in exec.queue for job status queries and timeout
484-
dict_userjob['SESSIONID'] = arc_job['SESSIONID']
485-
# abuse these two fields,
486-
# expected by timeout thread to be there anyway
487-
dict_userjob['UNIQUE_RESOURCE_NAME'] = 'ARC'
488-
dict_userjob['EXE'] = arc_job['ARCID']
489-
490-
# this one is used by the timeout thread as well
491-
# We put in a wild guess, 10 minutes. Perhaps not enough
492-
dict_userjob['EXECUTION_DELAY'] = 600
493-
494-
# set to executing even though it is kind-of wrong...
495-
dict_userjob['STATUS'] = 'EXECUTING'
496-
dict_userjob['EXECUTING_TIMESTAMP'] = time.gmtime()
497-
executing_queue.enqueue_job(dict_userjob,
498-
executing_queue.queue_length())
499-
500-
# Either way, save the job mrsl.
501-
# Status is EXECUTING or FAILED
502-
pickle(dict_userjob, file_userjob, logger)
503-
504-
# go on with scheduling loop (do not use scheduler magic below)
505-
continue
506-
507-
# following: non-ARC code
508-
509417
# put job in queue
510418

511419
job_queue.enqueue_job(dict_userjob, job_queue.queue_length())
@@ -1296,22 +1204,6 @@ def graceful_shutdown():
12961204
msg += \
12971205
', but job is being executed by %s:%s, ignoring result.'\
12981206
% (job_dict['UNIQUE_RESOURCE_NAME'], job_dict['EXE'])
1299-
elif job_dict['UNIQUE_RESOURCE_NAME'] == 'ARC':
1300-
if not configuration.arc_clusters:
1301-
logger.error('ARC backend disabled - ignore %s' %
1302-
job_dict)
1303-
continue
1304-
msg += (', which is an ARC job (ID %s).' % job_dict['EXE'])
1305-
1306-
# remove from the executing queue
1307-
executing_queue.dequeue_job_by_id(job_id)
1308-
1309-
# job status has been checked by put script already
1310-
# we need to clean up the job remainder (links, queue, and ARC
1311-
# side)
1312-
clean_arc_job(job_dict, 'FINISHED', None,
1313-
configuration, logger, False)
1314-
msg += 'ARC job completed'
13151207

13161208
else:
13171209

@@ -1455,26 +1347,6 @@ def graceful_shutdown():
14551347
'Cancel job: Could not get job_dict for executing job')
14561348
continue
14571349

1458-
# special treatment of ARC jobs: delete two links and cancel job
1459-
# in ARC
1460-
if unique_resource_name == 'ARC':
1461-
if not configuration.arc_clusters:
1462-
logger.error('ARC backend disabled - ignore %s' %
1463-
job_dict)
1464-
continue
1465-
1466-
# remove from the executing queue
1467-
executing_queue.dequeue_job_by_id(job_id)
1468-
1469-
# job status has been set by the cancel request already, but
1470-
# we need to kill the ARC job, or clean it (if already
1471-
# finished), and clean up the job remainder links
1472-
clean_arc_job(job_dict, 'CANCELED', None,
1473-
configuration, logger, True)
1474-
1475-
logger.debug('ARC job completed')
1476-
continue
1477-
14781350
if not server_cleanup(
14791351
job_dict['SESSIONID'],
14801352
job_dict['IOSESSIONID'],
@@ -1539,26 +1411,6 @@ def graceful_shutdown():
15391411

15401412
job_dict = executing_queue.get_job_by_id(jobid)
15411413

1542-
# special treatment of ARC jobs: delete two links and
1543-
# clean job in ARC system, do not retry.
1544-
if job_dict and unique_resource_name == 'ARC':
1545-
if not configuration.arc_clusters:
1546-
logger.error('ARC backend disabled - ignore %s' %
1547-
job_dict)
1548-
continue
1549-
1550-
# remove from the executing queue
1551-
executing_queue.dequeue_job_by_id(jobid)
1552-
1553-
# job status has been set by the cancel request already, but
1554-
# we need to kill the ARC job, or clean it (if already finished),
1555-
# and clean up the job remainder links
1556-
clean_arc_job(job_dict, 'FAILED', 'Job timed out',
1557-
configuration, logger, True)
1558-
1559-
logger.debug('ARC job timed out, removed')
1560-
continue
1561-
15621414
# Execution information is removed from job_dict in
15631415
# requeue_job - save here
15641416

mig/server/jobscriptgenerator.py

Lines changed: 0 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747

4848
try:
4949
from mig.shared import mrsltoxrsl
50-
from mig.shared import arcwrapper
5150
except Exception as exc:
5251
# Ignore errors and let it crash if ARC is enabled without the lib
5352
pass
@@ -441,130 +440,6 @@ def create_job_script(
441440
return (job_dict, 'OK')
442441

443442

444-
def create_arc_job(
445-
job,
446-
configuration,
447-
logger,
448-
):
449-
"""Analog to create_job_script for ARC jobs:
450-
Creates symLinks for receiving result files, translates job dict to ARC
451-
xrsl, and stores resulting job script (xrsl + sh script) for submitting.
452-
453-
We do _not_ create a separate job_dict with copies and SESSIONID inside,
454-
as opposed to create_job_script, all we need is the link from
455-
webserver_home / sessionID into the user's home directory
456-
("job_output/job['JOB_ID']" is added to the result upload URLs in the
457-
translation).
458-
459-
Returns message (ARC job ID if no error) and sessionid (None if error)
460-
"""
461-
462-
if not configuration.arc_clusters:
463-
return (None, 'No ARC support!')
464-
if not job['JOBTYPE'] == 'arc':
465-
return (None, 'Error. This is not an ARC job')
466-
467-
# Deep copy job for local changes
468-
job_dict = deepcopy(job)
469-
# Finally expand reserved job variables like +JOBID+ and +JOBNAME+
470-
job_dict = expand_variables(job_dict)
471-
# ... no more changes to job_dict from here on
472-
client_id = "%(USER_CERT)s" % job_dict
473-
474-
# we do not want to see empty jobs here. Test as done in create_job_script.
475-
if client_id == configuration.empty_job_name:
476-
return (None, 'Error. empty job for ARC?')
477-
478-
# generate random session ID:
479-
sessionid = hexlify(os.urandom(session_id_bytes))
480-
logger.debug('session ID (for creating links): %s' % sessionid)
481-
482-
client_dir = client_id_dir(client_id)
483-
484-
# make symbolic links inside webserver_home:
485-
#
486-
# we need: link to owner's dir. to receive results,
487-
# job mRSL inside sessid_to_mrsl_link_home
488-
linklist = [(configuration.user_home + client_dir,
489-
configuration.webserver_home + sessionid),
490-
(configuration.mrsl_files_dir + client_dir + '/' +
491-
"%(JOB_ID)s" % job_dict + '.mRSL',
492-
configuration.sessid_to_mrsl_link_home + sessionid + '.mRSL')
493-
]
494-
495-
for (dest, loc) in linklist:
496-
make_symlink(dest, loc, logger)
497-
498-
# the translation generates an xRSL object which specifies to execute
499-
# a shell script with script_name. If sessionid != None, results will
500-
# be uploaded to sid_redirect/sessionid/job_output/job_id
501-
502-
try:
503-
(xrsl, script, script_name) = mrsltoxrsl.translate(job_dict, sessionid)
504-
logger.debug('translated to xRSL: %s' % xrsl)
505-
logger.debug('script:\n %s' % script)
506-
507-
except Exception as err:
508-
# error during translation, pass a message
509-
logger.error('Error during xRSL translation: %s' % err.__str__())
510-
return (None, err.__str__())
511-
512-
# we submit directly from here (the other version above does
513-
# copyFileToResource and gen_job_script generates all files)
514-
515-
# we have to put the generated script somewhere..., and submit from there.
516-
# inputfiles are given by the user as relative paths from his home,
517-
# so we should use that location (and clean up afterwards).
518-
519-
# write script (to user home)
520-
user_home = os.path.join(configuration.user_home, client_dir)
521-
script_path = os.path.abspath(os.path.join(user_home, script_name))
522-
write_file(script, script_path, logger)
523-
524-
os.chdir(user_home)
525-
526-
try:
527-
logger.debug('submitting job to ARC')
528-
session = arcwrapper.Ui(user_home)
529-
arc_job_ids = session.submit(xrsl)
530-
531-
# if no exception occurred, we are done:
532-
533-
job_dict['ARCID'] = arc_job_ids[0]
534-
job_dict['SESSIONID'] = sessionid
535-
536-
msg = 'OK'
537-
result = job_dict
538-
539-
# when errors occurred, pass a message to the caller.
540-
except arcwrapper.ARCWrapperError as err:
541-
msg = err.what()
542-
result = None # unsuccessful
543-
except arcwrapper.NoProxyError as err:
544-
msg = 'No Proxy found: %s' % err.what()
545-
result = None # unsuccessful
546-
except Exception as err:
547-
msg = err.__str__()
548-
result = None # unsuccessful
549-
550-
# always remove the generated script
551-
os.remove(script_name)
552-
# and remove the created links immediately if failed
553-
if not result:
554-
for (_, link) in linklist:
555-
os.remove(link)
556-
logger.error('Unsuccessful ARC job submission: %s' % msg)
557-
else:
558-
logger.debug('submitted to ARC as job %s' % msg)
559-
return (result, msg)
560-
561-
# errors are handled inside grid_script. For ARC jobs, set status = FAILED
562-
# on errors, and include the message
563-
# One potential error is that the proxy is invalid,
564-
# which should be checked inside the parser, before informing
565-
# grid_script about the new job.
566-
567-
568443
def gen_job_script(
569444
job_dictionary,
570445
resource_config,

0 commit comments

Comments
 (0)