Datasets
The Apache Spark Dataset API provides a type-safe, object-oriented programming interface.
DataFrame
is an alias for an untyped Dataset [Row]
. Datasets provide compile-time type safety—which means that production applications can be checked for errors before they are run—and they allow direct operations over user-defined classes. The Dataset API also offers high-level domain-specific language operations like sum()
, avg()
, join()
, select()
, groupBy()
, making the code a lot easier to express, read, and write.
In this tutorial module, you will learn how to:
We also provide a sample notebook that you can import to access and run all of the code examples included in the module.
Create sample data
There two ways to create Datasets: dynamically and by reading from a JSON file using
SparkSession
. First, for primitive types in examples or demos, you can create Datasets within a Scala or Python notebook or in your sample Spark application. For example, here’s a way to create a Dataset of 100 integers in a notebook. We use the spark
variable to create 100 integers as Dataset[Long]
.
Copy
// range of 100 numbers to create a Dataset.
val range100 = spark.range(100)
range100.collect()
Load sample data
The more common way is to read a data file from an external data source, such HDFS, blob storage, NoSQL, RDBMS, or local filesystem. Spark supports multiple formats: JSON, CSV, Text, Parquet, ORC, and so on. To read a JSON file, you also use the
SparkSession
variable spark
.
The easiest way to start working with Datasets is to use an example Azure Databricks dataset available in the
/databricks-datasets
folder accessible within the Azure Databricks workspace.
Copy
val df = spark.read.json("/databricks-datasets/samples/people/people.json")
At the time of reading the JSON file, Spark does not know the structure of your data. That is, it doesn’t know how you want to organize your data into a typed-specific JVM object. It attempts to infer the schema from the JSON file and creates a
DataFrame = Dataset[Row]
of generic Row
objects.
You can explicitly convert your
DataFrame
into a Dataset
reflecting a Scala class object by defining a domain-specific Scala case class
and converting the DataFrame into that type:
Copy
// First, define a case class that represents a type-specific Scala JVM Object
case class Person (name: String, age: Long)
// Read the JSON file, convert the DataFrames into a type-specific JVM Scala object
// Person. At this stage Spark, upon reading JSON, created a generic
// DataFrame = Dataset[Rows]. By explicitly converting DataFrame into Dataset
// results in a type-specific rows or collection of objects of type Person
val ds = spark.read.json("/databricks-datasets/samples/people/people.json").as[Person]
You can do something similar with IoT device state information captured in a JSON file: define a
case class
, read the JSON file, and convert the DataFrame = Dataset[DeviceIoTData]
.
There are two reasons to convert a
DataFrame
into a type-specific JVM object. First, after an explicit conversion, for all relational and query expressions using Dataset API, you get compile-type safety. For example, if you use a filter operation using the wrong data type, Spark detects mismatch types and issues a compile error rather an execution runtime error, so that you catch errors earlier. Second, the Dataset API provides high-order methods, which makes code much easier to read and develop. In the section Process and visualize the Dataset, notice how using Dataset
typed objects makes the code easier to express and read.
As in the
Person
example, here create a case class
that encapsulates the Scala object. To access the file that contains IoT data, load the file /databricks-datasets/iot/iot_devices.json
.
Copy
// define a case class that represents the device data.
case class DeviceIoTData (
battery_level: Long,
c02_level: Long,
cca2: String,
cca3: String,
cn: String,
device_id: Long,
device_name: String,
humidity: Long,
ip: String,
latitude: Double,
longitude: Double,
scale: String,
temp: Long,
timestamp: Long
)
// read the JSON file and create the Dataset from the ``case class`` DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json("/databricks-datasets/iot/iot_devices.json").as[DeviceIoTData]
View the Dataset
To view the data in a tabular format instead of exporting it to a third-party tool, you can use the Azure Databricks
display()
command. Once you have loaded the JSON data and converted it into a Dataset
for your type-specific collection of JVM objects, you can view them as you would view a DataFrame
, by using either display()
or standard Spark commands, such as take()
, foreach()
, and println()
API calls.
Copy
// display the dataset table just read in from the JSON file
display(ds)
Copy
// Using the standard Spark commands, take() and foreach(), print the first
// 10 rows of the Datasets.
ds.take(10).foreach(println(_))
// Print first 10 rows of a dataset
Process and visualize the Dataset
A Dataset has transformations and actions. Most important are the high-level domain specific operations such as
sum()
, select()
, avg()
, join()
, and union()
. For more information, see the Scala Dataset API.
In this example, you can use
filter()
, map()
, groupBy()
, and avg()
, all higher-level methods, to create new Datasets
. What’s noteworthy is that you can access the attributes by their names as defined in the case class
. That is, use the dot notation to access individual fields. As such, it makes code easy to read and write.
Copy
// filter out all devices whose temperature exceed 25 degrees and generate
// another Dataset with three fields that of interest and then display
// the mapped Dataset
val dsTemp = ds.filter(d => d.temp > 25).map(d => (d.temp, d.device_name, d.cca3)
display(dsTemp)
Copy
// Apply higher-level Dataset API methods such as groupBy() and avg().
// Filter temperatures > 25, along with their corresponding
// devices' humidity, compute averages, groupBy cca3 country codes,
// and display the results, using table and bar charts
val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()
// display averages as a table, grouped by the country
display(dsAvgTmp)
Copy
// Select individual fields using the Dataset method select()
// where battery_level is greater than 6. Note this high-level
// domain specific language API reads like a SQL query
display(ds.select($"battery_level", $"c02_level", $"device_name").where($"battery_level" > 6).sort($"c02_level"))
Here is an animated gif showing how quickly you can go from table to map to charts using Datasets and Azure Databricks
display()
command.
An additional benefit of using the Databricks
display()
command is that you can quickly view this data with a number of embedded visualizations. For example, in a new cell, you can issue SQL queries and click the map to see the data. But first you must save your dataset, ds
, as a temporary table.
Copy
// registering your Dataset as a temporary table to which you can issue SQL queries
ds.createOrReplaceTempView("iot_device_data")
Having saved the
Dataset
of DeviceIoTData as a temporary table, you can issue SQL queries to it.
Copy
%sql select cca3, count (distinct device_id) as device_id from iot_device_data group by cca3 order by device_id desc limit 100
No comments:
Post a Comment