MBrace, CloudFlows and FSharp.Data – data analysis made easy


In case you’ve not seen it before, MBrace is a simple programming model for scalable cloud data scripting and programming with .NET. It’s written in F#, but has growing support for C# and VB .NET. Over the past year or so, I worked closely with the MBrace team to help get it working smoothly on Microsoft Azure, using features such as Service Bus and Storage to provide an excellent development and deployment experience. As MBrace gears up for a v1 release, the design of the API is looking extremely positive.

I’m going to demonstrate here a simple example that illustrates how easy it is to start working with a large CSV file available on the internet in an MBrace cluster, parsing and querying data easily – we’re going to analyse UK house prices over the past year (this file is freely available on the gov.uk website).

I’m going to assume that you have an MBrace cluster up and running – if you don’t, you can either use a local development cluster or download the latest source code and deploy a full cluster onto Azure using the example MBrace Worker Role supplied in the MBrace Azure source code.

Type Providers on MBrace

We’ll start by generating a schema for our data using FSharp.Data and its CSV Type Provider. Usually the type provider can infer all data types and columns but in this case the file does not include headers, so we’ll supply them ourselves. I’m also using a local version of the CSV file which contains a subset of the data (the live dataset even for a single month is > 10MB): –


type HousePrices = CsvProvider< @"SampleHousePrices.csv", HasHeaders = false, Schema = "TransactionId, Price, DateOfTransfer, Postcode, PropertyType, NewBuild, Duration, PAON, SAON, Street, Locality, TownCity, District, County, Status">

In that single line, we now have a strongly-typed way to parse CSV data. Now, let’s move onto the MBrace side of things. I want to start with something simple – let’s get the average sale price of a property, by month, and chart it.


let prices : (int * float) array =
[ "http://publicdata.landregistry.gov.uk/market-trend-data/price-paid-data/a/pp-2015.csv&quot; ]
|> CloudFlow.OfHttpFileByLine // Stream the HTTP file across the cluster
|> CloudFlow.map (HousePrices.ParseRows >> Seq.head) // Convert from raw text to our CSV Provided type
|> CloudFlow.groupBy(fun row -> row.DateOfTransfer.Month) // Group by month
|> CloudFlow.map(fun (month, rows) -> month, rows |> Seq.averageBy (fun row -> float row.Price)) // Get the average price for each month
|> CloudFlow.toArray
|> cluster.Run

A CloudFlow is an MBrace primitive which allows a distributed set of transformations to be chained together, just like you would with the Seq module in F# (or LINQ’s IEnumerable operators for the rest of the .NET world), except in MBrace, a CloudFlow pipeline is partitioned across the cluster, making full use of resources available in the cluster; only when the pipelines are completed in each partition are they aggregated together again.

Also notice that we’re using type providers in tandem with the distributed computation. Once we call the ParseRows function, in the next call in the pipeline, we’re working with a strongly-typed object model – so DateOfTransfer is a proper DateTime etc. All dependent assemblies have automatically been shipped with MBrace; it wasn’t explicitly designed to work with FSharp.Data – it just works. So now that we have an array of int * float i.e. month * price, we can easily map it on a chart: –


prices
|> Seq.sortBy fst // sort by month
|> Seq.map(fun (month, price) -> sprintf "%s 2015" (System.DateTime(2015, month, 1).ToString("MMM")), price)
|> Chart.Line
|> Chart.WithOptions(Options(curveType = "function"))

MBrace1Easy.

Persisted Cloud Flows

Even better, MBrace supports something called Persisted Cloud Flows (known in the Spark world as RDDs). These are flows whose results are partitioned and cached across the cluster, ready to be re-used again and again. This is particularly useful if you have an intermediary result set that you wish to query multiple times. In our case, we might persist the first few lines of the computation (which involves downloading the data from source and parsing with the CSV Type Provider), ready to be used for any number of strongly-typed queries we might have: –


// 46 seconds – download data, convert to provided type and partition across nodes in-memory only
let persistedHousePrices =
[ "http://publicdata.landregistry.gov.uk/market-trend-data/price-paid-data/a/pp-2015.csv&quot; ]
|> CloudFlow.OfHttpFileByLine
|> CloudFlow.map (HousePrices.ParseRows >> Seq.head)
|> CloudFlow.persist StorageLevel.Memory
|> cluster.Run
// 5 seconds – get average house price by month
let pricesByMonth =
persistedHousePrices
|> CloudFlow.groupBy(fun row -> row.DateOfTransfer.Month)
|> CloudFlow.map(fun (month, rows) -> month, rows |> Seq.averageBy (fun row -> float row.Price))
|> CloudFlow.toArray
|> cluster.Run
// 1 second – get property types in London
let londonProperties =
persistedHousePrices
|> CloudFlow.filter(fun row -> row.TownCity = "LONDON")
|> CloudFlow.countBy(fun row -> row.PropertyType)
|> CloudFlow.toArray
|> cluster.Run
(*
val londonProperties : (string * int64) [] =
[|("T", 8622L); ("D", 582L); ("S", 2327L); ("F", 22288L)|]
Terraced Detached Semi Flat
*)
// 5 seconds – get % new builds by county
let newBuildsByCounty =
persistedHousePrices
|> CloudFlow.groupBy(fun row -> row.County)
|> CloudFlow.map(fun (county, rows) ->
let rows = rows |> Seq.toList
let newBuilds = rows |> List.filter(fun r -> r.NewBuild = "Y") |> List.length
let percentageNewBuilds = (100. / float rows.Length) * float newBuilds
county, percentageNewBuilds)
|> CloudFlow.toArray
|> cluster.Run
|> Array.sortByDescending snd
(*
val newBuildsByCounty : (string * float) [] =
[|("RUTLAND", 19.79434447); ("MIDDLESBROUGH", 17.20430108);
("NEWPORT", 16.91896705); ("HARTLEPOOL", 16.52892562);
("BEDFORD", 16.09907121); ("CENTRAL BEDFORDSHIRE", 15.94540613);
("LEICESTERSHIRE", 15.74045328); ("WREKIN", 14.43452381);
("BRIDGEND", 14.26294821); ("SLOUGH", 14.09135083);
("FLINTSHIRE", 14.08450704); ("MILTON KEYNES", 12.75510204);
("DARLINGTON", 12.61930011); ("CITY OF PETERBOROUGH", 12.61872456);
("WARRINGTON", 11.68305379); ("WINDSOR AND MAIDENHEAD", 10.7751938);
("CITY OF KINGSTON UPON HULL", 10.71225071);
etc. etc. *)

So notice that the first query takes 45 seconds to execute, which involves downloading the data and parsing it via the CSV type provider. Once we’ve done that, we persist it across the cluster in memory – then we can re-use that persisted flow in all subsequent queries, each of which just takes a few seconds to run.

Conclusion

MBrace is on the cusp of a 1.0 release – it’s ready for you to start using now, and offers not only a powerful and flexible set of abstractions for distributed computations, but as you can see from above, if you’ve used the collection libraries in F# before it’s a very smooth transition to make the leap to distributed collection queries. In less than ten lines of code, you can start writing distributed queries against live datasets with the minimum of effort.

One thought on “MBrace, CloudFlows and FSharp.Data – data analysis made easy

Leave a comment