11package org .thp .cortex .services
22
33import akka .actor .ActorSystem
4- import com .spotify .docker .client .DockerClient .LogsParam
5- import com .spotify .docker .client .messages .HostConfig .Bind
6- import com .spotify .docker .client .messages .{ContainerConfig , HostConfig }
7- import com .spotify .docker .client .{DefaultDockerClient , DockerClient }
4+ import org .thp .cortex .util .docker .{DockerClient => DockerJavaClient }
85import play .api .libs .json .Json
96import play .api .{Configuration , Logger }
107
118import java .nio .charset .StandardCharsets
129import java .nio .file ._
10+ import java .util .concurrent .TimeUnit
1311import javax .inject .{Inject , Singleton }
1412import scala .concurrent .ExecutionContext
1513import scala .concurrent .duration .FiniteDuration
1614import scala .util .Try
1715
1816@ Singleton
1917class DockerJobRunnerSrv (
20- client : DockerClient ,
21- config : Configuration ,
18+ javaClient : DockerJavaClient ,
2219 autoUpdate : Boolean ,
2320 jobBaseDirectory : Path ,
2421 dockerJobBaseDirectory : Path ,
@@ -28,17 +25,7 @@ class DockerJobRunnerSrv(
2825 @ Inject ()
2926 def this (config : Configuration , system : ActorSystem ) =
3027 this (
31- new DefaultDockerClient .Builder ()
32- .apiVersion(config.getOptional[String ](" docker.version" ).orNull)
33- .connectionPoolSize(config.getOptional[Int ](" docker.connectionPoolSize" ).getOrElse(100 ))
34- .connectTimeoutMillis(config.getOptional[Long ](" docker.connectTimeoutMillis" ).getOrElse(5000 ))
35- // .dockerCertificates()
36- .readTimeoutMillis(config.getOptional[Long ](" docker.readTimeoutMillis" ).getOrElse(30000 ))
37- // .registryAuthSupplier()
38- .uri(config.getOptional[String ](" docker.uri" ).getOrElse(" unix:///var/run/docker.sock" ))
39- .useProxy(config.getOptional[Boolean ](" docker.useProxy" ).getOrElse(false ))
40- .build(),
41- config,
28+ new DockerJavaClient (config),
4229 config.getOptional[Boolean ](" docker.autoUpdate" ).getOrElse(true ),
4330 Paths .get(config.get[String ](" job.directory" )),
4431 Paths .get(config.get[String ](" job.dockerDirectory" )),
@@ -50,89 +37,43 @@ class DockerJobRunnerSrv(
5037 lazy val isAvailable : Boolean =
5138 Try {
5239 logger.debug(s " Retrieve docker information ... " )
53- logger.info(s " Docker is available: \n ${client .info() }" )
40+ logger.info(s " Docker is available: \n ${javaClient .info}" )
5441 true
5542 }.recover {
5643 case error =>
5744 logger.info(s " Docker is not available " , error)
5845 false
5946 }.get
6047
61- def run (jobDirectory : Path , dockerImage : String , timeout : Option [FiniteDuration ])(implicit
62- ec : ExecutionContext
63- ): Try [Unit ] = {
64- import scala .collection .JavaConverters ._
65- if (autoUpdate) Try (client.pull(dockerImage))
66- // ContainerConfig.builder().addVolume()
67- val hostConfigBuilder = HostConfig .builder()
68- config.getOptional[Seq [String ]](" docker.container.capAdd" ).map(_.asJava).foreach(hostConfigBuilder.capAdd)
69- config.getOptional[Seq [String ]](" docker.container.capDrop" ).map(_.asJava).foreach(hostConfigBuilder.capDrop)
70- config.getOptional[String ](" docker.container.cgroupParent" ).foreach(hostConfigBuilder.cgroupParent)
71- config.getOptional[Long ](" docker.container.cpuPeriod" ).foreach(hostConfigBuilder.cpuPeriod(_))
72- config.getOptional[Long ](" docker.container.cpuQuota" ).foreach(hostConfigBuilder.cpuQuota(_))
73- config.getOptional[Seq [String ]](" docker.container.dns" ).map(_.asJava).foreach(hostConfigBuilder.dns)
74- config.getOptional[Seq [String ]](" docker.container.dnsSearch" ).map(_.asJava).foreach(hostConfigBuilder.dnsSearch)
75- config.getOptional[Seq [String ]](" docker.container.extraHosts" ).map(_.asJava).foreach(hostConfigBuilder.extraHosts)
76- config.getOptional[Long ](" docker.container.kernelMemory" ).foreach(hostConfigBuilder.kernelMemory(_))
77- config.getOptional[Long ](" docker.container.memoryReservation" ).foreach(hostConfigBuilder.memoryReservation(_))
78- config.getOptional[Long ](" docker.container.memory" ).foreach(hostConfigBuilder.memory(_))
79- config.getOptional[Long ](" docker.container.memorySwap" ).foreach(hostConfigBuilder.memorySwap(_))
80- config.getOptional[Int ](" docker.container.memorySwappiness" ).foreach(hostConfigBuilder.memorySwappiness(_))
81- config.getOptional[String ](" docker.container.networkMode" ).foreach(hostConfigBuilder.networkMode)
82- config.getOptional[Boolean ](" docker.container.privileged" ).foreach(hostConfigBuilder.privileged(_))
83- hostConfigBuilder.appendBinds(
84- Bind
85- .from(dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString)
86- .to(" /job" )
87- .readOnly(false )
88- .build()
89- )
90- val cacertsFile = jobDirectory.resolve(" input" ).resolve(" cacerts" )
91- val containerConfigBuilder = ContainerConfig
92- .builder()
93- .hostConfig(hostConfigBuilder.build())
94- .image(dockerImage)
95- .cmd(" /job" )
48+ private def generateErrorOutput (containerId : String , f : Path ) = {
49+ logger.warn(s " the runner didn't generate any output file $f" )
50+ for {
51+ output <- javaClient.getLogs(containerId)
52+ report = Json .obj(" success" -> false , " errorMessage" -> output)
53+ _ <- Try (Files .write(f, report.toString.getBytes(StandardCharsets .UTF_8 )))
54+ } yield report
55+ }
9656
97- val containerConfig =
98- if (Files .exists(cacertsFile)) containerConfigBuilder.env(s " REQUESTS_CA_BUNDLE=/job/input/cacerts " ).build()
99- else containerConfigBuilder.build()
100- val containerCreation = client.createContainer(containerConfig)
101- // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn)
57+ def run (jobDirectory : Path , dockerImage : String , timeout : Option [FiniteDuration ])(implicit executionContext : ExecutionContext ): Try [Unit ] = {
58+ val to = timeout.getOrElse(FiniteDuration (5000 , TimeUnit .SECONDS ))
10259
103- logger.debug(s " Container configuration: $containerConfig" )
104- logger.info(
105- s " Execute container ${containerCreation.id()}\n " +
106- s " timeout: ${timeout.fold(" none" )(_.toString)}\n " +
107- s " image : $dockerImage\n " +
108- s " volume : ${jobDirectory.toAbsolutePath}:/job " +
109- Option (containerConfig.env()).fold(" " )(_.asScala.map(" \n env : " + _).mkString)
110- )
111-
112- val timeoutSched = timeout.map(to =>
113- system.scheduler.scheduleOnce(to) {
114- logger.info(" Timeout reached, stopping the container" )
115- client.removeContainer(containerCreation.id(), DockerClient .RemoveContainerParam .forceKill())
116- }
117- )
118- val execution = Try {
119- client.startContainer(containerCreation.id())
120- client.waitContainer(containerCreation.id())
121- ()
122- }
123- timeoutSched.foreach(_.cancel())
124- val outputFile = jobDirectory.resolve(" output" ).resolve(" output.json" )
125- if (! Files .exists(outputFile) || Files .size(outputFile) == 0 ) {
126- logger.warn(s " The worker didn't generate output file. " )
127- val output = Try (client.logs(containerCreation.id(), LogsParam .stdout(), LogsParam .stderr()).readFully())
128- .fold(e => s " Container logs can't be read ( ${e.getMessage}) " , identity)
129- val message = execution.fold(e => s " Docker creation error: ${e.getMessage}\n $output" , _ => output)
60+ if (autoUpdate) Try (javaClient.pullImage(dockerImage))
13061
131- val report = Json .obj(" success" -> false , " errorMessage" -> message)
132- Files .write(outputFile, report.toString.getBytes(StandardCharsets .UTF_8 ))
133- }
134- client.removeContainer(containerCreation.id(), DockerClient .RemoveContainerParam .forceKill())
135- execution
62+ for {
63+ containerId <- javaClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to)
64+ timeoutScheduled = timeout.map(to =>
65+ system.scheduler.scheduleOnce(to) {
66+ logger.info(" Timeout reached, stopping the container" )
67+ javaClient.clean(containerId)
68+ }
69+ )
70+ _ <- javaClient.execute(containerId)
71+ _ = timeoutScheduled.foreach(_.cancel())
72+ outputFile <- Try (jobDirectory.resolve(" output" ).resolve(" output.json" ))
73+ isError = Files .notExists(outputFile) || Files .size(outputFile) == 0 || Files .isDirectory(outputFile)
74+ _ = if (isError) generateErrorOutput(containerId, outputFile).toOption else None
75+ _ <- javaClient.clean(containerId)
76+ } yield ()
13677 }
13778
13879}
0 commit comments