From e402f7964e9ffeb9dd52c7fa655ddf5dd17628d4 Mon Sep 17 00:00:00 2001 From: Lehmann_Fabian Date: Fri, 14 Mar 2025 16:16:09 +0100 Subject: [PATCH 1/2] Extract FTPDaemon to separate project Signed-off-by: Lehmann_Fabian --- daemons/ftp/Dockerfile | 17 - daemons/ftp/ftp.py | 362 ------------------ daemons/ftp/vsftpd.conf | 164 -------- .../scheduler/SchedulerWithDaemonSet.java | 12 - .../scheduler/copystrategy/CopyStrategy.java | 38 -- .../scheduler/copystrategy/FTPstrategy.java | 10 - .../scheduler/la2/copystrategy/ShellCopy.java | 2 +- src/main/resources/copystrategies/ftp.py | 346 ----------------- 8 files changed, 1 insertion(+), 950 deletions(-) delete mode 100644 daemons/ftp/Dockerfile delete mode 100644 daemons/ftp/ftp.py delete mode 100644 daemons/ftp/vsftpd.conf delete mode 100644 src/main/java/cws/k8s/scheduler/scheduler/copystrategy/CopyStrategy.java delete mode 100644 src/main/java/cws/k8s/scheduler/scheduler/copystrategy/FTPstrategy.java delete mode 100644 src/main/resources/copystrategies/ftp.py diff --git a/daemons/ftp/Dockerfile b/daemons/ftp/Dockerfile deleted file mode 100644 index 57099808..00000000 --- a/daemons/ftp/Dockerfile +++ /dev/null @@ -1,17 +0,0 @@ -FROM python:slim -RUN apt update && apt install -y \ - vsftpd \ - && rm -rf /var/lib/apt/lists/* - -RUN mkdir -p /var/run/vsftpd/empty - -COPY vsftpd.conf /etc/vsftpd.conf - -USER root -RUN echo 'root:password' | chpasswd - -COPY ftp.py /code/ftp.py - -WORKDIR /code - -ENTRYPOINT ["sh","-c","/usr/sbin/vsftpd /etc/vsftpd.conf"] \ No newline at end of file diff --git a/daemons/ftp/ftp.py b/daemons/ftp/ftp.py deleted file mode 100644 index 4a779dde..00000000 --- a/daemons/ftp/ftp.py +++ /dev/null @@ -1,362 +0,0 @@ -#!/usr/bin/env python3 -import ftplib -import json -import logging as log -import os -import shutil -import signal -import sys -import time -import urllib.request -import urllib.parse -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from time import sleep - -######################################################################################## -# Call this class with three arguments: trace enabled, name to store logs, config json # -######################################################################################## - -exitIfFileWasNotFound = True -CLOSE = False -UNEXPECTED_ERROR = "Unexpected error" -EXIT = 0 -log.basicConfig( - format='%(levelname)s: %(message)s', - level=log.DEBUG, - handlers=[ - log.FileHandler(".command.init." + sys.argv[2] + ".log"), - log.StreamHandler() - ] -) -trace = {} -traceFilePath = ".command.scheduler.trace" -errors = 0 - - -def myExit(code): - global EXIT - EXIT = code - global CLOSE - CLOSE = True - writeTrace(trace) - exit(EXIT) - - -def close(signalnum, syncFile): - log.info("Killed: %s", str(signalnum)) - closeWithWarning(50, syncFile) - - -def closeWithWarning(errorCode, syncFile): - syncFile.write('##FAILURE##\n') - syncFile.flush() - syncFile.close() - myExit(errorCode) - - -def getIP(node, dns, execution): - ip = urllib.request.urlopen(dns + "daemon/" + execution + "/" + node).read() - return str(ip.decode("utf-8")) - - -# True if the file was deleted or did not exist -def clearLocation(path, dst=None): - if os.path.exists(path): - log.debug("Delete %s", path) - if os.path.islink(path): - if dst is not None and os.readlink(path) == dst: - return False - else: - os.unlink(path) - elif os.path.isdir(path): - shutil.rmtree(path) - else: - os.remove(path) - return True - - -def getFTP(node, currentIP, dns, execution, syncFile): - global errors - connectionProblem = 0 - while connectionProblem < 8: - try: - if currentIP is None: - log.info("Request ip for node: %s", node) - ip = getIP(node, dns, execution) - else: - ip = currentIP - log.info("Try to connect to %s", ip) - ftp = ftplib.FTP(ip, timeout=10) - ftp.login("root", "password") - ftp.set_pasv(True) - ftp.encoding = 'utf-8' - log.info("Connection established") - return ftp - except ConnectionRefusedError: - errors += 1 - log.warning("Connection refused! Try again...") - except BaseException: - errors += 1 - log.exception(UNEXPECTED_ERROR) - connectionProblem += 1 - time.sleep(2 ** connectionProblem) - closeWithWarning(8, syncFile) - - -def closeFTP(ftp): - global errors - if ftp is None: - return - try: - ftp.quit() - ftp.close() - except BaseException: - errors += 1 - log.exception(UNEXPECTED_ERROR) - - -def downloadFile(ftp, filename, size, index, node, syncFile, speed): - global errors - log.info("Download %s [%s/%s] - %s", node, str(index).rjust(len(str(size))), str(size), filename) - try: - syncFile.write("S-" + filename + '\n') - clearLocation(filename) - Path(filename[:filename.rindex("/")]).mkdir(parents=True, exist_ok=True) - start = time.time() - with open(filename, 'wb') as file: - if speed == 100: - ftp.retrbinary('RETR %s' % filename, file.write, 102400) - else: - timer = {"t": time.time_ns()} - - def callback(data): - now = time.time_ns() - diff = now - timer["t"] - file.write(data) - timeToSleep = (diff * (100 / speed) - diff) / 1_000_000_000 - # sleep at least 10ms - if timeToSleep > 0.01: - time.sleep(timeToSleep) - timer["t"] = time.time_ns() - - ftp.retrbinary('RETR %s' % filename, callback, 102400) - end = time.time() - sizeInMB = os.path.getsize(filename) / 1048576 - delta = (end - start) - log.info("Speed: %.3f Mb/s", sizeInMB / delta) - return sizeInMB, delta - except ftplib.error_perm as err: - errors += 1 - if str(err) == "550 Failed to open file.": - log.warning("File not found node: %s file: %s", node, filename) - if exitIfFileWasNotFound: - closeWithWarning(40, syncFile) - except FileNotFoundError: - errors += 1 - log.warning("File not found node: %s file: %s", node, filename) - if exitIfFileWasNotFound: - closeWithWarning(41, syncFile) - except EOFError: - errors += 1 - log.warning("It seems the connection was lost! Try again...") - return None - except BaseException: - errors += 1 - log.exception(UNEXPECTED_ERROR) - return None - return 0, 0 - - -def download(node, currentIP, files, dns, execution, syncFile, speed): - ftp = None - size = len(files) - global CLOSE - sizeInMB = 0 - downloadTime = 0 - while not CLOSE and len(files) > 0: - if ftp is None: - ftp = getFTP(node, currentIP, dns, execution, syncFile) - currentIP = None - filename = files[0] - index = size - len(files) + 1 - result = downloadFile(ftp, filename, size, index, node, syncFile, speed) - if result is None: - ftp = None - continue - sizeInMB += result[0] - downloadTime += result[1] - files.pop(0) - syncFile.write("F-" + filename + '\n') - closeFTP(ftp) - return node, sizeInMB / downloadTime - - -def waitForFiles(syncFilePath, files, startTime): - # wait max. 60 seconds - while True: - if startTime + 60 < time.time(): - return False - if os.path.isfile(syncFilePath): - break - log.debug("Wait for file creation") - time.sleep(0.1) - - # Read file live - with open(syncFilePath, 'r') as syncFileTask: - current = [] - while len(files) > 0: - data = syncFileTask.read() - if not data: - time.sleep(0.3) - else: - for d in data: - if d != "\n": - current.append(d) - else: - text = ''.join(current) - current = [] - if text.startswith("S-"): - continue - if text == "##FAILURE##": - log.debug("Read FAILURE in %s", syncFilePath) - myExit(51) - if text == "##FINISHED##": - log.debug("Read FINISHED in " + syncFilePath + " before all files were found") - myExit(52) - log.debug("Look for " + text[:2] + " with " + text[2:] + " in " + str(files)) - if text[:2] == "F-" and text[2:] in files: - files.remove(text[2:]) - if len(files) == 0: - return True - return len(files) == 0 - - -def loadConfig(): - log.info("Load config") - with open(sys.argv[3]) as jsonFile: - config = json.load(jsonFile) - os.makedirs(config["syncDir"], exist_ok=True) - return config - - -def registerSignal(syncFile): - signal.signal(signal.SIGINT, lambda signalnum, handler: close(signalnum, syncFile)) - signal.signal(signal.SIGTERM, lambda signalnum, handler: close(signalnum, syncFile)) - - -def registerSignal2(): - signal.signal(signal.SIGINT, lambda signalnum, handler: myExit(1)) - signal.signal(signal.SIGTERM, lambda signalnum, handler: myExit(1)) - - -def generateSymlinks(symlinks): - for s in symlinks: - src = s["src"] - dst = s["dst"] - if clearLocation(src, dst): - Path(src[:src.rindex("/")]).mkdir(parents=True, exist_ok=True) - try: - os.symlink(dst, src) - except FileExistsError: - log.warning("File exists: %s -> %s", src, dst) - - -def downloadAllData(data, dns, execution, syncFile, speed): - global trace - throughput = [] - with ThreadPoolExecutor(max_workers=max(10, len(data))) as executor: - futures = [] - for d in data: - files = d["files"] - node = d["node"] - currentIP = d["currentIP"] - futures.append(executor.submit(download, node, currentIP, files, dns, execution, syncFile, speed)) - lastNum = -1 - while len(futures) > 0: - if lastNum != len(futures): - log.info("Wait for %d threads to finish", len(futures)) - lastNum = len(futures) - for f in futures[:]: - if f.done(): - throughput.append(f.result()) - futures.remove(f) - sleep(0.1) - trace["scheduler_init_throughput"] = "\"" + ",".join("{}:{:.3f}".format(*x) for x in throughput) + "\"" - - -def waitForDependingTasks(waitForFilesOfTask, startTime, syncDir): - # Now check for files of other tasks - for waitForTask in waitForFilesOfTask: - waitForFilesSet = set(waitForFilesOfTask[waitForTask]) - if not waitForFiles(syncDir + waitForTask, waitForFilesSet, startTime): - log.error(syncDir + waitForTask + " was not successful") - myExit(200) - - -def writeTrace(dataMap): - if sys.argv[1] == 'true': - global errors - if len(dataMap) == 0 or errors > 0: - return - with open(traceFilePath, "a") as traceFile: - for d in dataMap: - traceFile.write(d + "=" + str(dataMap[d]) + "\n") - traceFile.write("scheduler_init_errors=" + str(errors) + "\n") - - -def finishedDownload(dns, execution, taskname): - try: - dns = dns + "downloadtask/" + execution - log.info("Request: %s", dns) - urllib.request.urlopen(dns, taskname.encode("utf-8")) - except BaseException as err: - log.exception(err) - myExit(100) - - -def run(): - global trace - startTime = time.time() - log.info("Start to setup the environment") - config = loadConfig() - - dns = config["dns"] - execution = config["execution"] - data = config["data"] - symlinks = config["symlinks"] - taskname = config["hash"] - - with open(config["syncDir"] + config["hash"], 'w') as syncFile: - registerSignal(syncFile) - syncFile.write('##STARTED##\n') - syncFile.flush() - startTimeSymlinks = time.time() - generateSymlinks(symlinks) - trace["scheduler_init_symlinks_runtime"] = int((time.time() - startTimeSymlinks) * 1000) - syncFile.write('##SYMLINKS##\n') - syncFile.flush() - startTimeDownload = time.time() - downloadAllData(data, dns, execution, syncFile, config["speed"]) - trace["scheduler_init_download_runtime"] = int((time.time() - startTimeDownload) * 1000) - if CLOSE: - log.debug("Closed with code %s", str(EXIT)) - exit(EXIT) - log.info("Finished Download") - syncFile.write('##FINISHED##\n') - registerSignal2() - - # finishedDownload(dns, execution, taskname) - - # startTimeDependingTasks = time.time() - # waitForDependingTasks(config["waitForFilesOfTask"], startTime, config["syncDir"]) - # trace["scheduler_init_depending_tasks_runtime"] = int((time.time() - startTimeDependingTasks) * 1000) - # log.info("Waited for all tasks") - - # runtime = int((time.time() - startTime) * 1000) - # trace["scheduler_init_runtime"] = runtime - # writeTrace(trace) - - -if __name__ == '__main__': - run() diff --git a/daemons/ftp/vsftpd.conf b/daemons/ftp/vsftpd.conf deleted file mode 100644 index d14bf2df..00000000 --- a/daemons/ftp/vsftpd.conf +++ /dev/null @@ -1,164 +0,0 @@ -# Example config file /etc/vsftpd.conf -# -# The default compiled in settings are fairly paranoid. This sample file -# loosens things up a bit, to make the ftp daemon more usable. -# Please see vsftpd.conf.5 for all compiled in defaults. -# -# READ THIS: This example file is NOT an exhaustive list of vsftpd options. -# Please read the vsftpd.conf.5 manual page to get a full idea of vsftpd's -# capabilities. -# -# -# Run standalone? vsftpd can run either from an inetd or as a standalone -# daemon started from an initscript. -listen=YES -# -# This directive enables listening on IPv6 sockets. By default, listening -# on the IPv6 "any" address (::) will accept connections from both IPv6 -# and IPv4 clients. It is not necessary to listen on *both* IPv4 and IPv6 -# sockets. If you want that (perhaps because you want to listen on specific -# addresses) then you must run two copies of vsftpd with two configuration -# files. -listen_ipv6=NO -# -# Allow anonymous FTP? (Disabled by default). -anonymous_enable=YES -# -# Uncomment this to allow local users to log in. -local_enable=YES -# -# Uncomment this to enable any form of FTP write command. -write_enable=YES -# -# Default umask for local users is 077. You may wish to change this to 022, -# if your users expect that (022 is used by most other ftpd's) -#local_umask=022 -# -# Uncomment this to allow the anonymous FTP user to upload files. This only -# has an effect if the above global write enable is activated. Also, you will -# obviously need to create a directory writable by the FTP user. -anon_upload_enable=NO -# -# Uncomment this if you want the anonymous FTP user to be able to create -# new directories. -anon_mkdir_write_enable=NO -# -anon_other_write_enable=NO -# -# Activate directory messages - messages given to remote users when they -# go into a certain directory. -dirmessage_enable=YES -# -# If enabled, vsftpd will display directory listings with the time -# in your local time zone. The default is to display GMT. The -# times returned by the MDTM FTP command are also affected by this -# option. -use_localtime=YES -# -# Activate logging of uploads/downloads. -xferlog_enable=YES -# -# Make sure PORT transfer connections originate from port 20 (ftp-data). -connect_from_port_20=NO -# -# If you want, you can arrange for uploaded anonymous files to be owned by -# a different user. Note! Using "root" for uploaded files is not -# recommended! -#chown_uploads=YES -#chown_username=whoever -# -# You may override where the log file goes if you like. The default is shown -# below. -#xferlog_file=/var/log/vsftpd.log -# -# If you want, you can have your log file in standard ftpd xferlog format. -# Note that the default log file location is /var/log/xferlog in this case. -#xferlog_std_format=YES -# -# You may change the default value for timing out an idle session. -#idle_session_timeout=600 -# -# You may change the default value for timing out a data connection. -#data_connection_timeout=120 -# -# It is recommended that you define on your system a unique user which the -# ftp server can use as a totally isolated and unprivileged user. -#nopriv_user=ftpsecure -# -# Enable this and the server will recognise asynchronous ABOR requests. Not -# recommended for security (the code is non-trivial). Not enabling it, -# however, may confuse older FTP clients. -#async_abor_enable=YES -# -# By default the server will pretend to allow ASCII mode but in fact ignore -# the request. Turn on the below options to have the server actually do ASCII -# mangling on files when in ASCII mode. -# Beware that on some FTP servers, ASCII support allows a denial of service -# attack (DoS) via the command "SIZE /big/file" in ASCII mode. vsftpd -# predicted this attack and has always been safe, reporting the size of the -# raw file. -# ASCII mangling is a horrible feature of the protocol. -#ascii_upload_enable=YES -#ascii_download_enable=YES -# -# You may fully customise the login banner string: -#ftpd_banner=Welcome to blah FTP service. -# -# You may specify a file of disallowed anonymous e-mail addresses. Apparently -# useful for combatting certain DoS attacks. -#deny_email_enable=YES -# (default follows) -#banned_email_file=/etc/vsftpd.banned_emails -# -# You may restrict local users to their home directories. See the FAQ for -# the possible risks in this before using chroot_local_user or -# chroot_list_enable below. -#chroot_local_user=YES -# -# You may specify an explicit list of local users to chroot() to their home -# directory. If chroot_local_user is YES, then this list becomes a list of -# users to NOT chroot(). -# (Warning! chroot'ing can be very dangerous. If using chroot, make sure that -# the user does not have write access to the top level directory within the -# chroot) -#chroot_local_user=YES -#chroot_list_enable=YES -# (default follows) -#chroot_list_file=/etc/vsftpd.chroot_list -# -# You may activate the "-R" option to the builtin ls. This is disabled by -# default to avoid remote users being able to cause excessive I/O on large -# sites. However, some broken FTP clients such as "ncftp" and "mirror" assume -# the presence of the "-R" option, so there is a strong case for enabling it. -#ls_recurse_enable=YES -# -# Customization -# -# Some of vsftpd's settings don't fit the filesystem layout by -# default. -# -# This option should be the name of a directory which is empty. Also, the -# directory should not be writable by the ftp user. This directory is used -# as a secure chroot() jail at times vsftpd does not require filesystem -# access. -secure_chroot_dir=/var/run/vsftpd/empty -# -# This string is the name of the PAM service vsftpd will use. -pam_service_name=ftp -# -# This option specifies the location of the RSA certificate to use for SSL -# encrypted connections. -rsa_cert_file=/etc/ssl/certs/ssl-cert-snakeoil.pem -rsa_private_key_file=/etc/ssl/private/ssl-cert-snakeoil.key -ssl_enable=NO - -anonymous_enable=yes -anon_root=/ - -pasv_enable=Yes -pasv_max_port=10090 -pasv_min_port=11090 - -# -# Uncomment this to indicate that vsftpd use a utf8 filesystem. -#utf8_filesystem=YES \ No newline at end of file diff --git a/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java b/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java index e3a760d8..60f76f7e 100644 --- a/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java +++ b/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java @@ -13,8 +13,6 @@ import cws.k8s.scheduler.model.taskinputs.TaskInputs; import cws.k8s.scheduler.rest.exceptions.NotARealFileException; import cws.k8s.scheduler.rest.response.getfile.FileResponse; -import cws.k8s.scheduler.scheduler.copystrategy.CopyStrategy; -import cws.k8s.scheduler.scheduler.copystrategy.FTPstrategy; import cws.k8s.scheduler.util.DaemonHolder; import cws.k8s.scheduler.util.NodeTaskAlignment; import cws.k8s.scheduler.util.NodeTaskFilesAlignment; @@ -44,8 +42,6 @@ public abstract class SchedulerWithDaemonSet extends Scheduler { final DaemonHolder daemonHolder = new DaemonHolder(); @Getter private String workflowEngineNode = null; - @Getter - private final CopyStrategy copyStrategy; final HierarchyWrapper hierarchyWrapper; private final InputFileCollector inputFileCollector; private final ConcurrentHashMap requestedLocations = new ConcurrentHashMap<>(); @@ -64,14 +60,6 @@ public abstract class SchedulerWithDaemonSet extends Scheduler { if ( config.copyStrategy == null ) { throw new IllegalArgumentException( "Copy strategy is null" ); } - switch ( config.copyStrategy ){ - case "ftp": - case "copy": - copyStrategy = new FTPstrategy(); - break; - default: - throw new IllegalArgumentException( "Copy strategy is unknown " + config.copyStrategy ); - } this.localWorkDir = config.workDir; } diff --git a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/CopyStrategy.java b/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/CopyStrategy.java deleted file mode 100644 index 65479eb7..00000000 --- a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/CopyStrategy.java +++ /dev/null @@ -1,38 +0,0 @@ -package cws.k8s.scheduler.scheduler.copystrategy; - -import lombok.extern.slf4j.Slf4j; - -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; -import java.util.Set; - -@Slf4j -public abstract class CopyStrategy { - - private void write( BufferedWriter pw, File file, String resource ) { - ClassLoader classLoader = getClass().getClassLoader(); - try (InputStream inputStream = classLoader.getResourceAsStream( resource )) { - assert inputStream != null; - try (InputStreamReader streamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); - BufferedReader reader = new BufferedReader(streamReader)) { - - String line; - while ((line = reader.readLine()) != null) { - pw.write( line ); - pw.write( '\n' ); - } - - Set executable = PosixFilePermissions.fromString("rwxrwxrwx"); - Files.setPosixFilePermissions( file.toPath(), executable ); - } - } catch (IOException e) { - log.error( "Cannot write " + file, e); - } - } - - abstract String getResource(); - -} diff --git a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/FTPstrategy.java b/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/FTPstrategy.java deleted file mode 100644 index 4369efb8..00000000 --- a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/FTPstrategy.java +++ /dev/null @@ -1,10 +0,0 @@ -package cws.k8s.scheduler.scheduler.copystrategy; - -public class FTPstrategy extends CopyStrategy { - - @Override - String getResource() { - return "copystrategies/ftp.py"; - } - -} \ No newline at end of file diff --git a/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java b/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java index 2cbcd936..c189a123 100644 --- a/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java +++ b/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java @@ -37,7 +37,7 @@ public void startCopyTasks( final CopyTask copyTask, final NodeTaskFilesAlignmen } catch ( IOException e ) { throw new RuntimeException( e ); } - command[2] += "/code/ftp.py false \"" + copyTaskIdentifier + "\" \"" + filename + "\""; + command[2] += "/app/ftp.py false \"" + copyTaskIdentifier + "\" \"" + filename + "\""; String name = nodeTaskFilesAlignment.task.getConfig().getName() + "-copy-" + nodeTaskFilesAlignment.node.getName(); log.info( "Starting {} to node {}", nodeTaskFilesAlignment.task.getConfig().getName(), nodeTaskFilesAlignment.node.getName() ); logCopyTask.copy( nodeTaskFilesAlignment.task.getConfig().getName(), nodeTaskFilesAlignment.node.getName(), copyTask.getInputFiles().size(), "start" ); diff --git a/src/main/resources/copystrategies/ftp.py b/src/main/resources/copystrategies/ftp.py deleted file mode 100644 index da9191a0..00000000 --- a/src/main/resources/copystrategies/ftp.py +++ /dev/null @@ -1,346 +0,0 @@ -#!/usr/bin/env python3 -import ftplib -import json -import logging as log -import os -import shutil -import signal -import sys -import time -import urllib.request -import urllib.parse -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from time import sleep - -exitIfFileWasNotFound = True -CLOSE = False -UNEXPECTED_ERROR = "Unexpected error" -EXIT = 0 -log.basicConfig( - format='%(levelname)s: %(message)s', - level=log.DEBUG, - handlers=[ - log.FileHandler(".command.init.log"), - log.StreamHandler() - ] -) -trace = {} -traceFilePath = ".command.scheduler.trace" -errors = 0 - - -def myExit(code): - global EXIT - EXIT = code - global CLOSE - CLOSE = True - writeTrace(trace) - exit(EXIT) - - -def close(signalnum, syncFile): - log.info("Killed: %s", str(signalnum)) - closeWithWarning(50, syncFile) - - -def closeWithWarning(errorCode, syncFile): - syncFile.write('##FAILURE##\n') - syncFile.flush() - syncFile.close() - myExit(errorCode) - - -def getIP(node, dns, execution): - ip = urllib.request.urlopen(dns + "daemon/" + execution + "/" + node).read() - return str(ip.decode("utf-8")) - - -# True if the file was deleted or did not exist -def clearLocation(path, dst=None): - if os.path.exists(path): - log.debug("Delete %s", path) - if os.path.islink(path): - if dst is not None and os.readlink(path) == dst: - return False - else: - os.unlink(path) - elif os.path.isdir(path): - shutil.rmtree(path) - else: - os.remove(path) - return True - - -def getFTP(node, currentIP, dns, execution, syncFile): - global errors - connectionProblem = 0 - while connectionProblem < 8: - try: - if currentIP is None: - log.info("Request ip for node: %s", node) - ip = getIP(node, dns, execution) - else: - ip = currentIP - log.info("Try to connect to %s", ip) - ftp = ftplib.FTP(ip, timeout=10) - ftp.login("root", "password") - ftp.set_pasv(True) - ftp.encoding = 'utf-8' - log.info("Connection established") - return ftp - except ConnectionRefusedError: - errors += 1 - log.warning("Connection refused! Try again...") - except BaseException: - errors += 1 - log.exception(UNEXPECTED_ERROR) - connectionProblem += 1 - time.sleep(2 ** connectionProblem) - closeWithWarning(8, syncFile) - - -def closeFTP(ftp): - global errors - if ftp is None: - return - try: - ftp.quit() - ftp.close() - except BaseException: - errors += 1 - log.exception(UNEXPECTED_ERROR) - - -def downloadFile(ftp, filename, size, index, node, syncFile): - global errors - log.info("Download %s [%s/%s] - %s", node, str(index).rjust(len(str(size))), str(size), filename) - try: - syncFile.write("S-" + filename + '\n') - clearLocation(filename) - Path(filename[:filename.rindex("/")]).mkdir(parents=True, exist_ok=True) - start = time.time() - ftp.retrbinary('RETR %s' % filename, open(filename, 'wb').write, 102400) - end = time.time() - sizeInMB = os.path.getsize(filename) / 1000000 - delta = (end - start) - log.info("Speed: %.3f Mbit/s", sizeInMB / delta) - return sizeInMB, delta - except ftplib.error_perm as err: - errors += 1 - if str(err) == "550 Failed to open file.": - log.warning("File not found node: %s file: %s", node, filename) - if exitIfFileWasNotFound: - closeWithWarning(40, syncFile) - except FileNotFoundError: - errors += 1 - log.warning("File not found node: %s file: %s", node, filename) - if exitIfFileWasNotFound: - closeWithWarning(41, syncFile) - except EOFError: - errors += 1 - log.warning("It seems the connection was lost! Try again...") - return None - except BaseException: - errors += 1 - log.exception(UNEXPECTED_ERROR) - return None - return 0, 0 - - -def download(node, currentIP, files, dns, execution, syncFile): - ftp = None - size = len(files) - global CLOSE - sizeInMB = 0 - downloadTime = 0 - while not CLOSE and len(files) > 0: - if ftp is None: - ftp = getFTP(node, currentIP, dns, execution, syncFile) - currentIP = None - filename = files[0] - index = size - len(files) + 1 - result = downloadFile(ftp, filename, size, index, node, syncFile) - if result is None: - ftp = None - continue - sizeInMB += result[0] - downloadTime += result[1] - files.pop(0) - syncFile.write("F-" + filename + '\n') - closeFTP(ftp) - return node, sizeInMB / downloadTime - - -def waitForFiles(syncFilePath, files, startTime): - # wait max. 60 seconds - while True: - if startTime + 60 < time.time(): - return False - if os.path.isfile(syncFilePath): - break - log.debug("Wait for file creation") - time.sleep(0.1) - - # Read file live - with open(syncFilePath, 'r') as syncFileTask: - current = [] - while len(files) > 0: - data = syncFileTask.read() - if not data: - time.sleep(0.3) - else: - for d in data: - if d != "\n": - current.append(d) - else: - text = ''.join(current) - current = [] - if text.startswith("S-"): - continue - if text == "##FAILURE##": - log.debug("Read FAILURE in %s", syncFilePath) - myExit(51) - if text == "##FINISHED##": - log.debug("Read FINISHED in " + syncFilePath + " before all files were found") - myExit(52) - log.debug("Look for " + text[:2] + " with " + text[2:] + " in " + str(files)) - if text[:2] == "F-" and text[2:] in files: - files.remove(text[2:]) - if len(files) == 0: - return True - return len(files) == 0 - - -def loadConfig(configFilePath): - if not os.path.isfile(configFilePath): - log.error("Config file not found: %s", configFilePath) - myExit(102) - - with open(configFilePath, 'r') as configFile: - config = json.load(configFile) - - os.makedirs(config["syncDir"], exist_ok=True) - return config - - -def registerSignal(syncFile): - signal.signal(signal.SIGINT, lambda signalnum, handler: close(signalnum, syncFile)) - signal.signal(signal.SIGTERM, lambda signalnum, handler: close(signalnum, syncFile)) - - -def registerSignal2(): - signal.signal(signal.SIGINT, lambda signalnum, handler: myExit(1)) - signal.signal(signal.SIGTERM, lambda signalnum, handler: myExit(1)) - - -def generateSymlinks(symlinks): - for s in symlinks: - src = s["src"] - dst = s["dst"] - if clearLocation(src, dst): - Path(src[:src.rindex("/")]).mkdir(parents=True, exist_ok=True) - try: - os.symlink(dst, src) - except FileExistsError: - log.warning("File exists: %s -> %s", src, dst) - - -def downloadAllData(data, dns, execution, syncFile): - global trace - throughput = [] - with ThreadPoolExecutor(max_workers=max(10, len(data))) as executor: - futures = [] - for d in data: - files = d["files"] - node = d["node"] - currentIP = d["currentIP"] - futures.append(executor.submit(download, node, currentIP, files, dns, execution, syncFile)) - lastNum = -1 - while len(futures) > 0: - if lastNum != len(futures): - log.info("Wait for %d threads to finish", len(futures)) - lastNum = len(futures) - for f in futures[:]: - if f.done(): - throughput.append(f.result()) - futures.remove(f) - sleep(0.1) - trace["scheduler_init_throughput"] = "\"" + ",".join("{}:{:.3f}".format(*x) for x in throughput) + "\"" - - -def waitForDependingTasks(waitForFilesOfTask, startTime, syncDir): - # Now check for files of other tasks - for waitForTask in waitForFilesOfTask: - waitForFilesSet = set(waitForFilesOfTask[waitForTask]) - if not waitForFiles(syncDir + waitForTask, waitForFilesSet, startTime): - log.error(syncDir + waitForTask + " was not successful") - myExit(200) - - -def writeTrace(dataMap): - if sys.argv[1] == 'true': - global errors - if len(dataMap) == 0 or errors > 0: - return - with open(traceFilePath, "a") as traceFile: - for d in dataMap: - traceFile.write(d + "=" + str(dataMap[d]) + "\n") - traceFile.write("scheduler_init_errors=" + str(errors) + "\n") - - -def finishedDownload(dns, execution, taskname): - try: - dns = dns + "downloadtask/" + execution - log.info("Request: %s", dns) - urllib.request.urlopen(dns, taskname.encode( "utf-8" )) - except BaseException as err: - log.exception(err) - myExit(100) - - -def run(): - global trace - startTime = time.time() - log.info("Start to setup the environment") - config = loadConfig(".command.inputs.json") - - dns = config["dns"] - execution = config["execution"] - data = config["data"] - symlinks = config["symlinks"] - taskname = config["hash"] - - with open(config["syncDir"] + config["hash"], 'w') as syncFile: - registerSignal(syncFile) - syncFile.write('##STARTED##\n') - syncFile.flush() - startTimeSymlinks = time.time() - generateSymlinks(symlinks) - trace["scheduler_init_symlinks_runtime"] = int((time.time() - startTimeSymlinks) * 1000) - syncFile.write('##SYMLINKS##\n') - syncFile.flush() - startTimeDownload = time.time() - downloadAllData(data, dns, execution, syncFile) - trace["scheduler_init_download_runtime"] = int((time.time() - startTimeDownload) * 1000) - if CLOSE: - log.debug("Closed with code %s", str(EXIT)) - exit(EXIT) - log.info("Finished Download") - syncFile.write('##FINISHED##\n') - registerSignal2() - - finishedDownload(dns, execution, taskname) - - startTimeDependingTasks = time.time() - waitForDependingTasks(config["waitForFilesOfTask"], startTime, config["syncDir"]) - trace["scheduler_init_depending_tasks_runtime"] = int((time.time() - startTimeDependingTasks) * 1000) - log.info("Waited for all tasks") - - runtime = int((time.time() - startTime) * 1000) - trace["scheduler_init_runtime"] = runtime - writeTrace(trace) - - -if __name__ == '__main__': - run() From a831d72ea0cfe2fc2281a0b165f73dfb84f79eff Mon Sep 17 00:00:00 2001 From: Lehmann_Fabian Date: Fri, 14 Mar 2025 16:16:34 +0100 Subject: [PATCH 2/2] Add spring-boot-starter-validation Signed-off-by: Lehmann_Fabian --- pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pom.xml b/pom.xml index 6c0c0f21..c2e35939 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,11 @@ 9.12.4544 + + org.springframework.boot + spring-boot-starter-validation + +