Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions mig/cgi-bin/arcresources.py

This file was deleted.

150 changes: 1 addition & 149 deletions mig/server/grid_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from mig.shared.gridscript import clean_grid_stdin, \
remove_jobrequest_pending_files, check_mrsl_files, requeue_job, \
server_cleanup, load_queue, save_queue, load_schedule_cache, \
save_schedule_cache, arc_job_status, clean_arc_job
save_schedule_cache
from mig.shared.notification import notify_user_thread
from mig.shared.resadm import atomic_resource_exe_restart, put_exe_pgid
from mig.shared.vgrid import job_fits_res_vgrid, validated_vgrid_list
Expand Down Expand Up @@ -164,42 +164,10 @@
% (job['JOB_ID'], total_cputime, delay))
grid_script_msg = 'JOBTIMEOUT %s %s %s\n'\
% (job['UNIQUE_RESOURCE_NAME'], job['EXE'
], job['JOB_ID'])

Check warning on line 167 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (81 > 80 characters)
send_message_to_grid_script(grid_script_msg,
logger, configuration)

elif job['UNIQUE_RESOURCE_NAME'] == 'ARC':
if not configuration.arc_clusters:
logger.error('ARC backend disabled - ignore %s' %
job)
continue
jobstatus = arc_job_status(job, configuration, logger)

# take action if the job is failed or killed.
# No action for a finished job, since other
# machinery will be at work to update it

if jobstatus in ['FINISHED', 'FAILED', 'KILLED']:
logger.debug(
'discovered %s job %s, clean it on the server'
% (jobstatus, job['JOB_ID']))
if jobstatus in ['FAILED', 'KILLED']:
msg = '(failed inside ARC)'
else:
msg = None
exec_job = executing_queue.dequeue_job_by_id(
job['JOB_ID'])
if exec_job:
# job was still there, clean up here
# (otherwise, someone else picked it up in
# the meantime)
clean_arc_job(exec_job, jobstatus, msg,
configuration, logger, False)
else:
logger.debug(
'Status %s for ARC job %s, no action required'
% (jobstatus, job['JOB_ID']))

except Exception as err:
logger.error('time_out_jobs: unexpected exception: %s' % err)
logger.info('time_out_jobs: time out thread terminating')
Expand Down Expand Up @@ -311,29 +279,29 @@
scheduler = FirstFitScheduler(logger, configuration)
elif configuration.sched_alg == 'BestFit':
from mig.server.bestfitscheduler import BestFitScheduler
scheduler = BestFitScheduler(logger, configuration)

Check failure on line 282 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Incompatible types in assignment (expression has type "BestFitScheduler", variable has type "FirstFitScheduler | None") [assignment]
elif configuration.sched_alg == 'FairFit':
from mig.server.fairfitscheduler import FairFitScheduler
scheduler = FairFitScheduler(logger, configuration)

Check failure on line 285 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Incompatible types in assignment (expression has type "FairFitScheduler", variable has type "FirstFitScheduler | None") [assignment]
elif configuration.sched_alg == 'MaxThroughput':
from mig.server.maxthroughputscheduler import MaxThroughputScheduler
scheduler = MaxThroughputScheduler(logger, configuration)

Check failure on line 288 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Incompatible types in assignment (expression has type "MaxThroughputScheduler", variable has type "FirstFitScheduler | None") [assignment]
elif configuration.sched_alg == 'Random':
from mig.server.randomscheduler import RandomScheduler
scheduler = RandomScheduler(logger, configuration)

Check failure on line 291 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Incompatible types in assignment (expression has type "RandomScheduler", variable has type "FirstFitScheduler | None") [assignment]
elif configuration.sched_alg == 'FIFO':
from mig.server.fifoscheduler import FIFOScheduler
scheduler = FIFOScheduler(logger, configuration)

Check failure on line 294 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Incompatible types in assignment (expression has type "FIFOScheduler", variable has type "FirstFitScheduler | None") [assignment]
else:
from mig.server.firstfitscheduler import FirstFitScheduler
print('Unknown sched_alg %s - using FirstFit scheduler'
% configuration.sched_alg)
scheduler = FirstFitScheduler(logger, configuration)

scheduler.attach_job_queue(job_queue)

Check failure on line 301 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Item "None" of "FirstFitScheduler | None" has no attribute "attach_job_queue" [union-attr]
scheduler.attach_done_queue(done_queue)

Check failure on line 302 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Item "None" of "FirstFitScheduler | None" has no attribute "attach_done_queue" [union-attr]
if schedule_cache:
scheduler.set_cache(schedule_cache)

Check failure on line 304 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Item "None" of "FirstFitScheduler | None" has no attribute "set_cache" [union-attr]

# redirect grid_stdin to sys.stdin

Expand Down Expand Up @@ -447,66 +415,6 @@
dict_userjob['OWNER'] = user_id
dict_userjob['MIGRATE_COUNT'] = "0"

# ARC jobs: directly submit, and put in executing_queue
if dict_userjob['JOBTYPE'] == 'arc':
if not configuration.arc_clusters:
logger.error('ARC backend disabled - ignore %s' %
dict_userjob)
continue
logger.debug('ARC Job')
(arc_job, msg) = jobscriptgenerator.create_arc_job(
dict_userjob, configuration, logger)
if not arc_job:
# something has gone wrong
logger.error('Job NOT submitted (%s)' % msg)
# discard this job (as FAILED, including message)
# see gridscript::requeue_job for how to do this...

dict_userjob['STATUS'] = 'FAILED'
dict_userjob['FAILED_TIMESTAMP'] = time.gmtime()
# and create an execution history (basically empty)
hist = (
{'QUEUED_TIMESTAMP': dict_userjob['QUEUED_TIMESTAMP'],
'EXECUTING_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'],
'FAILED_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'],
'FAILED_MESSAGE': ('ARC Submission failed: %s' % msg),
'UNIQUE_RESOURCE_NAME': 'ARC', })
dict_userjob['EXECUTION_HISTORY'] = [hist]

# should also notify the user (if requested)
# not implented for this branch.

else:
# all fine, job is now in some ARC queue
logger.debug('Job submitted (%s,%s)' %
(arc_job['SESSIONID'], arc_job['ARCID']))
# set some job fields for job status retrieval, and
# put in exec.queue for job status queries and timeout
dict_userjob['SESSIONID'] = arc_job['SESSIONID']
# abuse these two fields,
# expected by timeout thread to be there anyway
dict_userjob['UNIQUE_RESOURCE_NAME'] = 'ARC'
dict_userjob['EXE'] = arc_job['ARCID']

# this one is used by the timeout thread as well
# We put in a wild guess, 10 minutes. Perhaps not enough
dict_userjob['EXECUTION_DELAY'] = 600

# set to executing even though it is kind-of wrong...
dict_userjob['STATUS'] = 'EXECUTING'
dict_userjob['EXECUTING_TIMESTAMP'] = time.gmtime()
executing_queue.enqueue_job(dict_userjob,
executing_queue.queue_length())

# Either way, save the job mrsl.
# Status is EXECUTING or FAILED
pickle(dict_userjob, file_userjob, logger)

# go on with scheduling loop (do not use scheduler magic below)
continue

# following: non-ARC code

# put job in queue

job_queue.enqueue_job(dict_userjob, job_queue.queue_length())
Expand All @@ -516,8 +424,8 @@

# Update list of users - create user if new

scheduler.update_users(user_dict)

Check failure on line 427 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Item "None" of "FirstFitScheduler | None" has no attribute "update_users" [union-attr]
user_dict = scheduler.find_user(user_dict)

Check failure on line 428 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Item "None" of "FirstFitScheduler | None" has no attribute "find_user" [union-attr]
user_dict['QUEUE_HIST'].pop(0)
user_dict['QUEUE_HIST'].append(dict_userjob)
scheduler.update_seen(user_dict)
Expand Down Expand Up @@ -1060,8 +968,8 @@
mrsl_dict['LOCALJOBNAME'] = localjobname
mrsl_dict['SESSIONID'] = new_job['SESSIONID']
mrsl_dict['IOSESSIONID'] = new_job['IOSESSIONID']
mrsl_dict['MOUNTSSHPUBLICKEY'] = new_job['MOUNTSSHPUBLICKEY']

Check warning on line 971 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (85 > 80 characters)
mrsl_dict['MOUNTSSHPRIVATEKEY'] = new_job['MOUNTSSHPRIVATEKEY']

Check warning on line 972 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (87 > 80 characters)

# pickle the new version

Expand Down Expand Up @@ -1131,7 +1039,7 @@
active_job[name] = new_job[name]

executing_queue.enqueue_job(active_job,
executing_queue.queue_length())

Check warning on line 1042 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (83 > 80 characters)

print('executing_queue length %d'
% executing_queue.queue_length())
Expand Down Expand Up @@ -1221,7 +1129,7 @@
pickle(last_request_dict, monitor_last_request_file,
logger)
logger.info('vgrid_name: %s status: %s' % (vgrid_name,
last_request_dict['STATUS']))

Check warning on line 1132 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (88 > 80 characters)
elif vgrid_name in executing_in_other_vgrids:

# create modified last_request_dict and save
Expand All @@ -1230,7 +1138,7 @@
new_last_request_dict['STATUS'] = \
'Executing job for another vgrid'
logger.info('vgrid_name: %s status: %s' % (vgrid_name,
new_last_request_dict['STATUS']))

Check warning on line 1141 in mig/server/grid_script.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (92 > 80 characters)
pickle(new_last_request_dict,
monitor_last_request_file, logger)
else:
Expand Down Expand Up @@ -1296,22 +1204,6 @@
msg += \
', but job is being executed by %s:%s, ignoring result.'\
% (job_dict['UNIQUE_RESOURCE_NAME'], job_dict['EXE'])
elif job_dict['UNIQUE_RESOURCE_NAME'] == 'ARC':
if not configuration.arc_clusters:
logger.error('ARC backend disabled - ignore %s' %
job_dict)
continue
msg += (', which is an ARC job (ID %s).' % job_dict['EXE'])

# remove from the executing queue
executing_queue.dequeue_job_by_id(job_id)

# job status has been checked by put script already
# we need to clean up the job remainder (links, queue, and ARC
# side)
clean_arc_job(job_dict, 'FINISHED', None,
configuration, logger, False)
msg += 'ARC job completed'

else:

Expand Down Expand Up @@ -1455,26 +1347,6 @@
'Cancel job: Could not get job_dict for executing job')
continue

# special treatment of ARC jobs: delete two links and cancel job
# in ARC
if unique_resource_name == 'ARC':
if not configuration.arc_clusters:
logger.error('ARC backend disabled - ignore %s' %
job_dict)
continue

# remove from the executing queue
executing_queue.dequeue_job_by_id(job_id)

# job status has been set by the cancel request already, but
# we need to kill the ARC job, or clean it (if already
# finished), and clean up the job remainder links
clean_arc_job(job_dict, 'CANCELED', None,
configuration, logger, True)

logger.debug('ARC job completed')
continue

if not server_cleanup(
job_dict['SESSIONID'],
job_dict['IOSESSIONID'],
Expand Down Expand Up @@ -1539,26 +1411,6 @@

job_dict = executing_queue.get_job_by_id(jobid)

# special treatment of ARC jobs: delete two links and
# clean job in ARC system, do not retry.
if job_dict and unique_resource_name == 'ARC':
if not configuration.arc_clusters:
logger.error('ARC backend disabled - ignore %s' %
job_dict)
continue

# remove from the executing queue
executing_queue.dequeue_job_by_id(jobid)

# job status has been set by the cancel request already, but
# we need to kill the ARC job, or clean it (if already finished),
# and clean up the job remainder links
clean_arc_job(job_dict, 'FAILED', 'Job timed out',
configuration, logger, True)

logger.debug('ARC job timed out, removed')
continue

# Execution information is removed from job_dict in
# requeue_job - save here

Expand Down
131 changes: 0 additions & 131 deletions mig/server/jobscriptgenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

Check warning on line 23 in mig/server/jobscriptgenerator.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (81 > 80 characters)
#
# -- END_HEADER ---
#
Expand All @@ -45,13 +45,6 @@
from mig.shared.mrslparser import expand_variables
from mig.shared.ssh import copy_file_to_resource, generate_ssh_rsa_key_pair

try:
from mig.shared import mrsltoxrsl
from mig.shared import arcwrapper
except Exception as exc:
# Ignore errors and let it crash if ARC is enabled without the lib
pass


def create_empty_job(
unique_resource_name,
Expand Down Expand Up @@ -405,7 +398,7 @@

try:
os.remove(linkloc)
except:

Check warning on line 401 in mig/server/jobscriptgenerator.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

do not use bare 'except'
pass

# Make link sandboxkey.oneclick -> sessionid.jvm
Expand Down Expand Up @@ -436,135 +429,11 @@

try:
os.remove(inputfiles_path)
except:

Check warning on line 432 in mig/server/jobscriptgenerator.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

do not use bare 'except'
logger.error('could not remove ' + inputfiles_path)
return (job_dict, 'OK')


def create_arc_job(
job,
configuration,
logger,
):
"""Analog to create_job_script for ARC jobs:
Creates symLinks for receiving result files, translates job dict to ARC
xrsl, and stores resulting job script (xrsl + sh script) for submitting.

We do _not_ create a separate job_dict with copies and SESSIONID inside,
as opposed to create_job_script, all we need is the link from
webserver_home / sessionID into the user's home directory
("job_output/job['JOB_ID']" is added to the result upload URLs in the
translation).

Returns message (ARC job ID if no error) and sessionid (None if error)
"""

if not configuration.arc_clusters:
return (None, 'No ARC support!')
if not job['JOBTYPE'] == 'arc':
return (None, 'Error. This is not an ARC job')

# Deep copy job for local changes
job_dict = deepcopy(job)
# Finally expand reserved job variables like +JOBID+ and +JOBNAME+
job_dict = expand_variables(job_dict)
# ... no more changes to job_dict from here on
client_id = "%(USER_CERT)s" % job_dict

# we do not want to see empty jobs here. Test as done in create_job_script.
if client_id == configuration.empty_job_name:
return (None, 'Error. empty job for ARC?')

# generate random session ID:
sessionid = hexlify(os.urandom(session_id_bytes))
logger.debug('session ID (for creating links): %s' % sessionid)

client_dir = client_id_dir(client_id)

# make symbolic links inside webserver_home:
#
# we need: link to owner's dir. to receive results,
# job mRSL inside sessid_to_mrsl_link_home
linklist = [(configuration.user_home + client_dir,
configuration.webserver_home + sessionid),
(configuration.mrsl_files_dir + client_dir + '/' +
"%(JOB_ID)s" % job_dict + '.mRSL',
configuration.sessid_to_mrsl_link_home + sessionid + '.mRSL')
]

for (dest, loc) in linklist:
make_symlink(dest, loc, logger)

# the translation generates an xRSL object which specifies to execute
# a shell script with script_name. If sessionid != None, results will
# be uploaded to sid_redirect/sessionid/job_output/job_id

try:
(xrsl, script, script_name) = mrsltoxrsl.translate(job_dict, sessionid)
logger.debug('translated to xRSL: %s' % xrsl)
logger.debug('script:\n %s' % script)

except Exception as err:
# error during translation, pass a message
logger.error('Error during xRSL translation: %s' % err.__str__())
return (None, err.__str__())

# we submit directly from here (the other version above does
# copyFileToResource and gen_job_script generates all files)

# we have to put the generated script somewhere..., and submit from there.
# inputfiles are given by the user as relative paths from his home,
# so we should use that location (and clean up afterwards).

# write script (to user home)
user_home = os.path.join(configuration.user_home, client_dir)
script_path = os.path.abspath(os.path.join(user_home, script_name))
write_file(script, script_path, logger)

os.chdir(user_home)

try:
logger.debug('submitting job to ARC')
session = arcwrapper.Ui(user_home)
arc_job_ids = session.submit(xrsl)

# if no exception occurred, we are done:

job_dict['ARCID'] = arc_job_ids[0]
job_dict['SESSIONID'] = sessionid

msg = 'OK'
result = job_dict

# when errors occurred, pass a message to the caller.
except arcwrapper.ARCWrapperError as err:
msg = err.what()
result = None # unsuccessful
except arcwrapper.NoProxyError as err:
msg = 'No Proxy found: %s' % err.what()
result = None # unsuccessful
except Exception as err:
msg = err.__str__()
result = None # unsuccessful

# always remove the generated script
os.remove(script_name)
# and remove the created links immediately if failed
if not result:
for (_, link) in linklist:
os.remove(link)
logger.error('Unsuccessful ARC job submission: %s' % msg)
else:
logger.debug('submitted to ARC as job %s' % msg)
return (result, msg)

# errors are handled inside grid_script. For ARC jobs, set status = FAILED
# on errors, and include the message
# One potential error is that the proxy is invalid,
# which should be checked inside the parser, before informing
# grid_script about the new job.


def gen_job_script(
job_dictionary,
resource_config,
Expand All @@ -578,7 +447,7 @@
"""Generate job script from job_dictionary before handout to resource"""

script_language = resource_config['SCRIPTLANGUAGE']
if not script_language in configuration.scriptlanguages:

Check warning on line 450 in mig/server/jobscriptgenerator.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

test for membership should be 'not in'
print('Unknown script language! (conflict with scriptlanguages in ' +
'configuration?) %s not in %s' % (script_language,
configuration.scriptlanguages))
Expand Down
Loading