Skip to content

Commit 0bf2c0c

Browse files
fix error handling + word count example with failure
1 parent ced1939 commit 0bf2c0c

File tree

5 files changed

+45
-27
lines changed

5 files changed

+45
-27
lines changed
Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.cosmin.examples.wordcount
22

33
import com.cosmin.pipeline.Pipeline
4-
import com.cosmin.pipeline.executor.{AkkaExecutor, AsyncExecutor}
4+
import com.cosmin.pipeline.executor.{AkkaExecutor, AsyncExecutor, PipelineExecutor, SynchronouslyExecutor}
55

6-
import scala.util.Success
6+
import scala.util.{Failure, Success}
77

88
/**
99
* Implementation of the following UNIX command 'cat "myText.txt" | grep "hello" | wc -l'
@@ -13,26 +13,29 @@ object WordCount {
1313
val wordToFind = "hello"
1414
val pipeline = Pipeline[String, String]() | Cat() | Grep(wordToFind) | Count()
1515

16-
pipeline.execute("myText.txt") {
17-
case Success(output) => println(s"word '$wordToFind' was found on $output lines")
18-
}
16+
doExecute(pipeline, SynchronouslyExecutor(), "Sync", wordToFind)
17+
doExecute(pipeline, SynchronouslyExecutor(), "Sync Failed", wordToFind) ("notFound.txt")
1918

2019
executeAsync(pipeline, wordToFind)
2120
executeAsyncUsingAkka(pipeline, wordToFind)
2221
}
2322

2423
private def executeAsync(pipeline: Pipeline[String, Int], wordToFind: String): Unit = {
25-
implicit val asyncExecutor: AsyncExecutor[String, Int] = AsyncExecutor[String, Int]
26-
pipeline.execute("myText.txt") {
27-
case Success(output) => println(s"Async ---> word '$wordToFind' was found on $output lines")
28-
}
29-
Thread.sleep(2000)
24+
val asyncExecutor: AsyncExecutor[String, Int] = AsyncExecutor[String, Int]
25+
doExecute(pipeline, asyncExecutor, "Async", wordToFind)
26+
doExecute(pipeline, asyncExecutor, "Async Failed", wordToFind) ("notFound.txt")
3027
}
3128

3229
private def executeAsyncUsingAkka(pipeline: Pipeline[String, Int], wordToFind: String): Unit = {
33-
implicit val akkaExecutor: AkkaExecutor[String, Int] = AkkaExecutor[String, Int]
34-
pipeline.execute("myText.txt") {
35-
case Success(output) => println(s"Akka Async ---> word '$wordToFind' was found on $output lines")
36-
}
30+
val akkaExecutor: AkkaExecutor[String, Int] = AkkaExecutor[String, Int]
31+
doExecute(pipeline, akkaExecutor, "Akka Async", wordToFind)
32+
doExecute(pipeline, akkaExecutor, "Akka Async Failed", wordToFind) ("notFound.txt")
33+
}
34+
35+
private def doExecute(pipe: Pipeline[String, Int], exec: PipelineExecutor[String, Int], prefix: String, word: String) (implicit file: String = "myText.txt") : Unit = {
36+
pipe.execute(file) {
37+
case Success(output) => println(s"$prefix ---> word '$word' was found on $output lines")
38+
case Failure(e) => println(s"$prefix ---> error: $e")
39+
} (exec)
3740
}
3841
}

src/main/scala/com/cosmin/pipeline/executor/AkkaExecutor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import scala.util.Try
88

99
class AkkaExecutor[In, Out](system: ActorSystem) extends PipelineExecutor[In, Out] {
1010
override def execute(in: In, stages: List[Stage])(onComplete: Try[Out] => Unit): Unit = {
11-
val supervisor = system.actorOf(Supervisor.props[Out](stages, onComplete), "pipeline-supervisor")
11+
val supervisor = system.actorOf(Supervisor.props[Out](stages, onComplete))
1212

1313
supervisor ! Start[In](in)
1414
}

src/main/scala/com/cosmin/pipeline/executor/SynchronouslyExecutor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import scala.util.Try
66

77
class SynchronouslyExecutor[In, Out] extends PipelineExecutor[In, Out] {
88
override def execute(in: In, stages: List[Stage])(onComplete: Try[Out] => Unit): Unit = {
9-
val result: Out = doExecute[In, Out](stages, in)
9+
val result: Try[Out] = Try[Out](doExecute[In, Out](stages, in))
1010

11-
onComplete(Try[Out](result))
11+
onComplete(result)
1212
}
1313

1414
def doExecute[I, O](stages: List[Stage], input: I): O = stages match {

src/main/scala/com/cosmin/pipeline/executor/actor/Supervisor.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package com.cosmin.pipeline.executor.actor
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
44
import com.cosmin.pipeline.Stage
5-
import com.cosmin.pipeline.executor.actor.Supervisor.{StageCompleted, Start}
5+
import com.cosmin.pipeline.executor.actor.Supervisor.{StageCompleted, StageFailed, Start}
66
import com.cosmin.pipeline.executor.actor.Worker.Execute
77

8-
import scala.util.Try
8+
import scala.util.{Failure, Try}
99

1010
object Supervisor {
1111
def props[Out](stages: List[Stage], onComplete: Try[Out] => Unit): Props = Props(new Supervisor(stages, onComplete))
1212

1313
final case class Start[In](in: In)
1414
final case class StageCompleted[Out](stage: Stage, result: Out)
15+
final case class StageFailed(stage: Stage, e: Throwable)
1516
}
1617

1718
class Supervisor[Out](stages: List[Stage], onComplete: Try[Out] => Unit) extends Actor with ActorLogging {
@@ -22,13 +23,19 @@ class Supervisor[Out](stages: List[Stage], onComplete: Try[Out] => Unit) extends
2223

2324
override def receive: Receive = {
2425
case Start(in) => executeStage(in)
25-
case StageCompleted(stage, result) =>
26-
if (remainingStages.isEmpty) {
26+
case StageCompleted(stage, result) => onStageCompleted(result)
27+
case StageFailed(stage, e) =>
28+
onComplete(Failure(e))
29+
context.stop(self)
30+
}
31+
32+
private def onStageCompleted(result: Any): Unit = {
33+
remainingStages match {
34+
case Nil =>
2735
onComplete(Try(result.asInstanceOf[Out]))
2836
context.stop(self)
29-
} else {
30-
executeStage(result)
31-
}
37+
case _ => executeStage(result)
38+
}
3239
}
3340

3441
private def executeStage(in: Any): Unit = {

src/main/scala/com/cosmin/pipeline/executor/actor/Worker.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package com.cosmin.pipeline.executor.actor
22

33
import akka.actor.{Actor, Props}
44
import com.cosmin.pipeline.Stage
5-
import com.cosmin.pipeline.executor.actor.Supervisor.StageCompleted
5+
import com.cosmin.pipeline.executor.actor.Supervisor.{StageCompleted, StageFailed}
66
import com.cosmin.pipeline.executor.actor.Worker.Execute
77

8+
import scala.util.{Failure, Success, Try}
9+
810
object Worker {
911
def props(stage: Stage): Props = Props(new Worker(stage))
1012

@@ -14,7 +16,13 @@ object Worker {
1416
class Worker(stage: Stage) extends Actor {
1517
override def receive: Receive = {
1618
case Execute(input) =>
17-
val result = stage.execute(input.asInstanceOf[stage.In])
18-
sender() ! StageCompleted[stage.Out](stage, result.asInstanceOf[stage.Out])
19+
executeStage(input)
20+
}
21+
22+
private def executeStage(input: Any): Unit = {
23+
Try[stage.Out](stage.execute(input.asInstanceOf[stage.In])) match {
24+
case Success(result) => sender() ! StageCompleted[stage.Out](stage, result.asInstanceOf[stage.Out])
25+
case Failure(e) => sender() ! StageFailed(stage, e)
26+
}
1927
}
2028
}

0 commit comments

Comments
 (0)