Sunday, December 29, 2013

Akka Content Ingestion Pipeline, Part III


In this post, I add a JSON/HTTP front end to my Akka Content Ingestion Pipeline. This allows clients remote access (over HTTP) to the pipeline, so they can submit jobs to it and make some rudimentary queries against it. As you already know, a client can send Fetch messages to the pipeline to have a document be crawled off a website, parsed and indexed into a Solr index, a Stats message to query the size of the pipeline's internal queues, and a Stop message to terminate the pipeline.

The front end is a HTTP server that listens on a specified host and port and forwards HTTP GET and PUT requests to an Actor adapted for listening to HTTP requests (via the HttpServiceActor mixin). The PUT requests are accompanied by JSON payloads which correspond to the data in the message case classes. The actor's receive() method responds to these requests by transforming the JSON to the equivalent message case classes and sending the message to the controller Actor.

In addition, the front end HTTP server has a shutdown hook that will terminate the Controller and its children in an orderly fashion (waiting until the queues are all drained) by sending it a Stop request.

In pictures, this work corresponds to the top block of the diagram (updated from last week) below:


To build the HTTP server, I used the Spray library, which allows you to build an HTTP server in just a few lines of Scala code:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Source: src/main/scala/com/mycompany/delsym/rest/Main.scala
package com.mycompany.delsym.rest

import com.typesafe.config.ConfigFactory

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.io.IO
import spray.can.Http
import spray.httpx.RequestBuilding.Get

object Main extends App {

  implicit val system = ActorSystem("DelSym")
  
  val conf = ConfigFactory.load()
  val host = conf.getString("delsym.rest.host")
  val port = conf.getInt("delsym.rest.port")

  val api = system.actorOf(Props[RestActor], "api")
  IO(Http) ! Http.Bind(api, host, port = port)
  
  sys.addShutdownHook {
    Console.println("Shutting down...")
    api ! Get("/stop")
  }
}

The HTTP Server starts up a RestActor which is a specialized Actor (providing a HTTP Service). Its receive method does pattern matching on the requests and accordingly calls messages on the underlying Controller. The receive method is built off a routing table that is built using the Spray routing DSL. The code for the RestActor is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Source: src/main/scala/com/mycompany/delsym/rest/RestActor.scala
package com.mycompany.delsym.rest

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

import com.mycompany.delsym.actors.Controller
import com.mycompany.delsym.actors.Fetch
import com.mycompany.delsym.actors.Index
import com.mycompany.delsym.actors.MessageProtocol
import com.mycompany.delsym.actors.Parse
import com.mycompany.delsym.actors.Stats
import com.mycompany.delsym.actors.Stop
import com.typesafe.config.ConfigFactory

import akka.actor.Actor
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.util.Timeout
import spray.httpx.SprayJsonSupport.sprayJsonUnmarshaller
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
import spray.json.pimpAny
import spray.routing.Directive.pimpApply
import spray.routing.HttpService

class RestActor extends Actor with RestService {

  val conf = ConfigFactory.load()
  implicit val timeout = Timeout(
    conf.getInt("delsym.rest.timeout").seconds)

  val controller = actorRefFactory.actorOf(
    Props[Controller], "controller")

  def actorRefFactory = context
  
  def receive = runRoute {
    (get & path("stats")) {
      jsonpWithParameter("callback") {
        complete {
          val future = (controller ? Stats(Map.empty))
            .mapTo[Stats]
          val result = Await.result(future, timeout.duration)
          import MessageProtocol.statsFormat
          result.toJson.prettyPrint
        }
      }
    } ~
    (put & path("fetch")) { 
      jsonpWithParameter("callback") {
        import MessageProtocol.fetchFormat
        entity(as[Fetch]) { fetch => 
          complete {
            controller ! fetch
            "Got(" + fetch.toJson.compactPrint + ")"
          }  
        }
      }
    } ~
    (put & path("parse")) { 
      jsonpWithParameter("callback") {
        import MessageProtocol.parseFormat
        entity(as[Parse]) { parse => 
          complete {
            controller ! parse
            "Got(" + parse.toJson.compactPrint + ")"
          }  
        }
      }
    } ~
    (put & path("index")) { 
      jsonpWithParameter("callback") {
        import MessageProtocol.indexFormat
        entity(as[Index]) { index => 
          complete {
            controller ! index
            "Got(" + index.toJson.compactPrint + ")"
          }  
        }
      }
    } ~
    (get & path("stop")) { 
      complete {
        import MessageProtocol.stopFormat
        controller ! Stop(0)
        "Stop signal sent"
      }
    }    
  }
}

trait RestService extends HttpService {

  implicit def executionContext = 
    actorRefFactory.dispatcher
}

Spray also provides JSON marshalling/unmarshalling facilities. This is automatic for native types and collections, but for case classes, it is necessary to specify the protocol. Since our messages are all case classes, we specify the protocol as below. This protocol needs to be brought into scope just before the actual JSON marshalling/unmarshalling, which is why we have the import MessageProtocol.*Format calls in the code above.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Source: src/main/scala/com/mycompany/delsym/actors/DelsymMessage.scala
package com.mycompany.delsym.actors

import akka.actor.ActorRef
import spray.json._
import DefaultJsonProtocol._

...

/////////////// Message <--> JSON ser/deser ////////////

object MessageProtocol extends DefaultJsonProtocol {
  implicit val fetchFormat = jsonFormat3(Fetch)
  implicit val parseFormat = jsonFormat1(Parse)
  implicit val indexFormat = jsonFormat1(Index)
  implicit val statsFormat = jsonFormat1(Stats)
  implicit val stopFormat = jsonFormat1(Stop)
}

To test this, I used cURL to send in a stats GET request and a fetch PUT request. The commands and their outputs are shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
sujit@tsunami:~/Projects/delsym$ curl localhost:8080/stats
{
  "stats": {
    "parsers": 0,
    "fetchers": 0,
    "indexers": 0
  }
}
sujit@tsunami:~/Projects/delsym$ curl -X PUT \
    -H "Content-Type: application/json \
    -d '{"url":"http://www.foo.com/bar", "depth":0, "metadata": {}}' \
    http://localhost:8080/fetch
Got({"url":"http://www.foo.com/bar","depth":0,"metadata":{}})

Although there is not that much code to show for it, this took me almost 2 days of experimentation to get right, mainly because Spray seems to make heavy use of implicits which are not really evident unless you read the documentation thoroughly. Here are some sites that helped me figuring things out.

  • GitHub Gist from Ayose Crzorla demonstrating a very simple Scala application that talks to two HttpServiceActors. This is what I started with.
  • Spray (REST on Akka) slides from Mathias Doenitz's talk in Paris Scala IO. This gave me some directive patterns that I used to implicitly convert JSON into message case classes and generate JSONP (although the wrapping in the callback doesn't work because I don't know how to set the content type).
  • Brandon Amos's Blog Post on adding shutdown hooks in Scala.

In addition to this, I also studied the code samples from the Akka in Action book, and downloaded the examples provided by Spray looking for useful patterns.

This part got done a bit earlier than planned, probably because I can sit around doing this at home all day over our Christmas to New Year office closure, but I am going to publish it anyway and move on to looking at how to distribute this application across multiple servers next. So in (the very likely) case that I don't post again before next year, belated Merry Christmas wishes and I hope you have a very Happy New Year and good times ahead in 2014.

Update 2014-01-01: For the remoting work, sbt gave me errors trying to download akka-remote for Akka 2.1.2 (and Spray 1.1-20130123) which I was working with so far (based on the version in the code for the Akka in Action book). So I upgraded Akka to the current latest stable version (Akka 2.2.3 and Spray 1.2.0) as a result of which both the classes in this post failed to compile. I had to rewrite them against the new API (using code examples from spray-routing examples). I have updated the code in the post to match the one in the DelSym GitHub repo.

Be the first to comment. Comments are moderated to prevent spam.