Click here to Skip to main content
13,706,905 members
Click here to Skip to main content
Add your own
alternative version

Stats

3K views
1 bookmarked
Posted 15 May 2018
Licenced CPOL

Azure : Examining Databricks Apache Spark platform

, 15 May 2018
Rate this:
Please Sign up or sign in to vote.
Examiniation of Apache Spark Databricks platform on Azure

Introduction

This article I will be talking about Apache Spark / Databricks. It will guide you through how to use the new cloud based hosted Databricks platform. This article will talk about it from a Microsoft Azure standpoint but it should be exactly the same ideas if you were to use Amazon cloud.

 

What Is Spark / Databricks?

A picture says a thousand word, they say, so here is a nice picture of what Apache Spark is.

CLICK FOR BIGGER IMAGE

 

At its heart Apache Spark  is a tool for Data Scientists to allow them to explore and crunch vaste amounts of data with ease. It features things like

  • Distributed Datasets (so that number crunching can happen across a compute cluster)
  • A DataFrame API, such that you can do things like add columns, aggregate column values, and alias data, join DataFrames. This also supports a SQL syntax, and also rund distrobuted across a cluster
  • A streaming API
  • A Machine Learning API
  • A Graph API

 

Spark also comes with various adaptors to allow it connect to various data sources such as

  • Blobs
  • Databases
  • Filesystems (HDFS / s3 / Azure storage / azure datalake / Databricks file system)

 

This is not the first time I have written about Apache Spark, here are some older articles on it should you be interested

 

 

So when I wrote those articles, there was limited options about how you could run you Apache Spark jobs on a cluster, you could basically do one of the following:

  • Create a Java/Scala/Python app that used the Apache Spark APIs and would run against a cluster
  • Create a JAR that you could get an existing cluster to run using a spark-submit command line

 

The problem with this was that neither were ideal, with the app approach you didnt really want your analytics job to be an app, you really just wanted it to be a class library of some sort

 

Spark-Submit would let you submit a class library, however the feedback that you got from it was not great

 

There was another product that came out to address this call Apache Livy that was a REST API over Apache Spark . But it too had its issues, in that it was not that great to setup, and the API was fairly limited. To address this the good folk that own/maintain Apache Spark came out with Databricks.

 

Databricks is essentially a fully managed Apache Spark  in the Cloud (Amazon / Azure). It also has the concept of REST APIs for common things. Lets dig into that next.

 

Why Is Databricks So Cool?

Just before we get into Databricks, why is it that I think it's so cool?

Well I have stated one point above already, but lets see the full list

  • Great (really good) REST API
  • Nice managemanent dashboard in the cloud
  • The ability to create a on-demand cluster for your own job run that is torn down at the end of the job is MASSIVE. The reason this one point alone should make you think about examining Databricks is as follows
    • By using your own cluster you are not sharing any resources with some one else, so you can guarentee your performance based on the nodes you choose for your cluster
    • The cluster gets torn down after your job has run. This saves you money
  • If you chose to use a single cluster rather than a new cluster per job, you can have the single static cluster set to AutoScale. This is a pretty neat feature. Just imagine trying to do that in-premise. Obviously sinceApache Spark also supports running on Kubernetes that does ease the process a bit. However to my mind to make the best use of Kubernetes you also want to run that on a Cloud such as Amazon / Azure / Google, as scaling the VMs needed for a cluster is just so MUCH easier if you are in a Cloud. Just as an aside if you don't know your Kubernetes from an onion I wrote a mini series on it which you can find here : https://sachabarbs.wordpress.com/kubernetes-series/
  • It's actually fairly cheap I think for what it gives you
  • It's highly intuitive

 

Where Is The Code?

So there is a small bit of code that goes with this article, which is split into 2 things

  1. A throw-away WPF app that simply acts as a vehicle to demonstrate the REST calls, in all seriousness you could use postman for the Databricks REST exploration. The WPF app just makes this easier, as you don't have to worry about remembering to set an access token, once you have set it up once, and trying to find the right REST API syntax. The UI just shows you a working set of REST calls for Databricks
  2. A simple IntelliJ IDEA Scala/SBT project, that represents a Apache Spark job that we wish to upload and run on Databricks. This will compiled using SBT, as such SBT is a must have if you want to run this code

The code repo can be found here : https://github.com/sachabarber/databrick-azure-spark-demo

 

Prerequisites

There are a few of these, not because Databricks needs them as such, but because I was keen to show you an entire workflow of how you might use Databricks for real to do your own jobs, which means we should create a new JAR file to act as a job to send to Databricks.

As such the following is required

  • A Databricks installation (either Amazon/Azure hosted)
  • Visual Studio 2017 (Community editition is fine here)
  • Java 1.8 SDK installed and in your path
  • IntelliJ IDEA Community Edition 2017.1.3 (or later)
  • SBT Intellij IDEA plugin installed

Obviously if you just want to read along and not try this yourself, you won't need any of this

 

Getting Started With Databricks In Azure

As I said already I will be using Microsoft Azure, but after the initial creation of Databricks in the Cloud (which will be cloud vendor specific) the rest of the instructions should hold for Amazon or Azure.

 Anyway the first step for working with Databricks in the cloud is to create a new Databricks instance. Which for Azure simple means creating a new resource as follows:

CLICK FOR BIGGER IMAGE

 

Once you have creating the Databricks instance, you should be able to launch the workspace from the overview of the Databricks instance

 

CLICK FOR BIGGER IMAGE

 

This should launch you into a new Databricks workspace website that is coupled to your Azure/Amazon subscription, so you should initially see something like this after you have passed the logging in phase (which happens automatically, well on Azure it does anyway)

 

CLICK FOR BIGGER IMAGE

 

From here you can see/do the following things, some of which we will explore in more detail below

  • Create a new Job/Cluster/Notebook
  • Explore DBFS (Databricks file system) data
  • Look at previous job runs
  • View / start / stop clusters

 

 

Exploring The Workspace

In this section we are going to explore what you can do in the Databricks workspace web site that is tied to your Databricks cloud installation . We will not be leaving the workspace web site, everything below will be done directly in the workspace web site, which to my mind is pretty cool, but where this stuff really shines is when we get to do all of this stuff programatically, rather than just clicking buttons on a web site.

After all we would like to build this stuff into our own processing pipelines. Luckily there is a pretty good one-one translation from what you can do using the web site compared to what is exposed via the rest API. But I am jumping ahead we will get there in the section after this, so for now lets just examine what we can do in the  Databricks workspace web site that is tied to your Databricks cloud installation.

 

Create A Cluster

So the very first thing you might like to do is create a cluster to run some code on. This is easily done using the Databricks workspace web site as follows

 

This will bring you to a screen like this, where you can configure your cluster, where you pick the variouos bits and peices for your cluster

CLICK FOR BIGGER IMAGE

 

Once you have created a cluster it will end up being listed on the clusters overview page

CLICK FOR BIGGER IMAGE

  • Interactive clusters are ones that are tied to Notebooks which we look at next
  • Job clusters are ones that are used to run Jobs

 

Explore A NoteBook

Now I am a long time Dev (I'm old, or feel it or something), so think nothing about opening up an IDE and writing an app/class lib/jar whatever. Bust at its heart Apache Spark is a tool for data scientists, who simply want to try some simple bits of number crunching code. That is exactly where notbooks come into play.

A notebook is a cellular editor that is hosted, that allows the user to run python/R/Scala code against a Apache Spark cluster.

You can create a new notebook from the home menu as shown below

CLICK FOR BIGGER IMAGE

So after you have picked your language you will be presented with a blank notebook where you can write some code into the cells. Teaching you the short cuts and how to use notebooks properly is outside the scope of this article, but here is some points on notebooks:

  • They allow you to quickly explore the APIs
  • They allow you to re-assign variables which are remembered
  • They allow you to enter just a specific cell
  • They give you some pre-defined variables. But be wary you will need to swap these out if you translate this to real code. For example
    spark
    is a pre-defined variable

 

Run Some NoteBook Code

So lets say I have just created a Scala notebook, and I typed the text as shown below in a cell. I can quickly run this using ALT  + Enter or the run button in the notebook UI

 

CLICK FOR BIGGER IMAGE

 

What this will do is run the active cell and print out the variables/output statements to the notebook UI, which can also be seen above

 

Read/Write From Azure Blob Store

One of the very common tasks when using Apache Spark is to grab some data from some external source and save it to storage once transformed into the required results

 

Here is an example of working with an existing Azure Blob Storage Account and some Scala code. This particular example simply loads a CSV file from Azure Blob Storage transforms the file and then saves it back to Azure Blob Storage as a date stamped named CSV file

 

import java.util.Calendar
import java.text.SimpleDateFormat

spark.conf.set("fs.azure.account.key.YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net", "YOUR_STORAGE_KEY_HERE")

spark.sparkContext.hadoopConfiguration.set(
  "fs.azure.account.key.YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net",
  "YOUR_STORAGE_KEY_HERE"
)


val now = Calendar.getInstance().getTime()
val minuteFormat = new SimpleDateFormat("mm")
val hourFormat = new SimpleDateFormat("HH")
val secondFormat = new SimpleDateFormat("ss")

val currentHour = hourFormat.format(now)      // 12
val currentMinute = minuteFormat.format(now)  // 29
val currentSecond = secondFormat.format(now)           // PM
val fileId  = currentHour  + currentMinute + currentSecond
fileId

val data = Array(1, 2, 3, 4, 5)
val dataRdd = sc.parallelize(data)
val ages_df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/Ages.csv")
ages_df.head

//https://github.com/databricks/spark-csv 
val selectedData = ages_df.select("age")
selectedData.write
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .save("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/" + fileId + "_SavedAges.csv")


val saved_ages_df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/" + fileId + "_SavedAges.csv")

saved_ages_df.show()

 

Which for me looked like this for my storage account in Azure

CLICK FOR BIGGER IMAGE

If you are wondering why there are folders for the results such as 080629_SavedAges.csv, this is due to how Apache Spark deals with partitions. Trust me it doesnt matter when you load things back into memory as Apache Spark just deals with this, as can be seen

 

CLICK FOR BIGGER IMAGE

 

Creating Jobs In DataBricks Web UI

The Databricks web UI allows you to create a new job from either a Notebook or a JAR that you have that you can drag in and set a main entry point for. You may also setup a schedule and a cluster that you want to use. Once you are happy you can click the "run now" button which will run your job.

CLICK FOR BIGGER IMAGE

 

Launch The Spark UI/Dashboard

Once you have run a job it is likely that you want to have a look at it to see that it worked how you expected it to work, and that it is running optimally. Luckily Apache Spark comes equipt with a nice visualiser for a given analysis run that you can use for this. Its kind of like the SQL Query Profiler in SQL Server.

This is accessible from the jobs page

CLICK FOR BIGGER IMAGE

So lets take a look at a successful job, which we can view using the "Succeeded" link in the Last Run column in the Jobs page.

From there we can view the Apache Spark UI or the logs for the job.

Lets see the Apache Spark UI for this job.

CLICK FOR BIGGER IMAGE

The Spark UI is your friend, try and get acquianted with it

 

Databricks API

Ok so now that we have covered how to use the Databricks web UI, how about we get familiar with the REST API such that we can craft our own code around using Apache Spark as our analytics engine. This next section will show how to use some of the REST APIs available.

 

What APIs Are Available?

So you may now be wondering what APIs are actually available? This is the place to check : https://docs.databricks.com/api/latest/index.html

From there the main top level APIs are

  • Clusters
  • DBFS
  • Groups
  • Instance Profiles
  • Jobs
  • Libraries
  • Secrets
  • Token
  • Workspace

There are simply not enough hours in the day for me to show you ALL of them. So I have chosen a few to demo, which we will talk about below

 

Creating A Token For Usage With API

The Databricks REST APIs ALL need to have a JWT token associated with them. Which means you need to firstly create a token for this. This is easily achieved in the Databricks web UI. Just follow these steps

CLICK FOR BIGGER IMAGE

So once you have done that, grab the token value, and you will also need to take a note of one other bit of information which is shown highlighted in the image below. With these 2 bits of information we can use Postman to try a request

CLICK FOR BIGGER IMAGE

 

Common Stuff You Need To Do With Any API Call

So as I just said you will need to ensure that the token from the previous step is supplied on every call. But just how do we do that? What does it look like? Lets use Postman to show an example using the last 2 bits of information from the previous pararaph

Creating A Base64 Encoded Token Value

The token you got from above needs to be turned into a Base64 encoded string. There are many online tools for this, just pick one. The important thing to note is that you must ALSO include a prefix of "token:". So the full string to encode is something like "token:dapi56b...........d5"

 

This will give you a base64 encoded string. From there we need to head into Postman to try out the request, which may look somethinng like this

CLICK FOR BIGGER IMAGE

The important things to note here are:

  • We create a header :
    Key = Authorization, Value = Basic 
    	YOUR_BASE64_ENCODED_STRING_FROM_PREVIOUS_STEP
  • We use the information from the Azure/AWS portal to use as part of the Uri. So for this is a valid erquest for my Databricks Azure installation
    https://northeurope.azuredatabricks.net/api/2.0/clusters/spark-versions
    

 

A Simple Throwaway Demo App

Obviously you could just mess around in Postman to learn how the Databricks REST APIs work, nothing wrong with that at all. But to make life easier for you I have come up with a simple (throw away) demo app that you can use to explore what I think are the 2 most important APIs

This is what it looks like running

And here is what it looks like when you have chosen to run one of the pre-canned REST API calls that the demo app provides

 

 

How Do I set My Token For The Demo App?

So above when I started talking about the Databricks REST APIs we said we needed to supply an API token. So how does the demo app deal with this.

Well there are 2 parts to how it does that, this entry in the App.Config should point to your own file that contains the token information

So for me this is my file

C:\Users\sacha\Desktop\databrick-azure-spark-demo\MyDataBricksToken.txt

Where the file simply contains a single line of contents "token:dapi56b...........d5" which is the base64 encoded string proceeded by "token:" which we talked about above.

This is then read into a globally available property in the demo app as follows:

using System.Configuration;
using System.IO;
using System.Windows;
using SAS.Spark.Runner.REST.DataBricks;

namespace SAS.Spark.Runner
{
    public partial class App : Application
    {
        protected override void OnStartup(StartupEventArgs e)
        {
            base.OnStartup(e);

            var tokenFileName = ConfigurationManager.AppSettings["TokenFileLocation"];
            if (!File.Exists(tokenFileName))
            {
                MessageBox.Show("Expecting token file to be provided");
            }

            AccessToken = File.ReadAllText(tokenFileName);

            if(!AccessToken.StartsWith("token:"))
            {
                MessageBox.Show("Token file should start with 'token:' + 
				  "following directly by YOUR DataBricks initial token you created");
            }
        }

        public static string AccessToken { get; private set; }
    }
}

 

And that is all there is to it. The demo app should take care of the rest of it for you.

 

As I say I did not have time to explore every single API, but I had time to look at 2 of the most common ones, Clusters and Jobs. Which I will talk about below.

But before I get into that, I just wanted to show you the rough idea behind each of the API explorations

 

Example ViewModel

Most of the API explorations are done using a viewmodel something like this

using SAS.Spark.Runner.REST.DataBricks;
using System;
using System.Threading.Tasks;
using System.Windows.Input;
using SAS.Spark.Runner.Services;

namespace SAS.Spark.Runner.ViewModels.Clusters
{
    public class ClusterGetViewModel : INPCBase
    {
        private IMessageBoxService _messageBoxService;
        private IDatabricksWebApiClient _databricksWebApiClient;
        private string _clustersJson;
        private string _clusterId;

        public ClusterGetViewModel(
            IMessageBoxService messageBoxService,
            IDatabricksWebApiClient databricksWebApiClient)
        {
            _messageBoxService = messageBoxService;
            _databricksWebApiClient = databricksWebApiClient;
            FetchClusterCommand = 
				new SimpleAsyncCommand<object, object>(ExecuteFetchClusterCommandAsync);
        }

        private async Task<object> ExecuteFetchClusterCommandAsync(object param)
        {
            if(string.IsNullOrEmpty(_clusterId))
            {
                _messageBoxService.ShowError("You must supply 'ClusterId'");
                return System.Threading.Tasks.Task.FromResult<object>(null);
            }

            try
            {
                var cluster = await _databricksWebApiClient.ClustersGetAsync(_clusterId);
                ClustersJson = cluster.ToString();
            }
            catch(Exception ex)
            {
                _messageBoxService.ShowError(ex.Message);
            }
            return System.Threading.Tasks.Task.FromResult<object>(null);
        }


        public string ClustersJson
        {
            get
            {
                return this._clustersJson;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._clustersJson, 
					value, () => ClustersJson);
            }
        }

        public string ClusterId
        {
            get
            {
                return this._clusterId;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._clusterId, 
					value, () => ClusterId);
            }
        }

        public ICommand FetchClusterCommand { get; private set; }
    }
}

The idea being that we use simple REST Service and we have a property representing the JSON response. The REST service implements this interface

using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public interface IDatabricksWebApiClient 
    {

        //https://docs.databricks.com/api/latest/jobs.html#create
        Task<CreateJobResponse> JobsCreateAsync(string jsonJobRequest);

        //https://docs.databricks.com/api/latest/jobs.html#jobsrunnow
        Task<DatabricksRunNowResponse> JobsRunNowAsync(DatabricksRunNowRequest runRequest);

        //https://docs.databricks.com/api/latest/jobs.html#runs-get
        Task<DatabricksRunResponse> JobsRunsGetAsync(int runId);

        //https://docs.databricks.com/api/latest/jobs.html#list
        Task<JObject> JobsListAsync();

        //https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit
        Task<DatabricksRunNowResponse> JobsRunsSubmitJarTaskAsync(RunsSubmitJarTaskRequest runsSubmitJarTaskRequest);

        //https://docs.azuredatabricks.net/api/latest/clusters.html#start
        Task<DatabricksClusterStartResponse> ClustersStartAsync(string clusterId);

        //https://docs.azuredatabricks.net/api/latest/clusters.html#get
        Task<JObject> ClustersGetAsync(string clusterId);

        //https://docs.databricks.com/api/latest/clusters.html#list
        Task<ClusterListResponse> ClustersListAsync();

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#list
        Task<DbfsListResponse> DbfsListAsync();

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#put
        Task<JObject> DbfsPutAsync(FileInfo file);

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsservicecreate
        Task<DatabricksDbfsCreateResponse> DbfsCreateAsync(DatabricksDbfsCreateRequest dbfsRequest);

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsserviceaddblock
        Task<JObject> DbfsAddBlockAsync(DatabricksDbfsAddBlockRequest dbfsRequest);

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#close
        Task<JObject> DbfsCloseAsync(DatabricksDbfsCloseRequest dbfsRequest);
    }
}

 

DataTemplate For UI

The actual UI is simple done using a DataTemplate, where we have bound the ViewModel in question to a ContentControl. For the JSON representation I am just using the AvalonEdit TextBox.

Here is an example for the ViewModel above:

<Controls:MetroWindow x:Class="SAS.Spark.Runner.MainWindow"

        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"

        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"

        xmlns:d="http://schemas.microsoft.com/expression/blend/2008"

        xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"

        xmlns:vms="clr-namespace:SAS.Spark.Runner.ViewModels"

        xmlns:vmsClusters="clr-namespace:SAS.Spark.Runner.ViewModels.Clusters"

        xmlns:vmsJobs="clr-namespace:SAS.Spark.Runner.ViewModels.Jobs"

        xmlns:avalonEdit="http://icsharpcode.net/sharpdevelop/avalonedit"

        xmlns:Controls="clr-namespace:MahApps.Metro.Controls;assembly=MahApps.Metro"

        xmlns:local="clr-namespace:SAS.Spark.Runner"       

        mc:Ignorable="d"

        WindowState="Maximized"

        Title="DataBricks API Runner">
    <Controls:MetroWindow.Resources>
		.....
		.....
		<DataTemplate DataType="{x:Type vmsClusters:ClusterGetViewModel}">
			<DockPanel LastChildFill="True">
				<StackPanel Orientation="Horizontal" DockPanel.Dock="Top">
					<Label Content="ClusterId" Margin="3" VerticalAlignment="Center"

						VerticalContentAlignment="Center" Height="24"/>
					<TextBox Text="{Binding ClusterId}" Width="200" VerticalAlignment="Center"

						VerticalContentAlig

						nment="Center" Height="24"/>
					<Button Content="Get Cluster" Margin="3,0,3,0" Width="100" 

					HorizontalAlignment="Left"

					VerticalAlignment="Center"

					VerticalContentAlignment="Center"

					Command="{Binding FetchClusterCommand}"/>
				</StackPanel>
				<avalonEdit:TextEditor

				FontFamily="Segoe UI"

				SyntaxHighlighting="JavaScript"

				FontSize="10pt"

				vms:TextEditorProps.JsonText="{Binding ClustersJson}"/>
			</DockPanel>
		</DataTemplate>
	<Controls:MetroWindow.Resources>
</Controls:MetroWindow>

 

As the ViewModels used in the demo app all mainly follow this pattern, I wont be showing you any more ViewModel code apart from one where we upload a JAR file as that one is a bit special.

Just have in the back of your mind that all roughly work this way, and you will be ok

 

Cluster API Exploration

This section shows the Cluster APIs that I chose to look at

 

Clusters List

Databricks docs are here : https://docs.databricks.com/api/latest/clusters.html#list, and this API call does the following:

  • Returns information about all pinned clusters, currently active clusters, up to 70 of the most recently terminated interactive clusters in the past 30 days, and up to 30 of the most recently terminated job clusters in the past 30 days. For example, if there is 1 pinned cluster, 4 active clusters, 45 terminated interactive clusters in the past 30 days, and 50 terminated job clusters in the past 30 days, then this API returns the 1 pinned cluster, 4 active clusters, all 45 terminated interactive clusters, and the 30 most recently terminated job clusters.

This is simply done via the following code

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/clusters.html#list
        public async Task<ClusterListResponse> ClustersListAsync()
        {
            var request = new RestRequest("api/2.0/clusters/list", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");

            var response = await _client.ExecuteTaskAsync<ClusterListResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<ClusterListResponse>(response.Content);
            return dbResponse;
        }

        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

Cluster Get

Databricks docs are here : https://docs.azuredatabricks.net/api/latest/clusters.html#get, and this API call does the following:

  • Retrieves the information for a cluster given its identifier. Clusters can be described while they are running, or up to 60 days after they are terminated

 

This is simply done via the following code

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.azuredatabricks.net/api/latest/clusters.html#get
        public async Task<JObject> ClustersGetAsync(string clusterId)
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/clusters/get", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddQueryParameter("cluster_id", clusterId);

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

Cluster Start

Databricks docs are here : https://docs.azuredatabricks.net/api/latest/clusters.html#start, and this API call does the following:

  • Starts a terminated Spark cluster given its ID. This is similar to createCluster, except:
    • The previous cluster id and attributes are preserved.
    • The cluster starts with the last specified cluster size. If the previous cluster was an autoscaling cluster, the current cluster starts with the minimum number of nodes.
    • If the cluster is not currently in a TERMINATED state, nothing will happen.
      Clusters launched to run a job cannot be started.

 

This is simply done via the following code

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.azuredatabricks.net/api/latest/clusters.html#start
        public async Task<DatabricksClusterStartResponse> ClustersStartAsync(string clusterId)
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/clusters/start", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(new { cluster_id = clusterId });

            var response = await _client.ExecuteTaskAsync<DatabricksClusterStartResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksClusterStartResponse>(response.Content);
            return dbResponse;
        }
        
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

Jobs API Exploration

This section shows the Jobs APIs that I chose to look at

 

Jobs List

Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#list, and this API call does the following:

  • Lists all jobs

This is simply done via the following code

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#list
        public async Task<JObject> JobsListAsync()
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/jobs/list", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

 

Jobs Create

Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#create, and this API call does the following:

  • Creates a new job with the provided settings

This is simply done via the following code

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#create
        public async Task<CreateJobResponse> JobsCreateAsync(string jsonJobRequest)
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/jobs/create", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddParameter("application/json", jsonJobRequest, ParameterType.RequestBody);
            var response = await _client.ExecuteTaskAsync<CreateJobResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<CreateJobResponse>(response.Content);
            return dbResponse;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

An example request for this one is worth a special call out, as its not a simple parameter, we need to pass in quite a lot of JSON form this request. Here is an example request for a job that runs at 10:15pm each night:

{
  "name": "Nightly model training",
  "new_cluster": {
    "spark_version": "4.0.x-scala2.11",
    "node_type_id": "r3.xlarge",
    "aws_attributes": {
      "availability": "ON_DEMAND"
    },
    "num_workers": 10
  },
  "libraries": [
    {
      "jar": "dbfs:/my-jar.jar"
    },
    {
      "maven": {
        "coordinates": "org.jsoup:jsoup:1.7.2"
      }
    }
  ],
  "email_notifications": {
    "on_start": [],
    "on_success": [],
    "on_failure": []
  },
  "timeout_seconds": 3600,
  "max_retries": 1,
  "schedule": {
    "quartz_cron_expression": "0 15 22 ? * *",
    "timezone_id": "America/Los_Angeles"
  },
  "spark_jar_task": {
    "main_class_name": "com.databricks.ComputeModels"
  }
}

 

Although the demo app doesn't use this one directly, I use a very similar one, which I will go through it quite some detail below.

 

Jobs Runs Get

Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#runs-get, and this API call does the following:

  • Retrieves the metadata of a run

This is simply done via the following code

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#runs-get
        public async Task<DatabricksRunResponse> JobsRunsGetAsync(int runId)
        {
            var request = new RestRequest("api/2.0/jobs/runs/get", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddQueryParameter("run_id", runId.ToString());
            var response = await _client.ExecuteTaskAsync<DatabricksRunResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksRunResponse>(response.Content);
            return dbResponse;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

Jobs Run Now

Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#jobsrunnow, and this API call does the following:

  • Runs the job now, and returns the run_id of the triggered run

This is simply done via the following code

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#jobsrunnow
        public async Task<DatabricksRunNowResponse> JobsRunNowAsync(DatabricksRunNowRequest runRequest)
        {
            var request = new RestRequest("api/2.0/jobs/run-now", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(runRequest);
            var response = await _client.ExecuteTaskAsync<DatabricksRunNowResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksRunNowResponse>(response.Content);
            return dbResponse;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

Where we use this sort of request

using Newtonsoft.Json;

namespace SAS.Spark.Runner.REST.DataBricks.Requests
{
    // If you want to pass args you can do so using extra properties
    // See : https://docs.databricks.com/api/latest/jobs.html#run-now
    // - jar_params : 
    //   A list of parameters for jobs with jar tasks, e.g. "jar_params": ["john doe", "35"]. 
    //   The parameters will be used to invoke the main function of the main class specified 
    //   in the spark jar task. If not specified upon run-now, it will default to an empty list.
    // - notebook_params : 
    //   A map from keys to values for jobs with notebook task, 
    //   e.g. "notebook_params": {"name": "john doe", "age":  "35"}. 
    //   The map is passed to the notebook and will be accessible through the 
    //   dbutils.widgets.get function
    // - python_params :
    //   A list of parameters for jobs with python tasks, e.g. "python_params": ["john doe", "35"]. 
    //   The parameters will be passed to python file as command line parameters. 
    // If specified upon run-now, it would overwrite the parameters specified in job setting. 
    public class DatabricksRunNowRequest
    {
        [JsonProperty(PropertyName = "job_id")]
        public int JobId { get; set; }
    }
}

 

Jobs Run Submit

Databricks docs are here : https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit, and this API call does the following:

  • Submit a one-time run with the provided settings. This endpoint doesn't require a Databricks job to be created. You can directly submit your workload. Runs submitted via this endpoint don't show up in the UI. Once the run is submitted, you can use the jobs/runs/get API to check the run state.

Now this is probably the most complex, but most useful of ALL of the REST APIs, a it allows you to do the following :

  • Run by using a JAR writen Scala (where you can pass in command line args too)
  • Run using a notebook
  • Run using a python file (where you can pass in command line args too)

As I am quite keen on Scala I will be using Scala for the demo

 

The Scala Project

The demo code has a 2nd project in the source code :

Src/SAS.SparkScalaJobApp
which is a IntelliJ IDEA Scala project. To run this you will need the prerequisites at the top of this article.

One you have downloaded the code you should run SBT in a command line window, and navigate to the  Src/SAS.SparkScalaJobApp folder. And issue these SBT command

 
> clean
>compile
>assembly

From there you should be able to go to the Target directory and see a FAT Jar (one with all dependencies bundled together)

We will use this in just a moment, but lets just take a minute to examine the code. It is a very simple Spark job that expects a single Int command line argument (that we will send via the REST call in a moment), and will then create List of that many items that have some simple Spark transformations applied.

 

NOTE :

 

One thing to note is that we need to be careful about how we use things like SparkContext and SparkSession which if you have done any spark before you will have created yourself. When using a Cloud provider such as AWS or Azure you need to use the existing

SparkContext
and SparkSession, and we also need to avoid terminating/shutting down this items, as they are in effect shared. This blog is a good read on this : https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html

 

 

import scala.util.Try
import scala.util.Success
import scala.util.Failure
import org.apache.spark.sql.SparkSession

object SparkApp extends App {

  println("===== Starting =====")

  if(args.length != 1) {
    println("Need exactly 1 int arg")
  }

  Try {
    Integer.parseInt(args(0))
  } match {
    case Success(v:Int) => {
      val combinedArgs = args.aggregate("")(_ + _, _ + _)
      println(s"Args were $combinedArgs")
      SparkDemoRunner.runUsingArg(v)
      println("===== Done =====")
    }
    case Failure(e) =>  {
      println(s"Could not parse command line arg [$args(0)] to Int")
      println("===== Done =====")
    }
  }
}

object SparkDemoRunner {
  def runUsingArg(howManyItems : Int) : Unit =  {
    val session = SparkSession.builder().getOrCreate()
    import session.sqlContext.implicits._

    val multiplier = 2
    println(s"multiplier is set to $multiplier")

    val multiplierBroadcast = session.sparkContext.broadcast(multiplier)
    val data = List.tabulate(howManyItems)(_ + 1)
    val dataRdd = session.sparkContext.parallelize(data)
    val mappedRdd = dataRdd.map(_ * multiplierBroadcast.value)
    val df = mappedRdd.toDF
    df.show()
  }
}

 

Anyway so once we have that Jar file available, we need to use a few APIs which I will go through 1 by 1, but here is the rough flow:

  • Examine if the chosen Jar file exists in the Dbfs (Databricks file system, which means we have uploaded it already)
  • Start the upload of the file (which we have to do in chunks as there is a 1MB limit on the single 2.0/dbfs/put API) to get a file handle
  • Upload blocks of data for the file hadle as Base64 encoded strings
  • Close the file using the file handle
  • Craft a runs-submit request to make use of the just uploaded/latest Dbfs file

 

So that's the rough outline of it

 

So here is the ViewModel that will allow you pick the Jar (which as stated above should be in the Target folder of the   Src/SAS.SparkScalaJobApp source code if you followed the instructions above to compile it using SBT.

using SAS.Spark.Runner.REST.DataBricks;
using System;
using System.IO;
using System.Threading.Tasks;
using System.Windows.Input;
using SAS.Spark.Runner.Services;
using System.Linq;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using System.Collections.Generic;
using Newtonsoft.Json;
using System.Diagnostics;

namespace SAS.Spark.Runner.ViewModels.Jobs
{
    public class JobsPickAndRunJarViewModel : INPCBase
    {
        private IMessageBoxService _messageBoxService;
        private IDatabricksWebApiClient _databricksWebApiClient;
        private IOpenFileService _openFileService;
        private IDataBricksFileUploadService _dataBricksFileUploadService;
        private string _jarFilePath;
        private string _status;
        private FileInfo _jarFile;
        private bool _isBusy;
        private bool _isPolling = false;
        private string _jobsJson;
        private Stopwatch _watch = new Stopwatch();

        public JobsPickAndRunJarViewModel(
            IMessageBoxService messageBoxService,
            IDatabricksWebApiClient databricksWebApiClient,
            IOpenFileService openFileService,
            IDataBricksFileUploadService dataBricksFileUploadService)
        {
            _messageBoxService = messageBoxService;
            _databricksWebApiClient = databricksWebApiClient;
            _openFileService = openFileService;
            _dataBricksFileUploadService = dataBricksFileUploadService;
            PickInputJarFileCommand = new SimpleAsyncCommand<object, object>(x => !IsBusy && !_isPolling, ExecutePickInputJarFileCommandAsync);
        }

        public string JobsJson
        {
            get
            {
                return this._jobsJson;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._jobsJson, value, () => JobsJson);
            }
        }

        public string JarFilePath
        {
            get
            {
                return this._jarFilePath;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._jarFilePath, value, () => JarFilePath);
            }
        }

        public string Status
        {
            get
            {
                return this._status;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._status, value, () => Status);
            }
        }

        public bool IsBusy
        {
            get
            {
                return this._isBusy;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._isBusy, value, () => IsBusy);
            }
        }

        public ICommand PickInputJarFileCommand { get; private set; }
 
        

        private async Task<object> ExecutePickInputJarFileCommandAsync(object param)
        {
            IsBusy = true;

            try
            {
                _openFileService.Filter = "Jar Files (*.jar)|*.jar";
                _openFileService.InitialDirectory = @"c:\";
                _openFileService.FileName = "";
                var dialogResult = _openFileService.ShowDialog(null);
                if(dialogResult.Value)
                {
                    if(!_openFileService.FileName.ToLower().EndsWith(".jar"))
                    {
                        _messageBoxService.ShowError($"{_openFileService.FileName} is not a JAR file");
                        return Task.FromResult<object>(null);
                    }
                    _jarFile = new FileInfo(_openFileService.FileName);
                    JarFilePath = _jarFile.Name;
                    var rawBytesLength = File.ReadAllBytes(_jarFile.FullName).Length;
                    await _dataBricksFileUploadService.UploadFileAsync(_jarFile, rawBytesLength,
                        (newStatus) => this.Status = newStatus);

                    bool uploadedOk = await IsDbfsFileUploadedAndAvailableAsync(_jarFile, rawBytesLength);
                    if (uploadedOk)
                    {
                        //2.0/jobs/runs/submit
                        //poll for success using jobs/runs/get, store that in the JSON

                        var runId = await SubmitJarJobAsync(_jarFile);
                        if(!runId.HasValue)
                        {
                            IsBusy = false;
                            _messageBoxService.ShowError(this.Status = $"Looks like there was a problem with calling Spark API '2.0/jobs/runs/submit'");
                        }
                        else
                        {
                            await PollForRunIdAsync(runId.Value);
                        }

                    }
                    else
                    {
                        IsBusy = false;
                        _messageBoxService.ShowError("Looks like the Jar file did not upload ok....Boo");
                    }
                }
            }
            catch (Exception ex)
            {
                _messageBoxService.ShowError(ex.Message);
            }
            finally
            {
                IsBusy = false;
            }
            return Task.FromResult<object>(null);
        }

        private async Task<bool> IsDbfsFileUploadedAndAvailableAsync(FileInfo dbfsFile, int rawBytesLength)
        {
            bool fileUploadOk = false;
            int maxNumberOfAttemptsAllowed = 10;
            int numberOfAttempts = 0;

            while (!fileUploadOk || (numberOfAttempts == maxNumberOfAttemptsAllowed))
            {
                //check for the file in Dbfs
                var response = await _databricksWebApiClient.DbfsListAsync();
                fileUploadOk = response.files.Any(x =>

                    x.file_size == rawBytesLength &&
                    x.is_dir == false &&
                    x.path == $@"/{dbfsFile.Name}"
                );
                numberOfAttempts++;
                this.Status = $"Checking that Jar has been uploaded ok.\r\nAttempt {numberOfAttempts} out of {maxNumberOfAttemptsAllowed}";
                await Task.Delay(500);
            }
            return fileUploadOk;
        }

        private async Task<int?> SubmitJarJobAsync(FileInfo dbfsFile)
        {
            this.Status = $"Creating the Spark job using '2.0/jobs/runs/submit'";

            // =====================================================================
            // EXAMPLE REQUEST
            // =====================================================================
            //{
            //  "run_name": "my spark task",
            //  "new_cluster": 
            //  {
            //    "spark_version": "3.4.x-scala2.11",
            //    "node_type_id": "Standard_D3_v2",
            //    "num_workers": 10
            //  },
            //  "libraries": [
            //    {
            //      "jar": "dbfs:/my-jar.jar"
            //    }
            //  ],
            //  "timeout_seconds": 3600,
            //  "spark_jar_task": {
            //    "main_class_name": "com.databricks.ComputeModels",
            //    "parameters" : ["10"]
            //  }
            //}

            var datePart = DateTime.Now.ToShortDateString().Replace("/", "");
            var timePart = DateTime.Now.ToShortTimeString().Replace(":", "");
            var request = new RunsSubmitJarTaskRequest()
            {
                run_name = $"JobsPickAndRunJarViewModel_{datePart}_{timePart}",
                new_cluster = new NewCluster
                {
                    // see api/2.0/clusters/spark-versions
                    spark_version = "4.0.x-scala2.11",
                    // see api/2.0/clusters/list-node-types
                    node_type_id = "Standard_F4s",
                    num_workers = 2
                },
                libraries = new List<Library>
                {
                    new Library { jar = $"dbfs:/{dbfsFile.Name}"}
                },
                timeout_seconds = 3600,
                spark_jar_task = new SparkJarTask
                {
                    main_class_name = "SparkApp",
                    parameters = new List<string>() { "10" }
                }
            };

            var response = await _databricksWebApiClient.JobsRunsSubmitJarTaskAsync(request);
            return response.RunId;
        }

        private async Task PollForRunIdAsync(int runId)
        {
            _watch.Reset();
            _watch.Start();
            while (_isPolling)
            {
                var response = await _databricksWebApiClient.JobsRunsGetAsync(runId);
                JobsJson = JsonConvert.SerializeObject(response, Formatting.Indented);
                var state = response.state;
                this.Status = "Job not complete polling for completion.\r\n" +
                    $"Job has been running for {_watch.Elapsed.Seconds} seconds";

                try
                {
                    if (!string.IsNullOrEmpty(state.result_state))
                    {
                        _isPolling = false;
                        IsBusy = false;
                        _messageBoxService.ShowInformation(
                            $"Job finnished with Status : {state.result_state}");
                    }
                    else
                    {
                        switch (state.life_cycle_state)
                        {
                            case "TERMINATING":
                            case "RUNNING":
                            case "PENDING":
                                break;
                            case "SKIPPED":
                            case "TERMINATED":
                            case "INTERNAL_ERROR":
                                _isPolling = false;
                                IsBusy = false;
                                break;
                        }
                    }
                }
                finally
                {
                    if (_isPolling)
                    {
                        await Task.Delay(5000);
                    }
                }
            }
        }
    }
}

Where we use this helper class to do the actual upload to Dbfs

using SAS.Spark.Runner.REST.DataBricks;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace SAS.Spark.Runner.Services
{
    public class DataBricksFileUploadService : IDataBricksFileUploadService
    {
        private IDatabricksWebApiClient _databricksWebApiClient;

        public DataBricksFileUploadService(IDatabricksWebApiClient databricksWebApiClient)
        {
            _databricksWebApiClient = databricksWebApiClient;
        }

        public async Task UploadFileAsync(FileInfo file, int rawBytesLength, 
            Action<string> statusCallback, string path = "")
        {
            var dbfsPath = $"/{file.Name}";

            //Step 1 : Create the file
            statusCallback("Creating DBFS file");
            var dbfsCreateResponse = await _databricksWebApiClient.DbfsCreateAsync(
                new DatabricksDbfsCreateRequest
                    {
                        overwrite = true,
                        path = dbfsPath
                    });

            //Step 2 : Add block in chunks
            FileStream fileStream = new FileStream(file.FullName, FileMode.Open, FileAccess.Read);
            var oneMegaByte = 1 << 20;
            byte[] buffer = new byte[oneMegaByte];
            int bytesRead = 0;
            int totalBytesSoFar = 0;
            while ((bytesRead = fileStream.Read(buffer, 0, buffer.Length)) != 0)
            {
                totalBytesSoFar += bytesRead;
                statusCallback(
                    $"Uploaded {FormatAsNumeric(totalBytesSoFar)} " +
                    $"out of {FormatAsNumeric(rawBytesLength)} bytes to DBFS");
                var base64EncodedData = Convert.ToBase64String(buffer.Take(bytesRead).ToArray());

                await _databricksWebApiClient.DbfsAddBlockAsync(
                    new DatabricksDbfsAddBlockRequest
                        {
                            data = base64EncodedData,
                            handle = dbfsCreateResponse.Handle
                    });

            }
            fileStream.Close();

            //Step 3 : Close the file
            statusCallback($"Finalising write to DBFS file");
            await _databricksWebApiClient.DbfsCloseAsync(
                    new DatabricksDbfsCloseRequest
                    {
                        handle = dbfsCreateResponse.Handle
                    });

        }

        private string FormatAsNumeric(int byteLength)
        {
            return byteLength.ToString("###,###,###");
        }
    }
}

And just for completeness here is the set of REST APIs that make the 2 proceeded code snippets work

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

        //https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit
        public async Task<DatabricksRunNowResponse> JobsRunsSubmitJarTaskAsync(
			RunsSubmitJarTaskRequest runsSubmitJarTaskRequest)
        {
            var request = new RestRequest("2.0/jobs/runs/submit", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(runsSubmitJarTaskRequest);
            var response = await _client.ExecuteTaskAsync<DatabricksRunNowResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksRunNowResponse>(response.Content);
            return dbResponse;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#list
        public async Task<DbfsListResponse> DbfsListAsync()
        {
            var request = new RestRequest("api/2.0/dbfs/list", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddQueryParameter("path", "/");

            var response = await _client.ExecuteTaskAsync<DbfsListResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DbfsListResponse>(response.Content);
            return dbResponse;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#put
        public async Task<JObject> DbfsPutAsync(FileInfo file)
        {
            var request = new RestRequest("api/2.0/dbfs/put", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddFile("back", file.FullName);
            request.AddHeader("Content -Type", "multipart/form-data");

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsservicecreate
        public async Task<DatabricksDbfsCreateResponse> DbfsCreateAsync(DatabricksDbfsCreateRequest dbfsRequest)
        {
            var request = new RestRequest("api/2.0/dbfs/create", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(dbfsRequest);

            var response = await _client.ExecuteTaskAsync<DatabricksDbfsCreateResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksDbfsCreateResponse>(response.Content);
            return dbResponse;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsserviceaddblock
        public async Task<JObject> DbfsAddBlockAsync(DatabricksDbfsAddBlockRequest dbfsRequest)
        {
            var request = new RestRequest("api/2.0/dbfs/add-block", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(dbfsRequest);

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#close
        public async Task<JObject> DbfsCloseAsync(DatabricksDbfsCloseRequest dbfsRequest)
        {
            var request = new RestRequest("api/2.0/dbfs/close", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(dbfsRequest);

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

As I say this is the most complex of all of the APIs I chose to look at. In reality you would probably not kick a Databricks job off from a UI. You might use a REST API of your own, which could delegate off to some Job manager like https://www.hangfire.io/ which would obviously still have to do the polling part for you. 

With all that in place you should be able to pick the JAR from the UI, and submit it, watch it run and see the logs from the Databricks web UI for the run.

 

Conclusion

I have to say using Apache Spark  / Databricks is an absolute dream. Databricks have just nailed it, it's just what was needed, its awesome what you can do with it. Being able to spin up a cluster on demand that runs a job and is destroyed after the job run (to save the idle costs) is just frickin great.

I urge you to give it a look, I think you will love it

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Sacha Barber
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

You may also be interested in...

Pro

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web01-2016 | 2.8.180920.1 | Last Updated 16 May 2018
Article Copyright 2018 by Sacha Barber
Everything else Copyright © CodeProject, 1999-2018
Layout: fixed | fluid