Step-3 PhoneCat Recommendation System Using F# Agents, SignalR and Rx

This is step 3 of my blog series on Web Application Development in Fsharp using ASP.NET MVC

In the last two steps we have seen how to create a web apis and static razor views in fsharp. In this blog post we are going to see one of my favorite feature in fsharp “Message based approach to concurrency”.

A Small Flashback

I’ve actually planned to put this blog post for the great fsharp community initiative F# Advent Calender. Unfortunately it was not able to get through as I’ve nominated myself little late. I always believe there is an opportunity behind every adversity. I didn’t get hung up and I knew this is one of the great topic to blog about. When I decided to blog about it, I needed a sample web application. So, I was creating that sample application and it suddenly strikes! How about a blog series on web application development in fsharp? I’ve immediately started working on it and hence this blog series.

So what we gonna do in this step

In this blog post we are going to build a recommendation system which keeps track of what phones that the user is viewing, and based on his navigation history, we will be recommending a phone that he might be interested in

User visits “Motorola XOOM™”

User visits “Motorola XOOM™ with Wi-Fi”

Let us start the implementation by defining two high level tasks

  1. Tracking user navigation
  2. Recommending a phone

1. Tracking user navigation

The first two components has been already implemented as part of step-2. So we just need to wire up the other two components. Let’s start from PhoneViewTracker

Create a source file in the Web project and name it as PhoneViewTracker. Add a function observePhonesViewed which will be invoked when you a user visits a phone.

1
2
3
4
module PhoneViewTracker =

  let observePhonesViewed anonymousId phoneIdBeingVisited =
    StorageAgent.Post (SavePhoneVisit (anonymousId, phoneIdBeingVisited))

The anonymousId is a http property which represents a unique identifier for the given user session

Upon receiving the anonymousId and phoneIdBeingVisited we will be posting a message to the StorageAgent to save this visit. Both the agent and the message doesn’t exist now, so lets create them

Create a source file in the Domain project and name it as UserNavigationHistory and add the following

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
namespace PhoneCat.Domain

open System.Reactive.Subjects
open System.Collections.Generic

type Agent<'T> = MailboxProcessor<'T>

module UserNavigationHistory =

  type StorageAgentMessage =
    | SavePhoneVisit of string * string

  let private storageAgentFunc (agent : Agent<StorageAgentMessage>) =

    let rec loop (dict : Dictionary<string, list<string>>) = async {
      let! storageAgentMessage = agent.Receive()
      match storageAgentMessage with
      | SavePhoneVisit (anonymousId, phoneIdBeingVisited) ->
          if dict.ContainsKey(anonymousId) then
            let phoneIdsVisited = phoneIdBeingVisited :: dict.[anonymousId]
            dict.[anonymousId] <- phoneIdsVisited
          else
            dict.Add(anonymousId, [phoneIdBeingVisited])
      return! loop dict
    }

    loop (new Dictionary<string, list<string>>())

  let StorageAgent = Agent.Start(storageAgentFunc)

Storage Agent is an F# Agent which stores the user navigation history in an in-memory dictionary. It can be replaced by any key-value store like redis but for experimentation I’ve preferred to use F# Agents.

The StorageAgentMessage is a dicriminated union represents possible messages that the StorageAgent can process. Right now it has only message SavePhoneVisit which takes a tuple representing the anonymousId and the phoneIdBeingVisited

The StorageAgent is a typical F# Agent which waits for the incoming StorageAgentMessage and upon receiving it stores the visit in the in-memory dictionary.

The next step is wiring the PhoneViewTracker.observePhonesViewed function with the PhoneController.Show action method. We can call the function directly that will create a strongly coupled code. We can even directly post the message to the agent. But that also makes the code tightly coupled.

What we are actually trying to implement here is a User Phone Visit Stream. Whenever the user visits a phone, we just want to notifiy somebody to keep track of it and move on. And its where Reactive Extensions aka Rx comes into the picture. If you are new to Reactive Programming or Rx, I strongly recommend you to go through this excellent article by André Staltz

Install the Rx Nuget Package in the Web project. With Rx in the kitty the next step is to make the User’s phone visit as event and subscribe this event with the PhoneViewTracker

The first step is to make the PhoneController as observable. Modify the already created PhoneController as below

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type PhoneController(phones : seq<Phone>) =
  inherit Controller()

  let subject = new Subject<string>()

  interface IObservable<string> with
    member this.Subscribe observer = subject.Subscribe observer

  member this.Show (id : string) =
    let phone = phones |> Seq.find (fun p -> p.Id = id) |> PhoneViewModel.ToPhoneViewModel
    subject.OnNext id
    this.View(phone)

  override this.Dispose disposing =
    if disposing then
      subject.OnCompleted()
      subject.Dispose()
    base.Dispose disposing

One of the great feature of F# is its seamless interoperability with C# libraries. As you seen in the above code snippet we have just made the PhoneController into an observable by implementing the interface IObservable.

We have created a private Rx Subject and made it responsible for pushing the notification which contains the phone id that is being visited.

But wait how do we configure the subscription between this controller and the PhoneViewTracker ? Thanks to the CompositionRoot that we have created in the step-2. As we have full control over the creation of controller its just a matter of two lines to achieve it.

1
2
3
4
5
6
7
8
9
10
11
type CompositionRoot(phones : seq<PhoneTypeProvider.Root>) =
    inherit DefaultControllerFactory() with
      override this.GetControllerInstance(requestContext, controllerType) =
        // ...
        else if controllerType = typeof<PhoneController> then
          let phones' = phones |> Seq.map TypeProviders.ToCatalogPhone
          let observer = PhoneViewTracker.observePhonesViewed (requestContext.HttpContext.Request.AnonymousID)
          let phoneController = new PhoneController(phones')
          let subscription = phoneController.Subscribe observer
          phoneController :> IController
        // ...

We are leveraging the ASP.NET’s Anonymous Identification Module which help us in creating a unique anonymous id for every user session. We can retrieve it from the HttpRequest as mentioned in the above snippet

Anonymous identification of user session are not enabled by default, so add the following entry in the Web.config file

1
2
3
4
5
6
7
<configuration>
  <!-- Existing Code ignored for brevity ... -->
  <system.web>
    <!-- Existing Code ignored for brevity... -->
    <anonymousIdentification enabled="true" />
  </system.web>
</configuration>

With the help of the partial application of functions as we did it in the previous steps, we have created a partial function called observer which has the signature string -> unit. Then we have subscribed to PhoneController using the Subscribe method and with this we are done with saving an user visit.

2. Recommending a phone

Workflow

  1. User initiates the recommendation request using SignalR
  2. Upon receiving it, Recommendation SignalR hub sends a recommendation request message to Storage Agent with the user Anonymous Id and SignlaR connection Id of the given user
  3. Storage Agent then fetches the phone visit history of the given user based on the incoming anonymous id and pass it to the Recommendation Agent along with the SignalR connection id.
  4. Recommendation Agent responds to this by computing the recommendation and publish the result (Either recommended phone id or none) in the Recommendation observable
  5. Recommendation hub receives this recommendation result, send the response back to the corresponding SignalR client.

The beauty of this entire workflow is all are message driven and asynchronous by default.

Let’s start from Recommendation SignalR hub. The first step is installing SingalR from the nuget.

After installing create a source file in the Web project and name it as Startup then add the following code as per the SignalR convention.

1
2
3
4
5
6
7
8
namespace PhoneCat.Web

open Owin

type Startup() =
  member x.Configuration(app : IAppBuilder) =
    app.MapSignalR() |> ignore
    ()

Then add an app setting in the Web.config file and configure the SignalR to use this Startup class

1
2
3
4
5
6
7
<configuration>
  <appSettings>
    <!-- Other keys.. -->
    <add key="owin:AppStartup" value="PhoneCat.Web.Startup" />
  </appSettings>
  <!-- other configuration items.. -->
</configuration>

With SignalR added to the system, the next step is to create RecommendationHub. Add a source file in Web project and name it as Hubs.

Then create a RecommendationHub class with a public method GetRecommendation which will be invoked by the SignalR client to initiate recommendation process.

1
2
3
4
5
6
7
8
type RecommendationHub() =
    inherit Hub ()
    member this.GetRecommendation () =
      let encodedAnonymousId = this.Context.Request.Cookies.[".ASPXANONYMOUS"].Value
      let anonymousId = decode encodedAnonymousId
      let connectionId = this.Context.ConnectionId
      StorageAgent.Post (GetRecommendation(anonymousId, connectionId))
      "Recommendation initiated"

Then anonymous id of the user session is actually persisted in the request cookies by Asp.Net in an encoded format. In the GetRecommendation method we will be retrieving this encoded anonymous id from the cookie and decode it. Then we need to get the SignalR connection id which available in the base class Hub. After getting both the anonymous id and the connection id, send a GetRecommendation message to the StorageAgent with these information. Finally send a response to the SignalR client as “Recommendation initiated”.

The decode function is not added yet so let’s add them. Thanks to this stackoverflow answer we just need to convert the code from C# to F#

1
2
3
4
5
6
7
8
9
let private decode encodedAnonymousId =
    let decodeMethod =
      typeof<AnonymousIdentificationModule>
        .GetMethod("GetDecodedValue", BindingFlags.Static ||| BindingFlags.NonPublic)
    let anonymousIdData = decodeMethod.Invoke(null, [| encodedAnonymousId |]);
    let field =
      anonymousIdData.GetType()
        .GetField("AnonymousId", BindingFlags.Instance ||| BindingFlags.NonPublic);
    field.GetValue(anonymousIdData) :?> string

We have used a special F# operator here :?> which is a dynamic down cast operator which casts a base class to a sub-class of it. You can read this msdn documentation to know more about F# casting and conversions.

The GetRecommendation message is not added yet, so let’s add them too. Modify StorageAgentMessage created before as below

1
2
3
  type StorageAgentMessage =
    | SavePhoneVisit of string * string
    | GetRecommendation of string * string

The final step of this pipeline is to Update the StorageAgent to handle this GetRecommendation message. Modify the storageAgentFunc in the UserNavigationHistory as below

1
2
3
4
5
6
7
8
9
10
11
 let private storageAgentFunc (agent : Agent<StorageAgentMessage>) =
    let rec loop (dict : Dictionary<string, list<string>>) = async {
      let! storageAgentMessage = agent.Receive()
      match storageAgentMessage with
      | SavePhoneVisit (anonymousId, phoneIdBeingVisited) ->
          // .. existing code ignored for brevity ..
      | GetRecommendation (anonymousId, connectionId) ->
          if dict.ContainsKey(anonymousId) then
            let phoneIdsVisited = dict.[anonymousId]
            RecommendationAgent.Post (connectionId,phoneIdsVisited)
      return! loop dict

Handling of the GetRecommendation message is very straight forward. Just get the phone ids being visited by the given anonymous id from the in memory dictionary and send a message consists of connection id and this phone ids visited to the RecommendationAgent which we will be creating next.

Create a source file with the name Recommendations in the Domain project and add the RecommendationAgent below

1
2
3
4
5
6
7
8
9
10
11
12
[<AutoOpen>]
module Recommendation =
  let private recommendationAgentFunc (inbox : Agent<string*List<string>>) =
    let rec messageLoop () = async {
      let! connectionId, visitedPhoneIds = inbox.Receive()
      if (Seq.length visitedPhoneIds) >= 2 then
        suggestRecommendation connectionId (visitedPhoneIds |> Seq.take 2 |> Seq.toList)
      return! messageLoop()
    }
    messageLoop ()

  let RecommendationAgent = Agent.Start recommendationAgentFunc

In order to keep this blog post simple I’ve used my own ‘Hello World’ kind of algorithm which picks the latest two phone ids being visited and suggests recommendation based on it. In a real world you would be replacing this toddler algorithm with more sophisticated algorithms like Association Rule Learning. I am planning to implement this algorithm at later stages of this blog series.

Then the next step is to implement the suggestRecommendation function which picks a hardcoded recommendation and publish the result using Rx. To do this add the Rx Nuget Package in the Domain project and add the suggestRecommendation function in the Recommendation module

1
2
3
4
5
6
7
let RecommendationPipe = new Subject<string*string option>()

  let private suggestRecommendation connectionId visitedPhoneIds =
    match visitedPhoneIds with
    | ["motorola-xoom-with-wi-fi"; "motorola-xoom"] -> RecommendationPipe.OnNext (connectionId, Some "motorola-atrix-4g")
    | ["dell-streak-7"; "dell-venue"] -> RecommendationPipe.OnNext (connectionId, Some "nexus-s")
    | _ -> RecommendationPipe.OnNext (connectionId, None)

The implementation is a very straight forward pattern matching. If last visited two items are (“motorola-xoom-with-wi-fi”, “motorola-xoom”), send the recommendation as “motorola-atrix-4g”, else if they are (“dell-streak-7”,”dell-venue”) then recommend “nexus-s”. If none of the condition matches then send none. Thanks to the option type which expresses this result in type safe manner.

With all these infrastructure in place, all we need to do is to just subscribe to this RecommendationPipe and send the suggestion to the user via SignalR

Let’s add a observer for this pipe in the Web project. Open Hubs module in the Web project and add the following code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let private getUrl (phoneId : string) httpContext =
    let routeValueDictionary = new RouteValueDictionary()
    routeValueDictionary.Add("controller", "Phone")
    routeValueDictionary.Add("action", "Show")
    routeValueDictionary.Add("id", phoneId)
    let requestContext = new RequestContext(new HttpContextWrapper(httpContext), new RouteData());
    let virtualPathData = RouteTable.Routes.GetVirtualPath(requestContext, routeValueDictionary);
    virtualPathData.VirtualPath

let notifyRecommendation httpContext phones (connectionId, recommendedPhoneId) =
    let phones' = phones |> Seq.map TypeProviders.ToPhone
    match recommendedPhoneId with
    | Some phoneId ->
      let recommendedPhone = Phones.getPhoneById phones' phoneId
      let phoneUrl = getUrl phoneId httpContext
      let hubContext = GlobalHost.ConnectionManager.GetHubContext<RecommendationHub>()
      hubContext.Clients.Client(connectionId)?showRecommendation(recommendedPhone, phoneUrl)
    | None -> ()

The notifyRecommendation function checks whether the incoming recommendedPhoneId has value or not. If it has value, it just picks the Phone record corresponding to the given phoneId and get the url for the recommended phone. With all these data in place, we just need to send the response to the using via SignalR.

You would have a noticed a weird ? symbol which is actually part of the ImpromptuInterface.FSharp. This library adds provisions to use C# dynamic types in F#

They are two missing pieces. One is Phones.getPhoneById which we are not having. Let’s add them. Open Phones module in Domain project and add it as mentioned below

1
2
let getPhoneById (phones : seq<Phone>) phoneId =
    phones |> Seq.find (fun p -> p.Id = phoneId)

The final step is wiring the RecommendationPipe with the notifyRecommendation. Open Global.asax.fs and update it as below

1
2
3
4
5
6
7
8
type Global() =
  // .. existing code ignore for brevity ..
 member x.Application_Start() =
    let phones = GitHubRepository.getPhones()
    // .. existing code ignore for brevity ..
    let notificationObserver = Hubs.notifyRecommendation HttpContext.Current phones
    Recommendation.RecommendationPipe.Subscribe notificationObserver |> ignore
    ()

Partial application of function is a very handy thing which actually replaces its counterpart Dependency Injection in the OOP. We just provided the first two arguments of notifyRecommendation and created a new function with the signature string * string option -> unit which is the expected observer signature for the RecommendationPipe.

The Front End UI

The front end for this is a typical SignalR-Javascript client code which you can find it the github repository. I’ve intentionally leaving the front-end part of this application as it would be extends the scope of the blog post. Moreover if you go through the source code in the github repository you can easily understand

Summary

F# is just so awesome with so much expressive functional programming features. Rx, Agents and SignalR add more powers to it and enable you to create a scalable functional reactive architecture. I’d like give credits two incredible resources on this subject Mark Seemann’s Pluralsight course on Functional Architecture with F# and Kevin Ashton’s excellent blog post on Full Stack F# which helped me a lot in coming out with this blog post.

Last but not the least, Thanks to Sergey Tihon for the words of encouragement to write the blog post on this topic.

You can find the source code of this step in the github repository.

Comments