Skip to content

Commit fca9efe

Browse files
author
Thomas Franco
authored
[DL-2086] fix: improve docker image pull process (#32)
* [DL-2086] fix: improve docker image pull process * [DL-2086] fix: Alex's feedback
1 parent 6112bff commit fca9efe

File tree

4 files changed

+100
-47
lines changed

4 files changed

+100
-47
lines changed

app/org/thp/cortex/services/DockerJobRunnerSrv.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package org.thp.cortex.services
22

33
import akka.actor.ActorSystem
4-
import org.thp.cortex.util.docker.{DockerClient => DockerJavaClient}
4+
import org.thp.cortex.util.docker.DockerClient
55
import play.api.libs.json.Json
66
import play.api.{Configuration, Logger}
77

@@ -15,7 +15,7 @@ import scala.util.Try
1515

1616
@Singleton
1717
class DockerJobRunnerSrv(
18-
javaClient: DockerJavaClient,
18+
dockerClient: DockerClient,
1919
autoUpdate: Boolean,
2020
jobBaseDirectory: Path,
2121
dockerJobBaseDirectory: Path,
@@ -25,7 +25,7 @@ class DockerJobRunnerSrv(
2525
@Inject()
2626
def this(config: Configuration, system: ActorSystem) =
2727
this(
28-
new DockerJavaClient(config),
28+
new DockerClient(config),
2929
config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true),
3030
Paths.get(config.get[String]("job.directory")),
3131
Paths.get(config.get[String]("job.dockerDirectory")),
@@ -37,7 +37,7 @@ class DockerJobRunnerSrv(
3737
lazy val isAvailable: Boolean =
3838
Try {
3939
logger.debug(s"Retrieve docker information ...")
40-
logger.info(s"Docker is available:\n${javaClient.info}")
40+
logger.info(s"Docker is available:\n${dockerClient.info}")
4141
true
4242
}.recover {
4343
case error =>
@@ -48,7 +48,7 @@ class DockerJobRunnerSrv(
4848
private def generateErrorOutput(containerId: String, f: Path) = {
4949
logger.warn(s"the runner didn't generate any output file $f")
5050
for {
51-
output <- javaClient.getLogs(containerId)
51+
output <- dockerClient.getLogs(containerId)
5252
report = Json.obj("success" -> false, "errorMessage" -> output)
5353
_ <- Try(Files.write(f, report.toString.getBytes(StandardCharsets.UTF_8)))
5454
} yield report
@@ -57,22 +57,22 @@ class DockerJobRunnerSrv(
5757
def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit executionContext: ExecutionContext): Try[Unit] = {
5858
val to = timeout.getOrElse(FiniteDuration(5000, TimeUnit.SECONDS))
5959

60-
if (autoUpdate) Try(javaClient.pullImage(dockerImage))
60+
if (autoUpdate) dockerClient.pullImage(dockerImage)
6161

6262
for {
63-
containerId <- javaClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to)
63+
containerId <- dockerClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to)
6464
timeoutScheduled = timeout.map(to =>
6565
system.scheduler.scheduleOnce(to) {
6666
logger.info("Timeout reached, stopping the container")
67-
javaClient.clean(containerId)
67+
dockerClient.clean(containerId)
6868
}
6969
)
70-
_ <- javaClient.execute(containerId)
70+
_ <- dockerClient.execute(containerId)
7171
_ = timeoutScheduled.foreach(_.cancel())
7272
outputFile <- Try(jobDirectory.resolve("output").resolve("output.json"))
7373
isError = Files.notExists(outputFile) || Files.size(outputFile) == 0 || Files.isDirectory(outputFile)
7474
_ = if (isError) generateErrorOutput(containerId, outputFile).toOption else None
75-
_ <- javaClient.clean(containerId)
75+
_ <- dockerClient.clean(containerId)
7676
} yield ()
7777
}
7878

app/org/thp/cortex/services/JobRunnerSrv.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class JobRunnerSrv @Inject() (
4949
.map(_.toLowerCase)
5050
.collect {
5151
case "kubernetes" if k8sJobRunnerSrv.isAvailable => "kubernetes"
52-
case "docker" if dockerJobRunnerSrv.isAvailable => "docker"
52+
case "docker" if dockerJobRunnerSrv.isAvailable => "docker"
5353
case "process" =>
5454
Seq("", "2", "3").foreach { pythonVersion =>
5555
val cortexUtilsVersion = processJobRunnerSrv.checkCortexUtilsVersion(pythonVersion)
@@ -229,7 +229,7 @@ class JobRunnerSrv @Inject() (
229229
syncStartJob(job).get
230230
val jobFolder = prepareJobFolder(worker, job).get
231231
maybeJobFolder = Some(jobFolder)
232-
runners
232+
val result = runners
233233
.foldLeft[Option[Try[Unit]]](None) {
234234
case (None, "kubernetes") =>
235235
worker
@@ -261,12 +261,15 @@ class JobRunnerSrv @Inject() (
261261
None
262262

263263
}
264-
.getOrElse(throw BadRequestError("Worker cannot be run"))
264+
.getOrElse(Failure(BadRequestError("Worker cannot be run")))
265+
Future.fromTry(result)
265266
}(executionContext)
267+
.flatten
266268
.transformWith {
267269
case _: Success[_] =>
268270
extractReport(maybeJobFolder.get /* can't be none */, job)
269271
case Failure(error) =>
272+
logger.error(s"Worker execution fails", error)
270273
endJob(job, JobStatus.Failure, Option(error.getMessage), maybeJobFolder.map(jf => readFile(jf.resolve("input").resolve("input.json"))))
271274

272275
}

app/org/thp/cortex/util/docker/DockerClient.scala

Lines changed: 83 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ import play.api.{Configuration, Logger}
88

99
import java.nio.file.{Files, Path}
1010
import java.time.Duration
11+
import java.util.Collections
1112
import java.util.concurrent.{Executors, TimeUnit}
1213
import scala.concurrent.blocking
1314
import scala.concurrent.duration.FiniteDuration
1415
import scala.jdk.CollectionConverters._
15-
import scala.util.Try
16+
import scala.util.{Failure, Success, Try}
1617

1718
class DockerClient(config: Configuration) {
1819
private lazy val logger: Logger = Logger(getClass.getName)
@@ -32,25 +33,33 @@ class DockerClient(config: Configuration) {
3233
waitResult
3334
}
3435

35-
def prepare(image: String, jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, timeout: FiniteDuration): Try[String] = Try {
36-
logger.info(s"image $image pull result: ${pullImage(image)}")
37-
val containerCmd = underlyingClient
38-
.createContainerCmd(image)
39-
.withHostConfig(configure(jobDirectory, jobBaseDirectory, dockerJobBaseDirectory))
40-
if (Files.exists(jobDirectory.resolve("input").resolve("cacerts")))
41-
containerCmd.withEnv(s"REQUESTS_CA_BUNDLE=/job/input/cacerts")
42-
val containerResponse = containerCmd.exec()
43-
logger.info(
44-
s"about to start container ${containerResponse.getId}\n" +
45-
s" timeout: ${timeout.toString}\n" +
46-
s" image : $image\n" +
47-
s" volumes : ${jobDirectory.toAbsolutePath}"
48-
)
49-
if (containerResponse.getWarnings.nonEmpty) logger.warn(s"${containerResponse.getWarnings.mkString(", ")}")
50-
scheduleContainerTimeout(containerResponse.getId, timeout)
51-
52-
containerResponse.getId
53-
}
36+
def pullImageIfRequired(image: String): Try[Unit] =
37+
if (imageExists(image)) Success(())
38+
else {
39+
logger.info(s"pulling image $image ...")
40+
pullImage(image)
41+
}
42+
43+
def prepare(image: String, jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, timeout: FiniteDuration): Try[String] =
44+
pullImageIfRequired(image)
45+
.map { _ =>
46+
val containerCmd = underlyingClient
47+
.createContainerCmd(image)
48+
.withHostConfig(configure(jobDirectory, jobBaseDirectory, dockerJobBaseDirectory))
49+
if (Files.exists(jobDirectory.resolve("input").resolve("cacerts")))
50+
containerCmd.withEnv(s"REQUESTS_CA_BUNDLE=/job/input/cacerts")
51+
val containerResponse = containerCmd.exec()
52+
logger.info(
53+
s"about to start container ${containerResponse.getId}\n" +
54+
s" timeout: ${timeout.toString}\n" +
55+
s" image : $image\n" +
56+
s" volumes : ${jobDirectory.toAbsolutePath}"
57+
)
58+
if (containerResponse.getWarnings.nonEmpty) logger.warn(s"${containerResponse.getWarnings.mkString(", ")}")
59+
scheduleContainerTimeout(containerResponse.getId, timeout)
60+
61+
containerResponse.getId
62+
}
5463

5564
private def configure(jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path): HostConfig = {
5665
val hostConfigMut = HostConfig
@@ -85,27 +94,67 @@ class DockerClient(config: Configuration) {
8594
}
8695

8796
def info: Info = underlyingClient.infoCmd().exec()
88-
def pullImage(image: String): Boolean = blocking {
89-
val pullImageResultCbk = underlyingClient // Blocking
90-
.pullImageCmd(image)
91-
.start()
92-
.awaitCompletion()
93-
val timeout = config.get[FiniteDuration]("docker.pullImageTimeout")
9497

95-
pullImageResultCbk.awaitCompletion(timeout.toMillis, TimeUnit.MILLISECONDS)
98+
def pullImage(image: String): Try[Unit] = Try {
99+
blocking {
100+
val pullImageResultCbk = underlyingClient // Blocking
101+
.pullImageCmd(image)
102+
.start()
103+
val timeout = config.get[FiniteDuration]("docker.pullImageTimeout")
104+
105+
pullImageResultCbk.awaitCompletion(timeout.toMillis, TimeUnit.MILLISECONDS)
106+
()
107+
}
96108
}
97109

98-
def clean(containerId: String): Try[Unit] = Try {
99-
underlyingClient
100-
.killContainerCmd(containerId)
101-
.exec()
110+
def imageExists(image: String): Boolean =
111+
!underlyingClient.listImagesCmd().withImageNameFilter(image).exec().isEmpty
112+
113+
def getContainerStatus(containerId: String): Option[String] =
102114
underlyingClient
103-
.removeContainerCmd(containerId)
104-
.withForce(true)
115+
.listContainersCmd()
116+
.withIdFilter(Collections.singletonList(containerId))
105117
.exec()
106-
logger.info(s"removed container $containerId")
118+
.asScala
119+
.headOption
120+
.map(_.getStatus)
121+
122+
def killContainer(containerId: String): Try[Unit] = {
123+
logger.info(s"Killing the container $containerId")
124+
Try {
125+
underlyingClient
126+
.killContainerCmd(containerId)
127+
.exec()
128+
()
129+
}.recoverWith {
130+
case error =>
131+
logger.warn(s"Unable to kill the container $containerId", error)
132+
Failure(error)
133+
}
107134
}
108135

136+
def removeContainer(containerId: String): Try[Unit] = {
137+
logger.info(s"Removing the container $containerId")
138+
Try {
139+
underlyingClient
140+
.removeContainerCmd(containerId)
141+
.withForce(true)
142+
.exec()
143+
()
144+
}.recoverWith {
145+
case error =>
146+
logger.warn(s"Unable to remove the container $containerId", error)
147+
Failure(error)
148+
}
149+
}
150+
151+
def clean(containerId: String): Try[Unit] =
152+
getContainerStatus(containerId).fold[Try[Unit]](Success(())) { status =>
153+
if (status != "exited" && status != "dead")
154+
killContainer(containerId)
155+
removeContainer(containerId)
156+
}
157+
109158
def getLogs(containerId: String): Try[String] = Try {
110159
val stringBuilder = new StringBuilder()
111160
val callback = new DockerLogsStringBuilder(stringBuilder)

conf/reference.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,4 @@ responder {
124124
}
125125
}
126126

127+
docker.pullImageTimeout = 10 minutes

0 commit comments

Comments
 (0)