DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Customer 360: Fraud Detection in Fintech With PySpark and ML
  • Automating Data Pipelines: Generating PySpark and SQL Jobs With LLMs in Cloudera
  • Doris: Unifying SQL Dialects for a Seamless Data Query Ecosystem
  • A Deep Dive into Apache Doris Indexes

Trending

  • Understanding Java Signals
  • Solid Testing Strategies for Salesforce Releases
  • The Role of Retrieval Augmented Generation (RAG) in Development of AI-Infused Enterprise Applications
  • Ensuring Configuration Consistency Across Global Data Centers
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Mastering Advanced Aggregations in Spark SQL

Mastering Advanced Aggregations in Spark SQL

OLAP (Online Analytical Processing) aggregation techniques in Spark SQL are used for complex, multi-level data analysis over large datasets.

By 
Ram Ghadiyaram user avatar
Ram Ghadiyaram
·
May. 15, 25 · Tutorial
Likes (0)
Comment
Save
Tweet
Share
1.4K Views

Join the DZone community and get the full member experience.

Join For Free

In data analytics, efficiently aggregating large datasets is a basic need. For example, when working with retail inventory data, tracking products shipped to stores each month, the standard GROUP BY clause in SQL can handle basic aggregations. 

However, it falls short when you need multiple levels of aggregation in a single query. This is where Spark SQL’s advanced GROUP BY extensions, GROUPING SETS, ROLLUP, and CUBE, come into the picture to compute multiple groupings efficiently.

In this article, we’ll see aggregation techniques using a retail inventory scenario. We’ll explore how to calculate shipments by store and product type, as well as totals across various dimensions, all in one query. With examples in both Scala and PySpark, complete with code snippets and outputs.

 Let’s get going.

The Problem: Aggregating Retail Inventory Data

Suppose you’re a data engineer managing a retail chain’s inventory system. Every month, products like clothing, footwear, and electronics are shipped to various stores. Your dataset includes details such as store ID, product type, units shipped, and shipment month. You need to answer questions like:

  • How many units of each product type were shipped to each store?
  • What’s the total number of units shipped to each store, regardless of product type?
  • What’s the grand total of all shipments across all stores and types?

Running separate GROUP BY queries for each question and combining the results with UNION would work, but it’s inefficient, especially with large datasets processed in Spark. Instead, GROUPING SETS, ROLLUP, and CUBE allow you to compute all these aggregations in a single task, optimizing performance and simplifying your code.

Input Data

We’ll use a sample dataset provided as a CSV string, representing shipments to two stores (101 and 102) over two months:

Here’s what the data looks like visually:

store_id product_type shipped_units month
101 Clothing 2000 1
101 Footwear 1200 1
101 Electronics 2400 1
102 Electronics 3000 1
102 Clothing 1000 1
101 Clothing 1500 2
101 Electronics 800 2
102 Clothing 3000 2


Setting Up the Spark Environment

Before we start the aggregations, let’s set up our Spark environment in Scala and PySpark. We’ll load the CSV string into a DataFrame and create a temporary view for SQL queries.

Scala
 
package com.examples

import org.apache.spark.sql.SparkSession

object AdvancedAggregations {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Advanced Aggregations in Spark SQL")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._
 
    val csvData = spark.sparkContext.parallelize(
      """
      |store_id,product_type,shipped_units,month
      |101,Clothing,2000,1
      |101,Footwear,1200,1
      |101,Electronics,2400,1
      |102,Electronics,3000,1
      |102,Clothing,1000,1
      |101,Clothing,1500,2
      |101,Electronics,800,2
      |102,Clothing,3000,2
      """.stripMargin.lines.toList).toDS()

    val df = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
    df.show()
    df.createOrReplaceTempView("shipped_products")
  }
}


PySpark setup:

Python
 
from pyspark.sql import SparkSession
 
spark = SparkSession.builder \
    .appName("Advanced Aggregations in Spark SQL") \
    .master("local[*]") \
    .getOrCreate()

csv_data = """
store_id,product_type,shipped_units,month
101,Clothing,2000,1
101,Footwear,1200,1
101,Electronics,2400,1
102,Electronics,3000,1
102,Clothing,1000,1
101,Clothing,1500,2
101,Electronics,800,2
102,Clothing,3000,2
"""
lines = csv_data.strip().split("\n")
header = lines[0].split(",")
data = [dict(zip(header, line.split(","))) for line in lines[1:]]
df = spark.createDataFrame(data).selectExpr("cast(store_id as int)", "product_type", "cast(shipped_units as int)", "cast(month as int)")
df.show()
df.createOrReplaceTempView("shipped_products")


Output (from Scala and PySpark setup):

Plain Text
 
+--------+------------+-------------+-----+
|store_id|product_type|shipped_units|month|
+--------+------------+-------------+-----+
|     101|    Clothing|         2000|    1|
|     101|    Footwear|         1200|    1|
|     101| Electronics|         2400|    1|
|     102| Electronics|         3000|    1|
|     102|    Clothing|         1000|    1|
|     101|    Clothing|         1500|    2|
|     101| Electronics|          800|    2|
|     102|    Clothing|         3000|    2|
+--------+------------+-------------+-----+


With our data loaded, let’s explore the aggregation techniques.

GROUPING SETS: Multi-Level Aggregations

GROUPING SETS is an extension of GROUP BY that lets you define multiple grouping combinations in a single query. Think of it as a way to perform several GROUP BY operations and union their results, all in one single go.

GROUP BY GROUPING SETS is equivalent to the UNION of two or more GROUP BY operations in the same result set:

  • GROUP BY GROUPING SETS((a)) is equivalent to the single grouping set operation GROUP BY a.
  • GROUP BY GROUPING SETS((a),(b)) is equivalent to GROUP BY a UNION ALL GROUP BY b.

Example: Shipments by Store and Product Type, Plus Store Totals

Let’s calculate the total units shipped to each store for each product type, plus the total units per store across all types.

Scala/PySpark code: 

SQL
 
spark.sql("""
  SELECT store_id, product_type, SUM(shipped_units) as total_count
  FROM shipped_products
  GROUP BY store_id, product_type
  GROUPING SETS ((store_id, product_type), (store_id))
""").show()


 Output:

Plain Text
 
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
|     102|        null|       7000|
|     101| Electronics|       3200|
|     102| Electronics|       3000|
|     101|        null|       7900|
|     101|    Clothing|       3500|
|     102|    Clothing|       4000|
|     101|    Footwear|       1200|
+--------+------------+-----------+


Explanation:

  • (store_id, product_type) gives totals for each store-product combination (e.g., 3200 Electronics for store 101).
  • (store_id) gives totals per store across all types (e.g., 7900 for store 101), with product_type as null.

Adding the Grand Total

To include the overall total of all shipments, add an empty set ():

SQL
 
spark.sql("""
  SELECT store_id, product_type, SUM(shipped_units) as total_count
  FROM shipped_products
  GROUP BY store_id, product_type
  GROUPING SETS ((store_id, product_type), (store_id), ())
""").show()


Output:

Plain Text
 
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
|    null|        null|      14900|
|     102|        null|       7000|
|     101| Electronics|       3200|
|     102| Electronics|       3000|
|     101|        null|       7900|
|     101|    Clothing|       3500|
|     102|    Clothing|       4000|
|     101|    Footwear|       1200|
+--------+------------+-----------+


Explanation:

  • () adds the grand total (14900), with both store_id and product_type as null.

Using GROUPING_ID for Clarity

The GROUPING_ID function provides a bit vector to identify which columns are grouped, helping in explaining the result.

SQL
 
spark.sql("""
  SELECT store_id, product_type, SUM(shipped_units) as total_count, GROUPING__ID
  FROM shipped_products
  GROUP BY store_id, product_type
  GROUPING SETS ((store_id, product_type), (store_id), ())
""").show(


Output:

Plain Text
 
+--------+------------+-----------+------------+
|store_id|product_type|total_count|grouping__id|
+--------+------------+-----------+------------+
|    null|        null|      14900|           3|
|     102|        null|       7000|           1|
|     101| Electronics|       3200|           0|
|     102| Electronics|       3000|           0|
|     101|        null|       7900|           1|
|     101|    Clothing|       3500|           0|
|     102|    Clothing|       4000|           0|
|     101|    Footwear|       1200|           0|
+--------+------------+-----------+------------+


Explanation:

  • 0: Both columns grouped.
  • 1: Only store_id grouped.
  • 3: No columns grouped (grand total).

ROLLUP: Hierarchical Aggregations Made Simple

ROLLUP builds on GROUPING SETS to create hierarchical aggregations, ideal for subtotals along a dimension. For store_id and product_type, it generates:

  1. (store_id, product_type)
  2. (store_id)
  3. ()

Example: Rollup 

SQL
 
spark.sql("""
  SELECT store_id, product_type, SUM(shipped_units) as total_count
  FROM shipped_products
  GROUP BY store_id, product_type WITH ROLLUP
""").show()


Output: (The N elements of a ROLLUP specification correspond to N+1 GROUPING SETS.)

Plain Text
 
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
|    null|        null|      14900|
|     102|        null|       7000|
|     101| Electronics|       3200|
|     102| Electronics|       3000|
|     101|        null|       7900|
|     101|    Clothing|       3500|
|     102|    Clothing|       4000|
|     101|    Footwear|       1200|
+--------+------------+-----------+


Explanation: This matches the GROUPING SETS ((store_id, product_type), (store_id), ()) output, providing a hierarchical view from specific combinations to the grand total.

CUBE: Multi-Dimensional Aggregations

CUBE takes aggregation to the next level by computing all possible combinations of the specified columns. For two columns, it produces 2² = 4 sets:

  1. (store_id, product_type)
  2. (store_id)
  3. (product_type)
  4. ()

Example: Cube

SQL
 
spark.sql("""
  SELECT store_id, product_type, SUM(shipped_units) as total_count
  FROM shipped_products
  GROUP BY store_id, product_type WITH CUBE
""").show()


Output: (The N elements of a CUBE specification correspond to 2^N GROUPING SETS.)

Plain Text
 
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
|    null| Electronics|       6200|
|    null|        null|      14900|
|    null|    Footwear|       1200|
|     102|        null|       7000|
|     101| Electronics|       3200|
|    null|    Clothing|       7500|
|     102| Electronics|       3000|
|     101|        null|       7900|
|     101|    Clothing|       3500|
|     102|    Clothing|       4000|
|     101|    Footwear|       1200|
+--------+------------+-----------+


Explanation:

  • (product_type) adds totals per product type across all stores (e.g., 7500 for Clothing).
  • This is equivalent to GROUPING SETS ((store_id, product_type), (store_id), (product_type), ()).

Choosing the Right Tool

  • GROUPING SETS: Perfect for custom, specific groupings when you don’t need a full hierarchy or all combinations.
  • ROLLUP: Best for hierarchical aggregations, like subtotals by store, then grand totals.
  • CUBE: Ideal for multi-dimensional analysis, giving you every possible aggregation.

Conclusion

In data analytics, especially within fields like retail and financial technology, swiftly consolidating vast datasets is essential for uncovering valuable insights.

Spark SQL’s powerful GROUP BY extensions, such as GROUPING SETS, ROLLUP, and CUBE, will simplify data aggregation by enabling complex, multi-tier summaries within a single query, surpassing the constraints of basic GROUP BY operations. Through a retail inventory case study, the article showcases how these extensions efficiently calculate metrics like store-product totals, store-wide summaries, and overall aggregates.

pyspark Scala (programming language) SPARK (programming language) sql Big data

Opinions expressed by DZone contributors are their own.

Related

  • Customer 360: Fraud Detection in Fintech With PySpark and ML
  • Automating Data Pipelines: Generating PySpark and SQL Jobs With LLMs in Cloudera
  • Doris: Unifying SQL Dialects for a Seamless Data Query Ecosystem
  • A Deep Dive into Apache Doris Indexes

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: