From 182c8eab82171f57b0ed3d06d2e5faf885be480f Mon Sep 17 00:00:00 2001 From: Alex Burke Date: Thu, 9 Oct 2025 22:05:25 +0200 Subject: [PATCH] Remove arcwrapper and associated logic. --- mig/cgi-bin/arcresources.py | 36 - mig/server/grid_script.py | 150 +-- mig/server/jobscriptgenerator.py | 131 --- mig/shared/arcdummy.py | 356 ------ mig/shared/arcwrapper.py | 1010 ----------------- mig/shared/configuration.py | 12 - mig/shared/functionality/arcresources.py | 254 ----- mig/shared/functionality/autocreate.py | 70 -- mig/shared/functionality/jobstatus.py | 31 - mig/shared/functionality/settings.py | 36 - mig/shared/gridscript.py | 123 -- mig/shared/mrslparser.py | 37 +- mig/shared/mrsltoxrsl.py | 415 ------- .../mig_shared_configuration--new.json | 1 - 14 files changed, 3 insertions(+), 2659 deletions(-) delete mode 100755 mig/cgi-bin/arcresources.py delete mode 100644 mig/shared/arcdummy.py delete mode 100644 mig/shared/arcwrapper.py delete mode 100644 mig/shared/functionality/arcresources.py delete mode 100644 mig/shared/mrsltoxrsl.py diff --git a/mig/cgi-bin/arcresources.py b/mig/cgi-bin/arcresources.py deleted file mode 100755 index d3468cff7..000000000 --- a/mig/cgi-bin/arcresources.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# --- BEGIN_HEADER --- -# -# resadmin - Resource view for ARC resources -# Copyright (C) 2003-2009 The MiG Project lead by Brian Vinter -# -# This file is part of MiG. -# -# MiG is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# MiG is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# -# -- END_HEADER --- -# - -from __future__ import absolute_import -import cgi -import cgitb -# cgitb.enable() - -from mig.shared.functionality.arcresources import main -from mig.shared.cgiscriptstub import run_cgi_script - -run_cgi_script(main) diff --git a/mig/server/grid_script.py b/mig/server/grid_script.py index 739d0fdb7..fb403f7f7 100755 --- a/mig/server/grid_script.py +++ b/mig/server/grid_script.py @@ -52,7 +52,7 @@ from mig.shared.gridscript import clean_grid_stdin, \ remove_jobrequest_pending_files, check_mrsl_files, requeue_job, \ server_cleanup, load_queue, save_queue, load_schedule_cache, \ - save_schedule_cache, arc_job_status, clean_arc_job + save_schedule_cache from mig.shared.notification import notify_user_thread from mig.shared.resadm import atomic_resource_exe_restart, put_exe_pgid from mig.shared.vgrid import job_fits_res_vgrid, validated_vgrid_list @@ -168,38 +168,6 @@ def time_out_jobs(stop_event): send_message_to_grid_script(grid_script_msg, logger, configuration) - elif job['UNIQUE_RESOURCE_NAME'] == 'ARC': - if not configuration.arc_clusters: - logger.error('ARC backend disabled - ignore %s' % - job) - continue - jobstatus = arc_job_status(job, configuration, logger) - - # take action if the job is failed or killed. - # No action for a finished job, since other - # machinery will be at work to update it - - if jobstatus in ['FINISHED', 'FAILED', 'KILLED']: - logger.debug( - 'discovered %s job %s, clean it on the server' - % (jobstatus, job['JOB_ID'])) - if jobstatus in ['FAILED', 'KILLED']: - msg = '(failed inside ARC)' - else: - msg = None - exec_job = executing_queue.dequeue_job_by_id( - job['JOB_ID']) - if exec_job: - # job was still there, clean up here - # (otherwise, someone else picked it up in - # the meantime) - clean_arc_job(exec_job, jobstatus, msg, - configuration, logger, False) - else: - logger.debug( - 'Status %s for ARC job %s, no action required' - % (jobstatus, job['JOB_ID'])) - except Exception as err: logger.error('time_out_jobs: unexpected exception: %s' % err) logger.info('time_out_jobs: time out thread terminating') @@ -447,66 +415,6 @@ def graceful_shutdown(): dict_userjob['OWNER'] = user_id dict_userjob['MIGRATE_COUNT'] = "0" - # ARC jobs: directly submit, and put in executing_queue - if dict_userjob['JOBTYPE'] == 'arc': - if not configuration.arc_clusters: - logger.error('ARC backend disabled - ignore %s' % - dict_userjob) - continue - logger.debug('ARC Job') - (arc_job, msg) = jobscriptgenerator.create_arc_job( - dict_userjob, configuration, logger) - if not arc_job: - # something has gone wrong - logger.error('Job NOT submitted (%s)' % msg) - # discard this job (as FAILED, including message) - # see gridscript::requeue_job for how to do this... - - dict_userjob['STATUS'] = 'FAILED' - dict_userjob['FAILED_TIMESTAMP'] = time.gmtime() - # and create an execution history (basically empty) - hist = ( - {'QUEUED_TIMESTAMP': dict_userjob['QUEUED_TIMESTAMP'], - 'EXECUTING_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'], - 'FAILED_TIMESTAMP': dict_userjob['FAILED_TIMESTAMP'], - 'FAILED_MESSAGE': ('ARC Submission failed: %s' % msg), - 'UNIQUE_RESOURCE_NAME': 'ARC', }) - dict_userjob['EXECUTION_HISTORY'] = [hist] - - # should also notify the user (if requested) - # not implented for this branch. - - else: - # all fine, job is now in some ARC queue - logger.debug('Job submitted (%s,%s)' % - (arc_job['SESSIONID'], arc_job['ARCID'])) - # set some job fields for job status retrieval, and - # put in exec.queue for job status queries and timeout - dict_userjob['SESSIONID'] = arc_job['SESSIONID'] - # abuse these two fields, - # expected by timeout thread to be there anyway - dict_userjob['UNIQUE_RESOURCE_NAME'] = 'ARC' - dict_userjob['EXE'] = arc_job['ARCID'] - - # this one is used by the timeout thread as well - # We put in a wild guess, 10 minutes. Perhaps not enough - dict_userjob['EXECUTION_DELAY'] = 600 - - # set to executing even though it is kind-of wrong... - dict_userjob['STATUS'] = 'EXECUTING' - dict_userjob['EXECUTING_TIMESTAMP'] = time.gmtime() - executing_queue.enqueue_job(dict_userjob, - executing_queue.queue_length()) - - # Either way, save the job mrsl. - # Status is EXECUTING or FAILED - pickle(dict_userjob, file_userjob, logger) - - # go on with scheduling loop (do not use scheduler magic below) - continue - - # following: non-ARC code - # put job in queue job_queue.enqueue_job(dict_userjob, job_queue.queue_length()) @@ -1296,22 +1204,6 @@ def graceful_shutdown(): msg += \ ', but job is being executed by %s:%s, ignoring result.'\ % (job_dict['UNIQUE_RESOURCE_NAME'], job_dict['EXE']) - elif job_dict['UNIQUE_RESOURCE_NAME'] == 'ARC': - if not configuration.arc_clusters: - logger.error('ARC backend disabled - ignore %s' % - job_dict) - continue - msg += (', which is an ARC job (ID %s).' % job_dict['EXE']) - - # remove from the executing queue - executing_queue.dequeue_job_by_id(job_id) - - # job status has been checked by put script already - # we need to clean up the job remainder (links, queue, and ARC - # side) - clean_arc_job(job_dict, 'FINISHED', None, - configuration, logger, False) - msg += 'ARC job completed' else: @@ -1455,26 +1347,6 @@ def graceful_shutdown(): 'Cancel job: Could not get job_dict for executing job') continue - # special treatment of ARC jobs: delete two links and cancel job - # in ARC - if unique_resource_name == 'ARC': - if not configuration.arc_clusters: - logger.error('ARC backend disabled - ignore %s' % - job_dict) - continue - - # remove from the executing queue - executing_queue.dequeue_job_by_id(job_id) - - # job status has been set by the cancel request already, but - # we need to kill the ARC job, or clean it (if already - # finished), and clean up the job remainder links - clean_arc_job(job_dict, 'CANCELED', None, - configuration, logger, True) - - logger.debug('ARC job completed') - continue - if not server_cleanup( job_dict['SESSIONID'], job_dict['IOSESSIONID'], @@ -1539,26 +1411,6 @@ def graceful_shutdown(): job_dict = executing_queue.get_job_by_id(jobid) - # special treatment of ARC jobs: delete two links and - # clean job in ARC system, do not retry. - if job_dict and unique_resource_name == 'ARC': - if not configuration.arc_clusters: - logger.error('ARC backend disabled - ignore %s' % - job_dict) - continue - - # remove from the executing queue - executing_queue.dequeue_job_by_id(jobid) - - # job status has been set by the cancel request already, but - # we need to kill the ARC job, or clean it (if already finished), - # and clean up the job remainder links - clean_arc_job(job_dict, 'FAILED', 'Job timed out', - configuration, logger, True) - - logger.debug('ARC job timed out, removed') - continue - # Execution information is removed from job_dict in # requeue_job - save here diff --git a/mig/server/jobscriptgenerator.py b/mig/server/jobscriptgenerator.py index b2c7ca1cb..bb14916a6 100644 --- a/mig/server/jobscriptgenerator.py +++ b/mig/server/jobscriptgenerator.py @@ -45,13 +45,6 @@ from mig.shared.mrslparser import expand_variables from mig.shared.ssh import copy_file_to_resource, generate_ssh_rsa_key_pair -try: - from mig.shared import mrsltoxrsl - from mig.shared import arcwrapper -except Exception as exc: - # Ignore errors and let it crash if ARC is enabled without the lib - pass - def create_empty_job( unique_resource_name, @@ -441,130 +434,6 @@ def create_job_script( return (job_dict, 'OK') -def create_arc_job( - job, - configuration, - logger, -): - """Analog to create_job_script for ARC jobs: - Creates symLinks for receiving result files, translates job dict to ARC - xrsl, and stores resulting job script (xrsl + sh script) for submitting. - - We do _not_ create a separate job_dict with copies and SESSIONID inside, - as opposed to create_job_script, all we need is the link from - webserver_home / sessionID into the user's home directory - ("job_output/job['JOB_ID']" is added to the result upload URLs in the - translation). - - Returns message (ARC job ID if no error) and sessionid (None if error) - """ - - if not configuration.arc_clusters: - return (None, 'No ARC support!') - if not job['JOBTYPE'] == 'arc': - return (None, 'Error. This is not an ARC job') - - # Deep copy job for local changes - job_dict = deepcopy(job) - # Finally expand reserved job variables like +JOBID+ and +JOBNAME+ - job_dict = expand_variables(job_dict) - # ... no more changes to job_dict from here on - client_id = "%(USER_CERT)s" % job_dict - - # we do not want to see empty jobs here. Test as done in create_job_script. - if client_id == configuration.empty_job_name: - return (None, 'Error. empty job for ARC?') - - # generate random session ID: - sessionid = hexlify(os.urandom(session_id_bytes)) - logger.debug('session ID (for creating links): %s' % sessionid) - - client_dir = client_id_dir(client_id) - - # make symbolic links inside webserver_home: - # - # we need: link to owner's dir. to receive results, - # job mRSL inside sessid_to_mrsl_link_home - linklist = [(configuration.user_home + client_dir, - configuration.webserver_home + sessionid), - (configuration.mrsl_files_dir + client_dir + '/' + - "%(JOB_ID)s" % job_dict + '.mRSL', - configuration.sessid_to_mrsl_link_home + sessionid + '.mRSL') - ] - - for (dest, loc) in linklist: - make_symlink(dest, loc, logger) - - # the translation generates an xRSL object which specifies to execute - # a shell script with script_name. If sessionid != None, results will - # be uploaded to sid_redirect/sessionid/job_output/job_id - - try: - (xrsl, script, script_name) = mrsltoxrsl.translate(job_dict, sessionid) - logger.debug('translated to xRSL: %s' % xrsl) - logger.debug('script:\n %s' % script) - - except Exception as err: - # error during translation, pass a message - logger.error('Error during xRSL translation: %s' % err.__str__()) - return (None, err.__str__()) - - # we submit directly from here (the other version above does - # copyFileToResource and gen_job_script generates all files) - - # we have to put the generated script somewhere..., and submit from there. - # inputfiles are given by the user as relative paths from his home, - # so we should use that location (and clean up afterwards). - - # write script (to user home) - user_home = os.path.join(configuration.user_home, client_dir) - script_path = os.path.abspath(os.path.join(user_home, script_name)) - write_file(script, script_path, logger) - - os.chdir(user_home) - - try: - logger.debug('submitting job to ARC') - session = arcwrapper.Ui(user_home) - arc_job_ids = session.submit(xrsl) - - # if no exception occurred, we are done: - - job_dict['ARCID'] = arc_job_ids[0] - job_dict['SESSIONID'] = sessionid - - msg = 'OK' - result = job_dict - - # when errors occurred, pass a message to the caller. - except arcwrapper.ARCWrapperError as err: - msg = err.what() - result = None # unsuccessful - except arcwrapper.NoProxyError as err: - msg = 'No Proxy found: %s' % err.what() - result = None # unsuccessful - except Exception as err: - msg = err.__str__() - result = None # unsuccessful - - # always remove the generated script - os.remove(script_name) - # and remove the created links immediately if failed - if not result: - for (_, link) in linklist: - os.remove(link) - logger.error('Unsuccessful ARC job submission: %s' % msg) - else: - logger.debug('submitted to ARC as job %s' % msg) - return (result, msg) - - # errors are handled inside grid_script. For ARC jobs, set status = FAILED - # on errors, and include the message - # One potential error is that the proxy is invalid, - # which should be checked inside the parser, before informing - # grid_script about the new job. - - def gen_job_script( job_dictionary, resource_config, diff --git a/mig/shared/arcdummy.py b/mig/shared/arcdummy.py deleted file mode 100644 index dce23fd56..000000000 --- a/mig/shared/arcdummy.py +++ /dev/null @@ -1,356 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# -# --- BEGIN_HEADER --- -# -# -# arcdummy - [optionally add short module description on this line] -# Copyright (C) 2003-2017 The MiG Project lead by Brian Vinter -# -# This file is part of MiG. -# -# MiG is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# MiG is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# -# --- END_HEADER --- -# - -# -# arcdummy: debug module providing arcwrapper interface -# -# (C) 2009 Jost Berthold, grid.dk -# adapted to usage inside a MiG framework -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -# - -"""ARC dummy interface module.""" - -from __future__ import absolute_import - -from builtins import object -import os - -# MiG utilities: -from mig.shared.conf import get_configuration_object -config = get_configuration_object() -logger = config.logger - - -class ARCLibError(Exception): - - def __init__(self, msg): - self.msg = msg - - def what(self): - return self.msg - -# (trivially inheriting) exception class of our own - - -class ARCWrapperError(ARCLibError): - - def __init__(self, msg): - ARCLibError.__init__(self, msg) - - -class NoProxyError(ARCLibError): - - """ A special error which can occur in this setting: - The user did not provide a valid proxy certificate, or the one - she provided is expired. We need to treat this error case - specially (advise to update, or create a new proxy).""" - - def __init(self, msg): - ARCLibError.__init__(self, ('No proxy available: %s' % msg)) - - -class DummyProxy(object): - - """Proxy management class. - - This class handles a X509 proxy certificate.""" - - def __init__(self, filename): - """Class constructor. - - @type filename: string - @param filename: Proxy filename""" - self.__filename = filename - - logger.debug('Dummy Proxy Certificate from %s' - % filename) - - def getFilename(self): - """Return the proxy filename.""" - logger.debug('proxy filename') - return self.__filename - - def getTimeleft(self): - """Return the amount of time left on the proxy certificate (int).""" - logger.debug('proxy timeleft') - return 0 - -# helper dummies: - -# splitting up an ARC job ID - - -def splitJobId(jobId): - """ Splits off the last part of the path from an ARC Job ID. - Reason: The job ID is a valid URL to the job directory on the - ARC resource, and all jobs have a common URL prefix. In addition, - job information on the ARC resource is usually obtained by - inspecting files at URL /info/ - (see ARC/arclib/jobinfo.cpp).""" - - if jobId.endswith('/'): - jobId = jobId[:-1] - return os.path.split(jobId) - -# asking the user for a proxy. This will be called from many places, -# thus centralised here (though too specific ). - - -def askProxy(): - output_objects = [] - output_objects.append({'object_type': 'sectionheader', - 'text': 'Proxy upload'}) - output_objects.append({'object_type': 'html_form', - 'text': """ -
-

-Please specify a proxy file to upload:
-Such a proxy file can be created using the command-line tool -voms-proxy-init, and can be found in /tmp/x509up_u<your UID>.
- -' + '"' + - """> - -  - -

- """}) - return output_objects - - -class Ui(object): - - """ARC middleware user interface class.""" - - def __init__(self, userdir): - """Class constructor""" - - try: - if not os.path.isdir(userdir): - raise ARCWrapperError('Given user directory ' + userdir - + ' does not exist.') - self._userdir = userdir - self._proxy = DummyProxy(userdir + '/dummyproxy') - - except ARCLibError as err: - logger.error('Cannot initialise: %s' % err.what()) - raise err - except Exception as other: - logger.error( - 'Unexpected error during initialisation.\n %s' % other) - raise ARCWrapperError(other.__str__()) - - def getProxy(self): - """ returns the proxy interface used""" - return self._proxy - - def getQueues(self): - """ returns the queues we discovered for the clusters.""" - return [] - - def submitFile(self, xrslFilename, jobName=''): - """Submit xrsl file as job to available ARC resources. - - @type xrslFilename: string - @param xrslFilename: Filename containing a job description in XRSL. - @rtype list: - @return: list containing [resultVal, jobIds] resultVal is the return - code of the ARC command, jobIds is a list of jobID strings.""" - - logger.debug('Submitting a job from file %s...' % xrslFilename) - - # Convert XRSL file into a string - - f = open(xrslFilename, 'rb') - xrslString = f.read() - f.close() - logger.debug('XRSL-file: %s', xrslString) - return (-1, []) - - def submit(self, xrslAll, jobName=''): - """Submit xrsl object as job to available ARC resources. - The method expects an arclib.Xrsl object and its current - working directory to contain the referenced files (rel. paths). - - @type xrslAll: arclib.Xrsl - @param xrslAll: job description in XRSL (arclib object). - @rtype list: - @return: (resultVal, list of jobIds) resultVal is a return - code (0 for success), jobIds is a list of jobID strings. - - Exceptions are forwarded to the caller.""" - - logger.debug('Ui: Submitting job .') - raise Exception("Dummy module") - - def AllJobStatus(self): - """Query status of jobs in joblist. - - The command returns a dictionary of jobIDs. Each item - in the dictionary consists of an additional dictionary with the - attributes: - - name = Job name - status = ARC job states, ACCPTED, SUBMIT, INLRMS etc - error = Error status - sub_time = string(submission_time) - completion = string(completion_time) - cpu_time = string(used_cpu_time) - wall_time = string(used_wall_time) - - If there was an error, an empty dictionary is returned. - - Example: - - jobList = ui.jobStatus() - - print jobList['gsiftp://...3217']['name'] - print jobList['gsiftp://...3217']['status'] - - @rtype: dict - @return: job status dictionary.""" - - logger.debug('Requesting job status for all jobs.') - - jobList = {} - return jobList - - def jobStatus(self, jobId): - """Retrieve status of a particular job. - - returns: dictionary containing keys name, status, error... - (see allJobStatus).""" - - logger.debug('Requesting job status for %s.' % jobId) - - jobInfo = {'name': 'UNKNOWN', 'status': 'NOT FOUND', 'error': -1} - - return jobInfo - - def cancel(self, jobID): - """Kill a (running?) job. - - If this fails, complain, and retrieve the job status. - @type jobID: string - @param jobID: jobId URL identifier.""" - - logger.debug('Trying to stop job %s' % jobID) - success = False - - return success - - def clean(self, jobId): - """Removes a (finished?) job from a remote cluster. - - If this fails, just remove it from our list (forget it). - @type jobID: string - @param jobID: jobId URL identifier.""" - - logger.debug('Cleaning up job %s' % jobId) - - def getResults(self, jobId, downloadDir=''): - """Download results from grid job. - - @type jobId: string - @param jobID: jobId URL identifier. - @type downloadDir: string - @param downloadDir: Download results to specified directory. - @rtype: list - @return: list of downloaded files (strings)""" - - logger.debug('Downloading files from job %s' % jobId) - # return - raise Exception("dummy module") - - def lsJobDir(self, jobId): - """List files at a specific URL. - - @type jobId: string - @param jobId: jobId, which is URL location of job dir. - @rtype: list - @return: list of FileInfo - """ - - # the jobID is a valid URL to the job directory. We can use it to - # inspect its contents. - # - # For other directories (gmlog or other), using FTPControl, we do - # not get accurate file sizes, only for the real output - # and for scripts/files in the proper job directory. - - logger.debug('ls in JobDir for job %s' % jobId) - return [] - - -# stdout of a job can be found directly in its job directory, but might have -# a different name (user can give the name). For a "live output request", -# we download the xrsl description from the info directory and look for -# the respective names. -# For jobs with "joined" stdout and stderr, we get an error when retrieving -# the latter, and fall back to retrieving stdout instead. - - - def getStandardOutput(self, jobId): - """Get the standard output of a running job. - - @type jobID: string - @param jobID: jobId URL identifier. - @rtype: string - @return: output from the job""" - - logger.debug('get std. output for %s' % jobId) - return 'DUMMY' - - def getStandardError(self, jobId): - """Get the standard error of a running job. - - @type jobID: string - @param jobID: jobId URL identifier. - @rtype: list - @return: list of return value from ARC and output from job.""" - - logger.debug('get stderr output for %s' % jobId) - return 'DUMMY' diff --git a/mig/shared/arcwrapper.py b/mig/shared/arcwrapper.py deleted file mode 100644 index cdc580fad..000000000 --- a/mig/shared/arcwrapper.py +++ /dev/null @@ -1,1010 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# -# --- BEGIN_HEADER --- -# -# arcwrapper: main ARC middleware wrapper module -# Copyright (C) 2009-2021 The MiG Project lead by Brian Vinter -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -# -# -- END_HEADER --- -# - -# Original copyright notice follows: -# Copyright (C) 2006-2009 Jonas Lindemann -# -# this version: -# (C) 2009 Jost Berthold, grid.dk -# adapted to usage inside a MiG framework -# - - -"""ARC middleware interface module.""" - -from __future__ import absolute_import - -from future import standard_library -standard_library.install_aliases() -from builtins import next -from builtins import filter -from builtins import object -import subprocess -import os -import sys -import tempfile -import threading - -from mig.shared.safeeval import subprocess_popen, subprocess_pipe - -# MiG utilities: -from mig.shared.conf import get_configuration_object -config = get_configuration_object() -logger = config.logger - -# Avoid massive log spam when unconditionally importing arcwrapper in other -# modules like jobstatus and jobscriptgenerator -if not config.arc_clusters: - raise Exception('ignoring arcwrapper import without ARC enabled!') - -# to make this succeed: -# install nordugrid-arc-client and nordugrid-arc-python -# set LD_LIBRARY_PATH="$NORDUGRID_LOCATION/lib:$GLOBUS_LOCATION/lib -# PYTHONPATH="$NORDUGRID_LOCATION/lib/python2.4/site-packages" -try: - import arclib -except: - logger.error('problems importing arclib... trying workaround') - try: - logger.debug('Current sys.path is %s' % sys.path) - sys.path.append(os.environ['NORDUGRID_LOCATION'] - + '/lib/python2.4/site-packages') - import arclib - except: - raise Exception('arclib not found - no problem unless using ARC') - -# (trivially inheriting) exception class of our own - - -class ARCWrapperError(arclib.ARCLibError): - - def __init__(self, msg): - arclib.ARCLibError.__init__(self, msg) - - -class NoProxyError(arclib.ARCLibError): - - """ A special error which can occur in this setting: - The user did not provide a valid proxy certificate, or the one - she provided is expired. We need to treat this error case - specially (advise to update, or create a new proxy).""" - - def __init(self, msg): - arclib.ARCLibError.__init__(self, ('No proxy available: %s' % msg)) - - -class Proxy(arclib.Certificate): - - """Proxy management class. - - This class handles a X509 proxy certificate.""" - - def __init__(self, filename): - """Class constructor. - - @type filename: string - @param filename: Proxy filename""" - - self.__filename = os.path.abspath(filename) - if not os.path.isfile(self.__filename): - raise NoProxyError('Proxy file ' + filename + ' does not exist.') - - try: - arclib.Certificate.__init__(self, arclib.PROXY, self.__filename) - except arclib.CertificateError as err: - raise NoProxyError(err.what()) - # just testing... - logger.debug('Proxy Certificate %s from %s' - % (self.GetSN(), self.getFilename())) - logger.debug('time left in seconds: %d' % self.getTimeleft()) - - def getFilename(self): - """Return the proxy filename.""" - return self.__filename - - def getTimeleft(self): - """Return the amount of time left on the proxy certificate (int).""" - - timeleft = 0 - if not self.IsExpired(): - timeLeftStr = self.ValidFor() - factor = {'days': 24 * 60 * 60, 'day': 24 * 60 * 60, 'hours': 60 * 60, 'hour': 60 * - 60, 'minutes': 60, 'minute': 60, 'seconds': 1, 'second': 1} - timeLeftParts = timeLeftStr.split(',') - for part in timeLeftParts: - [val, item] = part.split() - f = factor[item] - if f: - timeleft = timeleft + int(val) * f - return timeleft - -# small helpers: - -# splitting up an ARC job ID - - -def splitJobId(jobId): - """ Splits off the last part of the path from an ARC Job ID. - Reason: The job ID is a valid URL to the job directory on the - ARC resource, and all jobs have a common URL prefix. In addition, - job information on the ARC resource is usually obtained by - inspecting files at URL /info/ - (see ARC/arclib/jobinfo.cpp). - - This function can trigger an arclib.URLError exception. - """ - if not jobId.endswith('/'): - jobId = jobId + '/' - jobURL = arclib.URL(jobId) - path = os.path.split(jobURL.Path())[0] - return ('%s://%s:%d%s/%s' % (jobURL.Protocol(), jobURL.Host(), - jobURL.Port(), os.path.dirname(path), - os.path.basename(path))) - -# hack: issue a command line, return output and exit code - - -def getstatusoutput(cmd, env=None, startDir=""): - - variableDefs = "" - - if env: - for variableName in env: - variableDefs = variableDefs + "%s=%s " % \ - (variableName, env[variableName]) - - execCmd = variableDefs + cmd - - if startDir == "": - resultVal, result = subprocess.getstatusoutput(execCmd) - else: - resultVal, result = subprocess.getstatusoutput( - 'cd "%s";set;%s' % (startDir, execCmd)) - - resultLines = result.split('\n') - - logger.debug("Executing: %s, result = %d" % (execCmd, resultVal)) - - if logger.getLogLevel() == 'DEBUG': - if len(resultLines) < 200: - i = 0 - for line in resultLines: - logger.debug("\t%s: %s" % (i, line.strip())) - i += 1 - - return resultVal, resultLines - - -# asking the user for a proxy. This will be called from many places, -# thus centralised here (though too specific ). -def askProxy(): - output_objects = [] - output_objects.append({'object_type': 'sectionheader', - 'text': 'Proxy upload'}) - output_objects.append({'object_type': 'html_form', - 'text': """ -
-

-Please specify a proxy file to upload:
-Such a proxy file can be created using the command-line tool -voms-proxy-init, and can be found in /tmp/x509up_u<your UID>.
- - - -  - -

- """}) - return output_objects - - -def create_grid_proxy(cert_path, key_path, proxy_path): - """ - Create a default proxy cert. Uses grid-proxy-init. - In this way no additional voms information is added. - - Returns the absolute path of the generated proxy. By standard placed in - the /tmp/ folder. - """ - try: - command_list = ["../java-bin/generate_proxy", cert_path, key_path, - proxy_path] - # NOTE: we use command list to avoid the need for shell - proc = subprocess_popen(command_list, stdout=subprocess_pipe, - stderr=subprocess_pipe) - (out, _) = proc.communicate() - logger.info(out.replace("\n", ".")) - except Exception as exc: - logger.error("Could not generate a proxy certificate: \n%s" % exc) - raise - - -class Ui(object): - - """ARC middleware user interface class.""" - - # static information: - # service URL (Danish resources) - giis = arclib.URL('ldap://gridsrv4.nbi.dk:2135/O=Grid/Mds-Vo-Name=Denmark') - # and benedict cluster URL... for first tests - benedict =\ - arclib.URL('ldap://benedict.grid.aau.dk:2135/o=grid/mds-vo-name=local') - fyrgrid =\ - arclib.URL('ldap://fyrgrid.grid.aau.dk:2135/o=grid/mds-vo-name=local') - # hard-wired: expected proxy name - proxy_name = '.proxy.pem' - - def __init__(self, userdir, require_user_proxy=False): - """Class constructor""" - - # would be nice to hold the Ui instance and have the resources - # set up on instantiation. but several problems arise: - # - A stateless web interface cannot carry over the Ui object - # between several calls. We cannot pickle this information if - # it contains SWIG proxy objects. userdir, proxy and lock can - # be pickled, but _clusters and _queues are the interesting ones. - # - Different users should not share the same Ui! So running the - # whole server with just one Ui will not work either. - # Allowed _clusters and _queues might depend on the user's - # permissions, but we can work with a superset and rely on - # ConstructTargets to filter out the allowed ones. - self._clusters = None # SWIG - self._queues = None # SWIG - - self._userdir = None # holds user config, job cache, and proxy file - self._proxy = None # determines user permissions - - self._arclibLock = threading.Lock() - proxy_path = os.path.join(userdir, self.proxy_name) - - try: - - if not os.path.isdir(userdir): - raise ARCWrapperError('Given user directory ' + userdir - + ' does not exist.') - self._userdir = userdir - - # if a proxy is not explicitly required and the user does not have a valid one - # then use the shared default proxy cert - if not require_user_proxy and \ - (not os.path.exists(proxy_path) or Proxy(proxy_path).IsExpired()): - - logger.info("Using default proxy certificate.") - - # Check if there is already a default proxy certificate and get - # its location - proxy_path = config.nordugrid_proxy - - # it there is no default proxy or it is expired - if not os.path.exists(proxy_path) or Proxy(proxy_path).IsExpired(): - cert_path = config.nordugrid_cert - key_path = config.nordugrid_key - # generate a new one - create_grid_proxy(cert_path, key_path, proxy_path) - else: - logger.info("Using personal proxy certificate.") - - # proxy constructor might raise an exception as well - self._proxy = Proxy(proxy_path) - if self._proxy.IsExpired(): # should not happen - raise NoProxyError('Expired.') - - except NoProxyError as err: - logger.error('Proxy error: %s' % err.what()) - raise err - except arclib.ARCLibError as err: - logger.error('Cannot initialise: %s' % err.what()) - raise ARCWrapperError(err.what()) - except Exception as other: - logger.error( - 'Unexpected error during initialisation.\n %s' % other) - raise ARCWrapperError(other.__str__()) - - def __initQueues(self): - """ Initialises possible queues for a job submission.""" - - logger.debug('init queues (for job submission/resource display)') - - try: - # init data: cluster information (obtained per user) - self.__lockArclib() - - # this takes ages: - # self._clusters = arclib.GetResources(Ui.giis) - self._clusters = [] - logger.debug(config.arc_clusters) - for url_str in config.arc_clusters: - if url_str.startswith('ldap://'): - self._clusters.append(arclib.URL(url_str)) - elif url_str in ['benedict', 'fyrgrid']: - self._clusters.append(eval('Ui.' + url_str)) - logger.debug('clusters: ') - for c in self._clusters: - logger.debug('\t %s' % c) - - self._queues = [] - for cl in self._clusters: - qs = arclib.GetQueueInfo(cl) - self._queues = self._queues + list(qs) - self.__unlockArclib() - logger.debug('ARC Init, discovered queues are') - for q in self._queues: - logger.debug('\t %s' % q) - - except NoProxyError as err: - self.__unlockArclib() - logger.error('Proxy error during queue initialisation: %s' % err) - raise err - except Exception as err: - self.__unlockArclib() - logger.error('ARC queue initialisation error: %s' % err) - self._clusters = [] - self._queues = [] - raise ARCWrapperError(err.__str__()) - - def __lockArclib(self): - """ ensures exclusive access to the interface and sets the environment - so that the user's proxy and home are used. - Locking is perhaps not needed in our setup, where anyway users - cannot share the same Ui (needed if _arclib.so not thread-safe, - though).""" - - self._arclibLock.acquire() - self.__setupEnviron() - return - - def __unlockArclib(self): - """ Releases the mutex lock of the interface. - Perhaps not needed.""" - - self._arclibLock.release() - return - - def __setupEnviron(self): - """Make sure the API acts on behalf of the calling user. - Called by __lockArclib. - """ - os.environ['X509_USER_PROXY'] = self._proxy.getFilename() - os.environ['HOME'] = self._userdir - return - - def getProxy(self): - """ returns the proxy interface used""" - return self._proxy - - def getQueues(self): - """ returns the queues we discovered for the clusters. - TODO: should only return _allowed_ queues - (__initQueues to change).""" - self.__initQueues() - return self._queues - - def submitFile(self, xrslFilename, jobName=''): - """Submit xrsl file as job to available ARC resources. - - @type xrslFilename: string - @param xrslFilename: Filename containing a job description in XRSL. - @rtype list: - @return: list containing ARC jobIds (strings). - Throws an ARCWrapperError if unsuccessful.""" - - logger.debug('Submitting a job from file %s...' % xrslFilename) - currDir = os.getcwd() - try: - - # Convert XRSL file into a string - - f = open(xrslFilename, 'rb') - xrslString = f.read() - f.close() - xrslAll = arclib.Xrsl(xrslString) - - [jobDir, filename] = os.path.split(xrslFilename) - os.chdir(jobDir) - - result = (self.submit(xrslAll, jobName)) - os.chdir(currDir) - return result - except arclib.XrslError as err: - logger.error('Ui: XrslError: ' + err.what()) - os.chdir(currDir) - raise ARCWrapperError('XrslError: ' + err.what()) - - def submit(self, xrslAll, jobName=''): - """Submit xrsl object as job to available ARC resources. - The method expects an arclib.Xrsl object and its current - working directory to contain the referenced files (rel. paths). - - @type xrslAll: arclib.Xrsl - @param xrslAll: job description in XRSL (arclib object). - @rtype list: - @return: list of jobIds(strings). - - Any error is raised as an exception to the caller, as - ARCWrapperError or NoProxyError.""" - - try: - # Check for multiple xrsl - xrslSplit = xrslAll.SplitMulti() - - # retrieve clusters and their queues - # might throw a NoProxyError, leading us to the end - self.__initQueues() - - # Construct submission targets - - logger.debug('Ui: Constructing targets:') - allTargets = arclib.ConstructTargets(self._queues, xrslAll) - targets = arclib.PerformStandardBrokering(allTargets) - for t in targets: - logger.debug('\t %s' % t) - - # Submit job - - jobIds = [] - - logger.debug('Ui: Submitting job .') - if len(targets) > 0: - self.__lockArclib() - for xrsl in xrslSplit: - jobId = arclib.SubmitJob(xrsl, targets) - jobIds.append(jobId) - logger.debug('Ui:' + jobId + 'submitted.') - - jobName = xrsl.GetRelation('jobName' - ).GetSingleValue() - - arclib.AddJobID(jobId, jobName) - self.__unlockArclib() - return jobIds - else: - # len(targets) == 0, thus: - raise ARCWrapperError( - "No matching resource for submission.") - - except NoProxyError as err: - logger.error('Proxy error during job submission: ' + err.what()) - if self._arclibLock.locked(): - # should not happen! - # we come here from initQueues - logger.error('submit: still locked???') - self.__unlockArclib() - raise err - except arclib.XrslError as message: - logger.error('Ui,XRSL' + message.what()) - if self._arclibLock.locked(): # should not happen! - self.__unlockArclib() - raise ARCWrapperError('XrslError: ' + message.what()) - except arclib.JobSubmissionError as message: - logger.error('Ui,Submit: ' + message.what()) - self.__unlockArclib() - raise ARCWrapperError('JobSubmissionError: ' + message.what()) - except arclib.TargetError as message: - logger.error('Ui,Target: %s' % message) - if self._arclibLock.locked(): # should not be... - self.__unlockArclib() - raise ARCWrapperError('TargetError: %s' % message) - except Exception as err: - if self._arclibLock.locked(): # ... - self.__unlockArclib() - logger.error('Unexpected error: %s' % err) - raise ARCWrapperError(err.__str__()) - - def AllJobStatus(self): - """Query status of jobs in joblist. - - The command returns a dictionary of jobIDs. Each item - in the dictionary consists of an additional dictionary with the - attributes: - - name = Job name - status = ARC job states, ACCPTED, SUBMIT, INLRMS etc - error = Error status - sub_time = "%s" % submission_time - completion = "%s" % completion_time - cpu_time = "%s" % used_cpu_time - wall_time = "%s" % used_wall_time - - If there was an error, an empty dictionary is returned. - - Example: - - jobList = ui.jobStatus() - - print jobList['gsiftp://...3217']['name'] - print jobList['gsiftp://...3217']['status'] - - @rtype: dict - @return: job status dictionary.""" - - logger.debug('Requesting job status for all jobs.') - - jobList = {} - -# GetJobIDs returns a multimap, mapping job names to JobIDs... - self.__lockArclib() - try: - # ATTENTION: GetJobIDs does not throw an exception - # if the .ngjobs file is not found. Instead, it - # only complains on stderr and returns {}. - if not os.path.isfile( - os.path.join(self._userdir, '.ngjobs')): - logger.debug('No Job file found, skipping') - return jobList - else: - jobIds = arclib.GetJobIDs() - except Exception as err: - logger.error('could not get job IDs: %s', err) - self.__unlockArclib() - return jobList - - self.__unlockArclib() - - # use an iterator over the multimap elements - # do not call iter.next() at the end (segfaults!) - iter = jobIds.begin() - i = 0 - while i < jobIds.size(): - i = i + 1 - (jobName, jobId) = next(iter) -# this is what GetJobIDs really does when called with no arguments -# jobListFile = open(os.path.join(self._userdir, -# '.ngjobs'), 'rb') -# lines = jobListFile.readlines() -# jobListFile.close() -# for line in lines: -# (jobId, jobName) = line.strip().split('#') - logger.debug('Querying job %s (%s)' % (jobId, jobName)) - jobList[jobId] = {} - jobList[jobId]['name'] = jobName - status = None - exitCode = None - sub_time = None - - self.__lockArclib() - try: - # jobInfo = arclib.GetJobInfoDirect(jobId) - jobInfo = arclib.GetJobInfo(jobId) - status = jobInfo.status - exitCode = jobInfo.exitcode - sub_time = jobInfo.submission_time.__str__() - completed = jobInfo.completion_time.__str__() - # cpu_time = jobInfo.used_cpu_time.__str__() - # wall_time= jobInfo.used_wall_time.__str__() - - except arclib.FTPControlError: - logger.error('Failed to query job %s' % jobName) - status = 'REMOVED' - exitCode = -1 - completed = None - cpu_time = None - wall_time = None - self.__unlockArclib() - - jobList[jobId]['status'] = status - jobList[jobId]['error'] = exitCode - jobList[jobId]['submitted'] = sub_time - jobList[jobId]['completed'] = completed - # jobList[jobId]['cpu_time' ] = sub_time - # jobList[jobId]['wall_time'] = sub_time - logger.debug(' %s: %s' % (jobId, jobList[jobId])) - - return jobList - - def jobStatus(self, jobId): - """Retrieve status of a particular job. - - returns: dictionary containing keys name, status, error... - (see allJobStatus).""" - - logger.debug('Requesting job status for %s.' % jobId) - - jobInfo = {'name': 'UNKNOWN', 'status': 'NOT FOUND', 'error': -1} - - # check if we know this job at all: - self.__lockArclib() - job_ = arclib.GetJobIDs([jobId]) - self.__unlockArclib() - - # ugly! GetJobIDs return some crap if not found... - jobName = [j for j in job_][0] - if jobName == '': # job not found - logger.debug('Job %s was not found.' % jobId) - else: - jobInfo['name'] = jobName - # ASSERT(jobId = jobs[jobName]) - - self.__lockArclib() - try: - logger.debug('Querying job %s (%s)' % (jobId, jobName)) - info = arclib.GetJobInfo(jobId) - jobInfo['status'] = info.status - jobInfo['error'] = info.exitcode - jobInfo['submitted'] = info.submission_time.__str__() - jobInfo['completed'] = info.completion_time.__str__() - # jobInfo['cpu_time' ] = info.used_cpu_time.__str__() - # jobInfo['wall_time'] = info.used_wall_time.__str__() - - except arclib.ARCLibError as err: - logger.error('Could not query: %s' % err.what()) - jobInfo['status'] = 'UNABLE TO RETRIEVE: ' + err.what(), - jobInfo['error'] = 255 - jobInfo['submitted'] = 'unknown' - self.__unlockArclib() - logger.debug(' Returned %s' % jobInfo) - return jobInfo - - def cancel(self, jobID): - """Kill a (running?) job. - - If this fails, complain, and retrieve the job status. - @type jobID: string - @param jobID: jobId URL identifier.""" - - logger.debug('Trying to stop job %s' % jobID) - success = False - - self.__lockArclib() - try: - arclib.CancelJob(jobID) - success = True - except arclib.FTPControlError as err: - logger.error('Error canceling job %s: %s' % (jobID, err.what())) - if logger.getLogLevel == 'DEBUG': - try: - info = arclib.GetJobInfoDirect(jobID) - logger.debug('Job status: %s' % info.status) - except arclib.ARCLibError as err: - logger.debug('No job status known') - self.__unlockArclib() - return success - - def clean(self, jobId): - """Removes a (finished?) job from a remote cluster. - - If this fails, just remove it from our list (forget it). - @type jobID: string - @param jobID: jobId URL identifier.""" - - logger.debug('Cleaning up job %s' % jobId) - self.__lockArclib() - try: - arclib.CleanJob(jobId) - except arclib.FTPControlError as err: - logger.error( - 'Failed to clean job %s: %s' % (jobId, err.what())) - arclib.RemoveJobID(jobId) - self.__unlockArclib() - - def getResults(self, jobId, downloadDir=''): - """Download results from grid job. - - @type jobId: string - @param jobID: jobId URL identifier. - @type downloadDir: string - @param downloadDir: Download results to specified directory. - @rtype: list - @return: list of downloaded files (strings)""" - - logger.debug('Downloading files from job %s' % jobId) - complete = [] - currDir = os.getcwd() - - # jobID is a valid URL for the job directory. - # we chop off the final number (should be unique enough) - # and use it as a directory name to download (emulates behaviour - # of ngget: downloaddir _prefixes_ the dir to which we download). - - try: - (jobPath, jobBasename) = splitJobId(jobId) - jobInfoDir = jobPath + '/info/' + jobBasename - jobDir = jobPath + '/' + jobBasename - - os.chdir(self._userdir) - if not downloadDir == '': - if not os.path.exists(downloadDir): - os.mkdir(downloadDir) - elif not os.path.isdir(downloadDir): - raise ARCWrapperError(downloadDir - + ' exists, not a directory.') - os.chdir(downloadDir) - if not os.path.exists(jobBasename): - os.mkdir(jobBasename) - else: - if not os.path.isdir(jobBasename): - raise ARCWrapperError('Cannot create job directory,' - + ' existing file %s in the way.' - % jobBasename) - os.chdir(jobBasename) - except Exception as err: - logger.error('Error creating job directory: %s' % err) - os.chdir(currDir) - raise ARCWrapperError(err.__str__()) - - logger.debug('downloading output summary file') - self.__lockArclib() - try: - ftp = arclib.FTPControl() - - # We could just download the whole directory. - # But better use the contents of "output" in - # the info-directory... (specified by user) - # to avoid downloading large input files. - # ftp.DownloadDirectory(jobURL, jobBasename) - # - # We use a temp file to get this information first - - (tmp, tmpname) = tempfile.mkstemp(prefix='output', text=True) - os.close(tmp) - ftp.Download(arclib.URL(jobInfoDir + '/output'), tmpname) - lines = open(tmpname, 'rb').readlines() - os.remove(tmpname) - files = [cur.strip().strip('/') for cur in lines] - - # also get the entire directory listing from the server - dir = ftp.ListDir(arclib.URL(jobDir), True) - basenames = [os.path.basename(x.filename) for x in dir] - - if '' in files: - logger.debug('downloading _all_ files') - # TODO for files which are already there? - ftp.DownloadDirectory(arclib.URL(jobDir), '.') - complete = basenames - else: - for f in files: - if f in basenames: - # we should download this one - try: - if f.isdir: - logger.debug('DownloadDir %s' % f) - ftp.DownloadDirectory( - arclib.URL(jobDir + '/' + f), f) - # ... which operates recursively - complete.append(f + '/ (dir)') - else: - logger.debug('Download %s' % f) - ftp.Download(arclib.URL(jobDir + '/' + f), f) - complete.append(f) - except arclib.ARCLibError as err: - logger.error('Error downloading %s: %s' - % (f, err.what())) - except arclib.ARCLibError as err: - logger.error('ARCLib error while downloading: %s' % err.what()) - self.__unlockArclib() - os.chdir(currDir) - raise ARCWrapperError(err.what()) - except Exception as err: - logger.error('Error while downloading.\n %s' % err) - self.__unlockArclib() - os.chdir(currDir) - raise ARCWrapperError(err.__str__()) - - # return - logger.debug(' '.join(['downloaded:'] + complete)) - os.chdir(currDir) - return complete - - def lsJobDir(self, jobId): - """List files at a specific URL. - - @type jobId: string - @param jobId: jobId, which is URL location of job dir. - @rtype: list - @return: list of FileInfo - """ - - # the jobID is a valid URL to the job directory. We can use it to - # inspect its contents. - # - # For other directories (gmlog or other), using FTPControl, we do - # not get accurate file sizes, only for the real output - # and for scripts/files in the proper job directory. - - logger.debug('ls in JobDir for job %s' % jobId) - ftp = arclib.FTPControl() - url = arclib.URL(jobId) - - self.__lockArclib() - try: - files = ftp.ListDir(url) - except arclib.ARCLibError as err: - logger.debug('Error during file listing: %s' % err.what()) - errmsg = arclib.FileInfo() - errmsg.filename = err.what - errmsg.size = 0 - errmsg.isDir = False - files = [errmsg] - - self.__unlockArclib() - - # filter out the gmlog if present - def notGmlog(file): - return ((not file.isDir) or (file.filename != 'gmlog')) - - return (list(filter(notGmlog, files))) - - -# stdout of a job can be found directly in its job directory, but might have -# a different name (user can give the name). For a "live output request", -# we download the xrsl description from the info directory and look for -# the respective names. -# For jobs with "joined" stdout and stderr, we get an error when retrieving -# the latter, and fall back to retrieving stdout instead. - - def recoverXrsl(self, jobId): - """ retrieves the xrsl for a job (from the server), if possible""" - - logger.debug('Trying to obtain xRSL for job %s' % jobId) - xrsl = arclib.Xrsl('') - self.__lockArclib() - try: - (jobPath, jobBasename) = splitJobId(jobId) - xrslURL = arclib.URL(jobPath + '/info/' - + jobBasename + '/description') - ftp = arclib.FTPControl() - ftp.Download(xrslURL, 'tmp') - content = open('tmp', 'rb').read() - xrsl = arclib.Xrsl(content) - os.remove('tmp') - except arclib.ARCLibError as err: - logger.error('Failed to get Xrsl: %s' % err.what()) - self.__unlockArclib() - logger.debug('Obtained %s' % xrsl) - return xrsl - - def getStandardOutput(self, jobId): - """Get the standard output of a running job. - - @type jobID: string - @param jobID: jobId URL identifier. - @rtype: string - @return: output from the job""" - - logger.debug('get std. output for %s' % jobId) - try: - xrsl = self.recoverXrsl(jobId) - try: - outname = xrsl.GetRelation('stdout').GetSingleValue() - except arclib.XrslError as err: - outname = 'stdout' # try default if name not found - logger.debug('output file name: %s' % outname) - try: - self.__lockArclib() - ftp = arclib.FTPControl() - ftp.Download(arclib.URL(jobId + '/' + outname)) - except Exception as err: - self.__unlockArclib() - raise ARCWrapperError(err.__str__()) - self.__unlockArclib() - logger.debug('output downloaded') - result = open(outname, 'rb').read() - os.remove(outname) - except arclib.ARCLibError as err: - result = 'failed to retrieve job output stdout: %s' % err.what() - logger.error('%s' % result) - logger.debug('output retrieved') - return result - - # (resultVal, result) = utils.getstatusoutput('ngcat -o %s' - # % jobId, self._env) - # - # return result - - def getStandardError(self, jobId): - """Get the standard error of a running job. - - @type jobID: string - @param jobID: jobId URL identifier. - @rtype: list - @return: list of return value from ARC and output from job.""" - - logger.debug('get stderr output for %s' % jobId) - try: - xrsl = self.recoverXrsl(jobId) - try: - outname = xrsl.GetRelation('stderr').GetSingleValue() - except arclib.XrslError as err: - outname = 'stderr' # try default if name not found - logger.debug('output file name: %s' % outname) - try: - self.__lockArclib() - ftp = arclib.FTPControl() - ftp.Download(arclib.URL(jobId + '/' + outname)) - except Exception as err: - self.__unlockArclib() - raise ARCWrapperError(err.__str__()) - self.__unlockArclib() - logger.debug('output downloaded') - result = open(outname, 'rb').read() - os.remove(outname) - except arclib.ARCLibError as err: - result = 'failed to retrieve job output stderr: %s' % err.what() - logger.error('%s' % result) - logger.debug('output retrieved') - return result - -# (resultVal, result) = utils.getstatusoutput('ngcat -e %s' -# % jobId, self._env) -# -# return result - -# old code: - - def getGridLog(self, jobId): - """Get the grid log of a running job. - - @type jobID: string - @param jobID: jobId URL identifier. - @rtype: list - @return: list of return value from ARC and output from job.""" - - (resultVal, result) = getstatusoutput('ngcat -l %s' - % jobId, self._env) - - return result - - def copy(self, source, dest=''): - """Copy file from source URL to dest URL. - - @type source: string - @param source: URL of file to copy from. - @type dest: string - @param dest: destination file name on server.""" - - (resultVal, result) = getstatusoutput('ngcp %s %s' - % (source, dest), self._env) - - return resultVal - - def pcopy(self, source): - """Open the ngcp command as a popen process, redirecting output - to stdout and return process file handle. - - @type source: string - @param source: URL to open""" - - # NOTE: I replaced a possibly unsafe call - # f = popen('ngcp %s /dev/stdout' % source, self._env) - # and haven't tested afterwards - # -Jonas - - command_list = ['ngcp', source, '/dev/stdout'] - # NOTE: we use command list to avoid the need for shell - return subprocess_popen(command_list, stdout=subprocess_pipe, - env=self._env).stdout - - def sync(self): - """Query grid for jobs and update job list. - - @rtype: list - @return: list of [resultVal, result], where resultVal is the return value - from the ARC command and result is a list of command output.""" - - (resultVal, result) = \ - getstatusoutput('ngsync -f -d %d' - % self._debugLevel, self._env) diff --git a/mig/shared/configuration.py b/mig/shared/configuration.py index 62b04c237..2f872694a 100644 --- a/mig/shared/configuration.py +++ b/mig/shared/configuration.py @@ -741,12 +741,6 @@ def get(self, *args, **kwargs): 'auto_add_user_with_peer': [('distinguished_name', '.*')], 'auto_add_filter_method': '', 'auto_add_filter_fields': [], - - # ARC resource configuration (list) - # wired-in shorthands in arcwrapper: - # fyrgrid, benedict. Otherwise, ldap://bla.bla:2135/... - - 'arc_clusters': [], } @@ -2806,12 +2800,6 @@ def reload_config(self, verbose, skip_log=False, disable_auth_log=False, else: self.generic_valid_days = generic_valid_days - # if arc cluster URLs configured, read them in: - - if config.has_option('ARC', 'arc_clusters'): - self.arc_clusters = config.get('ARC', - 'arc_clusters').split() - # Warn about missing IP check if in GDP mode if self.site_enable_gdp \ and not self.site_twofactor_strict_address: diff --git a/mig/shared/functionality/arcresources.py b/mig/shared/functionality/arcresources.py deleted file mode 100644 index 8b9df6873..000000000 --- a/mig/shared/functionality/arcresources.py +++ /dev/null @@ -1,254 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# -# --- BEGIN_HEADER --- -# -# arcresources - list arc resources and queues -# Copyright (C) 2003-2023 The MiG Project lead by Brian Vinter -# -# This file is part of MiG. -# -# MiG is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# MiG is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# -# -- END_HEADER --- -# - -"""Display the ARC queues accessible for submission by this server""" - -from __future__ import absolute_import - -import os -import time - -from mig.shared import returnvalues -from mig.shared.base import client_id_dir -from mig.shared.functional import validate_input_and_cert -from mig.shared.init import initialize_main_variables, find_entry -try: - from mig.shared import arcwrapper -except Exception as exc: - # Ignore errors and let it crash if ARC is enabled without the lib - pass - - -def signature(): - """Signature of the main function""" - - defaults = {'benchmark': 'false'} - return ['html_form', defaults] - -# shared functions to name things: - - -def q_anchor(q): - return ('__'.join([q.name] + q.cluster.hostname.split("."))) - - -def q_displayname(q): - return ('%s on %s' % (q.name, q.cluster.alias)) - - -# HARDCODED STRING to name the zero-install ARC runtime environment -# We already use a hardcoded string in jobscriptgenerator. Merge/configure? -zero_install_arc = 'ENV/ZERO-INSTALL' - - -def display_arc_queue(queue): - """Format and print detailed information about an ARC queue. - """ - - html = '
\n' % \ - {'n': q_anchor(queue)} - html += \ - '' % q_displayname(queue) - html += '\n' % queue.status - - # figure out the real CPU characteristics... - - # The information "cores per node" is provided per-cluster in ARC. - # through the field cpu_distribution (which is a mapping of - # number_of_cores -> number_of_nodes. There we only use the first - # of possibly several values - - d = dict(queue.cluster.cpu_distribution) - if d: - cores = list(d)[0] - else: - cores = 1 - - def row(col1, col2=None, col3=None): - if col2 and col3: - return ('\n' % (col1, col2, col3)) - elif col2: - return ('\n' % (col1, col2)) - else: - return ('\n' % (col1)) - - html += \ - row('Architecture: %s' % queue.cluster.architecture, - 'Max. runnable jobs: %s' % queue.max_running, - 'Running Jobs: %s' % queue.grid_running) - - if (queue.total_cpus == -1): - cpus = queue.cluster.total_cpus - else: - cpus = queue.total_cpus - html += \ - row('Total Cores: %s (%s cores/node)' % (cpus, cores), - 'Max. time per job: %s sec.' % queue.max_wall_time, - 'Queued Jobs: %s' % queue.grid_queued) - html += \ - row('%s' % queue.node_cpu, - ' ', '(%s)' % queue.mds_validfrom) - - if zero_install_arc in ["%s" % rte for rte in queue.cluster.runtime_environments]: - html += row('Node Memory: %s' % queue.node_memory, - 'Provides Zero-Install runtime environment') - else: - html += row('Node Memory: %s' % queue.node_memory) - - html += '
' + \ - '

%s

Status: %s
%s%s%s
%s%s
%s
' - return html - - -def queue_resource(queue): - """Return a 'resource' dictionary for an ARC queue. - - Information mapping is straightforward, and most of it is - independent of other parts. Exception: the name has to follow the - format : to match submit page - and mrsltoxrsl translation""" - - resource = {'object_type': 'resource', - 'name': queue.name + ':' + queue.cluster.hostname, - 'PUBLICNAME': 'ARC: ' + - queue.name + ' on ' + queue.cluster.alias, - - 'MEMORY': queue.node_memory, - - # information not available for queues, and - # queue.cluster.session_dir_total is astronomic! - # '%.3f' % (float(queue.cluster.session_dir_total)/2**30), - 'DISK': '', - - # this would actually need a precise mapping between - # ARC and MiG, as done for the translation - 'ARCHITECTURE': queue.cluster.architecture, - # indicating whether the queue active/inactive - - 'STATUS': queue.status - } - # instead of a view link, we indicate "ARC" - resource['viewreslink'] = {'object_type': 'link', - 'destination': '#%s' % q_anchor(queue), - 'class': 'infolink arclink iconspace ' - + queue.cluster.alias, # HACK for sorting - 'title': 'Show queue details', - 'text': '(details)'} - - # 'NODECOUNT' : queue.total_cpus is sometimes -1. - # ... we use another provided value, queue.cluster.total_cpus, - # even though this one is not always correct either (not all CPUs - # might be assigned to the queue) - - if (queue.total_cpus == -1): - resource['NODECOUNT'] = queue.cluster.total_cpus - else: - resource['NODECOUNT'] = queue.total_cpus - - # ARC does not provide this readily, only through cpu_distribution - # (which is a mapping of number_of_cores -> number_of_nodes. There - # is no way to reserve cores on the same node, we set it to 1 - - resource['CPUCOUNT'] = 1 - - resource['RUNTIMEENVIRONMENT'] = [] - z_i = 'ENV/ZERO-INSTALL' # hard-wired name, same as in jobscriptgenerator - if z_i in ["%s" % rte for rte in queue.cluster.runtime_environments]: - resource['RUNTIMEENVIRONMENT'] = ['ZERO-INSTALL (ARC)'] - - return resource - - -def main(client_id, user_arguments_dict): - """Main function used by front end""" - - (configuration, logger, output_objects, op_name) = \ - initialize_main_variables(client_id, op_header=False) - defaults = signature()[1] - (validate_status, accepted) = validate_input_and_cert( - user_arguments_dict, - defaults, - output_objects, - client_id, - configuration, - allow_rejects=False, - ) - if not validate_status: - return (accepted, returnvalues.CLIENT_ERROR) - - user_dir = os.path.join(configuration.user_home, - client_id_dir(client_id)) - - title_entry = find_entry(output_objects, 'title') - title_entry['text'] = 'ARC Queues' - output_objects.append( - {'object_type': 'header', 'text': 'Available ARC queues'}) - - if not configuration.site_enable_griddk: - output_objects.append({'object_type': 'text', 'text': - """Grid.dk features are disabled on this site. -Please contact the %s site support (%s) if you think it should be enabled. -""" % (configuration.short_title, configuration.support_email)}) - return (output_objects, returnvalues.OK) - - # could factor out from here, to be usable from outside - if not configuration.arc_clusters: - output_objects.append({'object_type': 'error_text', 'text': - 'No ARC support!'}) - return (output_objects, returnvalues.ERROR) - try: - session = arcwrapper.Ui(user_dir) - queues = session.getQueues() - - except arcwrapper.NoProxyError as err: - output_objects.append({'object_type': 'error_text', 'text': - 'Error while retrieving: %s' % err.what() - }) - output_objects += arcwrapper.askProxy() - return (output_objects, returnvalues.ERROR) - except Exception as err: - logger.error('Exception while retrieving ARC resources\n%s' % err) - output_objects.append({'object_type': 'warning', 'text': - 'Could not retrieve information: %s' % err}) - return(output_objects, returnvalues.ERROR) - - res_list = {'object_type': 'resource_list', 'resources': []} - for q in queues: - res_list['resources'].append(queue_resource(q)) - - output_objects.append(res_list) - - output_objects.append( - {'object_type': 'sectionheader', 'text': 'Queue details'}) - - # queue details (current usage and some machine information) - for q in queues: - output_objects.append( - {'object_type': 'html_form', 'text': display_arc_queue(q)}) - # end of "factoring out" - - return (output_objects, returnvalues.OK) diff --git a/mig/shared/functionality/autocreate.py b/mig/shared/functionality/autocreate.py index 3e9e4504a..9d704e81c 100644 --- a/mig/shared/functionality/autocreate.py +++ b/mig/shared/functionality/autocreate.py @@ -69,14 +69,6 @@ from mig.shared.userdb import default_db_path from mig.shared.validstring import is_valid_email_address -try: - from mig.shared import arcwrapper -except Exception as exc: - - # Ignore errors and let it crash if ARC is enabled without the lib - - pass - def signature(configuration, auth_type): """Signature of the main function""" @@ -171,60 +163,6 @@ def signature(configuration, auth_type): return ['text', defaults] -def handle_proxy(proxy_string, client_id, config): - """If ARC-enabled server: store a proxy certificate. - Arguments: proxy_string - text extracted from given upload - client_id - DN for user just being created - config - global configuration - """ - - output = [] - client_dir = client_id_dir(client_id) - proxy_dir = os.path.join(config.user_home, client_dir) - proxy_path = os.path.join(config.user_home, client_dir, - arcwrapper.Ui.proxy_name) - - if not config.arc_clusters: - output.append({'object_type': 'error_text', - 'text': 'No ARC support!'}) - return output - - # store the file - - try: - write_file(proxy_string, proxy_path, config.logger) - os.chmod(proxy_path, 0o600) - except Exception as exc: - output.append({'object_type': 'error_text', - 'text': 'Proxy file could not be written (%s)!' - % ("%s" % exc).replace(proxy_dir, '')}) - return output - - # provide information about the uploaded proxy - - try: - session_ui = arcwrapper.Ui(proxy_dir) - proxy = session_ui.getProxy() - if proxy.IsExpired(): - - # can rarely happen, constructor will throw exception - - output.append({'object_type': 'warning', - 'text': 'Proxy certificate is expired.'}) - else: - output.append({'object_type': 'text', 'text': 'Proxy for %s' - % proxy.GetIdentitySN()}) - output.append({'object_type': 'text', - 'text': 'Proxy certificate will expire on %s (in %s sec.)' - % (proxy.Expires(), proxy.getTimeleft())}) - except arcwrapper.NoProxyError as err: - - output.append({'object_type': 'warning', - 'text': 'No proxy certificate to load: %s' - % err.what()}) - return output - - def split_comma_concat(value_list, sep=','): """Take a list of values and adjust it so that any values with given sep inside is expanded to the individual values without the separator. @@ -768,14 +706,6 @@ def main(client_id, user_arguments_dict, environ=None): create_user(user_dict, configuration.config_file, db_path, ask_renew=False, default_renew=True, verify_peer=peer_pattern) - if configuration.site_enable_griddk \ - and accepted['proxy_upload'] != ['']: - - # save the file, display expiration date - - proxy_out = handle_proxy(proxy_content, user_id, - configuration) - output_objects.extend(proxy_out) except Exception as err: logger.error('create failed for %s: %s' % (user_id, err)) output_objects.append({'object_type': 'error_text', 'text': diff --git a/mig/shared/functionality/jobstatus.py b/mig/shared/functionality/jobstatus.py index b0e1d6efb..b853e10f0 100755 --- a/mig/shared/functionality/jobstatus.py +++ b/mig/shared/functionality/jobstatus.py @@ -47,12 +47,6 @@ from mig.shared.resource import anon_resource_id from mig.shared.validstring import valid_user_path -try: - from mig.shared import arcwrapper -except Exception as exc: - # Ignore errors and let it crash if ARC is enabled without the lib - pass - def signature(): """Signature of the main function""" @@ -251,31 +245,6 @@ def main(client_id, user_arguments_dict): job_obj[name.lower()] = job_dict[name] - ########################################### - # ARC job status retrieval on demand: - # But we should _not_ update the status in the mRSL files, since - # other MiG code might rely on finding only valid "MiG" states. - - if configuration.arc_clusters and \ - job_dict.get('UNIQUE_RESOURCE_NAME', 'unset') == 'ARC' \ - and job_dict['STATUS'] == 'EXECUTING': - try: - home = os.path.join(configuration.user_home, client_dir) - arcsession = arcwrapper.Ui(home) - arcstatus = arcsession.jobStatus(job_dict['EXE']) - job_obj['status'] = arcstatus['status'] - except arcwrapper.ARCWrapperError as err: - logger.error('Error retrieving ARC job status: %s' % - err.what()) - job_obj['status'] += '(Error: ' + err.what() + ')' - except arcwrapper.NoProxyError as err: - logger.error('While retrieving ARC job status: %s' % - err.what()) - job_obj['status'] += '(Error: ' + err.what() + ')' - except Exception as err: - logger.error('Error retrieving ARC job status: %s' % err) - job_obj['status'] += '(Error during retrieval)' - exec_histories = [] if verbose(flags): if 'EXECUTE' in job_dict: diff --git a/mig/shared/functionality/settings.py b/mig/shared/functionality/settings.py index fde56cb8a..85fd08175 100755 --- a/mig/shared/functionality/settings.py +++ b/mig/shared/functionality/settings.py @@ -65,12 +65,6 @@ get_default_css from mig.shared.vgridaccess import get_vgrid_map_vgrids -try: - from mig.shared import arcwrapper -except Exception as exc: - # Ignore errors and let it crash if ARC is enabled without the lib - pass - general_edit = cm_options.copy() ssh_edit = cm_options.copy() @@ -200,8 +194,6 @@ def main(client_id, user_arguments_dict): valid_topics.append('profile') if configuration.site_enable_widgets and configuration.site_script_deps: valid_topics.append('widgets') - if configuration.arc_clusters: - valid_topics.append('arc') if 'setup' not in active_menu: if configuration.site_enable_sftp or configuration.site_enable_sftp_subsys: valid_topics.append('sftp') @@ -2222,34 +2214,6 @@ def main(client_id, user_arguments_dict): output_objects.append({'object_type': 'html_form', 'text': html % fill_helpers}) - # if ARC-enabled server: - if 'arc' in topic_list: - # provide information about the available proxy, offer upload - try: - home_dir = os.path.normpath(base_dir) - session_Ui = arcwrapper.Ui(home_dir, require_user_proxy=True) - proxy = session_Ui.getProxy() - if proxy.IsExpired(): - # can rarely happen, constructor will throw exception - output_objects.append({'object_type': 'text', - 'text': - 'Proxy certificate is expired.'}) - else: - output_objects.append({'object_type': 'text', - 'text': 'Proxy for %s' - % proxy.GetIdentitySN()}) - output_objects.append( - {'object_type': 'text', - 'text': 'Proxy certificate will expire on %s (in %s sec.)' - % (proxy.Expires(), proxy.getTimeleft()) - }) - except arcwrapper.NoProxyError as err: - output_objects.append({'object_type': 'warning', - 'text': 'No proxy certificate to load: %s' - % err.what()}) - - output_objects = output_objects + arcwrapper.askProxy() - output_objects.append({'object_type': 'html_form', 'text': '
'}) return (output_objects, returnvalues.OK) diff --git a/mig/shared/gridscript.py b/mig/shared/gridscript.py index 13b0907b6..7a167e1ac 100644 --- a/mig/shared/gridscript.py +++ b/mig/shared/gridscript.py @@ -38,11 +38,6 @@ from mig.shared.fileio import send_message_to_grid_script, pickle, unpickle, \ delete_file, touch, walk, slow_walk from mig.shared.notification import notify_user_thread -try: - from mig.shared import arcwrapper -except Exception as exc: - # Ignore errors and let it crash if ARC is enabled without the lib - pass def clean_grid_stdin(stdin): @@ -534,121 +529,3 @@ def requeue_job( False, configuration, ) - - -def arc_job_status( - job_dict, - configuration, - logger -): - """Retrieve status information for a job submitted to ARC. - Status is returned as a string. In case of failure, returns - 'UNKNOWN' and logs the error.""" - - logger.debug('Checking ARC job status for %s' % job_dict['JOB_ID']) - - userdir = os.path.join(configuration.user_home, - client_id_dir(job_dict['USER_CERT'])) - try: - jobinfo = {'status': 'UNKNOWN(TO FINISH)'} - session = arcwrapper.Ui(userdir) - jobinfo = session.jobStatus(job_dict['EXE']) - except arcwrapper.ARCWrapperError as err: - logger.error('Error during ARC status retrieval: %s' - % err.what()) - except arcwrapper.NoProxyError as err: - logger.error('Error during ARC status retrieval: %s' - % err.what()) - except Exception as err: - logger.error('Error during ARC status retrieval: %s' - % err.__str__()) - return jobinfo['status'] - - -def clean_arc_job( - job_dict, - status, - msg, - configuration, - logger, - kill=True, - timestamp=None -): - """Cleaning remainder of an executed ARC job: - - delete from ARC (and possibly kill the job, parameter) - - delete two symbolic links (user dir and mrsl file) - - write status and timestamp into mrsl - """ - - logger.debug('Cleanup for ARC job %s, status %s' % - (job_dict['JOB_ID'], status)) - - if not status in ['FINISHED', 'CANCELED', 'FAILED']: - logger.error('inconsistent cleanup request: %s for job %s' % - (status, job_dict)) - return - - # done by the caller... - # executing_queue.dequeue_job_by_id(job_dict['JOB_ID']) - - if not timestamp: - timestamp = time.gmtime() - client_dir = client_id_dir(job_dict['USER_CERT']) - - # clean up in ARC - try: - userdir = os.path.join(configuration.user_home, client_dir) - arcsession = arcwrapper.Ui(userdir) - except Exception as err: - logger.error('Error cleaning up ARC job: %s' % err) - logger.debug('Job was: %s' % job_dict) - else: - # cancel catches, clean always succeeds - if kill: - killed = arcsession.cancel(job_dict['EXE']) - if not killed: - arcsession.clean(job_dict['EXE']) - else: - arcsession.clean(job_dict['EXE']) - - # Clean up associated server files of the job - - if 'SESSIONID' in job_dict: - sessionid = job_dict['SESSIONID'] - symlinks = [os.path.join(configuration.webserver_home, - sessionid), - os.path.join(configuration.sessid_to_mrsl_link_home, - sessionid + '.mRSL')] - for link in symlinks: - try: - os.remove(link) - except Exception as err: - logger.error('Could not remove link %s: %s' % (link, err)) - - job_dict['STATUS'] = status - job_dict[status + '_TIMESTAMP'] = timestamp - - if not status == 'FINISHED': - # Generate execution history - - if 'EXECUTION_HISTORY' not in job_dict: - job_dict['EXECUTION_HISTORY'] = [] - - history_dict = { - 'QUEUED_TIMESTAMP': job_dict['QUEUED_TIMESTAMP'], - 'EXECUTING_TIMESTAMP': job_dict['EXECUTING_TIMESTAMP'], - status + '_TIMESTAMP': timestamp, - status + '_MESSAGE': msg, - 'UNIQUE_RESOURCE_NAME': job_dict['UNIQUE_RESOURCE_NAME'], - } - - job_dict['EXECUTION_HISTORY'].append(history_dict) - - # save into mrsl - - mrsl_file = os.path.join(configuration.mrsl_files_dir, - client_dir, - job_dict['JOB_ID'] + '.mRSL') - pickle(job_dict, mrsl_file, logger) - - return diff --git a/mig/shared/mrslparser.py b/mig/shared/mrslparser.py index e7d26f08e..7ee144887 100644 --- a/mig/shared/mrslparser.py +++ b/mig/shared/mrslparser.py @@ -45,12 +45,6 @@ from mig.shared.safeinput import html_escape, valid_path from mig.shared.vgridaccess import user_vgrid_access -try: - from mig.shared import arcwrapper -except: - # Ignore errors and let it crash if ARC is enabled without the lib - pass - def replace_variables(text, replace_list): """Replace all occurrences of variables from replace_list keys in text @@ -187,10 +181,8 @@ def parse( # convert specified runtime environments to upper-case and verify they # actually exist - # do not check runtime envs if the job is for ARC (submission will - # fail later) - if global_dict.get('JOBTYPE', 'unset') != 'arc' \ - and 'RUNTIMEENVIRONMENT' in global_dict: + # check runtime envs + if 'RUNTIMEENVIRONMENT' in global_dict: re_entries_uppercase = [] for specified_re in global_dict['RUNTIMEENVIRONMENT']: specified_re = specified_re.upper() @@ -298,31 +290,6 @@ def parse( normalized_field.append(' '.join(normalized_parts)) global_dict[field] = normalized_field - # if this is an ARC job (indicated by a flag), check proxy existence - # and lifetime. grid_script will submit the job directly. - - if global_dict.get('JOBTYPE', 'unset') == 'arc': - if not configuration.arc_clusters: - return (False, 'No ARC support!') - - logger.debug('Received job for ARC.') - user_home = os.path.join(configuration.user_home, client_dir) - try: - session = arcwrapper.Ui(user_home) - timeleft = session.getProxy().getTimeleft() - req_time = int(global_dict.get('CPUTIME', '0')) - logger.debug('CPU time (%s), proxy lifetime (%s)' - % (req_time, timeleft)) - if timeleft < req_time: - return (False, 'Proxy time shorter than requested CPU time') - - except arcwrapper.ARCWrapperError as err: - return (False, err.what()) - except arcwrapper.NoProxyError as err: - return (False, 'No Proxy found: %s' % err.what()) - except Exception as err: - return (False, err.__str__()) - # save file if outfile == 'AUTOMATIC': filename = \ diff --git a/mig/shared/mrsltoxrsl.py b/mig/shared/mrsltoxrsl.py deleted file mode 100644 index cd36f9e33..000000000 --- a/mig/shared/mrsltoxrsl.py +++ /dev/null @@ -1,415 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# -# --- BEGIN_HEADER --- -# -# -# mrsltoxrsl - [optionally add short module description on this line] -# Copyright (C) 2003-2020 The MiG Project lead by Brian Vinter -# -# This file is part of MiG. -# -# MiG is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# MiG is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. -# -# --- END_HEADER --- -# - -# -# mrsltoxrsl: translate MiG job to ARC job -# -# (C) 2009 Jost Berthold, grid.dk -# adapted to usage inside a MiG framework -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -# - -"""translate a 'job' from MiG format to ARC format""" - -from __future__ import print_function -from __future__ import absolute_import -from __future__ import division - -from builtins import range -from string import ascii_letters -import math -import random -import os -import sys - -# MiG utilities: -from mig.shared.conf import get_configuration_object -config = get_configuration_object() -logger = config.logger - -# to make this succeed: -# install nordugrid-arc-client and nordugrid-arc-python -# set LD_LIBRARY_PATH="$NORDUGRID_LOCATION/lib:$GLOBUS_LOCATION/lib -# PYTHONPATH="$NORDUGRID_LOCATION/lib/python2.4/site-packages" -try: - import arclib -except: - logger.error('problems importing arclib... trying workaround') - try: - logger.debug('Current sys.path is %s' % sys.path) - sys.path.append(os.environ['NORDUGRID_LOCATION'] - + '/lib/python2.4/site-packages') - import arclib - except: - raise Exception('arclib not found - no problem unless using ARC') - - -def format_xrsl(xrsl): - """An indenter for xrsl files. - Rules: - 0) remove original indentation (strip lines, remove \\n) - 1) indent every line by 2 spaces for every open bracket '(' - 2) insert \\n before the opening bracket (unless there is one) - 3) insert \\n after a closing bracket ')' (unless there is one) - """ - - # raw string, without old indentation and newlines - raw = ''.join([i.strip() for i in ("%s" % xrsl).split('\n')]) - - def indent(acc, n, s): - if not s: - return acc - if s[0] == '(': - start = '\n' + ' ' * n - return(indent(acc + start + '(', n + 2, s[1:])) - elif s[0] == ')': - return(indent(acc + ')', n - 2, s[1:])) - - return(indent(acc + s[0], n, s[1:])) - - return(indent('', 0, raw)) - - -# translate :: (checked Dictionary, session_ID) -# -> (Xrsl,Job Script,name for Job script) -def translate(mrsl_dict, session_id=None): - """Translate an (already checked) mRSL dictionary into xRSL, - suitable for submitting to an ARC resource. - - Returns arclib.Xrsl object. - Throws exception if errors in the xRSL generation occur.""" - - logger.debug('to translate:\n%s\n using session ID %s' - % (mrsl_dict, session_id)) - - try: - # every xrsl-specified job is a conjunction at the top level - - xrsl = arclib.Xrsl(arclib.operator_and) - - # First action: include inner OR of ANDs for targetting - # specific ARC queues. - - # queues can be given in the 'RESOURCE' field, in the format - # : (separated by ":", - # see arcresources.py and safeinput.py). Example: - # ['daughter:benedict.grid.aau.dk','other:fyrgrid.grid.aau.dk'] - # Each entry leads to a cluster/queue combination, and all - # entries will be disjoint for the resulting xrsl. - - # Why we do it first: the arclib Xrsl API does not allow to - # construct relations with inner AND and OR, only relations - # like =,<,<= to specify fields are supported (globus was more - # clever here... XrslRelation being the same as Xrsl). - - if 'RESOURCE' in mrsl_dict: - - # we build a string containing Xrsl, which will replace - # the "xrsl" above (if it contains anything) - - tmp_str = '' - - # this is a list. iterate through all entries (if any) - for targetstring in mrsl_dict['RESOURCE']: - cur = targetstring.rsplit(':', 1) - if len(cur) == 2: - tmp_str += '(&(cluster=%s)(queue=%s))' % (cur[1], cur[0]) - - logger.debug("added to targets: %s" % tmp_str) - else: - logger.debug("ignoring malformed target %s" % cur) - - # did we add something at all? (might be all malformed) - if tmp_str != '': - - xrsl = arclib.Xrsl('&(|%s)' % tmp_str) - - logger.debug('starting with this Xrsl:\n%s' % xrsl) - - if 'JOB_ID' in mrsl_dict: - j_name = mrsl_dict['JOB_ID'] - else: - # random string. should not happen anyway... - j_name = ''.join(random.choice(ascii_letters) - for i in range(12)) -# j_name = mrsl_dict.get('JOB_ID', -# ''.join(random.choice(ascii_letters) \ -# for i in xrange(12))) - - # use JOBID as ARC jobname to avoid presenting only ARC IDs - addRel(xrsl, 'jobname', - ''.join([mrsl_dict.get('JOBNAME', ''), '(', j_name, ')'])) - - # inputfiles + executables, outputfiles - if session_id: - # we have been given a destination to put output files. Insert - # MiG server URL (automatic output download, will use PUT) - destination = '/'.join([config.migserver_https_sid_url, - 'sid_redirect', session_id, '']) - else: - destination = '' - - # make double lists, 2nd part perhaps empty - # output files, always including stdout - tmpoutfiles = [file_mapping(i) - for i in mrsl_dict.get('OUTPUTFILES', [])] - outfiles = [] - for [f, target] in tmpoutfiles: - if target == '': - target = f # same file name if none given - if -1 == target.find('://'): # not remote target, should copy - # (ARC does not allow local file names as target) - target = ''.join([destination, target]) - # means: automatic upload to jobdir on MiG server. - outfiles.append([f, target]) - - # job output, maybe transfer automatically to MiG server - destination = destination + '/'.join(['job_output', j_name, '']) - stdout = '.'.join([j_name, 'stdout']) - stderr = '.'.join([j_name, 'stderr']) - - # do not merge stdout and stderr - addRel(xrsl, 'join', 'no') - - addRel(xrsl, 'stdout', stdout) - outfiles.append([stdout, ''.join([destination, stdout])]) - addRel(xrsl, 'stderr', stderr) - outfiles.append([stderr, ''.join([destination, stderr])]) - - addRel(xrsl, 'outputfiles', outfiles) - - # what we want to do: EXECUTE (should be there) - scriptlines = mrsl_dict['EXECUTE'] - script = '\n'.join(['# generated script from mRSL EXECUTE'] - + scriptlines) - # the script is expected to be present as an input file, - # and to have a certain name which we return. - addRel(xrsl, 'executable', '/bin/sh') - # HEADS UP: this is the script name we wire in. - script_name = '.'.join([j_name, 'sh']) - addRel(xrsl, 'arguments', script_name) - - # executable input files, always including the execute script - execfiles = [file_mapping(i) for i in mrsl_dict.get('EXECUTABLES', [])] - - # HEADS UP: the script name again! - execfiles.append([script_name, '']) - - # (non-executable) input files - infiles = [file_mapping(i) for i in mrsl_dict.get('INPUTFILES', [])] - - # both execfiles and infiles are inputfiles for ARC - addRel(xrsl, 'inputfiles', [flip_for_input(i) - for i in execfiles + infiles]) - - # execfiles are made executable - # (specified as the remote name, relative to the session dir) - def fst(list): - return list[0] - addRel(xrsl, 'executables', [fst(i) for i in execfiles]) - - # more stuff... - - # requested runtime, given in minutes in (user) xrsl ... - time = mrsl_dict.get('CPUTIME') - if time: - addRel(xrsl, 'cputime', "%d" % math.ceil(float(time) / 60)) - - # simply copy the values for these: - copy_items = ['MEMORY', 'DISK', 'NODECOUNT'] - xrsl_name = {'MEMORY': 'memory', 'DISK': 'disk', 'NODECOUNT': 'count'} - # NB: we have to ignore CPUCOUNT, not supported by ARC xrsl - - for x in copy_items: # we ignore the ones which are not there - if x in mrsl_dict: - addRel(xrsl, xrsl_name[x], mrsl_dict[x]) - # and these are all single values - - if 'ARCHITECTURE' in mrsl_dict: - addRel(xrsl, 'architecture', - translate_arch(mrsl_dict['ARCHITECTURE'])) - - if 'ENVIRONMENT' in mrsl_dict: - - # these have already been mangled into pairs (name,value) before - # var_val = [] - # for definition in mrsl_dict['ENVIRONMENT']: - # vv = definition.strip().split('=') - # var_val.append(vv.strip()) - # addRel(xrsl,'environment',var_val) - - addRel(xrsl, 'environment', [list(i) - for i in mrsl_dict['ENVIRONMENT']]) - - if 'RUNTIMEENVIRONMENT' in mrsl_dict: - for line in mrsl_dict['RUNTIMEENVIRONMENT']: - addRel(xrsl, 'runTimeEnvironment', line.strip()) - - if 'NOTIFY' in mrsl_dict: - addresses = [] - # NOTE: max 3 - for line in [i for i in mrsl_dict['NOTIFY'] if is_mail(i)][:3]: - # remove whites before, then "email:" prefix, then strip - address = line.lstrip()[6:].strip() - if address != 'SETTINGS': - addresses.append(address) -# else: -# this should be replaced already, but... -# FIXME: get it from the settings :-P -# addresses.append('*FROM THE SETTINGS*') - if addresses: - addRel(xrsl, 'notify', 'ec ' + ' '.join(addresses)) - - logger.debug('XRSL:\n%s\nScript (%s):\n%s\n)' % - (xrsl, script_name, script)) - except arclib.XrslError as err: - logger.error('Error generating Xrsl: %s' % err) - raise err - return (xrsl, script, script_name) - -# helper functions and constants used: - -# write_pair :: (String,a) -> arclib.XrslRelation -# and is polymorphic in a: a = String, a = List(String), a = List(List(String)) -# the C version of XrslRelation is... so we emulate it here: - - -def write_pair(name, values): - if isinstance(values, list): - if isinstance(values[0], list): - con = arclib.XrslRelationDoubleList - val = values # should cast all to strings, but only used with them - else: - con = arclib.XrslRelationList - val = values # should cast all to strings, but only used with them - else: - con = arclib.XrslRelation - val = values.__str__() - return con(name, arclib.operator_eq, val) - -# used all the time... shortcut. - - -def addRel(xrsl, name, values): - # sometimes we receive empty stuff from the caller. - # No point writing it out at all. - if isinstance(values, list) and len(values) == 0: - return - if values == '': - return - xrsl.AddRelation(write_pair(name, values)) - - -# architectures -architectures = {'X86': 'i686', 'AMD64': 'x86-64', 'IA64': 'ia64', - 'SPARC': 'sparc64', 'SPARC64': 'sparc64', - # 'ITANIUM':'???ia64???', - 'SUN4U': 'sun4u', 'SPARC-T1': 'sparc64', 'SPARC-T2': 'sparc64', - # 'PS3':'??unknown??', - 'CELL': 'cell'} - - -def translate_arch(mig_arch): - - if mig_arch in architectures: - return architectures[mig_arch] - else: - return '' - - -def is_mail(text): - return text.lstrip().startswith('email:') - - -def file_mapping(line): - """Splits the given line of the expected format - local_name remote_name - into a 2-element list [local_name,remote_name] - If remote_name is empty, the empty string is returned as the 2nd part. - No additional checks are performed. - TODO: should perhaps also check for valid path characters. - """ - line = line.strip() - parts = line.split() - local = parts[0] - if len(parts) < 2: - remote = '' - else: - remote = parts[1] - return [local, remote] - - -def flip_for_input(list): - if list[1] == '': - return[list[0], ''] - else: - return [list[1], list[0]] - - -if __name__ == '__main__': - print('starting translation test. Args: ', len(sys.argv)) - logger.debug('translation for file ' + sys.argv[1] + ' starts') - if len(sys.argv) > 1: - fname = sys.argv[1] - parsed = '.'.join([fname, 'parsed']) - translated = '.'.join([parsed, 'xrsl']) - - try: - from mig.shared import mrslparser - from mig.shared import fileio - - (presult, errors) = mrslparser.parse(fname, 'test-id', - '+No+Client+Id', None, parsed) - if not presult: - print('Errors:\n%s' % errors) - else: - print('Parsing OK, now translating') - mrsl_dict = fileio.unpickle(parsed, logger) - (xrsl, script, name) = translate(mrsl_dict, 'test-name') - print('\n'.join(['Job name', name, 'script', script, 'XRSL'])) - fileio.write_file(script, "test-id.sh", logger) - print(format_xrsl(xrsl)) - fileio.write_file("%s" % xrsl, translated, logger) - print('done') - except Exception as err: - print('Error.') - print(err.__str__()) diff --git a/tests/fixture/mig_shared_configuration--new.json b/tests/fixture/mig_shared_configuration--new.json index f4d7edd15..611410309 100644 --- a/tests/fixture/mig_shared_configuration--new.json +++ b/tests/fixture/mig_shared_configuration--new.json @@ -1,7 +1,6 @@ { "admin_email": "", "admin_list": "", - "arc_clusters": [], "architectures": [], "auth_logger": null, "auth_logger_obj": null,