Azure : Examining Databricks Apache Spark platform





5.00/5 (8 votes)
Examiniation of Apache Spark Databricks platform on Azure
- Introduction
- What Is Spark / Databricks?
- Why Is Databricks So Cool?
- Where Is The Code?
- Prerequisites
- Getting Started With Databricks In Azure
- Databricks API
- Conclusion
Introduction
This article I will be talking about Apache Spark / Databricks. It will guide you through how to use the new cloud based hosted Databricks platform. This article will talk about it from a Microsoft Azure standpoint but it should be exactly the same ideas if you were to use Amazon cloud.
What Is Spark / Databricks?
A picture says a thousand word, they say, so here is a nice picture of what Apache Spark is.
CLICK FOR BIGGER IMAGE
At its heart Apache Spark is a tool for Data Scientists to allow them to explore and crunch vaste amounts of data with ease. It features things like
- Distributed Datasets (so that number crunching can happen across a compute cluster)
- A DataFrame API, such that you can do things like add columns, aggregate column values, and alias data, join DataFrames. This also supports a SQL syntax, and also rund distrobuted across a cluster
- A streaming API
- A Machine Learning API
- A Graph API
Spark also comes with various adaptors to allow it connect to various data sources such as
- Blobs
- Databases
- Filesystems (HDFS / s3 / Azure storage / azure datalake / Databricks file system)
This is not the first time I have written about Apache Spark, here are some older articles on it should you be interested
So when I wrote those articles, there was limited options about how you could run you Apache Spark jobs on a cluster, you could basically do one of the following:
- Create a Java/Scala/Python app that used the Apache Spark APIs and would run against a cluster
- Create a JAR that you could get an existing cluster to run using a spark-submit command line
The problem with this was that neither were ideal, with the app approach you didnt really want your analytics job to be an app, you really just wanted it to be a class library of some sort
Spark-Submit would let you submit a class library, however the feedback that you got from it was not great
There was another product that came out to address this call Apache Livy that was a REST API over Apache Spark . But it too had its issues, in that it was not that great to setup, and the API was fairly limited. To address this the good folk that own/maintain Apache Spark came out with Databricks.
Databricks is essentially a fully managed Apache Spark in the Cloud (Amazon / Azure). It also has the concept of REST APIs for common things. Lets dig into that next.
Why Is Databricks So Cool?
Just before we get into Databricks, why is it that I think it's so cool?
Well I have stated one point above already, but lets see the full list
- Great (really good) REST API
- Nice managemanent dashboard in the cloud
- The ability to create a on-demand cluster for your own job run that is
torn down at the end of the job is MASSIVE.
The reason this one point alone should make you think about examining Databricks
is as follows
- By using your own cluster you are not sharing any resources with some one else, so you can guarentee your performance based on the nodes you choose for your cluster
- The cluster gets torn down after your job has run. This saves you money
- If you chose to use a single cluster rather than a new cluster per job, you can have the single static cluster set to AutoScale. This is a pretty neat feature. Just imagine trying to do that in-premise. Obviously sinceApache Spark also supports running on Kubernetes that does ease the process a bit. However to my mind to make the best use of Kubernetes you also want to run that on a Cloud such as Amazon / Azure / Google, as scaling the VMs needed for a cluster is just so MUCH easier if you are in a Cloud. Just as an aside if you don't know your Kubernetes from an onion I wrote a mini series on it which you can find here : https://sachabarbs.wordpress.com/kubernetes-series/
- It's actually fairly cheap I think for what it gives you
- It's highly intuitive
Where Is The Code?
So there is a small bit of code that goes with this article, which is split into 2 things
- A throw-away WPF app that simply acts as a vehicle to demonstrate the REST calls, in all seriousness you could use postman for the Databricks REST exploration. The WPF app just makes this easier, as you don't have to worry about remembering to set an access token, once you have set it up once, and trying to find the right REST API syntax. The UI just shows you a working set of REST calls for Databricks
- A simple IntelliJ IDEA Scala/SBT project, that represents a Apache Spark job that we wish to upload and run on Databricks. This will compiled using SBT, as such SBT is a must have if you want to run this code
The code repo can be found here : https://github.com/sachabarber/databrick-azure-spark-demo
Prerequisites
There are a few of these, not because Databricks needs them as such, but because I was keen to show you an entire workflow of how you might use Databricks for real to do your own jobs, which means we should create a new JAR file to act as a job to send to Databricks.
As such the following is required
- A Databricks installation (either Amazon/Azure hosted)
- Visual Studio 2017 (Community editition is fine here)
- Java 1.8 SDK installed and in your path
- IntelliJ IDEA Community Edition 2017.1.3 (or later)
- SBT Intellij IDEA plugin installed
Obviously if you just want to read along and not try this yourself, you won't need any of this
Getting Started With Databricks In Azure
As I said already I will be using Microsoft Azure, but after the initial creation of Databricks in the Cloud (which will be cloud vendor specific) the rest of the instructions should hold for Amazon or Azure.
Anyway the first step for working with Databricks in the cloud is to create a new Databricks instance. Which for Azure simple means creating a new resource as follows:
CLICK FOR BIGGER IMAGE
Once you have creating the Databricks instance, you should be able to launch the workspace from the overview of the Databricks instance
CLICK FOR BIGGER IMAGE
This should launch you into a new Databricks workspace website that is coupled to your Azure/Amazon subscription, so you should initially see something like this after you have passed the logging in phase (which happens automatically, well on Azure it does anyway)
CLICK FOR BIGGER IMAGE
From here you can see/do the following things, some of which we will explore in more detail below
- Create a new Job/Cluster/Notebook
- Explore DBFS (Databricks file system) data
- Look at previous job runs
- View / start / stop clusters
Exploring The Workspace
In this section we are going to explore what you can do in the Databricks workspace web site that is tied to your Databricks cloud installation . We will not be leaving the workspace web site, everything below will be done directly in the workspace web site, which to my mind is pretty cool, but where this stuff really shines is when we get to do all of this stuff programatically, rather than just clicking buttons on a web site.
After all we would like to build this stuff into our own processing pipelines. Luckily there is a pretty good one-one translation from what you can do using the web site compared to what is exposed via the rest API. But I am jumping ahead we will get there in the section after this, so for now lets just examine what we can do in the Databricks workspace web site that is tied to your Databricks cloud installation.
Create A Cluster
So the very first thing you might like to do is create a cluster to run some code on. This is easily done using the Databricks workspace web site as follows
This will bring you to a screen like this, where you can configure your cluster, where you pick the variouos bits and peices for your cluster
CLICK FOR BIGGER IMAGE
Once you have created a cluster it will end up being listed on the clusters overview page
CLICK FOR BIGGER IMAGE
- Interactive clusters are ones that are tied to Notebooks which we look at next
- Job clusters are ones that are used to run Jobs
Explore A NoteBook
Now I am a long time Dev (I'm old, or feel it or something), so think nothing about opening up an IDE and writing an app/class lib/jar whatever. Bust at its heart Apache Spark is a tool for data scientists, who simply want to try some simple bits of number crunching code. That is exactly where notbooks come into play.
A notebook is a cellular editor that is hosted, that allows the user to run python/R/Scala code against a Apache Spark cluster.
You can create a new notebook from the home menu as shown below
CLICK FOR BIGGER IMAGE
So after you have picked your language you will be presented with a blank notebook where you can write some code into the cells. Teaching you the short cuts and how to use notebooks properly is outside the scope of this article, but here is some points on notebooks:
- They allow you to quickly explore the APIs
- They allow you to re-assign variables which are remembered
- They allow you to enter just a specific cell
- They give you some pre-defined variables. But be wary you will need to
swap these out if you translate this to real code. For example
spark
is a pre-defined variable
Run Some NoteBook Code
So lets say I have just created a Scala notebook, and I typed the text as shown below in a cell. I can quickly run this using ALT + Enter or the run button in the notebook UI
CLICK FOR BIGGER IMAGE
What this will do is run the active cell and print out the variables/output statements to the notebook UI, which can also be seen above
Read/Write From Azure Blob Store
One of the very common tasks when using Apache Spark is to grab some data from some external source and save it to storage once transformed into the required results
Here is an example of working with an existing Azure Blob Storage Account and some Scala code. This particular example simply loads a CSV file from Azure Blob Storage transforms the file and then saves it back to Azure Blob Storage as a date stamped named CSV file
import java.util.Calendar
import java.text.SimpleDateFormat
spark.conf.set("fs.azure.account.key.YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net", "YOUR_STORAGE_KEY_HERE")
spark.sparkContext.hadoopConfiguration.set(
"fs.azure.account.key.YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net",
"YOUR_STORAGE_KEY_HERE"
)
val now = Calendar.getInstance().getTime()
val minuteFormat = new SimpleDateFormat("mm")
val hourFormat = new SimpleDateFormat("HH")
val secondFormat = new SimpleDateFormat("ss")
val currentHour = hourFormat.format(now) // 12
val currentMinute = minuteFormat.format(now) // 29
val currentSecond = secondFormat.format(now) // PM
val fileId = currentHour + currentMinute + currentSecond
fileId
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sc.parallelize(data)
val ages_df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/Ages.csv")
ages_df.head
//https://github.com/databricks/spark-csv
val selectedData = ages_df.select("age")
selectedData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.save("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/" + fileId + "_SavedAges.csv")
val saved_ages_df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/" + fileId + "_SavedAges.csv")
saved_ages_df.show()
Which for me looked like this for my storage account in Azure
CLICK FOR BIGGER IMAGE
If you are wondering why there are folders for the results such as 080629_SavedAges.csv, this is due to how Apache Spark deals with partitions. Trust me it doesnt matter when you load things back into memory as Apache Spark just deals with this, as can be seen
CLICK FOR BIGGER IMAGE
Creating Jobs In DataBricks Web UI
The Databricks web UI allows you to create a new job from either a Notebook or a JAR that you have that you can drag in and set a main entry point for. You may also setup a schedule and a cluster that you want to use. Once you are happy you can click the "run now" button which will run your job.
CLICK FOR BIGGER IMAGE
Launch The Spark UI/Dashboard
Once you have run a job it is likely that you want to have a look at it to see that it worked how you expected it to work, and that it is running optimally. Luckily Apache Spark comes equipt with a nice visualiser for a given analysis run that you can use for this. Its kind of like the SQL Query Profiler in SQL Server.
This is accessible from the jobs page
CLICK FOR BIGGER IMAGE
So lets take a look at a successful job, which we can view using the "Succeeded" link in the Last Run column in the Jobs page.
From there we can view the Apache Spark UI or the logs for the job.
Lets see the Apache Spark UI for this job.
CLICK FOR BIGGER IMAGE
The Spark UI is your friend, try and get acquianted with it
Databricks API
Ok so now that we have covered how to use the Databricks web UI, how about we get familiar with the REST API such that we can craft our own code around using Apache Spark as our analytics engine. This next section will show how to use some of the REST APIs available.
What APIs Are Available?
So you may now be wondering what APIs are actually available? This is the place to check : https://docs.databricks.com/api/latest/index.html
From there the main top level APIs are
- Clusters
- DBFS
- Groups
- Instance Profiles
- Jobs
- Libraries
- Secrets
- Token
- Workspace
There are simply not enough hours in the day for me to show you ALL of them. So I have chosen a few to demo, which we will talk about below
Creating A Token For Usage With API
The Databricks REST APIs ALL need to have a JWT token associated with them. Which means you need to firstly create a token for this. This is easily achieved in the Databricks web UI. Just follow these steps
CLICK FOR BIGGER IMAGE
So once you have done that, grab the token value, and you will also need to take a note of one other bit of information which is shown highlighted in the image below. With these 2 bits of information we can use Postman to try a request
CLICK FOR BIGGER IMAGE
Common Stuff You Need To Do With Any API Call
So as I just said you will need to ensure that the token from the previous step is supplied on every call. But just how do we do that? What does it look like? Lets use Postman to show an example using the last 2 bits of information from the previous pararaph
Creating A Base64 Encoded Token Value
The token you got from above needs to be turned into a Base64 encoded string.
There are many online tools for this, just pick one. The important thing to note
is that you must ALSO include a prefix of "token:"
. So the full
string to encode is something like "token:dapi56b...........d5"
This will give you a base64 encoded string. From there we need to head into Postman to try out the request, which may look somethinng like this
CLICK FOR BIGGER IMAGE
The important things to note here are:
- We create a header :
Key = Authorization, Value = Basic YOUR_BASE64_ENCODED_STRING_FROM_PREVIOUS_STEP
- We use the information from the Azure/AWS portal to use as part of the
Uri. So for this is a valid erquest for my Databricks
Azure installation
https://northeurope.azuredatabricks.net/api/2.0/clusters/spark-versions
A Simple Throwaway Demo App
Obviously you could just mess around in Postman to learn how the Databricks REST APIs work, nothing wrong with that at all. But to make life easier for you I have come up with a simple (throw away) demo app that you can use to explore what I think are the 2 most important APIs
This is what it looks like running
And here is what it looks like when you have chosen to run one of the pre-canned REST API calls that the demo app provides
How Do I set My Token For The Demo App?
So above when I started talking about the Databricks REST APIs we said we needed to supply an API token. So how does the demo app deal with this.
Well there are 2 parts to how it does that, this entry in the App.Config should point to your own file that contains the token information
So for me this is my file
C:\Users\sacha\Desktop\databrick-azure-spark-demo\MyDataBricksToken.txt
Where the file simply contains a single line of contents "token:dapi56b...........d5"
which is the base64 encoded string proceeded by "token:"
which we
talked about above.
This is then read into a globally available property in the demo app as follows:
using System.Configuration;
using System.IO;
using System.Windows;
using SAS.Spark.Runner.REST.DataBricks;
namespace SAS.Spark.Runner
{
public partial class App : Application
{
protected override void OnStartup(StartupEventArgs e)
{
base.OnStartup(e);
var tokenFileName = ConfigurationManager.AppSettings["TokenFileLocation"];
if (!File.Exists(tokenFileName))
{
MessageBox.Show("Expecting token file to be provided");
}
AccessToken = File.ReadAllText(tokenFileName);
if(!AccessToken.StartsWith("token:"))
{
MessageBox.Show("Token file should start with 'token:' +
"following directly by YOUR DataBricks initial token you created");
}
}
public static string AccessToken { get; private set; }
}
}
And that is all there is to it. The demo app should take care of the rest of it for you.
As I say I did not have time to explore every single API, but I had time to look at 2 of the most common ones, Clusters and Jobs. Which I will talk about below.
But before I get into that, I just wanted to show you the rough idea behind each of the API explorations
Example ViewModel
Most of the API explorations are done using a viewmodel something like this
using SAS.Spark.Runner.REST.DataBricks;
using System;
using System.Threading.Tasks;
using System.Windows.Input;
using SAS.Spark.Runner.Services;
namespace SAS.Spark.Runner.ViewModels.Clusters
{
public class ClusterGetViewModel : INPCBase
{
private IMessageBoxService _messageBoxService;
private IDatabricksWebApiClient _databricksWebApiClient;
private string _clustersJson;
private string _clusterId;
public ClusterGetViewModel(
IMessageBoxService messageBoxService,
IDatabricksWebApiClient databricksWebApiClient)
{
_messageBoxService = messageBoxService;
_databricksWebApiClient = databricksWebApiClient;
FetchClusterCommand =
new SimpleAsyncCommand<object, object>(ExecuteFetchClusterCommandAsync);
}
private async Task<object> ExecuteFetchClusterCommandAsync(object param)
{
if(string.IsNullOrEmpty(_clusterId))
{
_messageBoxService.ShowError("You must supply 'ClusterId'");
return System.Threading.Tasks.Task.FromResult<object>(null);
}
try
{
var cluster = await _databricksWebApiClient.ClustersGetAsync(_clusterId);
ClustersJson = cluster.ToString();
}
catch(Exception ex)
{
_messageBoxService.ShowError(ex.Message);
}
return System.Threading.Tasks.Task.FromResult<object>(null);
}
public string ClustersJson
{
get
{
return this._clustersJson;
}
set
{
RaiseAndSetIfChanged(ref this._clustersJson,
value, () => ClustersJson);
}
}
public string ClusterId
{
get
{
return this._clusterId;
}
set
{
RaiseAndSetIfChanged(ref this._clusterId,
value, () => ClusterId);
}
}
public ICommand FetchClusterCommand { get; private set; }
}
}
The idea being that we use simple REST Service and we have a property representing the JSON response. The REST service implements this interface
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
namespace SAS.Spark.Runner.REST.DataBricks
{
public interface IDatabricksWebApiClient
{
//https://docs.databricks.com/api/latest/jobs.html#create
Task<CreateJobResponse> JobsCreateAsync(string jsonJobRequest);
//https://docs.databricks.com/api/latest/jobs.html#jobsrunnow
Task<DatabricksRunNowResponse> JobsRunNowAsync(DatabricksRunNowRequest runRequest);
//https://docs.databricks.com/api/latest/jobs.html#runs-get
Task<DatabricksRunResponse> JobsRunsGetAsync(int runId);
//https://docs.databricks.com/api/latest/jobs.html#list
Task<JObject> JobsListAsync();
//https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit
Task<DatabricksRunNowResponse> JobsRunsSubmitJarTaskAsync(RunsSubmitJarTaskRequest runsSubmitJarTaskRequest);
//https://docs.azuredatabricks.net/api/latest/clusters.html#start
Task<DatabricksClusterStartResponse> ClustersStartAsync(string clusterId);
//https://docs.azuredatabricks.net/api/latest/clusters.html#get
Task<JObject> ClustersGetAsync(string clusterId);
//https://docs.databricks.com/api/latest/clusters.html#list
Task<ClusterListResponse> ClustersListAsync();
//https://docs.azuredatabricks.net/api/latest/dbfs.html#list
Task<DbfsListResponse> DbfsListAsync();
//https://docs.azuredatabricks.net/api/latest/dbfs.html#put
Task<JObject> DbfsPutAsync(FileInfo file);
//https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsservicecreate
Task<DatabricksDbfsCreateResponse> DbfsCreateAsync(DatabricksDbfsCreateRequest dbfsRequest);
//https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsserviceaddblock
Task<JObject> DbfsAddBlockAsync(DatabricksDbfsAddBlockRequest dbfsRequest);
//https://docs.azuredatabricks.net/api/latest/dbfs.html#close
Task<JObject> DbfsCloseAsync(DatabricksDbfsCloseRequest dbfsRequest);
}
}
DataTemplate For UI
The actual UI is simple done using a DataTemplate
, where we have bound the ViewModel in question to a
ContentControl
. For the JSON representation I am just using the
AvalonEdit TextBox.
Here is an example for the ViewModel above:
<Controls:MetroWindow x:Class="SAS.Spark.Runner.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlns:vms="clr-namespace:SAS.Spark.Runner.ViewModels"
xmlns:vmsClusters="clr-namespace:SAS.Spark.Runner.ViewModels.Clusters"
xmlns:vmsJobs="clr-namespace:SAS.Spark.Runner.ViewModels.Jobs"
xmlns:avalonEdit="http://icsharpcode.net/sharpdevelop/avalonedit"
xmlns:Controls="clr-namespace:MahApps.Metro.Controls;assembly=MahApps.Metro"
xmlns:local="clr-namespace:SAS.Spark.Runner"
mc:Ignorable="d"
WindowState="Maximized"
Title="DataBricks API Runner">
<Controls:MetroWindow.Resources>
.....
.....
<DataTemplate DataType="{x:Type vmsClusters:ClusterGetViewModel}">
<DockPanel LastChildFill="True">
<StackPanel Orientation="Horizontal" DockPanel.Dock="Top">
<Label Content="ClusterId" Margin="3" VerticalAlignment="Center"
VerticalContentAlignment="Center" Height="24"/>
<TextBox Text="{Binding ClusterId}" Width="200" VerticalAlignment="Center"
VerticalContentAlig
nment="Center" Height="24"/>
<Button Content="Get Cluster" Margin="3,0,3,0" Width="100"
HorizontalAlignment="Left"
VerticalAlignment="Center"
VerticalContentAlignment="Center"
Command="{Binding FetchClusterCommand}"/>
</StackPanel>
<avalonEdit:TextEditor
FontFamily="Segoe UI"
SyntaxHighlighting="JavaScript"
FontSize="10pt"
vms:TextEditorProps.JsonText="{Binding ClustersJson}"/>
</DockPanel>
</DataTemplate>
<Controls:MetroWindow.Resources>
</Controls:MetroWindow>
As the ViewModels used in the demo app all mainly follow this pattern, I wont be showing you any more ViewModel code apart from one where we upload a JAR file as that one is a bit special.
Just have in the back of your mind that all roughly work this way, and you will be ok
Cluster API Exploration
This section shows the Cluster APIs that I chose to look at
Clusters List
Databricks docs are here : https://docs.databricks.com/api/latest/clusters.html#list, and this API call does the following:
- Returns information about all pinned clusters, currently active
clusters, up to 70 of the most recently terminated interactive clusters in
the past 30 days, and up to 30 of the most recently terminated job clusters
in the past 30 days. For example, if there is 1 pinned cluster, 4 active
clusters, 45 terminated interactive clusters in the past 30 days, and 50
terminated job clusters in the past 30 days, then this API returns the 1
pinned cluster, 4 active clusters, all 45 terminated interactive clusters,
and the 30 most recently terminated job clusters.
This is simply done via the following code
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
.....
.....
//https://docs.databricks.com/api/latest/clusters.html#list
public async Task<ClusterListResponse> ClustersListAsync()
{
var request = new RestRequest("api/2.0/clusters/list", Method.GET);
request.AddHeader("Authorization", $"Basic {_authHeader}");
var response = await _client.ExecuteTaskAsync<ClusterListResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<ClusterListResponse>(response.Content);
return dbResponse;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
Cluster Get
Databricks docs are here : https://docs.azuredatabricks.net/api/latest/clusters.html#get, and this API call does the following:
- Retrieves the information for a cluster given its identifier. Clusters can be described while they are running, or up to 60 days after they are terminated
This is simply done via the following code
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
.....
.....
//https://docs.azuredatabricks.net/api/latest/clusters.html#get
public async Task<JObject> ClustersGetAsync(string clusterId)
{
var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/clusters/get", Method.GET);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddQueryParameter("cluster_id", clusterId);
var response = await _client.ExecuteTaskAsync(request);
JObject responseContent = JObject.Parse(response.Content);
return responseContent;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
Cluster Start
Databricks docs are here : https://docs.azuredatabricks.net/api/latest/clusters.html#start, and this API call does the following:
- Starts a terminated Spark cluster given its ID. This is similar to
createCluster, except:
- The previous cluster id and attributes are preserved.
- The cluster starts with the last specified cluster size. If the previous cluster was an autoscaling cluster, the current cluster starts with the minimum number of nodes.
- If the cluster is not currently in a TERMINATED state, nothing will
happen.
Clusters launched to run a job cannot be started.
This is simply done via the following code
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
.....
.....
//https://docs.azuredatabricks.net/api/latest/clusters.html#start
public async Task<DatabricksClusterStartResponse> ClustersStartAsync(string clusterId)
{
var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/clusters/start", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddJsonBody(new { cluster_id = clusterId });
var response = await _client.ExecuteTaskAsync<DatabricksClusterStartResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<DatabricksClusterStartResponse>(response.Content);
return dbResponse;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
Jobs API Exploration
This section shows the Jobs APIs that I chose to look at
Jobs List
Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#list, and this API call does the following:
- Lists all jobs
This is simply done via the following code
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
.....
.....
//https://docs.databricks.com/api/latest/jobs.html#list
public async Task<JObject> JobsListAsync()
{
var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/jobs/list", Method.GET);
request.AddHeader("Authorization", $"Basic {_authHeader}");
var response = await _client.ExecuteTaskAsync(request);
JObject responseContent = JObject.Parse(response.Content);
return responseContent;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
Jobs Create
Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#create, and this API call does the following:
- Creates a new job with the provided settings
This is simply done via the following code
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
.....
.....
//https://docs.databricks.com/api/latest/jobs.html#create
public async Task<CreateJobResponse> JobsCreateAsync(string jsonJobRequest)
{
var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/jobs/create", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddParameter("application/json", jsonJobRequest, ParameterType.RequestBody);
var response = await _client.ExecuteTaskAsync<CreateJobResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<CreateJobResponse>(response.Content);
return dbResponse;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
An example request for this one is worth a special call out, as its not a simple parameter, we need to pass in quite a lot of JSON form this request. Here is an example request for a job that runs at 10:15pm each night:
{
"name": "Nightly model training",
"new_cluster": {
"spark_version": "4.0.x-scala2.11",
"node_type_id": "r3.xlarge",
"aws_attributes": {
"availability": "ON_DEMAND"
},
"num_workers": 10
},
"libraries": [
{
"jar": "dbfs:/my-jar.jar"
},
{
"maven": {
"coordinates": "org.jsoup:jsoup:1.7.2"
}
}
],
"email_notifications": {
"on_start": [],
"on_success": [],
"on_failure": []
},
"timeout_seconds": 3600,
"max_retries": 1,
"schedule": {
"quartz_cron_expression": "0 15 22 ? * *",
"timezone_id": "America/Los_Angeles"
},
"spark_jar_task": {
"main_class_name": "com.databricks.ComputeModels"
}
}
Although the demo app doesn't use this one directly, I use a very similar one, which I will go through it quite some detail below.
Jobs Runs Get
Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#runs-get, and this API call does the following:
- Retrieves the metadata of a run
This is simply done via the following code
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
.....
.....
//https://docs.databricks.com/api/latest/jobs.html#runs-get
public async Task<DatabricksRunResponse> JobsRunsGetAsync(int runId)
{
var request = new RestRequest("api/2.0/jobs/runs/get", Method.GET);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddQueryParameter("run_id", runId.ToString());
var response = await _client.ExecuteTaskAsync<DatabricksRunResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<DatabricksRunResponse>(response.Content);
return dbResponse;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
Jobs Run Now
Databricks docs are here : https://docs.databricks.com/api/latest/jobs.html#jobsrunnow, and this API call does the following:
- Runs the job now, and returns the run_id of the triggered run
This is simply done via the following code
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
.....
.....
//https://docs.databricks.com/api/latest/jobs.html#jobsrunnow
public async Task<DatabricksRunNowResponse> JobsRunNowAsync(DatabricksRunNowRequest runRequest)
{
var request = new RestRequest("api/2.0/jobs/run-now", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddJsonBody(runRequest);
var response = await _client.ExecuteTaskAsync<DatabricksRunNowResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<DatabricksRunNowResponse>(response.Content);
return dbResponse;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
Where we use this sort of request
using Newtonsoft.Json;
namespace SAS.Spark.Runner.REST.DataBricks.Requests
{
// If you want to pass args you can do so using extra properties
// See : https://docs.databricks.com/api/latest/jobs.html#run-now
// - jar_params :
// A list of parameters for jobs with jar tasks, e.g. "jar_params": ["john doe", "35"].
// The parameters will be used to invoke the main function of the main class specified
// in the spark jar task. If not specified upon run-now, it will default to an empty list.
// - notebook_params :
// A map from keys to values for jobs with notebook task,
// e.g. "notebook_params": {"name": "john doe", "age": "35"}.
// The map is passed to the notebook and will be accessible through the
// dbutils.widgets.get function
// - python_params :
// A list of parameters for jobs with python tasks, e.g. "python_params": ["john doe", "35"].
// The parameters will be passed to python file as command line parameters.
// If specified upon run-now, it would overwrite the parameters specified in job setting.
public class DatabricksRunNowRequest
{
[JsonProperty(PropertyName = "job_id")]
public int JobId { get; set; }
}
}
Jobs Run Submit
Databricks docs are here : https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit, and this API call does the following:
- Submit a one-time run with the provided settings. This endpoint doesn't
require a Databricks job to be created. You can directly submit your
workload. Runs submitted via this endpoint don't show up in the UI. Once the
run is submitted, you can use the
jobs/runs/get
API to check the run state.
Now this is probably the most complex, but most useful of ALL of the REST APIs, a it allows you to do the following :
- Run by using a JAR writen Scala (where you can pass in command line args too)
- Run using a notebook
- Run using a python file (where you can pass in command line args too)
As I am quite keen on Scala I will be using Scala for the demo
The Scala Project
The demo code has a 2nd project in the source code :
Src/SAS.SparkScalaJobApp
which is a IntelliJ IDEA Scala project. To run
this you will need the prerequisites at the top of this article.
One you have downloaded the code you should run SBT in a command line window,
and navigate to the Src/SAS.SparkScalaJobApp
folder. And
issue these SBT command
> clean >compile >assembly
From there you should be able to go to the Target
directory and see a FAT Jar
(one with all dependencies bundled together)
We will use this in just a moment, but lets just take a minute to examine the code. It is a very simple Spark job that expects a single Int command line argument (that we will send via the REST call in a moment), and will then create List of that many items that have some simple Spark transformations applied.
NOTE :
One thing to note is that we need to be careful about how we use
things like SparkContext
and SparkSession
which if you
have done any spark before you will have created yourself. When using a Cloud
provider such as AWS or Azure you need to use the existing SparkContext
and SparkSession
, and we also need to avoid
terminating/shutting down this items, as they are in effect shared. This blog is
a good read on this :
https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
import scala.util.Try
import scala.util.Success
import scala.util.Failure
import org.apache.spark.sql.SparkSession
object SparkApp extends App {
println("===== Starting =====")
if(args.length != 1) {
println("Need exactly 1 int arg")
}
Try {
Integer.parseInt(args(0))
} match {
case Success(v:Int) => {
val combinedArgs = args.aggregate("")(_ + _, _ + _)
println(s"Args were $combinedArgs")
SparkDemoRunner.runUsingArg(v)
println("===== Done =====")
}
case Failure(e) => {
println(s"Could not parse command line arg [$args(0)] to Int")
println("===== Done =====")
}
}
}
object SparkDemoRunner {
def runUsingArg(howManyItems : Int) : Unit = {
val session = SparkSession.builder().getOrCreate()
import session.sqlContext.implicits._
val multiplier = 2
println(s"multiplier is set to $multiplier")
val multiplierBroadcast = session.sparkContext.broadcast(multiplier)
val data = List.tabulate(howManyItems)(_ + 1)
val dataRdd = session.sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(_ * multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()
}
}
Anyway so once we have that Jar file available, we need to use a few APIs which I will go through 1 by 1, but here is the rough flow:
- Examine if the chosen Jar file exists in the Dbfs (Databricks file system, which means we have uploaded it already)
- Start the upload of the file (which we have to do in chunks as there is a 1MB limit on the single 2.0/dbfs/put API) to get a file handle
- Upload blocks of data for the file hadle as Base64 encoded strings
- Close the file using the file handle
- Craft a runs-submit request to make use of the just uploaded/latest Dbfs file
So that's the rough outline of it
So here is the ViewModel that will allow you pick the Jar (which as stated
above should be in the Target
folder of the Src/SAS.SparkScalaJobApp
source code if you followed the instructions above to compile it using SBT.
using SAS.Spark.Runner.REST.DataBricks;
using System;
using System.IO;
using System.Threading.Tasks;
using System.Windows.Input;
using SAS.Spark.Runner.Services;
using System.Linq;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using System.Collections.Generic;
using Newtonsoft.Json;
using System.Diagnostics;
namespace SAS.Spark.Runner.ViewModels.Jobs
{
public class JobsPickAndRunJarViewModel : INPCBase
{
private IMessageBoxService _messageBoxService;
private IDatabricksWebApiClient _databricksWebApiClient;
private IOpenFileService _openFileService;
private IDataBricksFileUploadService _dataBricksFileUploadService;
private string _jarFilePath;
private string _status;
private FileInfo _jarFile;
private bool _isBusy;
private bool _isPolling = false;
private string _jobsJson;
private Stopwatch _watch = new Stopwatch();
public JobsPickAndRunJarViewModel(
IMessageBoxService messageBoxService,
IDatabricksWebApiClient databricksWebApiClient,
IOpenFileService openFileService,
IDataBricksFileUploadService dataBricksFileUploadService)
{
_messageBoxService = messageBoxService;
_databricksWebApiClient = databricksWebApiClient;
_openFileService = openFileService;
_dataBricksFileUploadService = dataBricksFileUploadService;
PickInputJarFileCommand = new SimpleAsyncCommand<object, object>(x => !IsBusy && !_isPolling, ExecutePickInputJarFileCommandAsync);
}
public string JobsJson
{
get
{
return this._jobsJson;
}
set
{
RaiseAndSetIfChanged(ref this._jobsJson, value, () => JobsJson);
}
}
public string JarFilePath
{
get
{
return this._jarFilePath;
}
set
{
RaiseAndSetIfChanged(ref this._jarFilePath, value, () => JarFilePath);
}
}
public string Status
{
get
{
return this._status;
}
set
{
RaiseAndSetIfChanged(ref this._status, value, () => Status);
}
}
public bool IsBusy
{
get
{
return this._isBusy;
}
set
{
RaiseAndSetIfChanged(ref this._isBusy, value, () => IsBusy);
}
}
public ICommand PickInputJarFileCommand { get; private set; }
private async Task<object> ExecutePickInputJarFileCommandAsync(object param)
{
IsBusy = true;
try
{
_openFileService.Filter = "Jar Files (*.jar)|*.jar";
_openFileService.InitialDirectory = @"c:\";
_openFileService.FileName = "";
var dialogResult = _openFileService.ShowDialog(null);
if(dialogResult.Value)
{
if(!_openFileService.FileName.ToLower().EndsWith(".jar"))
{
_messageBoxService.ShowError($"{_openFileService.FileName} is not a JAR file");
return Task.FromResult<object>(null);
}
_jarFile = new FileInfo(_openFileService.FileName);
JarFilePath = _jarFile.Name;
var rawBytesLength = File.ReadAllBytes(_jarFile.FullName).Length;
await _dataBricksFileUploadService.UploadFileAsync(_jarFile, rawBytesLength,
(newStatus) => this.Status = newStatus);
bool uploadedOk = await IsDbfsFileUploadedAndAvailableAsync(_jarFile, rawBytesLength);
if (uploadedOk)
{
//2.0/jobs/runs/submit
//poll for success using jobs/runs/get, store that in the JSON
var runId = await SubmitJarJobAsync(_jarFile);
if(!runId.HasValue)
{
IsBusy = false;
_messageBoxService.ShowError(this.Status = $"Looks like there was a problem with calling Spark API '2.0/jobs/runs/submit'");
}
else
{
await PollForRunIdAsync(runId.Value);
}
}
else
{
IsBusy = false;
_messageBoxService.ShowError("Looks like the Jar file did not upload ok....Boo");
}
}
}
catch (Exception ex)
{
_messageBoxService.ShowError(ex.Message);
}
finally
{
IsBusy = false;
}
return Task.FromResult<object>(null);
}
private async Task<bool> IsDbfsFileUploadedAndAvailableAsync(FileInfo dbfsFile, int rawBytesLength)
{
bool fileUploadOk = false;
int maxNumberOfAttemptsAllowed = 10;
int numberOfAttempts = 0;
while (!fileUploadOk || (numberOfAttempts == maxNumberOfAttemptsAllowed))
{
//check for the file in Dbfs
var response = await _databricksWebApiClient.DbfsListAsync();
fileUploadOk = response.files.Any(x =>
x.file_size == rawBytesLength &&
x.is_dir == false &&
x.path == $@"/{dbfsFile.Name}"
);
numberOfAttempts++;
this.Status = $"Checking that Jar has been uploaded ok.\r\nAttempt {numberOfAttempts} out of {maxNumberOfAttemptsAllowed}";
await Task.Delay(500);
}
return fileUploadOk;
}
private async Task<int?> SubmitJarJobAsync(FileInfo dbfsFile)
{
this.Status = $"Creating the Spark job using '2.0/jobs/runs/submit'";
// =====================================================================
// EXAMPLE REQUEST
// =====================================================================
//{
// "run_name": "my spark task",
// "new_cluster":
// {
// "spark_version": "3.4.x-scala2.11",
// "node_type_id": "Standard_D3_v2",
// "num_workers": 10
// },
// "libraries": [
// {
// "jar": "dbfs:/my-jar.jar"
// }
// ],
// "timeout_seconds": 3600,
// "spark_jar_task": {
// "main_class_name": "com.databricks.ComputeModels",
// "parameters" : ["10"]
// }
//}
var datePart = DateTime.Now.ToShortDateString().Replace("/", "");
var timePart = DateTime.Now.ToShortTimeString().Replace(":", "");
var request = new RunsSubmitJarTaskRequest()
{
run_name = $"JobsPickAndRunJarViewModel_{datePart}_{timePart}",
new_cluster = new NewCluster
{
// see api/2.0/clusters/spark-versions
spark_version = "4.0.x-scala2.11",
// see api/2.0/clusters/list-node-types
node_type_id = "Standard_F4s",
num_workers = 2
},
libraries = new List<Library>
{
new Library { jar = $"dbfs:/{dbfsFile.Name}"}
},
timeout_seconds = 3600,
spark_jar_task = new SparkJarTask
{
main_class_name = "SparkApp",
parameters = new List<string>() { "10" }
}
};
var response = await _databricksWebApiClient.JobsRunsSubmitJarTaskAsync(request);
return response.RunId;
}
private async Task PollForRunIdAsync(int runId)
{
_watch.Reset();
_watch.Start();
while (_isPolling)
{
var response = await _databricksWebApiClient.JobsRunsGetAsync(runId);
JobsJson = JsonConvert.SerializeObject(response, Formatting.Indented);
var state = response.state;
this.Status = "Job not complete polling for completion.\r\n" +
$"Job has been running for {_watch.Elapsed.Seconds} seconds";
try
{
if (!string.IsNullOrEmpty(state.result_state))
{
_isPolling = false;
IsBusy = false;
_messageBoxService.ShowInformation(
$"Job finnished with Status : {state.result_state}");
}
else
{
switch (state.life_cycle_state)
{
case "TERMINATING":
case "RUNNING":
case "PENDING":
break;
case "SKIPPED":
case "TERMINATED":
case "INTERNAL_ERROR":
_isPolling = false;
IsBusy = false;
break;
}
}
}
finally
{
if (_isPolling)
{
await Task.Delay(5000);
}
}
}
}
}
}
Where we use this helper class to do the actual upload to Dbfs
using SAS.Spark.Runner.REST.DataBricks;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
namespace SAS.Spark.Runner.Services
{
public class DataBricksFileUploadService : IDataBricksFileUploadService
{
private IDatabricksWebApiClient _databricksWebApiClient;
public DataBricksFileUploadService(IDatabricksWebApiClient databricksWebApiClient)
{
_databricksWebApiClient = databricksWebApiClient;
}
public async Task UploadFileAsync(FileInfo file, int rawBytesLength,
Action<string> statusCallback, string path = "")
{
var dbfsPath = $"/{file.Name}";
//Step 1 : Create the file
statusCallback("Creating DBFS file");
var dbfsCreateResponse = await _databricksWebApiClient.DbfsCreateAsync(
new DatabricksDbfsCreateRequest
{
overwrite = true,
path = dbfsPath
});
//Step 2 : Add block in chunks
FileStream fileStream = new FileStream(file.FullName, FileMode.Open, FileAccess.Read);
var oneMegaByte = 1 << 20;
byte[] buffer = new byte[oneMegaByte];
int bytesRead = 0;
int totalBytesSoFar = 0;
while ((bytesRead = fileStream.Read(buffer, 0, buffer.Length)) != 0)
{
totalBytesSoFar += bytesRead;
statusCallback(
$"Uploaded {FormatAsNumeric(totalBytesSoFar)} " +
$"out of {FormatAsNumeric(rawBytesLength)} bytes to DBFS");
var base64EncodedData = Convert.ToBase64String(buffer.Take(bytesRead).ToArray());
await _databricksWebApiClient.DbfsAddBlockAsync(
new DatabricksDbfsAddBlockRequest
{
data = base64EncodedData,
handle = dbfsCreateResponse.Handle
});
}
fileStream.Close();
//Step 3 : Close the file
statusCallback($"Finalising write to DBFS file");
await _databricksWebApiClient.DbfsCloseAsync(
new DatabricksDbfsCloseRequest
{
handle = dbfsCreateResponse.Handle
});
}
private string FormatAsNumeric(int byteLength)
{
return byteLength.ToString("###,###,###");
}
}
}
And just for completeness here is the set of REST APIs that make the 2 proceeded code snippets work
using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;
namespace SAS.Spark.Runner.REST.DataBricks
{
public class DatabricksWebApiClient : IDatabricksWebApiClient
{
private readonly string _baseUrl;
private readonly string _authHeader;
private readonly RestClient _client;
public DatabricksWebApiClient()
{
_baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
_authHeader = Base64Encode(App.AccessToken);
_client = new RestClient(_baseUrl);
}
//https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit
public async Task<DatabricksRunNowResponse> JobsRunsSubmitJarTaskAsync(
RunsSubmitJarTaskRequest runsSubmitJarTaskRequest)
{
var request = new RestRequest("2.0/jobs/runs/submit", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddJsonBody(runsSubmitJarTaskRequest);
var response = await _client.ExecuteTaskAsync<DatabricksRunNowResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<DatabricksRunNowResponse>(response.Content);
return dbResponse;
}
//https://docs.azuredatabricks.net/api/latest/dbfs.html#list
public async Task<DbfsListResponse> DbfsListAsync()
{
var request = new RestRequest("api/2.0/dbfs/list", Method.GET);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddQueryParameter("path", "/");
var response = await _client.ExecuteTaskAsync<DbfsListResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<DbfsListResponse>(response.Content);
return dbResponse;
}
//https://docs.azuredatabricks.net/api/latest/dbfs.html#put
public async Task<JObject> DbfsPutAsync(FileInfo file)
{
var request = new RestRequest("api/2.0/dbfs/put", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddFile("back", file.FullName);
request.AddHeader("Content -Type", "multipart/form-data");
var response = await _client.ExecuteTaskAsync(request);
JObject responseContent = JObject.Parse(response.Content);
return responseContent;
}
//https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsservicecreate
public async Task<DatabricksDbfsCreateResponse> DbfsCreateAsync(DatabricksDbfsCreateRequest dbfsRequest)
{
var request = new RestRequest("api/2.0/dbfs/create", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddJsonBody(dbfsRequest);
var response = await _client.ExecuteTaskAsync<DatabricksDbfsCreateResponse>(request);
var dbResponse = JsonConvert.DeserializeObject<DatabricksDbfsCreateResponse>(response.Content);
return dbResponse;
}
//https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsserviceaddblock
public async Task<JObject> DbfsAddBlockAsync(DatabricksDbfsAddBlockRequest dbfsRequest)
{
var request = new RestRequest("api/2.0/dbfs/add-block", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddJsonBody(dbfsRequest);
var response = await _client.ExecuteTaskAsync(request);
JObject responseContent = JObject.Parse(response.Content);
return responseContent;
}
//https://docs.azuredatabricks.net/api/latest/dbfs.html#close
public async Task<JObject> DbfsCloseAsync(DatabricksDbfsCloseRequest dbfsRequest)
{
var request = new RestRequest("api/2.0/dbfs/close", Method.POST);
request.AddHeader("Authorization", $"Basic {_authHeader}");
request.AddJsonBody(dbfsRequest);
var response = await _client.ExecuteTaskAsync(request);
JObject responseContent = JObject.Parse(response.Content);
return responseContent;
}
private static string Base64Encode(string plainText)
{
var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
return Convert.ToBase64String(plainTextBytes);
}
}
}
As I say this is the most complex of all of the APIs I chose to look at. In reality you would probably not kick a Databricks job off from a UI. You might use a REST API of your own, which could delegate off to some Job manager like https://www.hangfire.io/ which would obviously still have to do the polling part for you.
With all that in place you should be able to pick the JAR from the UI, and submit it, watch it run and see the logs from the Databricks web UI for the run.
Conclusion
I have to say using Apache Spark / Databricks is an absolute dream. Databricks have just nailed it, it's just what was needed, its awesome what you can do with it. Being able to spin up a cluster on demand that runs a job and is destroyed after the job run (to save the idle costs) is just frickin great.
I urge you to give it a look, I think you will love it