diff --git a/daemons/ftp/Dockerfile b/daemons/ftp/Dockerfile
deleted file mode 100644
index 57099808..00000000
--- a/daemons/ftp/Dockerfile
+++ /dev/null
@@ -1,17 +0,0 @@
-FROM python:slim
-RUN apt update && apt install -y \
- vsftpd \
- && rm -rf /var/lib/apt/lists/*
-
-RUN mkdir -p /var/run/vsftpd/empty
-
-COPY vsftpd.conf /etc/vsftpd.conf
-
-USER root
-RUN echo 'root:password' | chpasswd
-
-COPY ftp.py /code/ftp.py
-
-WORKDIR /code
-
-ENTRYPOINT ["sh","-c","/usr/sbin/vsftpd /etc/vsftpd.conf"]
\ No newline at end of file
diff --git a/daemons/ftp/ftp.py b/daemons/ftp/ftp.py
deleted file mode 100644
index 4a779dde..00000000
--- a/daemons/ftp/ftp.py
+++ /dev/null
@@ -1,362 +0,0 @@
-#!/usr/bin/env python3
-import ftplib
-import json
-import logging as log
-import os
-import shutil
-import signal
-import sys
-import time
-import urllib.request
-import urllib.parse
-from concurrent.futures import ThreadPoolExecutor
-from pathlib import Path
-from time import sleep
-
-########################################################################################
-# Call this class with three arguments: trace enabled, name to store logs, config json #
-########################################################################################
-
-exitIfFileWasNotFound = True
-CLOSE = False
-UNEXPECTED_ERROR = "Unexpected error"
-EXIT = 0
-log.basicConfig(
- format='%(levelname)s: %(message)s',
- level=log.DEBUG,
- handlers=[
- log.FileHandler(".command.init." + sys.argv[2] + ".log"),
- log.StreamHandler()
- ]
-)
-trace = {}
-traceFilePath = ".command.scheduler.trace"
-errors = 0
-
-
-def myExit(code):
- global EXIT
- EXIT = code
- global CLOSE
- CLOSE = True
- writeTrace(trace)
- exit(EXIT)
-
-
-def close(signalnum, syncFile):
- log.info("Killed: %s", str(signalnum))
- closeWithWarning(50, syncFile)
-
-
-def closeWithWarning(errorCode, syncFile):
- syncFile.write('##FAILURE##\n')
- syncFile.flush()
- syncFile.close()
- myExit(errorCode)
-
-
-def getIP(node, dns, execution):
- ip = urllib.request.urlopen(dns + "daemon/" + execution + "/" + node).read()
- return str(ip.decode("utf-8"))
-
-
-# True if the file was deleted or did not exist
-def clearLocation(path, dst=None):
- if os.path.exists(path):
- log.debug("Delete %s", path)
- if os.path.islink(path):
- if dst is not None and os.readlink(path) == dst:
- return False
- else:
- os.unlink(path)
- elif os.path.isdir(path):
- shutil.rmtree(path)
- else:
- os.remove(path)
- return True
-
-
-def getFTP(node, currentIP, dns, execution, syncFile):
- global errors
- connectionProblem = 0
- while connectionProblem < 8:
- try:
- if currentIP is None:
- log.info("Request ip for node: %s", node)
- ip = getIP(node, dns, execution)
- else:
- ip = currentIP
- log.info("Try to connect to %s", ip)
- ftp = ftplib.FTP(ip, timeout=10)
- ftp.login("root", "password")
- ftp.set_pasv(True)
- ftp.encoding = 'utf-8'
- log.info("Connection established")
- return ftp
- except ConnectionRefusedError:
- errors += 1
- log.warning("Connection refused! Try again...")
- except BaseException:
- errors += 1
- log.exception(UNEXPECTED_ERROR)
- connectionProblem += 1
- time.sleep(2 ** connectionProblem)
- closeWithWarning(8, syncFile)
-
-
-def closeFTP(ftp):
- global errors
- if ftp is None:
- return
- try:
- ftp.quit()
- ftp.close()
- except BaseException:
- errors += 1
- log.exception(UNEXPECTED_ERROR)
-
-
-def downloadFile(ftp, filename, size, index, node, syncFile, speed):
- global errors
- log.info("Download %s [%s/%s] - %s", node, str(index).rjust(len(str(size))), str(size), filename)
- try:
- syncFile.write("S-" + filename + '\n')
- clearLocation(filename)
- Path(filename[:filename.rindex("/")]).mkdir(parents=True, exist_ok=True)
- start = time.time()
- with open(filename, 'wb') as file:
- if speed == 100:
- ftp.retrbinary('RETR %s' % filename, file.write, 102400)
- else:
- timer = {"t": time.time_ns()}
-
- def callback(data):
- now = time.time_ns()
- diff = now - timer["t"]
- file.write(data)
- timeToSleep = (diff * (100 / speed) - diff) / 1_000_000_000
- # sleep at least 10ms
- if timeToSleep > 0.01:
- time.sleep(timeToSleep)
- timer["t"] = time.time_ns()
-
- ftp.retrbinary('RETR %s' % filename, callback, 102400)
- end = time.time()
- sizeInMB = os.path.getsize(filename) / 1048576
- delta = (end - start)
- log.info("Speed: %.3f Mb/s", sizeInMB / delta)
- return sizeInMB, delta
- except ftplib.error_perm as err:
- errors += 1
- if str(err) == "550 Failed to open file.":
- log.warning("File not found node: %s file: %s", node, filename)
- if exitIfFileWasNotFound:
- closeWithWarning(40, syncFile)
- except FileNotFoundError:
- errors += 1
- log.warning("File not found node: %s file: %s", node, filename)
- if exitIfFileWasNotFound:
- closeWithWarning(41, syncFile)
- except EOFError:
- errors += 1
- log.warning("It seems the connection was lost! Try again...")
- return None
- except BaseException:
- errors += 1
- log.exception(UNEXPECTED_ERROR)
- return None
- return 0, 0
-
-
-def download(node, currentIP, files, dns, execution, syncFile, speed):
- ftp = None
- size = len(files)
- global CLOSE
- sizeInMB = 0
- downloadTime = 0
- while not CLOSE and len(files) > 0:
- if ftp is None:
- ftp = getFTP(node, currentIP, dns, execution, syncFile)
- currentIP = None
- filename = files[0]
- index = size - len(files) + 1
- result = downloadFile(ftp, filename, size, index, node, syncFile, speed)
- if result is None:
- ftp = None
- continue
- sizeInMB += result[0]
- downloadTime += result[1]
- files.pop(0)
- syncFile.write("F-" + filename + '\n')
- closeFTP(ftp)
- return node, sizeInMB / downloadTime
-
-
-def waitForFiles(syncFilePath, files, startTime):
- # wait max. 60 seconds
- while True:
- if startTime + 60 < time.time():
- return False
- if os.path.isfile(syncFilePath):
- break
- log.debug("Wait for file creation")
- time.sleep(0.1)
-
- # Read file live
- with open(syncFilePath, 'r') as syncFileTask:
- current = []
- while len(files) > 0:
- data = syncFileTask.read()
- if not data:
- time.sleep(0.3)
- else:
- for d in data:
- if d != "\n":
- current.append(d)
- else:
- text = ''.join(current)
- current = []
- if text.startswith("S-"):
- continue
- if text == "##FAILURE##":
- log.debug("Read FAILURE in %s", syncFilePath)
- myExit(51)
- if text == "##FINISHED##":
- log.debug("Read FINISHED in " + syncFilePath + " before all files were found")
- myExit(52)
- log.debug("Look for " + text[:2] + " with " + text[2:] + " in " + str(files))
- if text[:2] == "F-" and text[2:] in files:
- files.remove(text[2:])
- if len(files) == 0:
- return True
- return len(files) == 0
-
-
-def loadConfig():
- log.info("Load config")
- with open(sys.argv[3]) as jsonFile:
- config = json.load(jsonFile)
- os.makedirs(config["syncDir"], exist_ok=True)
- return config
-
-
-def registerSignal(syncFile):
- signal.signal(signal.SIGINT, lambda signalnum, handler: close(signalnum, syncFile))
- signal.signal(signal.SIGTERM, lambda signalnum, handler: close(signalnum, syncFile))
-
-
-def registerSignal2():
- signal.signal(signal.SIGINT, lambda signalnum, handler: myExit(1))
- signal.signal(signal.SIGTERM, lambda signalnum, handler: myExit(1))
-
-
-def generateSymlinks(symlinks):
- for s in symlinks:
- src = s["src"]
- dst = s["dst"]
- if clearLocation(src, dst):
- Path(src[:src.rindex("/")]).mkdir(parents=True, exist_ok=True)
- try:
- os.symlink(dst, src)
- except FileExistsError:
- log.warning("File exists: %s -> %s", src, dst)
-
-
-def downloadAllData(data, dns, execution, syncFile, speed):
- global trace
- throughput = []
- with ThreadPoolExecutor(max_workers=max(10, len(data))) as executor:
- futures = []
- for d in data:
- files = d["files"]
- node = d["node"]
- currentIP = d["currentIP"]
- futures.append(executor.submit(download, node, currentIP, files, dns, execution, syncFile, speed))
- lastNum = -1
- while len(futures) > 0:
- if lastNum != len(futures):
- log.info("Wait for %d threads to finish", len(futures))
- lastNum = len(futures)
- for f in futures[:]:
- if f.done():
- throughput.append(f.result())
- futures.remove(f)
- sleep(0.1)
- trace["scheduler_init_throughput"] = "\"" + ",".join("{}:{:.3f}".format(*x) for x in throughput) + "\""
-
-
-def waitForDependingTasks(waitForFilesOfTask, startTime, syncDir):
- # Now check for files of other tasks
- for waitForTask in waitForFilesOfTask:
- waitForFilesSet = set(waitForFilesOfTask[waitForTask])
- if not waitForFiles(syncDir + waitForTask, waitForFilesSet, startTime):
- log.error(syncDir + waitForTask + " was not successful")
- myExit(200)
-
-
-def writeTrace(dataMap):
- if sys.argv[1] == 'true':
- global errors
- if len(dataMap) == 0 or errors > 0:
- return
- with open(traceFilePath, "a") as traceFile:
- for d in dataMap:
- traceFile.write(d + "=" + str(dataMap[d]) + "\n")
- traceFile.write("scheduler_init_errors=" + str(errors) + "\n")
-
-
-def finishedDownload(dns, execution, taskname):
- try:
- dns = dns + "downloadtask/" + execution
- log.info("Request: %s", dns)
- urllib.request.urlopen(dns, taskname.encode("utf-8"))
- except BaseException as err:
- log.exception(err)
- myExit(100)
-
-
-def run():
- global trace
- startTime = time.time()
- log.info("Start to setup the environment")
- config = loadConfig()
-
- dns = config["dns"]
- execution = config["execution"]
- data = config["data"]
- symlinks = config["symlinks"]
- taskname = config["hash"]
-
- with open(config["syncDir"] + config["hash"], 'w') as syncFile:
- registerSignal(syncFile)
- syncFile.write('##STARTED##\n')
- syncFile.flush()
- startTimeSymlinks = time.time()
- generateSymlinks(symlinks)
- trace["scheduler_init_symlinks_runtime"] = int((time.time() - startTimeSymlinks) * 1000)
- syncFile.write('##SYMLINKS##\n')
- syncFile.flush()
- startTimeDownload = time.time()
- downloadAllData(data, dns, execution, syncFile, config["speed"])
- trace["scheduler_init_download_runtime"] = int((time.time() - startTimeDownload) * 1000)
- if CLOSE:
- log.debug("Closed with code %s", str(EXIT))
- exit(EXIT)
- log.info("Finished Download")
- syncFile.write('##FINISHED##\n')
- registerSignal2()
-
- # finishedDownload(dns, execution, taskname)
-
- # startTimeDependingTasks = time.time()
- # waitForDependingTasks(config["waitForFilesOfTask"], startTime, config["syncDir"])
- # trace["scheduler_init_depending_tasks_runtime"] = int((time.time() - startTimeDependingTasks) * 1000)
- # log.info("Waited for all tasks")
-
- # runtime = int((time.time() - startTime) * 1000)
- # trace["scheduler_init_runtime"] = runtime
- # writeTrace(trace)
-
-
-if __name__ == '__main__':
- run()
diff --git a/daemons/ftp/vsftpd.conf b/daemons/ftp/vsftpd.conf
deleted file mode 100644
index d14bf2df..00000000
--- a/daemons/ftp/vsftpd.conf
+++ /dev/null
@@ -1,164 +0,0 @@
-# Example config file /etc/vsftpd.conf
-#
-# The default compiled in settings are fairly paranoid. This sample file
-# loosens things up a bit, to make the ftp daemon more usable.
-# Please see vsftpd.conf.5 for all compiled in defaults.
-#
-# READ THIS: This example file is NOT an exhaustive list of vsftpd options.
-# Please read the vsftpd.conf.5 manual page to get a full idea of vsftpd's
-# capabilities.
-#
-#
-# Run standalone? vsftpd can run either from an inetd or as a standalone
-# daemon started from an initscript.
-listen=YES
-#
-# This directive enables listening on IPv6 sockets. By default, listening
-# on the IPv6 "any" address (::) will accept connections from both IPv6
-# and IPv4 clients. It is not necessary to listen on *both* IPv4 and IPv6
-# sockets. If you want that (perhaps because you want to listen on specific
-# addresses) then you must run two copies of vsftpd with two configuration
-# files.
-listen_ipv6=NO
-#
-# Allow anonymous FTP? (Disabled by default).
-anonymous_enable=YES
-#
-# Uncomment this to allow local users to log in.
-local_enable=YES
-#
-# Uncomment this to enable any form of FTP write command.
-write_enable=YES
-#
-# Default umask for local users is 077. You may wish to change this to 022,
-# if your users expect that (022 is used by most other ftpd's)
-#local_umask=022
-#
-# Uncomment this to allow the anonymous FTP user to upload files. This only
-# has an effect if the above global write enable is activated. Also, you will
-# obviously need to create a directory writable by the FTP user.
-anon_upload_enable=NO
-#
-# Uncomment this if you want the anonymous FTP user to be able to create
-# new directories.
-anon_mkdir_write_enable=NO
-#
-anon_other_write_enable=NO
-#
-# Activate directory messages - messages given to remote users when they
-# go into a certain directory.
-dirmessage_enable=YES
-#
-# If enabled, vsftpd will display directory listings with the time
-# in your local time zone. The default is to display GMT. The
-# times returned by the MDTM FTP command are also affected by this
-# option.
-use_localtime=YES
-#
-# Activate logging of uploads/downloads.
-xferlog_enable=YES
-#
-# Make sure PORT transfer connections originate from port 20 (ftp-data).
-connect_from_port_20=NO
-#
-# If you want, you can arrange for uploaded anonymous files to be owned by
-# a different user. Note! Using "root" for uploaded files is not
-# recommended!
-#chown_uploads=YES
-#chown_username=whoever
-#
-# You may override where the log file goes if you like. The default is shown
-# below.
-#xferlog_file=/var/log/vsftpd.log
-#
-# If you want, you can have your log file in standard ftpd xferlog format.
-# Note that the default log file location is /var/log/xferlog in this case.
-#xferlog_std_format=YES
-#
-# You may change the default value for timing out an idle session.
-#idle_session_timeout=600
-#
-# You may change the default value for timing out a data connection.
-#data_connection_timeout=120
-#
-# It is recommended that you define on your system a unique user which the
-# ftp server can use as a totally isolated and unprivileged user.
-#nopriv_user=ftpsecure
-#
-# Enable this and the server will recognise asynchronous ABOR requests. Not
-# recommended for security (the code is non-trivial). Not enabling it,
-# however, may confuse older FTP clients.
-#async_abor_enable=YES
-#
-# By default the server will pretend to allow ASCII mode but in fact ignore
-# the request. Turn on the below options to have the server actually do ASCII
-# mangling on files when in ASCII mode.
-# Beware that on some FTP servers, ASCII support allows a denial of service
-# attack (DoS) via the command "SIZE /big/file" in ASCII mode. vsftpd
-# predicted this attack and has always been safe, reporting the size of the
-# raw file.
-# ASCII mangling is a horrible feature of the protocol.
-#ascii_upload_enable=YES
-#ascii_download_enable=YES
-#
-# You may fully customise the login banner string:
-#ftpd_banner=Welcome to blah FTP service.
-#
-# You may specify a file of disallowed anonymous e-mail addresses. Apparently
-# useful for combatting certain DoS attacks.
-#deny_email_enable=YES
-# (default follows)
-#banned_email_file=/etc/vsftpd.banned_emails
-#
-# You may restrict local users to their home directories. See the FAQ for
-# the possible risks in this before using chroot_local_user or
-# chroot_list_enable below.
-#chroot_local_user=YES
-#
-# You may specify an explicit list of local users to chroot() to their home
-# directory. If chroot_local_user is YES, then this list becomes a list of
-# users to NOT chroot().
-# (Warning! chroot'ing can be very dangerous. If using chroot, make sure that
-# the user does not have write access to the top level directory within the
-# chroot)
-#chroot_local_user=YES
-#chroot_list_enable=YES
-# (default follows)
-#chroot_list_file=/etc/vsftpd.chroot_list
-#
-# You may activate the "-R" option to the builtin ls. This is disabled by
-# default to avoid remote users being able to cause excessive I/O on large
-# sites. However, some broken FTP clients such as "ncftp" and "mirror" assume
-# the presence of the "-R" option, so there is a strong case for enabling it.
-#ls_recurse_enable=YES
-#
-# Customization
-#
-# Some of vsftpd's settings don't fit the filesystem layout by
-# default.
-#
-# This option should be the name of a directory which is empty. Also, the
-# directory should not be writable by the ftp user. This directory is used
-# as a secure chroot() jail at times vsftpd does not require filesystem
-# access.
-secure_chroot_dir=/var/run/vsftpd/empty
-#
-# This string is the name of the PAM service vsftpd will use.
-pam_service_name=ftp
-#
-# This option specifies the location of the RSA certificate to use for SSL
-# encrypted connections.
-rsa_cert_file=/etc/ssl/certs/ssl-cert-snakeoil.pem
-rsa_private_key_file=/etc/ssl/private/ssl-cert-snakeoil.key
-ssl_enable=NO
-
-anonymous_enable=yes
-anon_root=/
-
-pasv_enable=Yes
-pasv_max_port=10090
-pasv_min_port=11090
-
-#
-# Uncomment this to indicate that vsftpd use a utf8 filesystem.
-#utf8_filesystem=YES
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6c0c0f21..c2e35939 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,6 +118,11 @@
9.12.4544
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
diff --git a/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java b/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java
index e3a760d8..60f76f7e 100644
--- a/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java
+++ b/src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java
@@ -13,8 +13,6 @@
import cws.k8s.scheduler.model.taskinputs.TaskInputs;
import cws.k8s.scheduler.rest.exceptions.NotARealFileException;
import cws.k8s.scheduler.rest.response.getfile.FileResponse;
-import cws.k8s.scheduler.scheduler.copystrategy.CopyStrategy;
-import cws.k8s.scheduler.scheduler.copystrategy.FTPstrategy;
import cws.k8s.scheduler.util.DaemonHolder;
import cws.k8s.scheduler.util.NodeTaskAlignment;
import cws.k8s.scheduler.util.NodeTaskFilesAlignment;
@@ -44,8 +42,6 @@ public abstract class SchedulerWithDaemonSet extends Scheduler {
final DaemonHolder daemonHolder = new DaemonHolder();
@Getter
private String workflowEngineNode = null;
- @Getter
- private final CopyStrategy copyStrategy;
final HierarchyWrapper hierarchyWrapper;
private final InputFileCollector inputFileCollector;
private final ConcurrentHashMap requestedLocations = new ConcurrentHashMap<>();
@@ -64,14 +60,6 @@ public abstract class SchedulerWithDaemonSet extends Scheduler {
if ( config.copyStrategy == null ) {
throw new IllegalArgumentException( "Copy strategy is null" );
}
- switch ( config.copyStrategy ){
- case "ftp":
- case "copy":
- copyStrategy = new FTPstrategy();
- break;
- default:
- throw new IllegalArgumentException( "Copy strategy is unknown " + config.copyStrategy );
- }
this.localWorkDir = config.workDir;
}
diff --git a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/CopyStrategy.java b/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/CopyStrategy.java
deleted file mode 100644
index 65479eb7..00000000
--- a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/CopyStrategy.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package cws.k8s.scheduler.scheduler.copystrategy;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
-import java.util.Set;
-
-@Slf4j
-public abstract class CopyStrategy {
-
- private void write( BufferedWriter pw, File file, String resource ) {
- ClassLoader classLoader = getClass().getClassLoader();
- try (InputStream inputStream = classLoader.getResourceAsStream( resource )) {
- assert inputStream != null;
- try (InputStreamReader streamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
- BufferedReader reader = new BufferedReader(streamReader)) {
-
- String line;
- while ((line = reader.readLine()) != null) {
- pw.write( line );
- pw.write( '\n' );
- }
-
- Set executable = PosixFilePermissions.fromString("rwxrwxrwx");
- Files.setPosixFilePermissions( file.toPath(), executable );
- }
- } catch (IOException e) {
- log.error( "Cannot write " + file, e);
- }
- }
-
- abstract String getResource();
-
-}
diff --git a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/FTPstrategy.java b/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/FTPstrategy.java
deleted file mode 100644
index 4369efb8..00000000
--- a/src/main/java/cws/k8s/scheduler/scheduler/copystrategy/FTPstrategy.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package cws.k8s.scheduler.scheduler.copystrategy;
-
-public class FTPstrategy extends CopyStrategy {
-
- @Override
- String getResource() {
- return "copystrategies/ftp.py";
- }
-
-}
\ No newline at end of file
diff --git a/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java b/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java
index 2cbcd936..c189a123 100644
--- a/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java
+++ b/src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/ShellCopy.java
@@ -37,7 +37,7 @@ public void startCopyTasks( final CopyTask copyTask, final NodeTaskFilesAlignmen
} catch ( IOException e ) {
throw new RuntimeException( e );
}
- command[2] += "/code/ftp.py false \"" + copyTaskIdentifier + "\" \"" + filename + "\"";
+ command[2] += "/app/ftp.py false \"" + copyTaskIdentifier + "\" \"" + filename + "\"";
String name = nodeTaskFilesAlignment.task.getConfig().getName() + "-copy-" + nodeTaskFilesAlignment.node.getName();
log.info( "Starting {} to node {}", nodeTaskFilesAlignment.task.getConfig().getName(), nodeTaskFilesAlignment.node.getName() );
logCopyTask.copy( nodeTaskFilesAlignment.task.getConfig().getName(), nodeTaskFilesAlignment.node.getName(), copyTask.getInputFiles().size(), "start" );
diff --git a/src/main/resources/copystrategies/ftp.py b/src/main/resources/copystrategies/ftp.py
deleted file mode 100644
index da9191a0..00000000
--- a/src/main/resources/copystrategies/ftp.py
+++ /dev/null
@@ -1,346 +0,0 @@
-#!/usr/bin/env python3
-import ftplib
-import json
-import logging as log
-import os
-import shutil
-import signal
-import sys
-import time
-import urllib.request
-import urllib.parse
-from concurrent.futures import ThreadPoolExecutor
-from pathlib import Path
-from time import sleep
-
-exitIfFileWasNotFound = True
-CLOSE = False
-UNEXPECTED_ERROR = "Unexpected error"
-EXIT = 0
-log.basicConfig(
- format='%(levelname)s: %(message)s',
- level=log.DEBUG,
- handlers=[
- log.FileHandler(".command.init.log"),
- log.StreamHandler()
- ]
-)
-trace = {}
-traceFilePath = ".command.scheduler.trace"
-errors = 0
-
-
-def myExit(code):
- global EXIT
- EXIT = code
- global CLOSE
- CLOSE = True
- writeTrace(trace)
- exit(EXIT)
-
-
-def close(signalnum, syncFile):
- log.info("Killed: %s", str(signalnum))
- closeWithWarning(50, syncFile)
-
-
-def closeWithWarning(errorCode, syncFile):
- syncFile.write('##FAILURE##\n')
- syncFile.flush()
- syncFile.close()
- myExit(errorCode)
-
-
-def getIP(node, dns, execution):
- ip = urllib.request.urlopen(dns + "daemon/" + execution + "/" + node).read()
- return str(ip.decode("utf-8"))
-
-
-# True if the file was deleted or did not exist
-def clearLocation(path, dst=None):
- if os.path.exists(path):
- log.debug("Delete %s", path)
- if os.path.islink(path):
- if dst is not None and os.readlink(path) == dst:
- return False
- else:
- os.unlink(path)
- elif os.path.isdir(path):
- shutil.rmtree(path)
- else:
- os.remove(path)
- return True
-
-
-def getFTP(node, currentIP, dns, execution, syncFile):
- global errors
- connectionProblem = 0
- while connectionProblem < 8:
- try:
- if currentIP is None:
- log.info("Request ip for node: %s", node)
- ip = getIP(node, dns, execution)
- else:
- ip = currentIP
- log.info("Try to connect to %s", ip)
- ftp = ftplib.FTP(ip, timeout=10)
- ftp.login("root", "password")
- ftp.set_pasv(True)
- ftp.encoding = 'utf-8'
- log.info("Connection established")
- return ftp
- except ConnectionRefusedError:
- errors += 1
- log.warning("Connection refused! Try again...")
- except BaseException:
- errors += 1
- log.exception(UNEXPECTED_ERROR)
- connectionProblem += 1
- time.sleep(2 ** connectionProblem)
- closeWithWarning(8, syncFile)
-
-
-def closeFTP(ftp):
- global errors
- if ftp is None:
- return
- try:
- ftp.quit()
- ftp.close()
- except BaseException:
- errors += 1
- log.exception(UNEXPECTED_ERROR)
-
-
-def downloadFile(ftp, filename, size, index, node, syncFile):
- global errors
- log.info("Download %s [%s/%s] - %s", node, str(index).rjust(len(str(size))), str(size), filename)
- try:
- syncFile.write("S-" + filename + '\n')
- clearLocation(filename)
- Path(filename[:filename.rindex("/")]).mkdir(parents=True, exist_ok=True)
- start = time.time()
- ftp.retrbinary('RETR %s' % filename, open(filename, 'wb').write, 102400)
- end = time.time()
- sizeInMB = os.path.getsize(filename) / 1000000
- delta = (end - start)
- log.info("Speed: %.3f Mbit/s", sizeInMB / delta)
- return sizeInMB, delta
- except ftplib.error_perm as err:
- errors += 1
- if str(err) == "550 Failed to open file.":
- log.warning("File not found node: %s file: %s", node, filename)
- if exitIfFileWasNotFound:
- closeWithWarning(40, syncFile)
- except FileNotFoundError:
- errors += 1
- log.warning("File not found node: %s file: %s", node, filename)
- if exitIfFileWasNotFound:
- closeWithWarning(41, syncFile)
- except EOFError:
- errors += 1
- log.warning("It seems the connection was lost! Try again...")
- return None
- except BaseException:
- errors += 1
- log.exception(UNEXPECTED_ERROR)
- return None
- return 0, 0
-
-
-def download(node, currentIP, files, dns, execution, syncFile):
- ftp = None
- size = len(files)
- global CLOSE
- sizeInMB = 0
- downloadTime = 0
- while not CLOSE and len(files) > 0:
- if ftp is None:
- ftp = getFTP(node, currentIP, dns, execution, syncFile)
- currentIP = None
- filename = files[0]
- index = size - len(files) + 1
- result = downloadFile(ftp, filename, size, index, node, syncFile)
- if result is None:
- ftp = None
- continue
- sizeInMB += result[0]
- downloadTime += result[1]
- files.pop(0)
- syncFile.write("F-" + filename + '\n')
- closeFTP(ftp)
- return node, sizeInMB / downloadTime
-
-
-def waitForFiles(syncFilePath, files, startTime):
- # wait max. 60 seconds
- while True:
- if startTime + 60 < time.time():
- return False
- if os.path.isfile(syncFilePath):
- break
- log.debug("Wait for file creation")
- time.sleep(0.1)
-
- # Read file live
- with open(syncFilePath, 'r') as syncFileTask:
- current = []
- while len(files) > 0:
- data = syncFileTask.read()
- if not data:
- time.sleep(0.3)
- else:
- for d in data:
- if d != "\n":
- current.append(d)
- else:
- text = ''.join(current)
- current = []
- if text.startswith("S-"):
- continue
- if text == "##FAILURE##":
- log.debug("Read FAILURE in %s", syncFilePath)
- myExit(51)
- if text == "##FINISHED##":
- log.debug("Read FINISHED in " + syncFilePath + " before all files were found")
- myExit(52)
- log.debug("Look for " + text[:2] + " with " + text[2:] + " in " + str(files))
- if text[:2] == "F-" and text[2:] in files:
- files.remove(text[2:])
- if len(files) == 0:
- return True
- return len(files) == 0
-
-
-def loadConfig(configFilePath):
- if not os.path.isfile(configFilePath):
- log.error("Config file not found: %s", configFilePath)
- myExit(102)
-
- with open(configFilePath, 'r') as configFile:
- config = json.load(configFile)
-
- os.makedirs(config["syncDir"], exist_ok=True)
- return config
-
-
-def registerSignal(syncFile):
- signal.signal(signal.SIGINT, lambda signalnum, handler: close(signalnum, syncFile))
- signal.signal(signal.SIGTERM, lambda signalnum, handler: close(signalnum, syncFile))
-
-
-def registerSignal2():
- signal.signal(signal.SIGINT, lambda signalnum, handler: myExit(1))
- signal.signal(signal.SIGTERM, lambda signalnum, handler: myExit(1))
-
-
-def generateSymlinks(symlinks):
- for s in symlinks:
- src = s["src"]
- dst = s["dst"]
- if clearLocation(src, dst):
- Path(src[:src.rindex("/")]).mkdir(parents=True, exist_ok=True)
- try:
- os.symlink(dst, src)
- except FileExistsError:
- log.warning("File exists: %s -> %s", src, dst)
-
-
-def downloadAllData(data, dns, execution, syncFile):
- global trace
- throughput = []
- with ThreadPoolExecutor(max_workers=max(10, len(data))) as executor:
- futures = []
- for d in data:
- files = d["files"]
- node = d["node"]
- currentIP = d["currentIP"]
- futures.append(executor.submit(download, node, currentIP, files, dns, execution, syncFile))
- lastNum = -1
- while len(futures) > 0:
- if lastNum != len(futures):
- log.info("Wait for %d threads to finish", len(futures))
- lastNum = len(futures)
- for f in futures[:]:
- if f.done():
- throughput.append(f.result())
- futures.remove(f)
- sleep(0.1)
- trace["scheduler_init_throughput"] = "\"" + ",".join("{}:{:.3f}".format(*x) for x in throughput) + "\""
-
-
-def waitForDependingTasks(waitForFilesOfTask, startTime, syncDir):
- # Now check for files of other tasks
- for waitForTask in waitForFilesOfTask:
- waitForFilesSet = set(waitForFilesOfTask[waitForTask])
- if not waitForFiles(syncDir + waitForTask, waitForFilesSet, startTime):
- log.error(syncDir + waitForTask + " was not successful")
- myExit(200)
-
-
-def writeTrace(dataMap):
- if sys.argv[1] == 'true':
- global errors
- if len(dataMap) == 0 or errors > 0:
- return
- with open(traceFilePath, "a") as traceFile:
- for d in dataMap:
- traceFile.write(d + "=" + str(dataMap[d]) + "\n")
- traceFile.write("scheduler_init_errors=" + str(errors) + "\n")
-
-
-def finishedDownload(dns, execution, taskname):
- try:
- dns = dns + "downloadtask/" + execution
- log.info("Request: %s", dns)
- urllib.request.urlopen(dns, taskname.encode( "utf-8" ))
- except BaseException as err:
- log.exception(err)
- myExit(100)
-
-
-def run():
- global trace
- startTime = time.time()
- log.info("Start to setup the environment")
- config = loadConfig(".command.inputs.json")
-
- dns = config["dns"]
- execution = config["execution"]
- data = config["data"]
- symlinks = config["symlinks"]
- taskname = config["hash"]
-
- with open(config["syncDir"] + config["hash"], 'w') as syncFile:
- registerSignal(syncFile)
- syncFile.write('##STARTED##\n')
- syncFile.flush()
- startTimeSymlinks = time.time()
- generateSymlinks(symlinks)
- trace["scheduler_init_symlinks_runtime"] = int((time.time() - startTimeSymlinks) * 1000)
- syncFile.write('##SYMLINKS##\n')
- syncFile.flush()
- startTimeDownload = time.time()
- downloadAllData(data, dns, execution, syncFile)
- trace["scheduler_init_download_runtime"] = int((time.time() - startTimeDownload) * 1000)
- if CLOSE:
- log.debug("Closed with code %s", str(EXIT))
- exit(EXIT)
- log.info("Finished Download")
- syncFile.write('##FINISHED##\n')
- registerSignal2()
-
- finishedDownload(dns, execution, taskname)
-
- startTimeDependingTasks = time.time()
- waitForDependingTasks(config["waitForFilesOfTask"], startTime, config["syncDir"])
- trace["scheduler_init_depending_tasks_runtime"] = int((time.time() - startTimeDependingTasks) * 1000)
- log.info("Waited for all tasks")
-
- runtime = int((time.time() - startTime) * 1000)
- trace["scheduler_init_runtime"] = runtime
- writeTrace(trace)
-
-
-if __name__ == '__main__':
- run()