Click here to Skip to main content
14,238,693 members

Uber simple uber app written using loads of stuffz

Rate this:
5.00 (24 votes)
Please Sign up or sign in to vote.
5.00 (24 votes)
6 Feb 2018CPOL
Simple uber type app using Akka/React/TypeScript/Scala/Play/Kafka/MongoDb/Rx

Introduction

This article is the culmination of stuff I have been doing on the train on my way to work over 6 months. I have written quite a lot of blog posts on this already which you can read about on the projects home page on my blog : https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/

There are 13 blog posts there, but I thought it would be good to also have this overall article which covers all of it, as the blog posts are more a sequence of events that I went through which talk about the pieces in great detail as I went through them

For example these were the blog posts

 

Special Thanks

I would like to call out a special thanks to a very special man, Peter O'Hanlon code project legend and all round awesome dude. Who I asked to review this behemoth article, Pete immediately said yes, and found time to make that happen, depsite the fact he is a family man runs his own business and has his own cool ideas going on.

 

Thanks Pete, I salute you

 

 

So just exactly what is it that I was/am talking about?

In essence I want to write a very (pardon the pun) but uber simple uber type app. Where there are the following functional requirements
 

  • There should be a web interface that a client can use. Clients may be a driver or a pickup client requiring a delivery
  • There should be a web interface that a pickup client can use that shows a pickup client location on a map, which the pickup client chooses.
  • The pickup client may request a pickup job, in which case drivers that are in the area bid for a job.
  • The pickup client location should be visible to a driver on a map
  • A driver may bid for a pickup client job, and the bidding driver(s) location should be visible to the pickup client.
  • The acceptance of the bidding driver is down to the pickup client
  • Once a pickup client accepts a driver, ONLY the assigned driver(s) current map position will be shown to the pickup client
  • When a pickup client is happy that they have been picked up by a driver, the pickup client may rate the driver from 1-10, and the driver may also rate the pickup client from 1-10.
  • The rating should only be available once a pickup client has marked a job as completed
  • A driver or a pickup client should ALWAYS be able to view their previous ratings.

Whilst this may sound childs play to a lot of you (me included if I stuck to using simply CRUD operations), I just want to point out that this app is meant as a learning experience so I will not be using a simple SignalR Hub, and a couple of database tables.

I have written this project using a completely different set of technologies from the norm. Some of the technology choices could easily scale to hundreds of thousands of requests per second (Kafka has your back here)

TECHNOLOGIES INVOLVED

  • WebPack
  • React
  • React Router
  • PowerShell
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MongoDB
  • SBT
  • Kafka
  • Kafka Streams

They say a picture says a 1000 words, so here is a nice picture to get things started

Now before we carry on, let me just acknowledge that www.codeproject.com is mainly biased towards Microsoft tech and this code is mainly Scala/TypeScript. However I think there is still plenty to learn along the way, so don't let the fact it's not .NET/C/C++ put you off

 

Where is the code?

The code for this lot is here : https://github.com/sachabarber/MadCapIdea

Demo Video

CLICK THE IMAGE TO LAUNCH THE VIDEO

 

Prerequisites

As shown in the introduction section, there are many moving peices to this demo so, naturally, there are quite a few dependencies. I did try to get it to work in Docker for you, however I found that I still needed to create external scripts to orchestrate it all anyway. In the end I just went with downloading stuff as I will specify, and then giving a single PowerShell script to run most stuff, apart from 2 Scala projects.

This is the list of stuff you will need in order to run this code:

This has all been developed on Windows, so these instructions are all about how to get stuff working on Windows

- MongoDB : https://www.mongodb.com/dr/fastdl.mongodb.org/win32/mongodb-win32-x86_64-2008plus-ssl-3.4.6-signed.msi/download
- Confluence Platform 3.3.0 Open Source : http://packages.confluent.io/archive/3.3/confluent-oss-3.3.0-2.11.zip
- SBT
- Java 8 SDK
- Webpack
- Node.Js
- NPM
- IntelliJ IDEA v17.0 community
- PowerShell

Once you have downloaded all of this you will need to do a few things in order to run it nicely on Windows.

- Download the dependencies above (Keep a note of where you downloaded them as you will need them here and later)
- Replace the official YOUR extract location\confluent-3.3.0\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows
- Modify the YOUR extract location\confluent-3.3.0\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
- Modify the YOUR extract location\confluent-3.3.0\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true
- Modify the YOUR extract location\confluent-3.3.0\etc\kafka\server.properties file file to change the log.dirs to log.dirs=c:/temp/kafka-logs

You will need to remember some of these paths for the next section too, so just be mindful that you may have to edit the PowerShell script later with some new paths

How do I run all this stuff?

There are quite a few moving peices to this app, and they all need to be running in order for it to all work together.

1. Update node.js dependencies

Make sure you have Node.Js installed, and make sure NPM is installed too, also ensure that webpack is globally installed

Open command line and change to the MadCapIdea\PlayBackEndApi\FrontEndWebSite\ folder and run npm install

  • now run webpack from same folder

2. Kafka/Zookeeper etc etc

You can run the following powershell script to get all the pre-requistites up and running (assuming you have downloaded them all)

  • Open PowerShell command line and change to the PowerShellProject\PowerShellProject\ folder and run .\RunPipeline.ps1. Make sure you modify the top section paths to match your own installation paths

3. Play application

  • Open the SBT/Scala project inside IntelliJ IDEA (you will need the SBT plugin, and Java8 installed on your machine).
  • Open this folder MadCapIdea\PlayBackEndApi and run it.You may need to create a run time configuration

4. Kafka Streams application

  • Open the SBT/Scala project inside IntelliJ IDEA (you will need the SBT plugin, and Java8 installed on your machine).
  • Open this folder MadCapIdea\KafkaStreams and run it.You may need to create a run time configuration where you point to this main class RatingStreamProcessingApp

5. React

  • Open a browser navigate to http://localhost:9000, and register some users both passenger/driver

I normally follow this set of steps afterwards

  • open a tab, login as a passenger that I had created
  • go to the "create job" page, click the map, push the "create job" button
  • open a NEW tab, login as a new driver, go to the "view job" page
  • on the 1st tab (passenger) click the map to push passenger position to driver
  • on the 2nd tab (driver) click the map to push driver position to passenger
  • repeat last 4 steps for additonal driver
  • on client tab pick driver to accept, click accept button
  • complete the job from client tab, give driver rating
  • complete the job from paired driver tab, give passenger rating
  • go to "view rating" page, should see ratings

 

Known issues

The following are known issues

  • Once a driver and passenger become paired, the position updates from either are no longer reflected. I am sure this would boil down to a single JavaScript method that needs updating in the ViewJob.tsx file.
  • On the ViewJob page I originally wanted the passenger to be able to accept a driver by clicking a button in the drivers Overlay GoogleMap component. However no matter what I tried this causes a MapClick event to still happen, which changed the passengers position. So I had to resort to using a weird drop down select way for a passenger to accept a driver. This sucked, but meh

So those are the issues, I just kind of got to the end of a very long road (I have been writing about this on an off for 6 months of train rides, as well as not caring at all, and just staring into space) and was just happy that I got 99% of the stuff done that I set out to do. I just thought you know what, the app as it is now, demonstrates everything I set out to do, so I'm ok with 1-2 known issues.

Some basics

Before we dive into the actual code for the app (and there is quite a bit of it), I just thought it may be good to go over some of the individual building blocks that make up the app as a whole first, the next few sections will do that. It is a long list of stuff, and the demo app makes use of all of this plus a few more bits, that I deemed to be not important enough to warrant their own sections. We will learn more about each of these when we walk through the actual demo app code, this is more of an overiew of the various parts before we look at the specific usage

What is Kafka?

Overview

Apache Kafka is an open-source stream processing platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Its storage layer is essentially a "massively scalable pub/sub message queue architected as a distributed transaction log," making it highly valuable for enterprise infrastructures to process streaming data. Additionally, Kafka connects to external systems (for data import/export) via Kafka Connect and provides Kafka Streams, a Java stream processing library.

The design is heavily influenced by transaction logs.

Apache Kafka Architecture

Kafka stores messages which come from arbitrarily many processes called "producers". The data can thereby be partitioned in different "partitions" within different "topics". Within a partition the messages are indexed and stored together with a timestamp. Other processes called "consumers" can query messages from partitions. Kafka runs on a cluster of one or more servers and the partitions can be distributed across cluster nodes.

Apache Kafka efficiently processes the real-time and streaming data when used along with Apache Storm, Apache HBase and Apache Spark. Deployed as a cluster on multiple servers, Kafka handles its entire publish and subscribe messaging system with the help of four APIs, namely, producer API, consumer API, streams API and connector API. Its ability to deliver massive streams of message in a fault-tolerant fashion has made it replace some of the conventional messaging systems like JMS, AMQP, etc.

The major terms of Kafka's architecture are topics, records, and brokers. Topics consist of stream of records holding different information. On the other hand, Brokers are responsible for replicating the messages.

  • Producer API - Permits the applications to publish streams of records. (covered in this article)
  • Consumer API - Permits the application to subscribe to the topics and processes the stream of records. (covered in this article)
  • Streams API  This API converts the input streams to output and produces the result. (covered in this article)
  • Connector API  Executes the reusable producer and consumer APIs that can link the topics to the existing applications. (not covered in this article)

 

Anatomy of a Kafka Topic

Offset : messages in the partitions are each assigned a unique (per partition) and sequential Id, called the "offset". The "offset" is tracked by consumers, where each consumer tracks via (offset, partition, topic) tuples

Consumer Groups

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes

A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.

 

Kafka Performance

Due to its widespread integration into enterprise-level infrastructures, monitoring Kafka performance at scale has become an increasingly important issue. Monitoring end-to-end performance requires tracking metrics from brokers, consumer, and producers, in addition to monitoring ZooKeeper which is used by Kafka for coordination among consumers

https://en.wikipedia.org/wiki/Apache_Kafka up on date 02/01/18

 

What is Kafka Streams?

The Streams API of Apache Kafka, available through a Java library, can be used to build highly scalable, elastic, fault-tolerant, distributed applications and microservices. First and foremost, the Kafka Streams API allows you to create real-time applications that power your core business. It is the easiest yet the most powerful technology to process data stored in Kafka. It builds upon important concepts for stream processing such as efficient management of application state, fast and efficient aggregations and joins, properly distinguishing between event-time and processing-time, and seamless handling of late-arriving and out-of-order data.

A unique feature of the Kafka Streams API is that the applications you build with it are normal Java applications. These applications can be packaged, deployed, and monitored like any other Java application  there is no need to install separate processing clusters or similar special-purpose and expensive infrastructure!



An application that uses the Kafka Streams API is a normal Java application. Package, deploy, and monitor it like you would do for any other Java application. Even so, your application will be highly scalable, elastic, and fault-tolerant.

https://docs.confluent.io/current/streams/introduction.html up on date 02/01/18

So that is what the official docs say about it. Here is my take on it

Kafka Streams is an additional API on top of Kafka that allows you to perform many aggregate and filtering, time based windowing operations over the incoming messages that can either be stored to an internal database key-value representation known as a KTable which uses a state store (based on RocksDB), or you may choose to push the transformed stream values out to a new output topic.

You can perform complex stream processing, merge streams, and also store accumulated stream state.

It is a an AWESOME bit of kit

What is Play?

The Play Framework is a Scala based MVC (model view controller) type web application framework. As such it has in built mechanisms for things typical of a MVC web framework (certainly if you have done any ASP MVC .NET you would find it very familiar).

So we have the typical MVC concerns covered by the Play Framework

  • Controllers
  • Actions
  • Routing
  • Model binding
  • JSON support
  • View engine

Thing is, I will not be doing any actual HTML in the Play Framework back end code, I want to do all of that using the previously covered webpack/typescript/react starter code I have shown so far. Rather I will be using the Play Framework as a API backend, where we will simply be using various controllers as endpoint to accept/serve JSON, and Event streamed data. All the actual front end work/routing will be done via webpack and React.

There are still some very appealing parts in Play that I did want to make use of, such as:

  • It is Scala, which means when I come to integrate Kafka / Kafka Streams it will be ready to do so
  • It uses Akka which I wanted to use. I also want to use Akka streams, which Play also supports
  • Play controllers lend themselves quite nicely to being able to create a fairly simple REST API
  • It can be used fairly easily to serve static files (think of these as the final artifacts that come out of the webpack generation pipeline). So things like minimized CSS / JS etc etc

So hopefully you can see that using Play Framework still made a lot of sense, even if we will only end up using 1/2 of what it has to offer. To be honest the idea of using controllers for a REST API is something that is done in ASP MVC .NET all time either by using of actual controllers or by using the WebApi.

Ok so now that we know what we will be using Play Framework for, how about we dive into the code for this post.

Play Framework Basics

Lets start by looking at the bare bones structure of a Play Framework application, which looks like this (I am using IntelliJ IDEA as my IDE)

Lets talk a bit about each of these folders

app

This folder would hold controllers/views (I have added the Entities folder there that is not part of a Play Framework application requirements). Inside the controllers folder you would find controllers, and inside the views folder you would find views. For the final app there will be no views folder, I simply kept that in this screenshot to talk about what a standard Play Framework application looks like

conf

This folder contains the configuration for the Play Framework application. This would include the special routes file, and any other application specific configuration would might have.

Lets just spend a minute having a look at the Play Framework routes file, which you can read more about here : https://www.playframework.com/documentation/2.5.x/ScalaRouting

The routes file has its own DSL, that is responsible for matching a give route with a controller + controller action. The controller action that matches the route is ultimately responsible for servicing the http request. I think the DSL shown in routes file below is pretty self explanatory with perhaps the exception of the assets based routes.

All assets based http requests (ie ones that start with /assets for example http://localhost:9000/assets/images/favicon.png would actually be routed through to a special controller called Assets. You dont see any code for this one, its part of the Play Framework application codebase. This special Assets inbuilt play controller is responsible for serving up static data files which it expects to find in the public folder. So for example our initial request of http://localhost:9000/assets/images/favicon.png would get translated into this file (relative path from project root) /public/images/favicon.png. As I say this is handled for you by the special Assets built in controller.

The only other funky part to the Assets based route is that it uses a *file in its route. Which essentially boils down to the play framework being able match a multi-part path. Which we actually just saw with the example above http://localhost:9000/assets/images/favicon.png , see how that contains not only the file name, but also a directory of images. The Assets controller + routing is able to deal with that path just fine.

# Routes
# This file defines all application routes (Higher priority routes first)
# ~~~~

# Home page

GET        /                                   controllers.HomeController.index()

GET        /scala/comet/liveClock              controllers.ScalaCometController.streamClock()
GET        /scala/comet/kick                   controllers.ScalaCometController.kickRandomTime()

# Map static resources from the /public folder to the /assets URL path
GET        /assets/*file                       controllers.Assets.at(path="/public", file)

Ok so moving on to the rest of the standard folders that come with a Play Framework application

public

This is where you will need to put any static content that you wish to be served. Obviously views (if you use that part of play) will be within in the app/views folder. Like I say I am not using the views aspect of Play so you will not be seeing any views in my views folder. I instead want to let webpack et all generate my routing, web page etc etc. I do however want to serve bundles so later on I will be showing you how my webpack generated bundles fit in with the Play Framework ecco system.

target

Since this is a scala based project we get the standard scala based folders, and target is one of them, that has the generated/compiled code in it.

SBT

It is worth pointing out that my Play Framework application is an SBT based project, as such there is an SBT aspect to it, which largely boils down to these files

Project [root-build] / plugs.sbt file

This file adds Play as a plugin for the SBT project

// The Lightbend repository
resolvers += Resolver.typesafeRepo("releases")

// Use the Play sbt plugin for Play projects
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.14")

build.sbt

This is the main SBT build file for the project. This is where all our external dependencies are brought in etc etc (standard SBT stuff)

import play.sbt._
import sbt.Keys._
import sbt._

name := "play-streaming-scala"

version := "1.0-SNAPSHOT"

scalaVersion := "2.11.11"

lazy val root = (project in file(".")).enablePlugins(play.sbt.PlayScala)

javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")

initialize := {
  val _ = initialize.value
  if (sys.props("java.specification.version") != "1.8")
    sys.error("Java 8 is required for this project.")
}

So I think that covers the basics of a standard Play Framework application, the remainder of this article will cover the actual code for the demo project, where we will dive into routes/controllers etc etc

What is Akka

Akka is a great framework for building distibuted fault tolerant apps. It is built around the concept of actors, and has failure in mnid from the outset. It also comes with suport for HTTP over actors, and comes with play suport out of the box. One thing that also comes with Akka that I make use of is its reactive streams API. This is something like Rx whre you build streams of data that are also fault tolerant, and also feature back pressure to ensure a consumer is not over burdened by a fast producer. I have written a lot about Akka before which you can read more about here : https://sachabarbs.wordpress.com/akka-series/ where I cover quite a lot of Akkas functionality

What is WebPack?

We are headed towards an era where browsers will natively support modules, and dependencies but we are not there yet. Over the years there have been many attempts at JavaScript module management, and dependency tracking of related files, such as

  • ASP MVC bundles (fairly simplistic only bundles no other features)
  • CommonJs (require syntax, which is fairly intrusive in how your JS files work)
  • Browserify (again fairly intrusive in how your JS files are loaded and what they need to manage dependencies)
  • NPM (not really a module manager as such more like NuGet for installing packages)
  • SystemJS (has import/export syntax but a bit more syntax than Webpack)
  • JSPM uses SystemJS (fairly nice but not in as much use as Webpack)
  • Various task runners can be used such as Grunt/Gulp to build bundles

All of these have there pros/cons, however I think that it is fair to say that WebPack has emerged as the defacto standard module manager for JavaScript (for now anyway), that supports rich dependency graphs and bundles and also supports things like

  • Transpilers (such as Babel/Typescript/SASS/SCSS/LESS etc etc)
  • Sourcemaps (to go from your transpiled JS back to what you wrote)
  • Minification
  • File hashing (to allow browsers to load latest bundles as it has a different hash)

This is kind of what you get with webpack, where it is very clever about preserving the dependency graph and the bundles created, and allows you to use things like export and import to manage your inter dependencies

 

What is Babel?

Babel is a very neat JavaScript library that allows you to use next generation JavaScript syntax now, even if your targetted browser doesn't suport the syntax you are trying to use. Is is installed via the Node Packager Manager (NPM) and is one part of you transpilation pipeline. For this article we push stuff through TypeScript transpiler -> Babel transpiler and the final result is JavaScript that can be sent ot the browser.

Using babel this is the sort of things you will be able to do, where native browser support for these may be varied, or not supported at all

What is SASS/SCSS/LESS

I guess if you have used HTML you will have certainluy used CSS. Now CSS is great, but it lacks certain things such as

  • Variables
  • Heirachy selectors
  • Mixins

This is something that SASS/SCSS fixes. In much the same way that things like Typescript are transpiled to JavaScript, SASS/SCSS is transpiled to CSS.

By using SCSS you are now able to do things like this, note the variables

$blue: #3bbfce;
$margin: 16px;

.content_navigation {
  border-color: $blue;
  color: darken($blue, 10%);
}

.border {
  padding: $margin / 2;
  margin: $margin / 2;
  border-color: $blue;
}

What is React?

Unless you have been living under a rock you will have heard/used/seen React. React is maintained and developed by Facebook, and it is a JavaScript library for building user interfaces. It is not as much as a framework as say something like Angular is, and it is really just the "View" part of the typical MVC puzzle that is so prevelant in modern JS UI libraries/frameworks. It is declarative, and component centric. It is accepted that a React app would be made up of many smaller components. I can't give you a full walkthrough of React here, but we will certainly see more React code/components as the article demo code is dissected below

What is TypeScript?

Typescript  is a Microsoft offering that attempts to bring better typings and other useful constructs/language features to JavaScript. As TypeScript is a superset of JavaScript, existing JavaScript programs are also valid TypeScript programs. Typescript files will get transpiled into regular javascript files via a Typescript compiler (or in the case of WebPack via a Webpack loader)

Some of the features that TypeScript brings to the table are

  • Types
  • Interfaces
  • Parameter types
  • Enums
  • Generics

The demo article that goes with this makes a lot of use of TypeScript (particularly TSX which is a TypeScript verison of a React JSX file), and will also make use of a nice Inversion Of Control container for TypeScript called Inversify.

The app

The next set of sections will walk through the demo app. I have decided to take a workflow approach to describing the app, and as such will walk/talk through each of the workflows, which I think is the best way to do it

The app has MANY moving parts, but can largely be broken down into the following areas

  • Play backend API (scala)
  • React front end (TypeScript)
  • Kafka Streams (scala)

We will get to all of these parts, but before that let's examine some of the stuff that helps the front end work get of the ground

NPM requirements

The demo project uses quite a few components, such as

  • TypeScript
  • React
  • Babel
  • RX
  • SCSS
  • Various other libraries

As such we need a way to pull all these packages into use, so for that and the front end side of things we use NPM which you will need to have installed. The following shows the NPM dependencies for the demo app PlayBackEndApi/FrontEndWebSite/package.json file

{
  "name": "task1webpackconfig",
  "version": "1.0.0",
  "description": "webpack 2 + TypeScript 2 + Babel example",
  "repository": {
    "type": "git",
    "url": "git+https://github.com/sachabarber/MadCapIdea.git"
  },
  "keywords": [
    "babel",
    "typescript",
    "webpack",
    "bundling",
    "javascript",
    "npm"
  ],
  "author": "sacha barber",
  "homepage": "https://github.com/sachabarber/MadCapIdea#readme",
  "dependencies": {
    "bootstrap": "^3.3.7",
    "inversify": "^4.1.0",
    "jquery": "^3.2.1",
    "lodash": "^4.17.4",
    "react": "^15.5.4",
    "react-bootstrap": "^0.28.1",
    "react-bootstrap-validation": "^0.1.11",
    "react-dom": "^15.5.4",
    "react-google-maps": "^7.0.0",
    "react-measure": "^2.0.2",
    "react-router": "^3.0.5",
    "react-stars": "^2.1.0",
    "reflect-metadata": "^0.1.10",
    "revalidator": "^0.3.1",
    "rx": "^4.1.0",
    "webpack": "^2.5.1",
    "webpack-merge": "^4.1.0"
  },
  "devDependencies": {
    "@types/jquery": "^2.0.43",
    "@types/lodash": "^4.14.63",
    "@types/react": "^15.0.24",
    "@types/react-dom": "^15.5.0",
    "@types/rx": "^4.1.1",
    "awesome-typescript-loader": "^3.1.3",
    "babel-core": "^6.24.1",
    "babel-loader": "^7.0.0",
    "babel-preset-es2015": "^6.24.1",
    "babel-preset-es2015-native-modules": "^6.9.4",
    "babel-preset-react": "^6.24.1",
    "css-loader": "^0.28.1",
    "extract-text-webpack-plugin": "^2.1.0",
    "file-loader": "^0.11.1",
    "html-webpack-plugin": "^2.28.0",
    "node-sass": "^4.5.2",
    "on-build-webpack": "^0.1.0",
    "sass-loader": "^6.0.3",
    "source-map-loader": "^0.2.1",
    "typescript": "^2.3.2",
    "url-loader": "^0.5.8",
    "webpack": "^2.4.1"
  },
  "scripts": {
    "build-dev": "webpack -d --config webpack.develop.js",
    "build-prod": "webpack --config webpack.production.js"
  }
}

Babel config

As stated above the demo project makes use of Babeljs.io to use future state JavaScript now. As such we also need to supply a small Babel config file (PlayBackEndApi/FrontEndWebSite/.babelrc), this is shown below, where we opt to use the ES2015/React presets

{ "presets": ["es2015","react"] }

TypeScript config

The demo also makes use of  TypeScript which also need some specific configuration. As such we also need to supply a small TypeScript config file (PlayBackEndApi/FrontEndWebSite/tsconfig.json), this is shown below, the full set of TypeScript options can also be found here : https://www.typescriptlang.org/docs/handbook/tsconfig-json.html

{
  "compilerOptions": {
    "allowSyntheticDefaultImports": true,
    "moduleResolution": "node",
    "outDir": "./dist/",
    "sourceMap": true,
    "noImplicitAny": false,
    "module": "es2015",
    "target": "es5",
    "lib": ["es6", "dom"],
    "jsx": "react",
    "experimentalDecorators": true,
    "emitDecoratorMetadata": true,
    "types" : ["jquery", "lodash", "react", "react-dom", "reflect-metadata"]
  },
    "include": [
        "./src/**/*"
    ]
}

Sourcemap files

Sourcemaps are a magical thing that allow you to write your JavaScript in TypeScript say which is then possibly run through other transpilers (such as Babel) and then webpack where it is bundled according to your rules, where you then send the JavaScript to the browser. That sounds great, but do you write code write code first time, I don't, and constantly need to debug stuff.

So if I wrote stuff in TypeScript and that gets turned into plain vanilla JavaScript, how on earth do I make sense of the stuff that is in the browser?

Well that is where sourcemaps come in, the literally send a clever map that allows you to put a break point into your source code (which will be sent to the browser, ideally only in development phases) and it will know how to translate that breakpoint into the correct place/line in the transpiled vanilla JavaScript that the browser is using.

Webpack has in built support for sourcemaps (phew)

The best write up of source maps I have seen is this one : https://www.html5rocks.com/en/tutorials/developertools/sourcemaps/

It should be noted that SourceMaps should only be used in development phases, as in production you  should/would want to minify your JavaScript too.

How does Webpack/Play work with this?

WebPack has this concept of loaders that are what gets used to load different file types. So for example you can use a TypeScript loader that gets piped through a babel loader, where the final result will be vanilla JavaScript.

  • You can also control how you final bundle will work, what name it has, where is will be generated etc etc
  • How minification will work
  • Configure loaders
  • Turn on SourceMap
  • Configure things like jquery and lodash to appear in the usual places as $ and _ respectively

Perhaps its best to see the demo codes WebPack file which looks like this

PlayBackEndApi/FrontEndWebSite/webpack.config.js

let _ = require('lodash');
let webpack = require('webpack');
let path = require('path');
let fs = require("fs");
let WebpackOnBuildPlugin = require('on-build-webpack');
let ExtractTextPlugin = require('extract-text-webpack-plugin');
let HtmlWebpackPlugin = require('html-webpack-plugin');

let babelOptions = {
    "presets": ["es2015", "react"]
};

function isVendor(module) {
    return module.context && module.context.indexOf('node_modules') !== -1;
}

let entries = {
    index: './src/index.tsx',
    indexCss: './scss/index.scss'

};

//build it to the Play Framework public folder, which is services by the assets controller
let buildDir = path.resolve(__dirname, '../public/dist');

module.exports = {

    context: __dirname,

    entry: entries,

    output: {
        filename: '[name].bundle.[hash].js',
        path: buildDir,
		//this is to make it play nice with the Play Framework Assets controllers
		//that deals with static data
		publicPath: '/assets/dist'
    },

    // these break for node 5.3+ when building WS stuff
    node: {
        fs: 'empty'
    },

    watch: true,

    devServer: {
        open: true, // to open the local server in browser
        contentBase: __dirname,
    },

    // Enable sourcemaps for debugging webpack's output.
    devtool: "source-map",

    resolve: {
        extensions: [".tsx", ".ts", ".js", ".jsx"],
        modules: [path.resolve(__dirname, "src"), "node_modules"]
    },

    plugins: [

        //The ProvidePlugin makes a module available as a variable in every other
        //module required by webpack
        new webpack.ProvidePlugin({
            $: "jquery",
            jQuery: "jquery",
            "window.jQuery": "jquery"
        }),

        // creates a common vendor js file for libraries in node_modules
        new webpack.optimize.CommonsChunkPlugin({
            names: ['vendor'],
            minChunks: function (module, count) {
                return isVendor(module);
            }
        }),

        // creates a common vendor js file for libraries in node_modules
        new webpack.optimize.CommonsChunkPlugin({
            name: "commons",
            chunks: _.keys(entries),
            minChunks: function (module, count) {
                return !isVendor(module) && count > 1;
            }
        }),


        //will unlink unused files on a build
        //http://stackoverflow.com/questions/40370749/how-to-remove-old-files-from-the-build-dir-when-webpack-watch
        new WebpackOnBuildPlugin(function (stats) {
            const newlyCreatedAssets = stats.compilation.assets;

            const unlinked = [];
            fs.readdir(path.resolve(buildDir), (err, files) => {
                files.forEach(file => {
                    if (file != "fonts") {
                        if (!newlyCreatedAssets[file]) {
                            fs.unlink(path.resolve(buildDir + '\\' + file));
                            unlinked.push(file);
                        }
                    }
                });
                if (unlinked.length > 0) {
                    console.log('Removed old assets: ', unlinked);
                }
            })
        }),

        //scss/sass files extracted to common css bundle
        new ExtractTextPlugin({
            filename: '[name].bundle.[hash].css',
            allChunks: true,
        }),

        new HtmlWebpackPlugin({
            filename: 'index.html',
            template: 'template.html',
        })
    ],

    module: {
        rules: [
            // All files with a '.ts' or '.tsx' extension will be handled by 'awesome-typescript-loader' 1st 
            // then 'babel-loader'
            // NOTE : loaders run right to left (think of them as a cmd line pipe)
            {
                test: /\.ts(x?)$/,
                exclude: /node_modules/,
                use: [
                  {
                      loader: 'babel-loader',
                      options: babelOptions
                  },
                  {
                      loader: 'awesome-typescript-loader'
                  }
                ]
            },


            // All files with a .css extenson will be handled by 'css-loader'
            {
                test: /\.css$/,
                loader: ExtractTextPlugin.extract(['css-loader?importLoaders=1']),
            },

            // All files with a .scss|.sass extenson will be handled by 'sass-loader'
            {
                test: /\.(sass|scss)$/,
                loader: ExtractTextPlugin.extract(['css-loader', 'sass-loader'])
            },


            // All files with a '.js' extension will be handled by 'babel-loader'.
            {
                test: /\.js$/,
                exclude: /node_modules/,
                use: [
                  {
                      loader: 'babel-loader',
                      options: babelOptions
                  }
                ]
            },

            { 
                test: /\.png$/, 
                loader: "url-loader?limit=100000" 
            },
      
            { 
                test: /\.jpg$/, 
                loader: "file-loader" 
            },

            {
                test: /\.woff(\?.*)?$/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff'
            },

            {
                test: /\.woff2(\?.*)?$/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff2'
            },

            {
                test: /\.ttf(\?.*)?$/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/octet-stream'
            },

            {
                test: /\.eot(\?.*)?$/, loader: 'file-loader?prefix=fonts/&name=fonts/[name].[ext]'
            },

            {
                test: /\.svg(\?.*)?$/,
                loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=image/svg+xml'
            },

            // All output '.js' files will have any sourcemaps re-processed by 'source-map-loader'.
            {
                enforce: "pre",
                test: /\.js$/,
                loader: "source-map-loader"
            }
        ]
    }
};

Of particular note is this part

output: {
        filename: '[name].bundle.[hash].js',
        path: buildDir,
		//this is to make it play nice with the Play Framework Assets controllers
		//that deals with static data
		publicPath: '/assets/dist'
    }

What that is doing is using a hash for the file name, such that the browser would immediately see a change in the file, so would reload it, and not cache old JavaScript. The other part of this is where the file gets generated. As this "front end" part still needs to be hosted in back end (Play framework for this article) we need to ensure that the final JavaScript ends up in a location that the Play framework is able to render. For this article this means the static Play framework route which is configured as follows in the PlayBackEndApi/routes file in the PlayBackEndApi project

GET   /assets/*file                            controllers.Assets.at(path="/public", file)

PlayBackEndApi/FrontEndWebSite/webpack.develop.js

Where we might have a specialized develop phase variant (see how we use the base file webpack.config.js as a base) file that looks like this

let commonConfig = require('./webpack.config.js');
let webpack = require('webpack');
let Merge = require('webpack-merge');

module.exports = function (env) {
    return Merge(commonConfig, {})
}

PlayBackEndApi/FrontEndWebSite/webpack.production.js

Where we might have a specialized production phase variant (see how we use the base file webpack.config.js as a base) file that looks like this, where we do things like this to make the emitted JavaScript production like

  • Minify it
  • Strip comments
  • Strip console.log (some browsers don't have it)
  • Don't enable SourceMaps
let commonConfig = require('./webpack.config.js');
let webpack = require('webpack');
let Merge = require('webpack-merge');

module.exports = function (env) {
    return Merge(commonConfig, {
        plugins: [
          new webpack.LoaderOptionsPlugin({
              minimize: true,
              debug: false
          }),
          new webpack.optimize.UglifyJsPlugin({
              // Eliminate comments
              comments: false,
              beautify: false,
              mangle: {
                  screw_ie8: true,
                  keep_fnames: true
              },
              compress: {
                  screw_ie8: true,

                  // remove warnings
                  warnings: false,

                  // Drop console statements
                  drop_console: true
              },
              comments: false,
              sourceMap: false
          })
        ]
    })
}

The SPA

So the React front end is really a Single Page Application, as such you can expect to find a single page. So just where is that single page?

We we need to go back to some of the stuff in the webpack.config.js file, specifically these bits

//build it to the Play Framework public folder, which is services by the assets controller
let buildDir = path.resolve(__dirname, '../public/dist');

output: {
        filename: '[name].bundle.[hash].js',
        path: buildDir,
		//this is to make it play nice with the Play Framework Assets controllers
		//that deals with static data
		publicPath: '/assets/dist'
    },
....    
....    
....    
....    
new HtmlWebpackPlugin({
            filename: 'index.html',
            template: 'template.html',
        })

What this is doing is saying that Webpack will try and generate a file called index.html by using a template called template.html and the final file will be put in a relative path called /assets/dist, which if you go back a bit was mapped using the  following in the PlayBackEndApi/routes file in the PlayBackEndApi project

GET   /assets/*file                            controllers.Assets.at(path="/public", file)

PlayBackEndApi/FrontEndWebSite/template.html

So what does the template.html file look like, well here it is, where it also includes a CDN references to Google maps API as we use that in this article

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8" />
    <title>Hello React!</title>
    <script src="https://maps.googleapis.com/maps/api/js?key=AIzaSyBVtreRNA537_WsNSn2_kOiz3Xhm8w6pEo"

            type="text/javascript"></script>
</head>
<body>
    <div>
        <iframe id="comet" src="/job/streamedJob"></iframe>
    </div>
    <div id="root"></div>
    <!-- Main -->
</body>
</html>	

PlayBackEndApi/public/dist/index.html

So after WebPack has worked its magic, what does the final file look like. Well it looks like this

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8" />
    <title>Hello React!</title>
    <script src="https://maps.googleapis.com/maps/api/js?key=AIzaSyBVtreRNA537_WsNSn2_kOiz3Xhm8w6pEo"

            type="text/javascript"></script>
<link href="/assets/dist/vendor.bundle.5c5feaa8663412cf31c5.css" rel="stylesheet">
<link href="/assets/dist/indexCss.bundle.5c5feaa8663412cf31c5.css" rel="stylesheet"></head>
<body>
    <div>
        <iframe id="comet" src="/job/streamedJob"></iframe>
    </div>
    <div id="root"></div>
    <!-- Main -->
<script type="text/javascript" src="/assets/dist/vendor.bundle.5c5feaa8663412cf31c5.js"></script>
<script type="text/javascript" src="/assets/dist/index.bundle.5c5feaa8663412cf31c5.js"></script>
</body>
</html>		

See how the correct Css and JavaScript files have been placed into the HEAD/Script tags automatically, this is great I think, and they also have nice hashes as part of the file name, to allow the browser caches to be easily invalidated

Play "Home" route

So that's great but how does this page get served up. Well the trick to that lies in this Play framework route entry

# Home page
GET   /                                        controllers.HomeController.index()

Which has this server side code to serve the route

package controllers

import javax.inject.Inject
import play.api.mvc.{Action, Controller}

class HomeController @Inject() (environment: play.api.Environment)
  extends Controller {

  def index() = Action {
    val fullpath = s"${environment.rootPath}\\public\\dist\\index.html"
    val htmlContents = scala.io.Source.fromFile(fullpath).mkString
    Ok(htmlContents).as("text/html")
  }

}

 

Routing

As I stated above this demo is a Singlle Page App (SPA) type of app. So how is that done. That is done in this file PlayBackEndApi/FrontEndWebSite/src/index.tsx, which is also set as an entry point in the WebPack webpack.config.js file.

let entries = {
    index: './src/index.tsx',
    .....
    .....
    .....
    .....
};

//build it to the Play Framework public folder, which is services by the assets controller
let buildDir = path.resolve(__dirname, '../public/dist');

module.exports = {

    context: __dirname,

    entry: entries,
	
	
	.....
	.....
	.....
	.....
	.....

}

PlayBackEndApi/FrontEndWebSite/src/index.tsx

Where the actual routing work is done like this, where the main top level components will be

  • Login
  • Register
  • Logout
  • CreateJob
  • ViewJob
  • ViewRating

Some of which are conditional routes depending on whether you are logged in or not.  

import * as React from "react";
import * as ReactDOM from "react-dom";
import 'bootstrap/dist/css/bootstrap.css';
import {
    Nav,
    Navbar,
    NavItem,
    NavDropdown,
    MenuItem,
    Button
} from "react-bootstrap";
import { Router, Route, hashHistory } from 'react-router'
import { Login } from "./Login";
import { Logout } from "./Logout";
import { Register } from "./Register";
import { CreateJob } from "./CreateJob";
import { ViewJob } from "./ViewJob";
import { ViewRating } from "./ViewRating";
import { ContainerOperations } from "./ioc/ContainerOperations";
import { AuthService } from "./services/AuthService";
import { JobService } from "./services/JobService";
import { JobStreamService } from "./services/JobStreamService";
import { PositionService } from "./services/PositionService";
import { TYPES } from "./types";
import Rx from 'rx';

let authService = ContainerOperations.getInstance().container.get<AuthService>(TYPES.AuthService);
let jobService = ContainerOperations.getInstance().container.get<JobService>(TYPES.JobService);
let jobStreamService = ContainerOperations.getInstance().container.get<JobStreamService>(TYPES.JobStreamService);
let positionService = ContainerOperations.getInstance().container.get<PositionService>(TYPES.PositionService);
jobStreamService.init();


export interface MainNavProps {
    authService: AuthService;
    jobService: JobService;
    jobStreamService: JobStreamService;
    positionService: PositionService;

}

export interface MainNavState {
    isLoggedIn: boolean;
}

class MainNav extends React.Component<MainNavProps, MainNavState> {

    private _subscription: any;

    constructor(props: any) {
        super(props);
        console.log(props);
        this.state = {
            isLoggedIn: false
        };
    }

    componentWillMount() {
        this._subscription = this.props.authService.getAuthenticationStream()
            .subscribe(isAuthenticated => {
                this.state = {
                    isLoggedIn: isAuthenticated
                };
                if (this.state.isLoggedIn) {
                    hashHistory.push('/createjob');
                }
                else {
                    hashHistory.push('/');
                }
            });
    }

    componentWillUnmount() {
        this._subscription.dispose();
    }

    render() {
        return (
            this.state.isLoggedIn ?
                <Navbar collapseOnSelect>
                    <Navbar.Header>
                        <Navbar.Brand>
                            <span>Simple Kafka-Uber</span>
                        </Navbar.Brand>
                        <Navbar.Toggle />
                    </Navbar.Header>
                    <Navbar.Collapse>
                        <Nav pullRight>
                            <NavItem eventKey={2} href='#/logout'>Logout</NavItem>
                            <NavItem eventKey={2} href='#/createjob'>Create Job</NavItem>
                            <NavItem eventKey={2} href='#/viewjob'>View Job</NavItem>
                            <NavItem eventKey={2} href='#/viewrating'>View Rating</NavItem>
                        </Nav>
                    </Navbar.Collapse>
                </Navbar> :
                <Navbar pullRight collapseOnSelect>
                    <Navbar.Header>
                        <Navbar.Brand>
                            <span>Simple Kafka-Uber</span>
                        </Navbar.Brand>
                        <Navbar.Toggle />
                    </Navbar.Header>
                    <Navbar.Collapse>
                    </Navbar.Collapse>
                </Navbar>
        )
    }
}

class App extends React.Component<undefined, undefined> {
    render() {
        return (
            <div>
                <div>
                    <MainNav
                        authService={authService}
                        jobService={jobService}
                        jobStreamService={jobStreamService}
                        positionService={positionService}
                    />
                    {this.props.children}
                </div>
            </div>
        )
    }
}


ReactDOM.render((
    <Router history={hashHistory}>
        <Route component={App}>
            <Route
                path="/"
                component={Login}
                authService={authService} />
            <Route
                path="/register"
                component={Register}
                authService={authService} />
            <Route
                path="/logout"
                component={Logout}
                authService={authService}
                jobService={jobService}
                positionService={positionService} />
            <Route
                path="/createjob"
                component={CreateJob}
                authService={authService}
                jobService={jobService}
                positionService={positionService} />
            <Route
                path="/viewjob"
                component={ViewJob}
                authService={authService}
                jobService={jobService}
                jobStreamService={jobStreamService}
                positionService={positionService} />
            <Route
                path="/viewrating"
                component={ViewRating}
                authService={authService} />
        </Route>
    </Router>
), document.getElementById('root'));

This makes use of  ReactRouter version 3.0.5 to do the routing within the SPA, and also make use of React-Bootstrap for rendering the NavBar

This TypeScipt also hooks up a RxJS subcription to determine whether you are logged in/out, this is done using the following injectable service PlayBackEndApi/FrontEndWebSite/src/services/AuthService.ts

import { injectable, inject } from "inversify";
import { TYPES } from "../types";
import Rx from 'rx';

@injectable()
export class AuthService {

    private _isAuthenticated: boolean;
    private _authenticatedSubject = new Rx.Subject<boolean>();

    constructor() {

    }

    clearUser = (): void => {
        this._isAuthenticated = false;
        sessionStorage.removeItem('currentUserProfile');
        this._authenticatedSubject.onNext(false);
    }

    storeUser = (currentProfile: any): void => {

        if (currentProfile == null || currentProfile == undefined)
            return;

        this._isAuthenticated = true;
        sessionStorage.setItem('currentUserProfile', JSON.stringify(currentProfile));
        this._authenticatedSubject.onNext(true);
    }

    userName = (): string => {
        var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile'));
        return userProfile.user.fullName;
    }

    user = (): any => {
        var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile'));
        return userProfile.user;
    }

    userEmail = (): string => {
        var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile'));
        return userProfile.user.email;
    }

    isDriver = (): boolean => {
        var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile'));
        return userProfile.isDriver;
    }

    isAuthenticated = (): boolean => {
        return this._isAuthenticated;
    }

    getAuthenticationStream = (): Rx.Observable<boolean> => {
        return this._authenticatedSubject.asObservable();
    }
}

 

Some common React / Bootstrap UI components

As with any UI work, you will inevitably end up where you need some core components that you re-use over and over again. For the demo app I had these 3 reusable React components that make use of React/React-Bootstrap

YesNoDialog

This represents a generic re-usable yes/no dialog that can be triggered, here is the code for this one. The important part is that the various prop values can be controlled via the parent component state values

import * as React from "react";
import * as ReactDOM from "react-dom";
import * as _ from "lodash";
 
import 'bootstrap/dist/css/bootstrap.css';
import
{
    Button, 
    Modal
} from "react-bootstrap";
 
 
//TODO : Fix this
export interface YesNoDialogProps {
    headerText: string;
    theId: string;
    launchButtonText: string;
    yesCallBack: any;
    noCallBack: any;
}
 
export interface YesNoDialogState {
    showModal: boolean;
}
 
 
export class YesNoDialog extends React.Component<YesNoDialogProps, YesNoDialogState> {
 
    constructor(props) {
        super(props);
        console.log(this.props);
        //set initial state
        this.state = {
            showModal: false
        };
    }
 
    _yesClicked = () => {
        this.setState({ showModal: false });
        this.props.yesCallBack();
    }
 
    _noClicked = () => {
        this.setState({ showModal: false });
        this.props.noCallBack();
    }
 
    _close = () => {
        this.setState({ showModal: false });
    }
 
    _open = () => {
        this.setState({ showModal: true });
    }
 
    render() {
        return (
            <div className="leftFloat">
 
                <Button
                    id={this.props.theId}
                    type='button'
                    bsSize='small'
                    bsStyle='primary'
                    onClick={this._open}>{this.props.launchButtonText}</Button>
 
                <Modal show={this.state.showModal} onHide={this._close}>
                    <Modal.Header closeButton>
                        <Modal.Title>{ this.props.headerText }</Modal.Title>
                    </Modal.Header>
                    <Modal.Body>
                        <h4>Are you sure?</h4>
                    </Modal.Body>
                    <Modal.Footer>
                        <Button
                            type='button'
                            bsSize='small'
                            bsStyle='primary'
                            onClick={this._yesClicked}>Yes</Button>
                        <Button
                            type='button'
                            bsSize='small'
                            bsStyle='danger'
                            onClick={this._noClicked}>Cancel</Button>
                    </Modal.Footer>
                </Modal>
            </div>
        );
    }
}

This looks like this when rendered

OkDialog

This represents a generic re-usable ok dialog that can be triggered, here is the code for this one. The important part is that the various prop values can be controlled via the parent component state values

import * as React from "react";
import * as ReactDOM from "react-dom";
import * as _ from "lodash";
 
import 'bootstrap/dist/css/bootstrap.css';
import
{
    Button, 
    Modal
} from "react-bootstrap";
 
 
//TODO : Fix this
export interface OkDialogProps {
    headerText: string;
    bodyText: string;
    open: boolean;
    okCallBack: any;
}
 
export interface OkDialogState {
    showModal: boolean;
}
 
 
export class OkDialog extends React.Component<OkDialogProps, OkDialogState> {
 
    constructor(props) {
        super(props);
        console.log(this.props);
        //set initial state
        this.state = {
            showModal: false
        };
    }
 
    componentDidMount() {
        if (this.props.open === true) {
            this.setState({ showModal: true });
        }
    }
 
    _okClicked = () => {
        this.setState({ showModal: false });
        this.props.okCallBack();
    }
 
    _close = () => {
        this.setState({ showModal: false });
        this.props.okCallBack();
    }
 
    _open = () => {
        this.setState({ showModal: true });
    }
 
    render() {
        return (
            <div className="leftFloat">
 
                <Modal show={this.state.showModal} onHide={this._close}>
                    <Modal.Header closeButton>
                        <Modal.Title>{ this.props.headerText }</Modal.Title>
                    </Modal.Header>
                    <Modal.Body>
                        <h4>{this.props.bodyText}</h4>
                    </Modal.Body>
                    <Modal.Footer>
                        <Button
                            type='button'
                            bsSize='small'
                            bsStyle='primary'
                            onClick={this._okClicked}>Ok</Button>
                    </Modal.Footer>
                </Modal>
            </div>
        );
    }
}

This looks like this when rendered

RatingDialog

This represents a generic re-usable rating control where rating can be from 1-5; here is the code for this one. The important part is that the various prop values can be controlled via the parent component state values

import * as React from "react";
import * as ReactDOM from "react-dom";
import * as _ from "lodash";
 
import 'bootstrap/dist/css/bootstrap.css';
import
{
    Button, 
    Modal
} from "react-bootstrap";
 
 
import ReactStars from 'react-stars';
 
 
export interface RatingDialogProps {
    headerText: string;
    theId: string;
    okCallBack: any;
}
 
export interface RatingDialogState {
    showModal: boolean;
    rating: number;
}
 
 
export class RatingDialog extends React.Component<RatingDialogProps, RatingDialogState> {
 
    constructor(props) {
        super(props);
        console.log(this.props); 
        //set initial state
        this.state = {
            showModal: false,
            rating:0
        };
    }
 
    _close = () => {
        this.setState(
            {
                showModal: false,
                rating:0
            }
        );
    }
 
    _open = () => {
        this.setState(
            {
                showModal: true,
                rating: 0
            }
        );
    }
 
    _ratingChanged = (newRating) => {
        console.log(newRating)
        this.setState(
            {
                rating: newRating
            }
        );
    }
 
    _okClicked = () => {
        this._close();
        this.props.okCallBack();
    }
 
    render() {
        return (
            <div className="leftFloat">
 
                <Button
                    id={this.props.theId}
                    type='button'
                    bsSize='small'
                    bsStyle='primary'
                    onClick={this._open}>Complete</Button>
 
                <Modal show={this.state.showModal} onHide={this._close}>
                    <Modal.Header closeButton>
                        <Modal.Title>{ this.props.headerText }</Modal.Title>
                    </Modal.Header>
                    <Modal.Body>
                        <h4>Give your rating between 1-5</h4>
                        <ReactStars count={5}
                                    onChange={this._ratingChanged}
                                    size={24}
                                    color2={'#ffd700'} />
                    </Modal.Body>
                    <Modal.Footer>
                        <Button
                            type='submit'
                            bsSize='small'
                            bsStyle='primary'
                            onClick={this._okClicked}>Ok</Button>
                    </Modal.Footer>
                </Modal>
            </div>
        );

This looks like this when rendered

For the rating component I make use of this React library : https://www.npmjs.com/package/react-stars

Registration workflow

The registration workflow is pretty much as described above, and uses the pieces shown above.

There are 2 types of registration, where we need to capture different data

  • Passenger registration
  • Driver registration

We will examine the passenger registration in detail, with the driver registration being fairly similar in nature

Passenger registration

Registration React component

PlayBackEndApi/FrontEndWebSite/src/PassengerRegistration.tsx is the main file for the React TSX representing the Passenger registration component.

import * as React from "react";
import * as ReactDOM from "react-dom";
import { OkDialog } from "./components/OkDialog";
import 'bootstrap/dist/css/bootstrap.css';
import {
    Well,
    Grid,
    Row,
    Col,
    ButtonInput
} from "react-bootstrap";
import { AuthService } from "./services/AuthService";
import { hashHistory } from 'react-router';
import { Form, ValidatedInput } from 'react-bootstrap-validation';
import revalidator from 'revalidator';


let schema = {
    properties: {
        fullName: {
            type: 'string',
            minLength: 8,
            maxLength: 60,
            required: true,
            allowEmpty: false
        },
        email: {
            type: 'string',
            maxLength: 255,
            format: 'email',
            required: true,
            allowEmpty: false
        },
        password: {
            type: 'string',
            minLength: 8,
            maxLength: 60,
            required: true,
            allowEmpty: false
        }
    }
};


export interface PassengerRegistrationProps {
    authService: AuthService;
}


export interface PassengerRegistrationState {
    okDialogOpen: boolean;
    okDialogKey: number;
    okDialogHeaderText: string;
    okDialogBodyText: string;
    wasSuccessful: boolean;
}

export class PassengerRegistration extends React.Component<PassengerRegistrationProps, PassengerRegistrationState> {

    constructor(props: any) {
        super(props);
        this.state = {
            okDialogHeaderText: '',
            okDialogBodyText: '',
            okDialogOpen: false,
            okDialogKey: 0,
            wasSuccessful: false
        };
    }

    render() {
        return (
            <Form className="submittable-form-inner"
                // Supply callbacks to both valid and invalid
                // submit attempts
                validateAll={this.validateForm}
                onInvalidSubmit={this.handleInvalidSubmit}
                onValidSubmit={this.handleValidSubmit}>
                <Grid>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <h4>Passenger details</h4>
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <ValidatedInput type='text'
                                label='FullName'
                                name='fullName'
                                errorHelp='FullName is invalid' />

                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <ValidatedInput type='text'
                                label='Email'
                                name='email'
                                errorHelp='Email address is invalid' />
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <ValidatedInput type='password'
                                label='Password'
                                name='password'
                                errorHelp='Password is invalid' />
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <ButtonInput
                                id="registerBtn"
                                type='submit'
                                bsSize='small'
                                bsStyle='primary'
                                value='Register'>Register</ButtonInput>
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <span>
                            <OkDialog
                                open={this.state.okDialogOpen}
                                okCallBack={this.okDialogCallBack}
                                headerText={this.state.okDialogHeaderText}
                                bodyText={this.state.okDialogBodyText}
                                key={this.state.okDialogKey} />
                        </span>
                    </Row>
                </Grid>
            </Form>
        )
    }

    validateForm = (values) => {
        let res = revalidator.validate(values, schema);

        // If the values passed validation, we return true
        if (res.valid) {
            return true;
        }

        // Otherwise we should return an object containing errors
        // e.g. { email: true, password: true }
        return res.errors.reduce((errors, error) => {
            // Set each property to either true or
            // a string error description
            errors[error.property] = true;

            return errors;
        }, {});
    }

    handleInvalidSubmit = (errors, values) => {
        // Errors is an array containing input names
        // that failed to validate
        this.setState(
            {
                okDialogHeaderText: 'Validation Error',
                okDialogBodyText: 'Form has errors and may not be submitted',
                okDialogOpen: true,
                okDialogKey: Math.random()
            });
    }

    handleValidSubmit = (values) => {
        var passenger = values;
        var self = this;

        $.ajax({
            type: 'POST',
            url: 'registration/save/passenger',
            data: JSON.stringify(passenger),
            contentType: "application/json; charset=utf-8",
            dataType: 'json'
        })
        .done(function (jdata, textStatus, jqXHR) {
            var redactedPassenger = passenger;
            redactedPassenger.password = "";
            console.log("redacted ${redactedPassenger}");
            console.log(redactedPassenger);
            console.log("Auth Service");
            console.log(self.props.authService);
            let userProfile = {
                isDriver: false,
                user: redactedPassenger
            };
            self.setState(
                {
                    wasSuccessful: true,
                    okDialogHeaderText: 'Registration Successful',
                    okDialogBodyText: 'You are now registered',
                    okDialogOpen: true,
                    okDialogKey: Math.random()
                });
            self.props.authService.storeUser(userProfile);
        })
        .fail(function (jqXHR, textStatus, errorThrown) {
            self.setState(
                {
                    okDialogHeaderText: 'Error',
                    okDialogBodyText: jqXHR.responseText,
                    okDialogOpen: true,
                    okDialogKey: Math.random()
                });
        });
    }

    okDialogCallBack = () => {
        this.setState(
            {
                okDialogOpen: false
            });

        if (this.state.wasSuccessful) {
            hashHistory.push('/');
        }
    }
}

There are a couple of things of note there

  • That we use a standard POST
  • That will post the registration data as JSON to the Play API backend code
  • That we also show a standard Bootstrap OkDialog which we looked at last time, which when the Ok button is clicked will use the React Router to navigate to the route page
  • That we use the react-bootstrap-validation package for field validation
  • That if the registration process is ok we store the user data in the AuthorisationService.ts that we saw earlier, where it in turn writes to the browser sessionStorage

Lets now turn our attention to the Play API backend code that goes with the Passenger Registration

Json Support

We need to start with the JSON suport from JavaScript to Scala. This is done using this file PlayBackEndApi/app/Entities/PassengerRegistrationEntities.scala, where we use Play framework JSON combinators

  • Reads : Which allows reading a JSON string into a Scala object
  • Writes : Which allows a Scala object to be turned into a JSON string
  • Format : is just a mix of the Reads and Writes Traits and can be used for implicit conversion in place of its components.

The recommendation for both of these is that they are exposed as implicit vals you can read more about it here : https://www.playframework.com/documentation/2.6.x/ScalaJsonCombinators

package entities

import play.api.libs.json._
import play.api.libs.functional.syntax._

case class PassengerRegistration(
  fullName: String,
  email: String,
  password: String)

object PassengerRegistration {
  implicit val formatter = Json.format[PassengerRegistration]
}

object PassengerRegistrationJsonFormatters {

  implicit val passengerRegistrationWrites = new Writes[PassengerRegistration] {
    def writes(passengerRegistration: PassengerRegistration) = Json.obj(
      "fullName" -> passengerRegistration.fullName,
      "email" -> passengerRegistration.email,
      "password" -> passengerRegistration.password
    )
  }

  implicit val passengerRegistrationReads: Reads[PassengerRegistration] = (
    (JsPath \ "fullName").read[String] and
      (JsPath \ "email").read[String] and
      (JsPath \ "password").read[String]
    )(PassengerRegistration.apply _)
}

Registration route

Once we have that in place we need to turn our attention to the actual endpoint to support the POST of a PassengerRegistration object. We first need to set up the route in the conf/routes file as follows:

# Registration page
POST  /registration/save/passenger             controllers.RegistrationController.savePassengerRegistration()

Registration controller

So now that we have talked about the JSON Reads/Writes and we know that we need Mongo downloaded and running (see how do I run this section at start), lets see what the actual controller looks like shall we. Here is the FULL code for the Passenger Registration portion of the PlayBackEndApi/app/controllers/RegistrationController.scala

package controllers

import javax.inject.Inject
import play.api.mvc.{Action, Controller, Result}
import entities._
import entities.DriverRegistrationJsonFormatters._
import entities.PassengerRegistrationJsonFormatters._
import scala.concurrent.{ExecutionContext, Future}
import play.modules.reactivemongo._
import play.api.Logger
import utils.Errors
import play.api.libs.json._
import reactivemongo.api.ReadPreference
import reactivemongo.play.json._
import collection._

class RegistrationController @Inject()
  (val reactiveMongoApi: ReactiveMongoApi)
  (implicit ec: ExecutionContext)
  extends Controller with MongoController with ReactiveMongoComponents {

  def passRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("passenger-registrations"))
  def driverRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("driver-registrations"))


  def savePassengerRegistration = Action.async(parse.json) { request =>
    Json.fromJson[PassengerRegistration](request.body) match {
      case JsSuccess(newPassRegistration, _) =>

        //https://github.com/ReactiveMongo/ReactiveMongo-Extensions/blob/0.10.x/guide/dsl.md
        val query = Json.obj("email" -> Json.obj("$eq" -> newPassRegistration.email))

        dealWithRegistration[PassengerRegistration](
          newPassRegistration,
          passRegistrationFuture,
          query,
          PassengerRegistration.formatter)
      case JsError(errors) =>
        Future.successful(BadRequest("Could not build a PassengerRegistration from the json provided. " +
          Errors.show(errors)))
    }
  }

  private def dealWithRegistration[T](
          incomingRegistration: T,
          jsonCollectionFuture: Future[JSONCollection],
          query: JsObject,
          formatter: OFormat[T])
          (implicit ec: ExecutionContext): Future[Result] = {

    def hasExistingRegistrationFuture = jsonCollectionFuture.flatMap {
        //http://reactivemongo.org/releases/0.11/documentation/advanced-topics/collection-api.html
        _.find(query)
        .cursor[JsObject](ReadPreference.primary)
        .collect[List]()
      }.map(_.length match {
          case 0 => false
          case _ => true
      }
    )

    hasExistingRegistrationFuture.flatMap {
      case false => {
        for {
          registrations <- jsonCollectionFuture
          writeResult <- registrations.insert(incomingRegistration)(formatter,ec)
        } yield {
          Logger.debug(s"Successfully inserted with LastError: $writeResult")
          Ok(Json.obj())
        }
      }
      case true => Future(BadRequest("Registration already exists"))
    }
  }
}

Lets break this down into chunks

  • The controller constructor
    • This takes a ReactiveMongoApi (this is mandatory to satisfy the base trait MongoController requirements)
    • Inherits from MongoController which provides a lot of use functionality
    • It also inherits from ReactiveMongoComponents in order to allow the cake pattern/self typing requirements of the base MongoController which expects a ReactiveMongoComponents
  • The use of JSONCollection
    • There is a Future[JSONCollection] that represents the passenger collection in Mongo. This is a collection that stores JSON. When using reactive Mongo you have a choice about whether to use the standard BSON collections of JSON. I opted for JSON
  • The Guts Of The Logic
    • So now we have discussed the controller constructor and the Mongo collections. We just need to talk about the actual work that happens on registration. In a nutshell it works like this
      • The incoming JSON string is turned into a PassengerRegistration object via Play
      • We then create a new JSON query object to query the Mongo JSONCollection to see if a registration already exists
      • If a registration already exists we exit with a BadRequest output
      • If a registration does NOT already exist we insert the new registration details into the Mongo JSONCollection , and the we return an Ok output

And that is how the Passenger Registration works.

 

Driver registration

The driver registration works in much the same way as described above, its just slightly different JSON, but it does share the same core logic/controller as the Passenger Registration

 

Login workflow

The login uses the pieces shown above.

This workflow is VERY similar to registration, so feel free to skip this section if you think you understand how registration concepts work

Login React component

This is the main PlayBackEndApi/FrontEndWebSite/src/Login.tsx file for the React TSX representing the login component.

import * as React from "react";
import * as ReactDOM from "react-dom";
import { OkDialog } from "./components/OkDialog";
import 'bootstrap/dist/css/bootstrap.css';
import {
    Well,
    Grid,
    Row,
    Col,
    ButtonInput
} from "react-bootstrap";
import { Form, ValidatedInput } from 'react-bootstrap-validation';
import revalidator from 'revalidator';
import { AuthService } from "./services/AuthService";

let schema = {
    properties: {
        email: {
            type: 'string',
            maxLength: 255,
            format: 'email',
            required: true,
            allowEmpty: false
        },
        password: {
            type: 'string',
            minLength: 8,
            maxLength: 60,
            required: true,
            allowEmpty: false
        }
    }
};

export interface LoginState {
    okDialogOpen: boolean;
    okDialogKey: number;
    okDialogHeaderText: string;
    okDialogBodyText: string;
}

export class Login extends React.Component<undefined, LoginState> {

    private _authService: AuthService;

    constructor(props: any) {
        super(props);
        console.log(props);
        this._authService = props.route.authService;
        this.state = {
            okDialogHeaderText: '',
            okDialogBodyText: '',
            okDialogOpen: false,
            okDialogKey: 0
        };
    }

    render() {
        return (
            <Well className="outer-well">
                <Form
                    // Supply callbacks to both valid and invalid
                    // submit attempts
                    validateAll={this.validateForm}
                    onInvalidSubmit={this.handleInvalidSubmit}
                    onValidSubmit={this.handleValidSubmit}>
                    <Grid>
                        <Row className="show-grid">
                            <Col xs={10} md={6}>
                                <h4>ENTER YOUR LOGIN DETAILS</h4>
                                <span><h6>Or click <a href="#/register">here</a> to register</h6></span>
                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <Col xs={10} md={6}>
                                <ValidatedInput type='text'
                                    label='Email'
                                    name='email'
                                    errorHelp='Email address is invalid' />

                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <Col xs={10} md={6}>
                                <ValidatedInput type='password'
                                    name='password'
                                    label='Password'
                                    errorHelp='Password is invalid' />

                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <Col xs={10} md={6}>
                                <ValidatedInput
                                    type='checkbox'
                                    name='isDriver'
                                    label='Are you a driver?'
                                />
                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <Col xs={10} md={6}>
                                <ButtonInput
                                    id="loginBtn"
                                    type='submit'
                                    bsSize='small'
                                    bsStyle='primary'
                                    value='Register'>Login</ButtonInput>
                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <span>
                                <OkDialog
                                    open={this.state.okDialogOpen}
                                    okCallBack={this.okDialogCallBack}
                                    headerText={this.state.okDialogHeaderText}
                                    bodyText={this.state.okDialogBodyText}
                                    key={this.state.okDialogKey} />
                            </span>
                        </Row>
                    </Grid>
                </Form>
            </Well>
        )
    }

    validateForm = (values) => {
        let res = revalidator.validate(values, schema);

        // If the values passed validation, we return true
        if (res.valid) {
            return true;
        }

        // Otherwise we should return an object containing errors
        // e.g. { email: true, password: true }
        return res.errors.reduce((errors, error) => {
            // Set each property to either true or
            // a string error description
            errors[error.property] = true;

            return errors;
        }, {});
    }

    handleInvalidSubmit = (errors, values) => {

        console.log(values);

        // Errors is an array containing input names
        // that failed to validate
        this.setState(
            {
                okDialogHeaderText: 'Validation Error',
                okDialogBodyText: 'Form has errors and may not be submitted',
                okDialogOpen: true,
                okDialogKey: Math.random()
            });
    }

    handleValidSubmit = (values) => {
        var logindetails = values;
        var self = this;

        $.ajax({
            type: 'POST',
            url: 'login/validate',
            data: JSON.stringify(logindetails),
            contentType: "application/json; charset=utf-8",
            dataType: 'json'
        })
        .done(function (jdata, textStatus, jqXHR) {

            console.log("result of login");
            console.log(jqXHR.responseText);
            let currentUser = JSON.parse(jqXHR.responseText);
            let userProfile = {
                isDriver: logindetails.isDriver,
                user: currentUser
            };
            self._authService.storeUser(userProfile);

            self.setState(
                {
                    okDialogHeaderText: 'Login Successful',
                    okDialogBodyText: 'You are now logged in',
                    okDialogOpen: true,
                    okDialogKey: Math.random()
                });
        })
        .fail(function (jqXHR, textStatus, errorThrown) {
            self.setState(
                {
                    okDialogHeaderText: 'Error',
                    okDialogBodyText: jqXHR.responseText,
                    okDialogOpen: true,
                    okDialogKey: Math.random()
                });
        });
    }

    okDialogCallBack = () => {
        this.setState(
            {
                okDialogOpen: false
            });
    }
}

There are a couple of things of note there

  • That we use a standard POST
  • That will post the login data as JSON to the Play API backend code
  • That we use the react-bootstrap-validation package for field validation
  • That if the login process is okay we store the user data in the AuthorisationService.ts that we saw earlier, where it in turn writes to the browser sessionStorage

Lets now turn our attention to the Play API backend code that goes with the Login

Json Support

We need to start with the JSON support from JavaScript to Scala. This is done using this file PlayBackEndApi/app/Entities/LoginEntities.scala, where we use Play framework JSON combinators

  • Reads : Which allows reading a JSON string into a Scala object
  • Writes : Which allows a Scala object to be turned into a JSON string
  • Format : is just a mix of the Reads and Writes Traits and can be used for implicit conversion in place of its components.

The recommendation for both of these is that they are exposed as implicit vals you can read more about it here : https://www.playframework.com/documentation/2.6.x/ScalaJsonCombinators

package entities

import play.api.libs.json._
import play.api.libs.functional.syntax._

case class Login(email: String, password: String, isDriver: Boolean)

object Login {
  implicit val formatter = Json.format[Login]
}

object LoginJsonFormatters {

  implicit val loginWrites = new Writes[Login] {
    def writes(login: Login) = Json.obj(
      "email" -> login.email,
      "password" -> login.password,
      "isDriver" -> login.isDriver
    )
  }

  implicit val loginReads: Reads[Login] = (
      (JsPath \ "email").read[String] and
      (JsPath \ "password").read[String] and
      ((JsPath \ "isDriver").read[Boolean])
    )(Login.apply _)
}

Login route

Once we have that in place we need to turn our attention to the actual endpoint to support the POST of a Login object. We first need to set up the route in the conf/routes file as follows:

# Login page
POST  /login/validate                          controllers.LoginController.validateLogin()

Login controller

So now that we have talked about the JSON Reads/Writes and we know that we need Mongo downloaded and running (see how do I run this section at start), lets see what the actual controller looks like shall we. Here is the FULL code for the PlayBackEndApi/app/controllers/LoginController.scala

package controllers

import javax.inject.Inject
import entities.DriverRegistrationJsonFormatters._
import entities.PassengerRegistrationJsonFormatters._
import entities._
import play.api.Logger
import play.api.libs.json._
import play.api.mvc.{Action, Controller, Result}
import play.modules.reactivemongo._
import reactivemongo.api.ReadPreference
import reactivemongo.play.json._
import reactivemongo.play.json.collection._
import utils.Errors


import scala.concurrent.{ExecutionContext, Future}

class LoginController @Inject()
  (val reactiveMongoApi: ReactiveMongoApi)
  (implicit ec: ExecutionContext)
  extends Controller with MongoController with ReactiveMongoComponents {

  def passRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("passenger-registrations"))
  def driverRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("driver-registrations"))


  def validateLogin = Action.async(parse.json) { request =>
    Json.fromJson[Login](request.body) match {
      case JsSuccess(newLoginDetails, _) =>
        newLoginDetails.isDriver match {
          case false => {
            val maybePassengerReg = extractExistingRegistration(
              passRegistrationFuture.flatMap {
                _.find(Json.obj("email" -> Json.obj("$eq" -> newLoginDetails.email))).
                  cursor[JsObject](ReadPreference.primary).
                  collect[List]()
              })
            returnRedactedRegistration[PassengerRegistration](
              maybePassengerReg,
              (reg: PassengerRegistration) => Ok(Json.toJson(reg.copy(password = "")))
            )

          }
          case true => {
            val maybeDriverReg = extractExistingRegistration(
              driverRegistrationFuture.flatMap {
                _.find(Json.obj("email" -> Json.obj("$eq" -> newLoginDetails.email))).
                  cursor[JsObject](ReadPreference.primary).
                  collect[List]()
              })
            returnRedactedRegistration[DriverRegistration](
              maybeDriverReg,
              (reg: DriverRegistration) => Ok(Json.toJson(reg.copy(password = "")))
            )
          }
        }
      case JsError(errors) =>
        Future.successful(BadRequest("Could not build a Login from the json provided. " +
          Errors.show(errors)))
    }
  }

  private def returnRedactedRegistration[T]
  (
    maybeDriverRegFuture: Future[Option[JsObject]],
    redactor : T => Result
  )(implicit reads: Reads[T]): Future[Result] = {
    maybeDriverRegFuture.map {
      case Some(json) => {
        val reg = Json.fromJson[T](json)
        reg match {
          case JsSuccess(reg, _) => {
            redactor(reg)
          }
          case _ => BadRequest("Login already exists")
        }
      }
      case None => BadRequest("Could not find login")
    }
  }

  private def extractExistingRegistration[T]
  (incomingRegistrations: Future[List[T]])
  (implicit writes: Writes[T], ec: ExecutionContext): Future[Option[T]] = {
    incomingRegistrations.map(matchedRegistrations =>
      matchedRegistrations.length match {
        case 0 => None
        case _ => Some(matchedRegistrations(0))
      }
    )
  }
}

This is similar enough in nature to the registration process, so I will not go into this any further.

CreateJob workflow

The CreateJob uses the pieces shown above, and is intended to work like this

CreatJob React component

This is the code for the CreateJob React component

import * as React from "react";
import * as ReactDOM from "react-dom";
import * as _ from "lodash";
import Measure from 'react-measure'
import { OkDialog } from "./components/OkDialog";
import 'bootstrap/dist/css/bootstrap.css';
import {
    Well,
    Grid,
    Row,
    Col,
    ButtonInput,
    ButtonGroup,
    Button
} from "react-bootstrap";
import { AuthService } from "./services/AuthService";
import { JobService } from "./services/JobService";
import { PositionService } from "./services/PositionService";
import { UUIDService } from "./services/UUIDService";
import { Position } from "./domain/Position";
import { hashHistory } from 'react-router';
import { withGoogleMap, GoogleMap, Marker, InfoBox, OverlayView } from "react-google-maps";

const STYLES = {
    overlayView: {
        background: `white`,
        border: `1px solid #ccc`,
        padding: 15,
    },
    icon: {
        marginTop: 5,
        marginBottom: 5,
        marginLeft: 20
    }
}


const GetPixelPositionOffset = (width, height) => {
    return { x: -(width / 2), y: -(height / 2) };
}

const CreateJobGoogleMap = withGoogleMap(props => (
    <GoogleMap
        ref={props.onMapLoad}
        defaultZoom={16}
        defaultCenter={{ lat: 50.8202949, lng: -0.1406958 }}
        onClick={props.onMapClick}>

        <OverlayView
            key='createJobKey'
            mapPaneName={OverlayView.OVERLAY_MOUSE_TARGET}
            position={props.currentPosition}
            getPixelPositionOffset={GetPixelPositionOffset}>
            <div style={STYLES.overlayView}>
                <img style={STYLES.icon}
                    src='/assets/images/passenger.png' />
                <br />
                <Button
                    type='button'
                    bsSize='xsmall'
                    bsStyle='primary'
                    onClick={() => props.onCreateJobClick()}
                    disabled={props.hasIssuedJob}
                    value='Create Job'>Create Job</Button>
            </div>
        </OverlayView>
    </GoogleMap>
));


export interface CreateJobState {
    currentPosition: Position;
    dimensions: {
        width: number,
        height: number
    };
    hasIssuedJob: boolean;
    okDialogOpen: boolean;
    okDialogKey: number;
    okDialogHeaderText: string;
    okDialogBodyText: string;
    wasSuccessful: boolean;
}

export class CreateJob extends React.Component<undefined, CreateJobState> {

    private _authService: AuthService;
    private _jobService: JobService;
    private _positionService: PositionService;

    constructor(props: any) {
        super(props);
        this._jobService = props.route.jobService;
        this._authService = props.route.authService;
        this._positionService = props.route.positionService;
        console.log(this._authService.userName());
        console.log(this._authService.userEmail());
        console.log("CreateJob ctor");
        console.log(this._jobService);

        if (!this._authService.isAuthenticated()) {
            hashHistory.push('/');
        }

        if (this._authService.isDriver()) {
            hashHistory.push('/viewjob');
        }

        this.state = {
            currentPosition: new Position(50.8202949, -0.1406958),
            dimensions: { width: -1, height: -1 },
            hasIssuedJob: this._jobService.hasIssuedJob(),
            okDialogHeaderText: '',
            okDialogBodyText: '',
            okDialogOpen: false,
            okDialogKey: 0,
            wasSuccessful: false
        };
    }

    render() {

        const adjustedwidth = this.state.dimensions.width;

        return (
            <Well className="outer-well">
                <Grid>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <h4>SET YOUR CURRENT LOCATION</h4>
                            <h6>Click the map to set your current location</h6>
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>

                            <Measure
                                bounds
                                onResize={(contentRect) => {
                                    this.setState({ dimensions: contentRect.bounds })
                                }}
                            >
                                {({ measureRef }) =>
                                    <div ref={measureRef}>
                                        <CreateJobGoogleMap
                                            containerElement={
                                                <div style={{
                                                    position: 'relative',
                                                    top: 0,
                                                    left: 0,
                                                    right: 0,
                                                    bottom: 0,
                                                    justifyContent: 'flex-end',
                                                    alignItems: 'center',
                                                    width: { adjustedwidth },
                                                    height: 600,
                                                    marginTop: 20,
                                                    marginLeft: 0,
                                                    marginRight: 0,
                                                    marginBottom: 20
                                                }} />
                                            }
                                            mapElement={
                                                <div style={{
                                                    position: 'relative',
                                                    top: 0,
                                                    left: 0,
                                                    right: 0,
                                                    bottom: 0,
                                                    width: { adjustedwidth },
                                                    height: 600,
                                                    marginTop: 20,
                                                    marginLeft: 0,
                                                    marginRight: 0,
                                                    marginBottom: 20
                                                }} />
                                            }
                                            onMapLoad={this.handleMapLoad}
                                            onMapClick={this.handleMapClick}
                                            currentPosition={this.state.currentPosition}
                                            onCreateJobClick={this.handleCreateJobClick}
                                            hasIssuedJob={this.state.hasIssuedJob}
                                        />
                                    </div>
                                }
                            </Measure>
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <span>
                            <OkDialog
                                open={this.state.okDialogOpen}
                                okCallBack={this.okDialogCallBack}
                                headerText={this.state.okDialogHeaderText}
                                bodyText={this.state.okDialogBodyText}
                                key={this.state.okDialogKey} />
                        </span>
                    </Row>
                </Grid>
            </Well>
        );
    }

    handleCreateJobClick = () => {

        var self = this;
        var currentUser = this._authService.user();

        var newJob = {
            jobUUID: UUIDService.createUUID(),
            clientFullName: currentUser.fullName,
            clientEmail: currentUser.email,
            clientPosition: {
                latitude: self.state.currentPosition.latitude,
                longitude: self.state.currentPosition.longitude
            },
            driverFullName: '',
            driverEmail: '',
            vehicleDescription: '',
            vehicleRegistrationNumber: '',
            isAssigned: false,
            isCompleted: false
        }

        $.ajax({
            type: 'POST',
            url: 'job/submit',
            data: JSON.stringify(newJob),
            contentType: "application/json; charset=utf-8",
            dataType: 'json'
        })
        .done(function (jdata, textStatus, jqXHR) {
            self._jobService.storeUserIssuedJob(newJob);
            const newState = Object.assign({}, self.state, {
                hasIssuedJob: self._jobService.hasIssuedJob()
            });
            self.setState(newState)
            self._positionService.storeUserPosition(self.state.currentPosition);
            hashHistory.push('/viewjob');
        })
        .fail(function (jqXHR, textStatus, errorThrown) {
            const newState = Object.assign({}, self.state, {
                okDialogHeaderText: 'Error',
                okDialogBodyText: jqXHR.responseText,
                okDialogOpen: true,
                okDialogKey: Math.random()
            })
            self.setState(newState)
        });
    }

    okDialogCallBack = () => {
        this.setState(
            {
                okDialogOpen: false
            });
    }

    handleMapLoad = (map) => {
        if (map) {
            console.log(map.getZoom());
        }
    }

    handleMapClick = (event) => {
        const newState = Object.assign({}, this.state, {
            currentPosition: new Position(event.latLng.lat(), event.latLng.lng())
        })
        this.setState(newState)
    }
}

As you can see this component makes use of a Google Maps component. I chose this one to use : https://tomchentw.github.io/react-google-maps/.

One thing to note is that as I wanted the map to be responsive but the Google Map component I chose needed to have a fixed size, I had to use another React library to allow me to measure the screen at runtime, and adjust the React Google Maps component on the fly. This Measure package is this one : https://github.com/souporserious/react-measure which allows you to decorate/wrap your component in a measure type component.

 

As before we send the actual job JSON payload, but before doing that we need to allow the user to specify their position such that the position can be retrieved later

Once the client sets their OWN position, they are able to create a job, and push out a new job. If they already have a job in flight the client is NOT able to create a new job

To deal with the users current position, I also created this simple domain object and service class

export class Position {

    //my JSON API prefers nice names
    latitude: number;
    longitude: number;

    //map component wants these abbreviated names
    lat: number;
    lng: number;

    constructor(latitude: number, longitude: number) {
        this.latitude = latitude;
        this.longitude = longitude;

        //keep map happy
        this.lat = latitude;
        this.lng = longitude;
    }
   
}
import { injectable, inject } from "inversify";
import { Position } from "../domain/Position";
import { PositionMarker } from "../domain/PositionMarker";
import { TYPES } from "../../src/types";
import { AuthService } from "./AuthService";


@injectable()
export class PositionService {

    private _authService: AuthService;

    constructor( @inject(TYPES.AuthService) authService: AuthService) {
        this._authService = authService;
    }

    clearUserJobPositions = (): void => {
        let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail();
        sessionStorage.removeItem(keyCurrentUserJobPositions);
    }

    storeUserJobPositions = (jobPositions: Array<PositionMarker>): void => {

        if (jobPositions == null || jobPositions == undefined)
            return;

        let currentUsersJobPositions = {
            currentUser: this._authService.user(),
            jobPositions: jobPositions
        }
        let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail();
        sessionStorage.setItem(keyCurrentUserJobPositions, JSON.stringify(currentUsersJobPositions));
    }

    userJobPositions = (): Array<PositionMarker> => {
        let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail();
        var currentUserJobPositions = JSON.parse(sessionStorage.getItem(keyCurrentUserJobPositions));
        return currentUserJobPositions.jobPositions;
    }

    hasJobPositions = (): boolean => {
        let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail();
        var currentUserJobPositions = JSON.parse(sessionStorage.getItem(keyCurrentUserJobPositions));
        return currentUserJobPositions != null && currentUserJobPositions != undefined;
    }

    clearUserPosition = (): void => {
        let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail();
        sessionStorage.removeItem(keyCurrentUserPosition);
    }

    storeUserPosition = (position: Position): void => {

        if (position == null || position == undefined)
            return;

        let currentUsersPosition = {
            currentUser: this._authService.user(),
            position: position
        }
        let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail();
        sessionStorage.setItem(keyCurrentUserPosition, JSON.stringify(currentUsersPosition));
    }

    currentPosition = (): Position => {
        let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail();
        var currentUsersPosition = JSON.parse(sessionStorage.getItem(keyCurrentUserPosition));
        return currentUsersPosition.position;
    }

    hasPosition = (): boolean => {
        let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail();
        var currentUsersPosition = JSON.parse(sessionStorage.getItem(keyCurrentUserPosition));
        return currentUsersPosition != null && currentUsersPosition != undefined;
    }
}

JSON Payload

As before we send this across to the Play back end using this route

POST  /job/submit                              controllers.JobController.submitJob()

JobController

Ok so we now know that we have a new endpoint that can accept a job JSON object. What does it do with this Job JSON object. Well quite simply it does this

  • Converts the JSON into a Scala object
  • Sends it out over Kafka using Reactive Kafka publisher

You may be asking yourself why we want to burden ourselves with Kafka here at all if all we are going to do is get a Job JSON payload in them send it out via Kafka only to have it come back in via Kafka. This seems weird, so why bother? The reason we want to involve Kafka here, is for the audit an commit log facility that it provided. We want a record of the events and thats what Kafka gives us, a nice append only log

Anyway what does the new endpoint code look like that accepts the job. Here it is

package controllers

import javax.inject.Inject

import entities.Job
import entities.JobJsonFormatters._
import entities._
import actors.job.{JobConsumerActor, JobProducerActor}
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.scaladsl.{BroadcastHub, Keep, MergeHub}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.http.ContentTypes
import play.api.libs.Comet
import play.api.libs.json._
import play.api.libs.json.Json
import play.api.libs.json.Format
import play.api.libs.json.JsSuccess
import play.api.libs.json.Writes
import play.api.mvc.{Action, Controller}
import utils.Errors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class JobController @Inject()
(
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext
) extends Controller
{
  val rand = new Random()

  //Error handling for streams
  //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html
  val decider: Supervision.Decider = {
    case _ => Supervision.Restart
  }

  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

  val (sink, source) =
    MergeHub.source[JsValue](perProducerBufferSize = 16)
      .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
      .run()

  //job producer
  val childJobProducerActorProps = Props(classOf[JobProducerActor],mat,ec)
  val jobProducerSupervisorProps = createBackoffSupervisor(childJobProducerActorProps,
    s"JobProducerActor_${rand.nextInt()}")
  val jobProducerSupervisorActorRef = actorSystem.actorOf(jobProducerSupervisorProps,
    name = "jobProducerSupervisor")

  //job consumer
  val childJobConsumerActorProps = Props(new JobConsumerActor(sink)(mat,ec))
  val jobConsumerSupervisorProps = createBackoffSupervisor(childJobConsumerActorProps,
    s"JobConsumerActor_${rand.nextInt()}")
  val jobConsumerSupervisorActorRef = actorSystem.actorOf(jobConsumerSupervisorProps,
    name = "jobConsumerSupervisor")
  jobConsumerSupervisorActorRef ! Init


  def streamedJob() = Action {
    Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML)
  }


  def submitJob = Action.async(parse.json) { request =>
    Json.fromJson[Job](request.body) match {
      case JsSuccess(job, _) => {
        jobProducerSupervisorActorRef ! job
        Future.successful(Ok(Json.toJson(job.copy(clientEmail = job.clientEmail.toUpperCase))))
      }
      case JsError(errors) =>
        Future.successful(BadRequest("Could not build a Job from the json provided. " +
          Errors.show(errors)))
    }
  }


  private def createBackoffSupervisor(childProps:Props, actorChildName: String) : Props = {
    BackoffSupervisor.props(
      Backoff.onStop(
        childProps,
        childName = actorChildName,
        minBackoff = 3.seconds,
        maxBackoff = 30.seconds,
        randomFactor = 0.2
      ).withSupervisorStrategy(
        OneForOneStrategy() {
          case _ => SupervisorStrategy.Restart
        })
    )
  }

}

There is a fair bit going on in that code. Lets dissect it a bit

  • We create a backoff supervisor for both the Kafka producer/consumer actors
  • We create a stream that is capable of writing to the Comet frame socket
  • We provide the sink side (MergeHub) of the stream to the consumer actor, such that when it reads a value from Kafka it will be pumped into the sink which will then travel through the Akka stream back to the web page via the BroadcastHub and Comet forever frame back to the HTML (and ultimately RxJs Subject)

Push the consumed Job out of the forever frame (Comet functionality in Play backend)

Okay, so we just saw how the 2 actors are created under back off supervisors, and how the consumer (the one that reads from Kafka) gets the ability to essentially write back to the forever frame in the HTML which is like this

<iframe id="comet" src="/job/streamedJob"></iframe>

Where the actual route for Play framework is configured as this

So how does the job go out into Kafka land?

That part is quite simple, here it is

GET   /job/streamedJob                         controllers.JobController.streamedJob()

And this is the relevant part of the JobController that deals with exposing the comet stream to the browser

val (sink, source) =
    MergeHub.source[JsValue](perProducerBufferSize = 16)
      .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
      .run()
	  
....
....
....
....
	  
	  
def streamedJob() = Action {
    Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML)
  }	  

So diving back into what happens when a new Job comes in, we can see that this goes through the JobProducerActor below

ackage actors.job

import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.Done
import akka.actor.{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Keep, MergeHub, Source}
import akka.stream.{ActorMaterializer, KillSwitches}
import entities.Job
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}


class JobProducerActor(
  implicit materializer: ActorMaterializer,
  ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Job]
  val jobProducerSettings = ProducerSettings(
    context.system,
    new StringSerializer,
    new ByteArraySerializer)
    .withBootstrapServers(s"${Settings.bootStrapServers}")

  val ((mergeHubSink, killswitch), kafkaSourceFuture) =
    MergeHub.source[Job](perProducerBufferSize = 16)
      .map(job => {
        val jobBytes = jSONSerde.serializer().serialize("", job)
        (job, jobBytes)
      })
      .map { jobWithBytes =>
        val (job, jobBytes) = jobWithBytes
        new ProducerRecord[String, Array[Byte]](
          JobTopics.JOB_SUBMIT_TOPIC, job.clientEmail, jobBytes)
      }
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Producer.plainSink(jobProducerSettings))(Keep.both)
      .run()

  kafkaSourceFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => {
      self ! PoisonPill
    }
  }

  override def postStop(): Unit = {
    super.postStop()
    println(s"JobProducerActor seen 'Done'")
    killswitch.shutdown()
  }

  override def receive: Receive = {
    case (job: Job) => {
      println(s"JobProducerActor seen ${job}")
      Source.single(job).runWith(mergeHubSink)
    }
    case Done => {
      println(s"JobProducerActor seen 'Done'")
      killswitch.shutdown()
      self ! PoisonPill
    }
  }
}

Ill be honest there is a fair bit going on in that small chunk of code above. What is happening exactly?

  • The most important point is that we simply use the actor as a vessel to host a reactive Kafka Akka stream RunnableGraph representing a Graph of MergeHub  > Reactive Kafka producer sink. This is completely fine and a normal thing to do. Discussing Akka streams is out of scope for this article but if you want to know more you can read more on a previous post I did here : https://sachabarbs.wordpress.com/2016/12/13/akka-streams/
  •  So we now know this actor hosts a stream, but the stream could fail, or the actor could fail. So what we want is if the actor fails the stream is stopped, and if the stream fails the actor is stopped. To do that we need to do a couple of thing
    • STREAM FAILING : Since the RunnableGraph can return a Future[T] we can hook a callback Success/Failure on that, and send a PoisonPill to the hosting actor. Then the supervisor actor we saw above would kick in and try and create a new instance of this actor. Another thing to note is that the stream hosted in this actor uses the ActorMaterializer that was supplied by the JobController, where we provided a restart supervision strategy for the stream.
    • ACTOR FAILING : If the actor itself fails the Akka framework will call the postStop() method, at which point we want to shutdown the stream within this actor. So how can we shutdown the hosted stream? Well see in the middle of the stream setup there is this line .viaMat(KillSwitches.single)(Keep.both) this allows us to get a killswitch from the materialized values for the stream. Once we have a KillSwitch we can simply call its shutDown() method.
    • BELTS AND BRACES : I have also provided a way for the outside world to shutdown this actor and its hosted stream. This is via sending this actor a Done message. I have not put this in yet, but the hook is there to demonstrate how you could do this.
  • We can see that there is a MergeHub source which allows external code to push stuff through the MergeHub via the materialized Sink value from within the actor
  • We can also see that the Job object that the actor sees is indeed pushed into the MergeHub materialized Sink via this actor, and then some transformation is done on it, to grab its raw bytes
  • We can see the final stage in the RunnableGraph is the Reactive Kafka Producer.plainSink. Which would result in a message being pushed out to a Kafka topic from the hosted stream, pushed Job object from this actor into the stream And I think that is the main set of points about how this actor works

Consume the Job over a Kafka Topic (using Akka Streams / Reactive Kafka)

Lets see the JobConsumerActor which takes a Akka Stream Sink (MergeHub from JobController) and pushes the value out to it, when it sees a new value from Kafka on the job topic job-submit-topic. This then travels through the Akka stream where it goes via the BroadcastHub out to the forever from in the HTML.


Here is the code, it may look scary but really its just reading a value of the Kafka topic and pushing it out via the Sink (MergeHub) where the sink is hooked up to a forever frame that Play framework supports. Where the forever frame is open and waiting for data to be sent to it via a comet endpoint which you can read more about here : https://www.playframework.com/documentation/2.6.x/ScalaComet

package actors.job

import entities.{Job, Init}
import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.{Done, NotUsed}
import akka.actor.{Actor, ActorSystem, PoisonPill}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{Keep, MergeHub, Sink, Source}
import akka.stream.{ActorMaterializer, KillSwitches}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import play.api.libs.json.{JsValue, Json}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}



//TODO : This actor shouls take in a way of pushing back to Websocket
class JobConsumerActor
  (val sink:Sink[JsValue, NotUsed])
  (implicit materializer: ActorMaterializer, ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Job]
  val jobConsumerSettings = ConsumerSettings(
    context.system,new StringDeserializer(),new ByteArrayDeserializer())
    .withBootstrapServers(s"${Settings.bootStrapServers}")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val ((_, killswitch), kafkaConsumerFuture) =
    Consumer.committableSource(jobConsumerSettings, Subscriptions.topics(JobTopics.JOB_SUBMIT_TOPIC))
      .mapAsync(1) { msg => {
        val jobBytes = msg.record.value
        val job = jSONSerde.deserializer().deserialize(JobTopics.JOB_SUBMIT_TOPIC,jobBytes)
        self ! job
        msg.committableOffset.commitScaladsl()
      }
      }
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Sink.last)(Keep.both)
      .run()


  kafkaConsumerFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => {
      self ! PoisonPill
    }
  }

  override def postStop(): Unit = {
    super.postStop()
    println(s"JobConsumerActor seen 'Done'")
    killswitch.shutdown()
  }

  override def receive: Receive = {
    case (job: Job) => {
      println(s"JobConsumerActor seen ${job}")
      val finalJsonValue = Json.toJson(job)
      Source.single(finalJsonValue).runWith(sink)
    }
    case Done => {
      println(s"JobConsumerActor seen 'Done'")
      killswitch.shutdown()
      self ! PoisonPill
    }
    case Init => {
      println("JobConsumerActor saw init")
    }
  }
}

Have a new RxJs based Observable over the comet based forever frame, and ensure that is working

So at the end of the pipeline, we have a forever frame in the browser (always available within Index.html) that we wish to get events from. Ideally we want to turn this rather bland event into a better RxJs Observable, so how do we do that?

Its quite simple we use this little service that is able to create a new Observable from the incoming event for us, which if you recall the Play server side stuff was like this

Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML)

So this is how we turn that into a nice RxJS Observable

import { injectable, inject } from "inversify";
import { JobEventArgs } from "../domain/JobEventArgs";
import Rx from 'rx';

@injectable()
export class JobStreamService {

    private _jobSourceObservable: Rx.Observable<any>;

    constructor() {

    }

    init = (): void => {

        window['jobChanged'] = function (incomingJsonPayload: any) {
            let evt = new CustomEvent('onJobChanged', new JobEventArgs(incomingJsonPayload));
            window.dispatchEvent(evt);
        }

        this._jobSourceObservable = Rx.Observable.fromEvent(window, 'onJobChanged');
    }

    getJobStream = (): Rx.Observable<any> => {
        return this._jobSourceObservable;
    }
}

Where the JobEventArgs looks like this

export class JobEventArgs {

    detail: any;

    constructor(detail: any) {
        this.detail = detail;
    }

}

We can this use this service in other code and subscribe to this RxJs Observable that the above service exposes. Here is an example of subscribing to it.

componentWillMount() {
    this._subscription =
        this._jobStreamService.getJobStream()
        .subscribe(
        jobArgs => {
                console.log('RX saw onJobChanged');
                console.log('RX x = ', jobArgs.detail);
            },
            error => {
                console.log('RX saw ERROR');
                console.log('RX error = ', error);
            },
            () => {
                console.log('RX saw COMPLETE');
            }
        );
}

We will see much more of the RX stuff in the next part of this article, but for now just all you need to know is that there is an injectable service that you may use to listen to the Job <code>Observable

 

Final diagram to help you solidify this section

There is a lot going on above so I thought a final diagram may help

 

Okay, now would be a good time to get a cup of coffee, as although the next 2 sections build upon what we have just seen, there are still quite a lot of moving pieces to cover, especially the Kafka Streams / Interactive Queries stuff which we have yet to look at. So go on treat yourself to a nice strong coffee.

ViewJob workflow

The ViewJob uses the pieces shown above, and is intended to work like this

ViewJob react component

As can be seen above this is after a passenger has opened a browser session, and 2 other users (acting as drivers) have pushed through their positions, obviously this is just one view (the passenger view), to fully understand what the other users would see in their browser session we will walk through a scenario below.

So what should the View Job page do?

This page should do the following things

  • If a passenger sends out a job it should be seen by ANY driver that is logged in (providing the job is not already assigned to a driver)
  • Positions updates from a passenger to drivers (that know about the passenger) should show the new passenger position
  • When a driver pushes out (single laptop requires that users click on map to make their own position known to others) their new position that the client sees that and updates the driver marker accordingly
  • That a passenger can accept a driver for a job
  • That a driver can not accept a job from a passenger
  • That once a job is paired between passenger/driver only those 2 markers will be shown if you are either of these users
  • That once a job is paired between passenger/driver AND YOU ARE NOT ONE OF THESE USERS that you ONLY see your own markers
  • That a job may be completed by passenger OR driver independently and that they are able to Rate each other

This page is fairly complex, and contains many helper methods that carry out the functions above, probably too many to show. So I will just show the skeleto of the code in this case, and then we will walk through some screen shots, and a scenario that exercises this code.

It should be noted that this area is the one that has the 2 bugs/issues that I have within this code. As I said at the start, I kind of did what I wanted to do in this article, and it demonstrated everything I wanted to show. So meh

import * as React from "react";
import * as ReactDOM from "react-dom";
import * as _ from "lodash";
import Measure from 'react-measure'
import { RatingDialog } from "./components/RatingDialog";
import { YesNoDialog } from "./components/YesNoDialog";
import { OkDialog } from "./components/OkDialog";
import { AcceptList } from "./components/AcceptList";
import 'bootstrap/dist/css/bootstrap.css';
import {
    Well,
    Grid,
    Row,
    Col,
    ButtonInput,
    ButtonGroup,
    Button,
    Modal,
    Popover,
    Tooltip,
    OverlayTrigger
} from "react-bootstrap";
import { AuthService } from "./services/AuthService";
import { JobService } from "./services/JobService";
import { JobStreamService } from "./services/JobStreamService";
import { PositionService } from "./services/PositionService";
import { Position } from "./domain/Position";
import { PositionMarker } from "./domain/PositionMarker";
import { hashHistory } from 'react-router';
import { withGoogleMap, GoogleMap, Marker, OverlayView } from "react-google-maps";

const STYLES = {
    overlayView: {
        background: `white`,
        border: `1px solid #ccc`,
        padding: 15,
    }
}


const GetPixelPositionOffset = (width, height) => {
    return { x: -(width / 2), y: -(height / 2) };
}



const ViewJobGoogleMap = withGoogleMap(props => (

    <GoogleMap
        ref={props.onMapLoad}
        defaultZoom={16}
        defaultCenter={{ lat: 50.8202949, lng: -0.1406958 }}
        onClick={props.onMapClick}>
        {props.markers.map((marker, index) => (
            <OverlayView
                key={marker.key}
                mapPaneName={OverlayView.OVERLAY_MOUSE_TARGET}
                position={marker.position}
                getPixelPositionOffset={GetPixelPositionOffset}>
                <div style={STYLES.overlayView}>
                    <img src={marker.icon} />
                    <strong>{marker.key}</strong>
                </div>
            </OverlayView>
        ))}
    </GoogleMap>
));


export interface ViewJobState {
    markers: Array<PositionMarker>;
    okDialogOpen: boolean;
    okDialogKey: number;
    okDialogHeaderText: string;
    okDialogBodyText: string;
    dimensions: {
        width: number,
        height: number
    },
    currentPosition: Position;
    isJobAccepted: boolean;
    finalActionHasBeenClicked: boolean;
}

type DoneCallback = (jdata: any, textStatus: any, jqXHR: any) => void


export class ViewJob extends React.Component<undefined, ViewJobState> {

    private _authService: AuthService;
    private _jobService: JobService;
    private _jobStreamService: JobStreamService;
    private _positionService: PositionService;
    private _subscription: any; 
    private _currentJobUUID: any;

    constructor(props: any) {
        super(props);
        this._authService = props.route.authService;
        this._jobStreamService = props.route.jobStreamService;
        this._jobService = props.route.jobService;
        this._positionService = props.route.positionService;
        
        if (!this._authService.isAuthenticated()) {
            hashHistory.push('/');
        }

        let savedMarkers: Array<PositionMarker> = new Array<PositionMarker>();
        if (this._positionService.hasJobPositions()) {
            savedMarkers = this._positionService.userJobPositions();
        }

        this.state = {
            markers: savedMarkers,
            okDialogHeaderText: '',
            okDialogBodyText: '',
            okDialogOpen: false,
            okDialogKey: 0,
            dimensions: { width: -1, height: -1 },
            currentPosition: this._authService.isDriver() ? null :
                this._positionService.currentPosition(),
            isJobAccepted: false,
            finalActionHasBeenClicked: false
        };
    }

    componentWillMount() {
        var self = this;
        this._subscription =
            this._jobStreamService.getJobStream()
            .retry()
            .where(function (x, idx, obs) {
                return self.shouldShowMarkerForJob(x.detail);
            })
            .subscribe(
                jobArgs => {

                    console.log('RX saw onJobChanged');
                    console.log('RX x = ', jobArgs.detail);

                    this._jobService.clearUserIssuedJob();
                    this._jobService.storeUserIssuedJob(jobArgs.detail);
                    this.addMarkerForJob(jobArgs.detail);
                },
                error => {
                    console.log('RX saw ERROR');
                    console.log('RX error = ', error);
                },
                () => {
                    console.log('RX saw COMPLETE');
                }
            );
    }

    componentWillUnmount() {
        this._subscription.dispose();
        this._positionService.storeUserJobPositions(this.state.markers);
    }

    render() {

        const adjustedwidth = this.state.dimensions.width;

        return (
            <Well className="outer-well">
                <Grid>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <h4>CURRENT JOB</h4>
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <AcceptList
                                markers={_.filter(this.state.markers, { isDriverIcon: true })}
                                currentUserIsDriver={this._authService.isDriver()}
                                clickCallback={this.handleMarkerClick}
                            />
                        </Col>
                    </Row>
                    <Row className="show-grid">
                        <Col xs={10} md={6}>
                            <Measure
                                bounds
                                onResize={(contentRect) => {
                                    this.setState({ dimensions: contentRect.bounds })
                                }}>
                                {({ measureRef }) =>
                                    <div ref={measureRef}>
                                        <ViewJobGoogleMap
                                            containerElement={
                                                <div style={{
                                                    position: 'relative',
                                                    top: 0,
                                                    left: 0,
                                                    right: 0,
                                                    bottom: 0,
                                                    width: { adjustedwidth },
                                                    height: 600,
                                                    justifyContent: 'flex-end',
                                                    alignItems: 'center',
                                                    marginTop: 20,
                                                    marginLeft: 0,
                                                    marginRight: 0,
                                                    marginBottom: 20
                                                }} />
                                            }
                                            mapElement={
                                                <div style={{
                                                    position: 'relative',
                                                    top: 0,
                                                    left: 0,
                                                    right: 0,
                                                    bottom: 0,
                                                    width: { adjustedwidth },
                                                    height: 600,
                                                    marginTop: 20,
                                                    marginLeft: 0,
                                                    marginRight: 0,
                                                    marginBottom: 20
                                                }} />
                                            }
                                            markers={this.state.markers}
                                            onMapClick={this.handleMapClick}
                                        />
                                    </div>
                                }
                            </Measure>
                        </Col>
                    </Row>

                    {this.state.isJobAccepted === true ?
                        <Row className="show-grid">
                            <span>
                                <RatingDialog
                                    theId="viewJobCompleteBtn"
                                    headerText="Rate your driver/passenger"
                                    okCallBack={this.ratingsDialogOkCallBack}
                                    actionPerformed={this.state.finalActionHasBeenClicked} />

                                {!(this._authService.isDriver() === true) ?

                                    <YesNoDialog
                                        theId="viewJobCancelBtn"
                                        launchButtonText="Cancel"
                                        actionPerformed={this.state.finalActionHasBeenClicked} 
                                        yesCallBack={this.jobCancelledCallBack}
                                        noCallBack={this.jobNotCancelledCallBack}
                                        headerText="Cancel the job" />
                                    : 
                                    null
                                }

                                <OkDialog
                                    open={this.state.okDialogOpen}
                                    okCallBack={this.okDialogCallBack}
                                    headerText={this.state.okDialogHeaderText}
                                    bodyText={this.state.okDialogBodyText}
                                    key={this.state.okDialogKey} />
                            </span>
                        </Row> :
                        null
                    }
                </Grid>
            </Well>
        );
    }

    handleMapClick = (event) => {
        ....
        ....
        this._positionService.clearUserJobPositions();
        this._positionService.storeUserJobPositions(this.state.markers);
        this.pushOutJob(newPosition, currentJob);
    }

    handleMarkerClick = (targetMarker) => {

        console.log('button on AcceptList clicked:' + targetMarker.key);
        console.log(targetMarker);

        let currentJob = this._jobService.currentJob();
        let jobForMarker = targetMarker.jobForMarker;

        let clientMarker = _.find(this.state.markers, { 'isDriverIcon': false });
        if (clientMarker != undefined && clientMarker != null) {

            let clientJob = clientMarker.jobForMarker;
            clientJob.driverFullName = jobForMarker.driverFullName;
            clientJob.driverEmail = jobForMarker.driverEmail;
            clientJob.driverPosition = jobForMarker.driverPosition;
            clientJob.vehicleDescription = jobForMarker.vehicleDescription;
            clientJob.vehicleRegistrationNumber = jobForMarker.vehicleRegistrationNumber;
            clientJob.isAssigned = true;
            
            let self = this;
            console.log("handleMarkerClick job");
            console.log(clientJob);

            this.makePOSTRequest('job/submit', clientJob, this,
                function (jdata, textStatus, jqXHR) {
                    console.log("After is accepted");
                    const newState = Object.assign({}, self.state, {
                        isJobAccepted: true
                    })
                    self.setState(newState);
                });
        }
    }

    addMarkerForJob = (jobArgs: any): void => {

        console.log("addMarkerForJob");
        console.log(this.state);

        if (this.state.isJobAccepted || jobArgs.isAssigned) {
            this.processAcceptedMarkers(jobArgs);
        }
        else {
            this.processNotAcceptedMarkers(jobArgs);
        }
    }

    processAcceptedMarkers = (jobArgs: any): void => {
        ....
        ....
    }


    processNotAcceptedMarkers = (jobArgs: any): void => {
        ....
        ....
    }

    addClientDetailsToDrivers = (newMarkersList: PositionMarker[]): void => {
        ....
    }


    updateStateForMarkers = (newState: any, newMarkersList: PositionMarker[], newPositionForUser: Position, jobArgs:any): void => {

        //Update the list of position markers in the PositionService
        this._positionService.clearUserJobPositions();
        this._positionService.storeUserJobPositions(newMarkersList);

        //Update the position in the PositionService
        if (newPositionForUser != undefined && newPositionForUser != null) {
            this._positionService.clearUserPosition();
            this._positionService.storeUserPosition(newPositionForUser);
        }

        this._jobService.clearUserIssuedJob();
        this._jobService.storeUserIssuedJob(jobArgs);

        //update the state
        this.setState(newState);
    }

    updateMatchedUserMarker = (jobEmailToCheck: string, newMarkersList: PositionMarker[],
        jobPosition: Position, jobForMarker:any): void => {

        if (jobEmailToCheck != undefined && jobEmailToCheck != null) {

            let matchedMarker = _.find(this.state.markers, { 'email': jobEmailToCheck });
            if (matchedMarker != null) {
                //update its position
                matchedMarker.position = jobPosition;
                matchedMarker.jobForMarker = jobForMarker;
            }
        }
    }


    updateStateForNewMarker = (newMarkersList:PositionMarker[], position: Position): any => {

        if (position != null) {
            return Object.assign({}, this.state, {
                currentPosition: position,
                markers: newMarkersList
            })
        }
        else {
           return Object.assign({}, this.state, {
                markers: newMarkersList
            })
        }
    }

    updateStateForAcceptedMarker = (newMarkersList: PositionMarker[], position: Position): any => {

        if (position != null) {
            return Object.assign({}, this.state, {
                currentPosition: position,
                markers: newMarkersList,
                isJobAccepted: true
            })
        }
        else {
            return Object.assign({}, this.state, {
                markers: newMarkersList,
                isJobAccepted: true
            })
        }
    }


    shouldShowMarkerForJob = (jobArgs: any): boolean => {

        let isDriver = this._authService.isDriver();
        let currentJob = this._jobService.currentJob();
        let hasJob = currentJob != undefined && currentJob != null;

        //case 1 - No job exists, to allow driver to add their mark initially
        if (!hasJob && isDriver)
            return true;
        
        //case 2 - Job exists and is unassigned and if there is no other active 
        //         job for this client/ driver
        if (hasJob && !currentJob.isAssigned)
            return true;

        //case 3 - If the job isAssigned and its for the current logged in client/driver
        if (hasJob && currentJob.isAssigned) {
            if (currentJob.clientEmail == jobArgs.clientEmail) {
                return true;
            }
            if (currentJob.driverEmail == jobArgs.driverEmail) {
                return true;
            }
        }
        return false;
    }


    pushOutJob = (newPosition: Position, jobForMarker : any): void => {
        ....
        ....
        ....
        var newJob = {
            jobUUID: this._currentJobUUID != undefined && this._currentJobUUID != '' ?
                this._currentJobUUID : '',
            clientFullName: localClientFullName,
            clientEmail: localClientEmail,
            clientPosition: localClientPosition,
            driverFullName: localDriverFullName,
            driverEmail: localDriverEmail,
            driverPosition: localDriverPosition,
            vehicleDescription: isDriver ?
                this._authService.user().vehicleDescription : '',
            vehicleRegistrationNumber: isDriver ?
                this._authService.user().vehicleRegistrationNumber : '',
            isAssigned: localIsAssigned,
            isCompleted: false
        }

        console.log("handlpushOutJob job");
        console.log(newJob);
        this.makePOSTRequest('job/submit', newJob, self,
            function (jdata, textStatus, jqXHR) {
                self._jobService.clearUserIssuedJob();
                self._jobService.storeUserIssuedJob(newJob);
            });
    }

    createDriverMarker = (
        driver: any,
        event: any): PositionMarker => {

        ....
    }


    
    ratingsDialogOkCallBack = (theRatingScore: number) => {
        console.log('RATINGS OK CLICKED');

       //POST rating data
    }

   
    makePOSTRequest = (route: string, jsonData: any, context: ViewJob, doneCallback: DoneCallback) => {
        //Post job data
    }

    ....
    ....
}

Some highlights

  • We use RX.Js to listen to new events straight over the Comet based forever frame, that the server side Play scala code pushes a message out on
  • There was a funny thing with driver acceptance which I originally wanted to be a button on a drivers marker within the map. However this caused an issue with the Map where it would get a Map event when clicking on an overlay (higher Z-Order so should not happen). This is a feature of the React Google Map component. I could not find a fix I liked (I did mess around with form event mouseEnter/mouseLeave but it was just not that great, so I opted to chose to put the acceptance of driver outside of the map, thus avoiding the issue altogether)

What does it look like when run?

Some scenarios of what it looks like running are shown below.

In order to run it to this point, I normally follow this set of steps afterwards

  • open a tab, login as a passenger that I had created
  • go to the create job page, click the map, push the create job button
  • open a NEW tab, login as a new driver, go to the view job page
  • on the 1st tab (passenger) click the map to push passenger position to driver
  • on the 2nd tab (driver) click the map to push driver position to passenger
  • repeat last 4 steps for additional driver
  • on client tab pick driver to accept, click accept button
  • complete the job from client tab, give driver rating
  • complete the job from paired driver tab, give passenger rating
  • go to view rating page, should see ratings

One of the challenges with an app like this, is that it is a streaming app. So this means that when a client pushes out a new job, there may be no-one listening for that job. Drivers may not even be logged in at all, or may login later, so they effectively subscribe late. For this app dealing with that was kind of out of scope. To remedy this, you need to ensure that position updates (clicking on the map for the given user browser session (ie tab)) gets pushed to other user browser sessions where the marker is not currently shown.

In a real world app, we might choose to do one of the following to fix this permanently

  • Store some, and when a new user joins grab all unassigned passengers/driver within some geographical area
  • Store last N-Many passenger/driver positions and push these on new login (there is no guarantee that these are in the same geographical area as us though, could be completely unrelated/of no practical concern for the current user)

Anyway as I say this out of scope for this demo project, but I hope that it does give you some insight as to why you need to push position updates manually

This gives you an example of what it all looks like when its running (not accepted yet)

This is what it looks like for the following setup

  • 1 x passenger (sacha barber)
  • 2 x driver (driver 1 / driver 2)

So now lets see what happens when we accept one of the drivers. I have chosen driver 1 for this example

This gives you an example of what it all looks like when its running (after job accepted between passenger/driver1)

Here is what things look like after job has been accepted

ViewRating workflow

The Rating flow is one of the more complex aspects of this entire article. The idea is as follows:

  • At the end of a job the passenger may rate the driver for the Job
  • At the end of a job the driver may rate the passenger for the Job
  • These Ratings should be stored/aggregated and exposes via a REST API for the client/driver such that they can view their accumulated Rating over time

Simple enough idea right, but this part will use Kafka Streams

Walking through a Kafka Streams processing node, and the duality of streams

Before we get started I just wanted to include a several excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.

Any stream processing technology must therefore provide first-class support for streams and tables. Kafkas Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your applications latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

The stream-table duality describes the close relationship between streams and tables.

  • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a real table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
  • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a streams data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a real stream by iterating over each key-value entry in the table.

Lets illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time  and different revisions of the table  can be represented as a changelog stream (second column).

Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.

I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.

Anyway with all that in mind how does that relate to the use case we are trying to solve. Lets assume we have a Kafka publisher that pushes out Rating objects, and as stated ideally we would like to query these across all processor nodes. As such we should now know that this will involve a KStream and some sort of aggregation to an eventual KTable (where a state store will be used).

Probably the easiest thing to do is to start with the code, which looks like this for the main stream processing code for the Rating section of then final app.

MadCapIdea/blob/develop/KafkaStreams/src/main/scala/Processing/Ratings/RatingStreamProcessingApp.scala

import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import entities.Rating
import serialization.JSONSerde
import topics.RatingsTopics
import utils.Settings
import stores.StateStores
import org.apache.kafka.streams.state.HostInfo
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._


package processing.ratings {

  import org.apache.kafka.streams.errors.BrokerNotFoundException
  import utils.Retry

  class RatingByEmailInitializer extends Initializer[List[Rating]] {
    override def apply(): List[Rating] = List[Rating]()
  }

  class RatingByEmailAggregator extends Aggregator[String, Rating, List[Rating]] {
    override def apply(aggKey: String, value: Rating, aggregate: List[Rating]) = {
      value :: aggregate
    }
  }


  object RatingStreamProcessingApp extends App {

    implicit val ec = ExecutionContext.global

    run()

    private def run(): Unit = {

      val restEndpoint: HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
      System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
      System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

      val maybeStreams =
        Retry.whileSeeingExpectedException[KafkaStreams,BrokerNotFoundException](10.seconds)(createStreams)

      maybeStreams match {
        case Some(streams) => {
          val restService = new RatingRestService(streams, restEndpoint)
          restService.start()

          Runtime.getRuntime.addShutdownHook(new Thread(() => {
            streams.close(10, TimeUnit.SECONDS)
            restService.stop
          }))
        }
        case None => {
          println("Quiting due to no streams available/unknown expcetion")
        }
      }

      //return unit
      ()
    }

    def createStreams() : KafkaStreams = {
      val stringSerde = Serdes.String
      val ratingSerde = new JSONSerde[Rating]
      val listRatingSerde = new JSONSerde[List[Rating]]
      val builder: KStreamBuilder = new KStreamBuilder
      val ratings = builder.stream(stringSerde, ratingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)

      //aggrgate by (user email -> their ratings)
      val ratingTable = ratings.groupByKey(stringSerde, ratingSerde)
        .aggregate(
          new RatingByEmailInitializer(),
          new RatingByEmailAggregator(),
          listRatingSerde,
          StateStores.RATINGS_BY_EMAIL_STORE
        )

      //useful debugging aid, print KTable contents
      ratingTable.toStream.print()

      val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)

      // Always (and unconditionally) clean local state prior to starting the processing topology.
      // We opt for this unconditional call here because this will make it easier for you to
      // play around with the example when resetting the application for doing a re-run
      // (via the Application Reset Tool,
      // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
      //
      // The drawback of cleaning up local state prior is that your app must rebuilt its local
      // state from scratch, which will take time and will require reading all the state-relevant
      // data from the Kafka cluster over the network.
      // Thus in a production scenario you typically do not want to clean up always as we do
      // here but rather only when it is truly needed, i.e., only under certain conditions
      // (e.g., the presence of a command line flag for your app).
      // See `ApplicationResetExample.java` for a production-like example.
      streams.cleanUp();
      streams.start()
      streams
    }
  }

}

Remember the idea is to get a Rating for a user (based on their email address), and store all the Rating associated with them in some sequence/list such that they can be retrieved in one go based on a a key, where the key would be the users email, and the value would be this list of Rating objects.I think with the formal discussion from the official Kafka docs and my actual Rating requirement, the above should hopefully be pretty clear.

Walking through Kafka Streams interactive queries

So now that we have gone through how data is produced, and transformed (well actually I did not do too much transformation other than a simple map, but trust me you can), and how we aggregate results from a KStream to a KTable (and its state store), we will move on to see how we can use Kafka interactive queries to query the state stores.

One important concept is that if you used multiple partitions for your original topic, the state may be spread across n-many processing node. For this project I have only chosen to use 1 partition, but have written the code to support n-many.

So lets assume that each node could read a different segment of data, or that each node must read from n-many partitions (there is not actually a mapping to nodes and partitions these are 2 mut read chapters elastic-scaling-of-your-application and parallelism-model) we would need each node to expose a REST API to allow its OWN state store to be read. By reading ALL the state stores we are able to get a total view of ALL the persisted data across ALL the partitions. I urge all of you to read this section of the official docs : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

This diagram has also be shamelessly stolen from the official docs:

I think this diagram does an excellent job of showing you 3 separate processor nodes, and each of them may have a bit of state. ONLY be assembling ALL the data from these nodes are we able to see the ENTIRE dataset.

Kafka allows this via metadata about the streams, where we can use the exposed metadata to help us gather the state store data. To do this we first need a MetadataService, which for me is as follows:

package processing.ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._


/**
  * Looks up StreamsMetadata from KafkaStreams
  */
class MetadataService(val streams: KafkaStreams) {


   /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key   The key to find
    * @return { @link HostStoreInfo}
    */
  def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
      throw new NotFoundException(
        s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
  }


  def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {

    metadatas.stream.map[HostStoreInfo](metadata =>
      HostStoreInfo(
        metadata.host(),
        metadata.port,
        metadata.stateStoreNames.asScala.toList))
      .collect(Collectors.toList())
      .asScala.toList
  }

}

This metadata service is used to obtain the state store information, which we can then use to extract the state data we want (its a key value store really).

The next thing we need to do is expose a REST API to allow us to get the state. lets see that now

package processing.ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import entities.AkkaHttpEntitiesJsonFormats._
import entities._
import stores.StateStores
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import org.apache.kafka.common.serialization.Serdes
import scala.concurrent.{Await, ExecutionContext, Future}
import akka.http.scaladsl.unmarshalling.Unmarshal
import spray.json._
import scala.util.{Failure, Success}
import org.apache.kafka.streams.state.QueryableStoreTypes
import scala.concurrent.duration._



object RestService {
  val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
}


class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

  val metadataService = new MetadataService(streams)
  var bindingFuture: Future[Http.ServerBinding] = null

  implicit val system = ActorSystem("rating-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher


  def start() : Unit = {
    val emailRegexPattern =  """\w+""".r
    val storeNameRegexPattern =  """\w+""".r

    val route =
      path("ratingByEmail") {
        get {
          parameters('email.as[String]) { (email) =>
            try {

              val host = metadataService.streamsMetadataForStoreAndKey[String](
                StateStores.RATINGS_BY_EMAIL_STORE,
                email,
                Serdes.String().serializer()
              )

              var future:Future[List[Rating]] = null

              //store is hosted on another process, REST Call
              if(!thisHost(host))
                future = fetchRemoteRatingByEmail(host, email)
              else
                future = fetchLocalRatingByEmail(email)

              val ratings = Await.result(future, 20 seconds)
              complete(ratings)
            }
            catch {
              case (ex: Exception) => {
                val finalList:List[Rating] = scala.collection.immutable.List[Rating]()
                complete(finalList)
              }
            }
          }
        }
      } ~
      path("instances") {
        get {
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
        }
      }~
      path("instances" / storeNameRegexPattern) { storeName =>
        get {
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
        }
      }

    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))
  }


  def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Rating]] = {

    val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
    println(s"Client attempting to fetch from online at ${requestPath}")

    val responseFuture: Future[List[Rating]] = {
      Http().singleRequest(HttpRequest(uri = requestPath))
        .flatMap(response => Unmarshal(response.entity).to[List[Rating]])
    }

    responseFuture
  }

  def fetchLocalRatingByEmail(email: String) : Future[List[Rating]] = {

    val ec = ExecutionContext.global

    println(s"client fetchLocalRatingByEmail email=${email}")

    val host = metadataService.streamsMetadataForStoreAndKey[String](
      StateStores.RATINGS_BY_EMAIL_STORE,
      email,
      Serdes.String().serializer()
    )

    val f = StateStores.waitUntilStoreIsQueryable(
      StateStores.RATINGS_BY_EMAIL_STORE,
      QueryableStoreTypes.keyValueStore[String,List[Rating]](),
      streams
    ).map(_.get(email))(ec)

    val mapped = f.map(rating => {
      if (rating == null)
        List[Rating]()
      else
        rating
    })

    mapped
  }

  def stop() : Unit = {
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }

  def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
      hostStoreInfo.port == hostInfo.port
  }
}

With that final class we are able to run the application and query it using the url http://localhost:8080/ratingByEmail?email=sacha@here.com (the key to the Kafka store here is sacha@here.com and the value could either be an empty list or a List[Ranking] objects as JSON, the results of which are shown below after we have run the producer and used Chrome (or any other REST tool of your picking) to get the results

REST Endpoint Facade

So we just created an Akka Http REST endpoint to serve up the combined Rating(s) that have been pushed through the Kafka stream processing rating topic. However we have this Play framework API which we use for all other REST endpoints. So I have chose to create a façade endpoint in the Play backend that will simply call out to the existing Akka Http endpoint. Keeping all the traffic in one place is a nice thing if you ask me. So lets look at this play code to do this

We obviously need a new route, which is as follows:We obviously need a new route, which is as follows:

GET  /rating/byemail                          controllers.RatingController.ratingByEmail()

Controller Action

To serve this new route we need a new Action in the RatingController. This is shown below:

package controllers

import javax.inject.Inject
import entities.RatingJsonFormatters._
import entities._
import actors.rating.RatingProducerActor
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.libs.json._
import play.api.libs.json.Json
import play.api.libs.json.Format
import play.api.libs.json.JsSuccess
import play.api.libs.json.Writes
import play.api.libs.ws._
import play.api.mvc.{Action, Controller}
import utils.{Errors, Settings}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class RatingController @Inject()
(
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext,
  ws: WSClient
) extends Controller
{

  //Error handling for streams
  //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html
  val decider: Supervision.Decider = {
    case _ => Supervision.Restart
  }

  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
  val childRatingActorProps = Props(classOf[RatingProducerActor],mat,ec)
  val rand = new Random()
  val ratingSupervisorProps = BackoffSupervisor.props(
    Backoff.onStop(
      childRatingActorProps,
      childName = s"RatingProducerActor_${rand.nextInt()}",
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2
    ).withSupervisorStrategy(
      OneForOneStrategy() {
        case _ => SupervisorStrategy.Restart
      })
  )

  val ratingSupervisorActorRef = actorSystem.actorOf(ratingSupervisorProps, name = "ratingSupervisor")

  def submitNewRating = Action.async(parse.json) { request =>
    Json.fromJson[Rating](request.body) match {
      case JsSuccess(newRating, _) => {
        ratingSupervisorActorRef ! newRating
        Future.successful(Ok(Json.toJson(newRating.copy(toEmail = newRating.toEmail.toUpperCase))))
      }
      case JsError(errors) =>
        Future.successful(BadRequest("Could not build a Rating from the json provided. " +
          Errors.show(errors)))
    }
  }


  def ratingByEmail = Action.async { request =>
    val email = request.getQueryString("email")
    email match {
      case Some(emailAddress) => {
        val url = s"http://${Settings.ratingRestApiHostName}:${Settings.ratingRestApiPort}/ratingByEmail?email=${emailAddress}"
        ws.url(url).get().map {
          response => (response.json).validate[List[Rating]]
        }.map(x => Ok(Json.toJson(x.get)))
      }
      case None => {
        Future.successful(BadRequest(
          "ratingByEmail endpoint MUST be supplied with a non empty 'email' query string value"))
      }
    }
  }
}

The main thing to note here is:

  • We use the play ws (web services) library to issues a GET request against the existing Akka Http endpoint. Thus creating our façade.
  • We are still using Future to make it nice an async

React front end for ratings

This is the final results for the View Rating react page. I think its all fairly self explanatory. I guess the only bit that really of any note is that we use lodash _.sumBy(..) to do the summing up of the Ratings for this user to create an overall rating.The rest is standard jQuery/react stuff.

import * as React from "react";
import * as ReactDOM from "react-dom";
import * as _ from "lodash";
import { OkDialog } from "./components/OkDialog";
import 'bootstrap/dist/css/bootstrap.css';
import
{
    Well,
    Grid,
    Row,
    Col,
    Label,
    ButtonInput
} from "react-bootstrap";
 
import { AuthService } from "./services/AuthService";
 
import { hashHistory  } from 'react-router';
 
 
 
class Rating {
    fromEmail: string
    toEmail: string
    score: number
 
    constructor(fromEmail, toEmail, score) {
        this.fromEmail = fromEmail;
        this.toEmail = toEmail;
        this.score = score;
    }
}
 
 
export interface ViewRatingState {
    ratings: Array<Rating>;
    overallRating: number;
    okDialogOpen: boolean;
    okDialogKey: number;
    okDialogHeaderText: string;
    okDialogBodyText: string;
    wasSuccessful: boolean;
}
 
 
export class ViewRating extends React.Component<undefined, ViewRatingState> {
 
    private _authService: AuthService;
 
    constructor(props: any) {
        super(props);
        this._authService = props.route.authService;
        if (!this._authService.isAuthenticated()) {
            hashHistory.push('/');
        }
        this.state = {
            overallRating: 0,
            ratings: Array(),
            okDialogHeaderText: '',
            okDialogBodyText: '',
            okDialogOpen: false,
            okDialogKey: 0,
            wasSuccessful: false
        };
    }   
 
 
    loadRatingsFromServer = () => {
 
        var self = this;
        var currentUserEmail = this._authService.userEmail();
 
        $.ajax({
            type: 'GET',
            url: 'rating/byemail?email=' + currentUserEmail,
            contentType: "application/json; charset=utf-8",
            dataType: 'json'
        })
        .done(function (jdata, textStatus, jqXHR) {
 
            console.log("result of GET rating/byemail");
            console.log(jqXHR.responseText);
            let ratingsObtained = JSON.parse(jqXHR.responseText);
            self.setState(
                {
                    overallRating: _.sumBy(ratingsObtained, 'score'),
                    ratings: ratingsObtained
                });
        })
        .fail(function (jqXHR, textStatus, errorThrown) {
            self.setState(
                {
                    okDialogHeaderText: 'Error',
                    okDialogBodyText: 'Could not load Ratings',
                    okDialogOpen: true,
                    okDialogKey: Math.random()
                });
        });
         
    }
 
    componentDidMount() {
        this.loadRatingsFromServer();
    }
 
    render() {
 
        var rowComponents = this.generateRows();
 
        return (
            <Well className="outer-well">
                    <Grid>
                        <Row className="show-grid">
                            <Col xs={6} md={6}>
                                <div>
                                <h4>YOUR OVERALL RATING <Label>{this.state.overallRating}</Label></h4>
                                </div>
                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <Col xs={10} md={6}>
                                <h6>The finer details of your ratings are shown below</h6>
                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <Col xs={10} md={6}>
                                <div className="table-responsive">
                                    <table className="table table-striped table-bordered table-condensed factTable">
                                        <thead>
                                            <tr>
                                                <th>Rated By</th>
                                                <th>Rating Given</th>
                                            </tr>
                                        </thead>
                                        <tbody>
                                            {rowComponents} 
                                        </tbody>
                                    </table>
                                </div>
                            </Col>
                        </Row>
                        <Row className="show-grid">
                            <span>
                                <OkDialog
                                    open= {this.state.okDialogOpen}
                                    okCallBack= {this._okDialogCallBack}
                                    headerText={this.state.okDialogHeaderText}
                                    bodyText={this.state.okDialogBodyText}
                                    key={this.state.okDialogKey}/>
                            </span>
                        </Row>
                    </Grid>
            </Well>
        )
    }
 
    _okDialogCallBack = () => {
        this.setState(
            {
                okDialogOpen: false
            });
    }
 
    generateRows = () => {
        return this.state.ratings.map(function (item) {
            return  <tr key={item.fromEmail}>
                        <td>{item.fromEmail}</td>
                        <td>{item.score}</td>
                    </tr>;
 
        });
    } 
}

Conclusion

This was certainly a challenging thing to write, and I am honestly pleased that I got it done, I have had a really good time writing this and it has been a great project for self improvement. I would recommend this type of thing as a great use of time. Go on find yourself a pet project

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Sacha Barber
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions

 
GeneralMy vote of 5 Pin
D V L16-Jul-18 1:07
professionalD V L16-Jul-18 1:07 
GeneralMy vote of 5 Pin
Ehsan Sajjad17-Mar-18 8:20
mvpEhsan Sajjad17-Mar-18 8:20 
GeneralMy vote of 5 Pin
Igor Ladnik17-Feb-18 7:30
mvaIgor Ladnik17-Feb-18 7:30 
GeneralRe: My vote of 5 Pin
Sacha Barber17-Feb-18 17:10
mvaSacha Barber17-Feb-18 17:10 
Question5 from me Pin
Nick Polyak7-Feb-18 7:07
mvaNick Polyak7-Feb-18 7:07 
AnswerRe: 5 from me Pin
Sacha Barber7-Feb-18 7:55
mvaSacha Barber7-Feb-18 7:55 
QuestionI need a coffee. Pin
Chris Maunder5-Feb-18 3:54
adminChris Maunder5-Feb-18 3:54 
AnswerRe: I need a coffee. Pin
Sacha Barber5-Feb-18 4:59
mvaSacha Barber5-Feb-18 4:59 
AnswerRe: I need a coffee. Pin
bryce7-Feb-18 19:44
memberbryce7-Feb-18 19:44 
Question'Tis a 5 from me Pin
Pete O'Hanlon4-Feb-18 22:48
protectorPete O'Hanlon4-Feb-18 22:48 
AnswerRe: 'Tis a 5 from me Pin
Sacha Barber4-Feb-18 22:56
mvaSacha Barber4-Feb-18 22:56 
QuestionGood Pin
vietnamvisaagency4-Feb-18 22:45
professionalvietnamvisaagency4-Feb-18 22:45 
AnswerRe: Good Pin
Sacha Barber4-Feb-18 22:57
mvaSacha Barber4-Feb-18 22:57 
General5 just for the shear craziness Pin
SteveTheThread4-Feb-18 22:03
memberSteveTheThread4-Feb-18 22:03 
GeneralRe: 5 just for the shear craziness Pin
Sacha Barber4-Feb-18 22:57
mvaSacha Barber4-Feb-18 22:57 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Article
Posted 4 Feb 2018

Tagged as

Stats

12.2K views
19 bookmarked