Wednesday, 31 July 2019

Datasets

Datasets

The Apache Spark Dataset API provides a type-safe, object-oriented programming interface. DataFrame is an alias for an untyped Dataset [Row]. Datasets provide compile-time type safety—which means that production applications can be checked for errors before they are run—and they allow direct operations over user-defined classes. The Dataset API also offers high-level domain-specific language operations like sum()avg()join()select()groupBy(), making the code a lot easier to express, read, and write.
In this tutorial module, you will learn how to:
We also provide a sample notebook that you can import to access and run all of the code examples included in the module.

Create sample data

There two ways to create Datasets: dynamically and by reading from a JSON file using SparkSession. First, for primitive types in examples or demos, you can create Datasets within a Scala or Python notebook or in your sample Spark application. For example, here’s a way to create a Dataset of 100 integers in a notebook. We use the spark variable to create 100 integers as Dataset[Long].
Copy to clipboardCopy
// range of 100 numbers to create a Dataset.
val range100 = spark.range(100)
range100.collect()
../../_images/gsasg-dataset-output.png

Load sample data

The more common way is to read a data file from an external data source, such HDFS, blob storage, NoSQL, RDBMS, or local filesystem. Spark supports multiple formats: JSON, CSV, Text, Parquet, ORC, and so on. To read a JSON file, you also use the SparkSession variable spark.
The easiest way to start working with Datasets is to use an example Azure Databricks dataset available in the /databricks-datasetsfolder accessible within the Azure Databricks workspace.
Copy to clipboardCopy
val df = spark.read.json("/databricks-datasets/samples/people/people.json")
At the time of reading the JSON file, Spark does not know the structure of your data. That is, it doesn’t know how you want to organize your data into a typed-specific JVM object. It attempts to infer the schema from the JSON file and creates a DataFrame = Dataset[Row] of generic Row objects.
You can explicitly convert your DataFrame into a Dataset reflecting a Scala class object by defining a domain-specific Scala case class and converting the DataFrame into that type:
Copy to clipboardCopy
// First, define a case class that represents a type-specific Scala JVM Object
case class Person (name: String, age: Long)

// Read the JSON file, convert the DataFrames into a type-specific JVM Scala object
// Person. At this stage Spark, upon reading JSON, created a generic
// DataFrame = Dataset[Rows]. By explicitly converting DataFrame into Dataset
// results in a type-specific rows or collection of objects of type Person
val ds = spark.read.json("/databricks-datasets/samples/people/people.json").as[Person]
You can do something similar with IoT device state information captured in a JSON file: define a case class, read the JSON file, and convert the DataFrame = Dataset[DeviceIoTData].
There are two reasons to convert a DataFrame into a type-specific JVM object. First, after an explicit conversion, for all relational and query expressions using Dataset API, you get compile-type safety. For example, if you use a filter operation using the wrong data type, Spark detects mismatch types and issues a compile error rather an execution runtime error, so that you catch errors earlier. Second, the Dataset API provides high-order methods, which makes code much easier to read and develop. In the section Process and visualize the Dataset, notice how using Dataset typed objects makes the code easier to express and read.
As in the Person example, here create a case class that encapsulates the Scala object. To access the file that contains IoT data, load the file /databricks-datasets/iot/iot_devices.json.
Copy to clipboardCopy
// define a case class that represents the device data.
case class DeviceIoTData (
  battery_level: Long,
  c02_level: Long,
  cca2: String,
  cca3: String,
  cn: String,
  device_id: Long,
  device_name: String,
  humidity: Long,
  ip: String,
  latitude: Double,
  longitude: Double,
  scale: String,
  temp: Long,
  timestamp: Long
)

// read the JSON file and create the Dataset from the ``case class`` DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json("/databricks-datasets/iot/iot_devices.json").as[DeviceIoTData]

View the Dataset

To view the data in a tabular format instead of exporting it to a third-party tool, you can use the Azure Databricks display()command. Once you have loaded the JSON data and converted it into a Dataset for your type-specific collection of JVM objects, you can view them as you would view a DataFrame, by using either display() or standard Spark commands, such as take()foreach(), and println() API calls.
Copy to clipboardCopy
// display the dataset table just read in from the JSON file
display(ds)
Copy to clipboardCopy
// Using the standard Spark commands, take() and foreach(), print the first
// 10 rows of the Datasets.
ds.take(10).foreach(println(_))
// Print first 10 rows of a dataset
../../_images/gsasg-dataset-display-10.png

Process and visualize the Dataset

A Dataset has transformations and actions. Most important are the high-level domain specific operations such as sum()select()avg()join(), and union(). For more information, see the Scala Dataset API.
In this example, you can use filter()map()groupBy(), and avg(), all higher-level methods, to create new Datasets. What’s noteworthy is that you can access the attributes by their names as defined in the case class. That is, use the dot notation to access individual fields. As such, it makes code easy to read and write.
Copy to clipboardCopy
// filter out all devices whose temperature exceed 25 degrees and generate
// another Dataset with three fields that of interest and then display
// the mapped Dataset
val dsTemp = ds.filter(d => d.temp > 25).map(d => (d.temp, d.device_name, d.cca3)
display(dsTemp)
../../_images/gsasg-display-filtered-dataset.png
Copy to clipboardCopy
// Apply higher-level Dataset API methods such as groupBy() and avg().
// Filter temperatures > 25, along with their corresponding
// devices' humidity, compute averages, groupBy cca3 country codes,
// and display the results, using table and bar charts
val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()

// display averages as a table, grouped by the country
display(dsAvgTmp)
../../_images/gsasg-display-dataset-averages-as-a-table.png
Copy to clipboardCopy
// Select individual fields using the Dataset method select()
// where battery_level is greater than 6. Note this high-level
// domain specific language API reads like a SQL query
display(ds.select($"battery_level", $"c02_level", $"device_name").where($"battery_level" > 6).sort($"c02_level"))
../../_images/gsasg-dataset-display-table-output.png
Here is an animated gif showing how quickly you can go from table to map to charts using Datasets and Azure Databricks display()command.
../../_images/gsasg-example-of-databricks-visualizations.gif
An additional benefit of using the Databricks display() command is that you can quickly view this data with a number of embedded visualizations. For example, in a new cell, you can issue SQL queries and click the map to see the data. But first you must save your dataset, ds, as a temporary table.
Copy to clipboardCopy
// registering your Dataset as a temporary table to which you can issue SQL queries
ds.createOrReplaceTempView("iot_device_data")
Having saved the Dataset of DeviceIoTData as a temporary table, you can issue SQL queries to it.
Copy to clipboardCopy
%sql select cca3, count (distinct device_id) as device_id from iot_device_data group by cca3 order by device_id desc limit 100
../../_images/gsasg-screenshot-of-dataset-map.png

No comments:

Post a Comment