×
Community Blog Flink Materialized Table: Building Unified Stream and Batch ETL

Flink Materialized Table: Building Unified Stream and Batch ETL

This article explores Apache Flink's Materialized Table, a unified stream-batch architecture introduced in Flink 2.

Summary: This article is adapted from the presentation by Liu Dalong, Technical Expert at Alibaba Cloud and Apache Flink Committer, delivered at the Stream-Batch Unification (Part 1) session of Flink Forward Asia 2024. The content primarily consists of the following six sections:

1、A User Story Of Data Engineer

2、Introduction to Materialized Tables

3、Background and Challenges before introducing Materilized table

4、The Role of Materialized Tables: Building Mixing Stream and Batch Processing ETL

5、Technical Architecture with Materialized Table

6、Comparison Between Traditional Lambda Architecture and Materialized Table Unified Stream-Batch Architecture

A User Story Of Data Engineer

Nowadays, concepts like Data Lakes and Lakehouse architectures, along with technologies such as Apache Paimon, are gaining significant traction. The ideal solution centers on leveraging open data lake formats as its foundation. This approach allows for the ingestion of diverse data sources into a unified storage repository. By adopting open formats, the architecture inherently supports multiple computing engines—including stream processing, batch processing, OLAP analytics, and even AI model training workflows. This flexibility enables multi-dimensional analysis and processing of the data, with outcomes that can be applied to artificial intelligence implementations, business intelligence reporting, or other analytical scenarios. Ultimately, this empowers organizations to unlock greater value from their data assets.

Although this architecture appears highly appealing and many organizations are attempting to implement it, a series of challenges may arise during practical operations.

Let's look at a case: Suppose Steven is a data engineer at a company, and upon arriving at work, he receives a message from Roy—"Can you provide statistics on yesterday's top-selling product or the GMV across various categories on the platform?" Being familiar with the business, Steven chooses Iceberg as his data lake storage. He then imports data from MySQL using DataX, performs Batch ETL with Spark, and constructs an impressive dashboard using QuickBI. The manager is very satisfied upon seeing the dashboard and subsequently proposes a new requirement.

The new requirement is to update the report daily. Upon receiving this request, Steven adds a scheduler to the existing batch processing pipeline, using a tool like Airflow, to schedule batch jobs daily, thereby achieving daily report updates.

Then Roy expects the report to be updated in real-time every day. Steven, being familiar with Iceberg, knows that Iceberg is primarily geared toward batch processing and may face challenges in real-time updates. It is difficult to achieve second-level or minute-level real-time computation with Spark or Spark SQL. The existing unified data lakehouse architecture cannot meet this requirement, so Steven has to set up a new real-time processing pipeline. He uses a typical combination of Flink and Kafka to build a real-time processing pipeline and reconstructs a real-time BI dashboard. This essentially introduces both real-time and batch pipelines, which is fundamentally a Lambda architecture.

Meanwhile, Roy proposes additional requirements, such as performing comparative calculations between real-time data and historical data, and adding year-over-year and month-over-month calculations. Because there are two separate storage systems and two sets of business code, it is challenging to align the data metrics, making it difficult to obtain accurate results for real-time and batch comparisons. At this point, there are two solutions: one is to ask Roy to abandon the requirements, which is usually unlikely. The other option is to explore a completely new solution using a unified technical architecture.

In conclusion, the industry is currently exploring the integration of stream and batch processing through two prominent architectural approaches. The first approach is the Lambda architecture, which is hindered by the necessity for separate computing and storage systems for stream and batch processing. The second approach employs Flink alongside real-time storage technologies like Kafka or Paimon, commonly referred to as the Kappa architecture or Lakehouse architecture.

The Lakehouse architecture benefits from a unified framework and enables real-time data updates via incremental computation. However, it also faces challenges, such as the need to keep streaming resources continuously active, resulting in high costs and inefficiencies, especially concerning backtracking. Reprocessing historical data is complicated due to the distinct programming models required by stream and batch processing, hindering code reuse.

Both architectures present unique obstacles. In response to these challenges, over the past year, we have embarked on thoughtful exploration into the integration of stream and batch processing using Flink, yielding promising results.

By examining and experimenting with blending stream and batch paradigms in Flink, we aim to overcome existing inefficiencies and complexities, driving innovation in data processing and unlocking new possibilities for enhanced scalability and resource optimization within the industry.

Introduction to Materialized Tables

Materialized Table is an innovative table type introduced in Flink SQL in Apache Flink 2.0, designed to simplify and unify batch and stream data pipelines. This powerful feature provides developers with a consistent and streamlined development experience. By defining data freshness and queries when creating a Materialized Table, the Flink engine automatically generates the table's schema and establishes the necessary data refresh pipeline to maintain the specified level of freshness. This automation not only enhances efficiency but also minimizes the complexity involved in managing data workflows, allowing users to focus on deriving actionable insights from their data.

Background and Challenges before introducing Materilized table

Before diving into the concept of Materialized Table, it's important to understand some background. Apache Flink is a powerful computing engine that seamlessly integrates stream and batch processing. It has been designed with a unified approach across its core components—from the Runtime and operator levels to the API level within Flink SQL. Despite these efforts, Flink has yet to fully achieve the unification of stream and batch processing from an end-user perspective.

The reason lies in the distinct programming models required for each type of processing. Stream processing involves managing infinite data streams and relies on incremental computation, while batch processing is oriented towards partition or table granularity, necessitating full data computation. When writing SQL code, users are typically focused on partition granularity, making code reuse challenging and hindering true integration of stream and batch processing for users.

The Role of Materialized Tables: Building Mixing Stream and Batch Processing ETL

Declarative ETL unified Stream and Batch Processing

To address these challenges, improvements have been made to enhance unified stream and batch storage capabilities along with the Flink engine's proficiency in handling both processing types. Consequently, a set of business-level APIs called Materialized Tables has been developed. This abstraction serves as a bridge for integrating stream and batch processing at both the user and business levels.

The Materialized Table SQL statement consists of three key components:

  • Create Materialized Table: This keyword initiates the creation of a materialized table.
  • Freshness: Specifies the desired data freshness, indicating how current the data should be.
  • As Select: Details the query logic and data production operations.

Users simply declare a few lines of SQL code, and the Flink engine autonomously selects the optimal execution mode—either a streaming or batch job—based on the defined Freshness, updating the Materialized Table data accordingly. The primary aim of Materialized Tables is to abstract away the complexities of stream and batch processing, enabling users to concentrate on their core business objectives while the engine manages other tasks efficiently.

By leveraging Materialized Tables, users can streamline their workflows, enhance data processing efficiency, and focus on driving business value without the distractions of technical intricacies.

From Imperative ETL to Declarative ETL

Materialized Tables represent a significant evolution in building a unified stream and batch ETL system, distinguishing themselves from traditional ETL methods used in data warehouses and offline scheduling scenarios. Let's first examine traditional ETL processes in data warehouses. Writing a comprehensive ETL job involves several steps:

  • Table Creation: Establishing the foundational structure for data storage.
  • Data Partition Declaration: Specifying data freshness criteria, such as extracting data from the past day or past hour.
  • Business Logic Declaration: Defining the business rules and operations, including tasks like managing small files within the table.
  • Job Scheduling Configuration: Setting up periodic data refreshes to maintain the specified level of freshness.

Materialized Tables simplify these processes by allowing users to declare a set of SQL statements that define data freshness and business logic. The engine handles all other operations, enabling users to concentrate on business value. This provides a unified API experience for both stream and batch processing.

From a technical standpoint, Materialized Tables offer a user-friendly syntax that shifts the traditional imperative programming process to the engine. This transition moves users from an imperative ETL approach to a declarative ETL paradigm, changing the focus from job-centric processes to table-centric and data-centric operations. Consequently, users can concentrate solely on tables and data, creating a seamless, database-like experience.

By leveraging Materialized Tables, organizations can streamline their ETL workflows and enhance flexibility and efficiency, empowering users to focus on driving meaningful business insights without the complexities of traditional ETL configurations.

One SQL Statement to Backfill Historical Data


Creating a table marks only the beginning of the data development journey. As business needs evolve, iterative data requirements often emerge, including adding new fields, deleting existing ones, or modifying the calculation logic associated with particular fields. In traditional batch-based data warehouse environments, accommodating these changes typically requires rewriting jobs to update the business logic, specifying the data partition, and subsequently executing the data processing through scheduled tasks on the platform.

Materialized Tables simplify this process dramatically. With a single command, you can efficiently update the data without extensive reconfiguration. For instance, executing the command:

ALTER MATERIALIZED TABLE customer_orders REFRESH PARTITION(ds='20241125')

automatically refreshes the data for the specified partition, streamlining the entire process. This approach not only enhances efficiency but also reduces complexity, allowing data teams to focus on driving valuable insights rather than navigating cumbersome workflows.

One SQL Statement to Change Data Freshness

Adjusting data freshness can be crucial in certain scenarios, such as e-commerce operations. During normal periods, daily or hourly data updates might suffice. However, during major sales events like Black Friday or Cyber Monday, the demand for real-time data freshness escalates, requiring updates on a minute or even second basis. So, how can businesses efficiently manage this shift in data needs?

Traditionally, employing a Lambda architecture would require setting up a new real-time pipeline using Flink and Kafka to achieve second-level data updates. This approach involves costs related to redeveloping and maintaining streaming jobs, which can be substantial.

Materialized Tables offer a streamlined solution by allowing data freshness to be modified with minimal effort. With just a single command, businesses can automatically adjust the data freshness to align with business timeliness, eliminating the need to construct new pipelines. This capability is part of the essential user-facing APIs associated with Materialized Tables, offering a simplified yet powerful tool for managing dynamic data requirements.

By leveraging Materialized Tables, organizations can adapt quickly to changing data demands, ensuring their systems remain agile and responsive during critical sales events.

Technical Architecture with Materialized Table

After introducing the user APIs of Materialized Table, let's explore how to implement and use Materialized Table within a company from a developer's perspective. Next, we will outline the overall principles and necessary tasks from a technical architecture standpoint.

Overall Technical Architecture

The technical architecture of Flink Materialized Table consists of several key components:

(1)Client:

  • Responsible for submitting SQL statements related to Materialized Table.
  • Users submit SQL commands via the Client to create and manage Materialized Tables.

(2)SQL Gateway:

  • A resident service that manages the full lifecycle of Materialized Tables.
  • Includes table creation, metadata management, execution and monitoring of background data refresh jobs, among other tasks.

(3)Workflow:

  • Handles periodic batch scheduling jobs.
  • Executes data refresh operations based on the data freshness specified by Materialized Table, such as daily or hourly.

(4)CatalogStore:

  • Stores and manages Catalog metadata.
  • Works in conjunction with the Catalog component to ensure metadata persistence and consistency.

(5)Catalog:

  • A core component that persists all metadata related to Materialized Table, including schema, defined queries, freshness, and corresponding background job information.
  • Ensures that all metadata is accurately recorded and managed.

(6)Flink Cluster:

  • Executes data refresh jobs.
  • Responsible for actual data processing and computation tasks.

Technical Architecture Overview

The entire technical architecture is based on the existing capabilities of Flink, constructing a more comprehensive system by integrating various dispersed components in a "building block" manner to achieve unified stream and batch processing. Specifically, this architecture does not introduce new components but rather integrates existing Flink components to achieve the desired functionality.

New Components to Integrate

To use Materialized Table within the company, developers need to focus on integrating two new components:

(1)Catalog:

  • Function: Persists all metadata related to Materialized Table, including schema, defined queries, freshness, and corresponding background job information.
  • Integration Steps:

    • Select a suitable Catalog implementation (such as Paimon Catalog with unified stream and batch storage capabilities).
    • Ensure the Catalog supports storing metadata related to Materialized Table.

(2)Workflow Scheduler:

  • Function: Manages and executes periodic batch scheduling jobs, performing timely refreshes based on the data freshness specified by Materialized Table.
  • Integration Steps:

    • Select an appropriate Workflow scheduling tool (such as Airflow or DolphinScheduler).
    • Configure the Workflow to support the refresh requirements of Materialized Table.
    • Implement the creation and management of scheduled tasks to ensure the timeliness and accuracy of data refreshes.

Catalog API Integration

The first integration involves the Catalog API. In Flink SQL, a new table type called CatalogMaterializedTable has been introduced. It is a parallel concept to the existing CatalogTable, but with additional metadata beyond the Schema, including Definition Query, Freshness, and background job metadata. We need corresponding methods in the Catalog API: createTable, getTable, alterTable, and dropTable to support this new table type, enabling various CRUD operations on CatalogMaterializedTable objects.

Furthermore, an important method called getFactory is required because CatalogMaterializedTable needs background jobs to perform data refresh. Thus, compiling its Definition Query and submitting job execution necessitates retrieving its DynamicTableSourceFactory and DynamicTableSinkFactory, which can be accessed via the getFactory method. Essentially, Materialized Table requires the Catalog to have storage capabilities, essentially Catalog Connectorization.

These are the core methods that need to be implemented for Catalog integration. Within the Flink community, integration with Paimon has been completed, and Paimon Catalog already supports Materialized Table. If you have a custom Catalog and wish to support Materialized Table, you need to implement these corresponding Catalog methods.

Pluggable Workflow Integration

The second part involves integrating with the Workflow. When a user specifies the freshness while creating a CatalogMaterializedTable, it may correspond to either a streaming job or a batch job on the backend. If it is a batch job, it relies on a workflow scheduler to create the corresponding Workflow and perform periodic scheduling to refresh the data. This requires a plugin to interface with the Workflow scheduler, enabling communication between the SQL Gateway and the respective Workflow scheduler.

Based on these requirements, we have abstracted a pluggable WorkflowScheduler interface in FLIP-448. The WorkflowScheduler interface includes three methods: createRefreshWorkflow, modifyRefreshWorkflow, and deleteRefreshWorkflow. When performing CRUD operations on a Materialized Table, it communicates with the specific Workflow scheduler through the WorkflowScheduler interface to complete the corresponding operations. For instance, using Airflow requires implementing an AirflowWorkflowScheduler, using DolphinScheduler requires implementing a DolphinSchedulerWorkflowScheduler, or for other custom workflow schedulers, the corresponding implementation is needed.

To effectively utilize Materialized Table from a developer's perspective, the core APIs that need to be integrated are the two parts mentioned above. The current progress in the community is that we have completed the integration of CatalogMaterializedTable with Paimon in Flink 1.20, and Paimon's lake storage already supports this capability. For the integration with WorkflowScheduler, the first step is to complete the integration with DolphinScheduler, which will be accomplished in Flink 2.0, enabling end-to-end functionality of Materialized Table in version 2.0. Additionally, in version 2.0, we will undertake more tasks, such as integrating with Yarn/K8S to allow Materialized Table jobs to be submitted to Yarn/K8S, and developing YAML integration. The current 1.20 version only includes MVP functionality.

Comparison Between Traditional Lambda Architecture and Materialized Table Unified Stream-Batch Architecture

Challenges of Traditional Lambda Architecture:

  • High Cost: Requires maintaining two separate systems (batch and stream processing), increasing hardware and operational costs.
  • Low Efficiency: Data processing logic must be implemented separately for batch and stream processing, leading to poor code reusability and reduced development and maintenance efficiency.
  • Error-Prone: Differences in data processing logic between batch and stream processing can cause inconsistencies in data results.

Advantages of Materialized Table:

  • Low Cost: Utilizes a single storage and computation engine, reducing hardware and operational costs.
  • High Efficiency: Provides a unified API, allowing users to write SQL statements once to fulfill both stream and batch processing needs, enhancing development and maintenance efficiency.
  • Correctness: Ensures consistency and accuracy of data through unified processing logic.

Different Computing Modes in Flink:

  • Batch Processing: Suitable for processing full sets of data.
  • Stream Processing: Suitable for real-time data processing.
  • Incremental Processing: An emerging computing mode that optimizes costs in scenarios requiring freshness at the minute and hour levels.

Due to the varied computational capabilities of Materialized Table, combined with user-specified freshness and cost optimizer, it automatically selects the optimal execution mode. This ensures that the Materialized Table achieves a balance between cost and freshness as expected by the user, which is a primary concern for users.

Materialized Table not only addresses the pain points of the traditional Lambda architecture but also offers a more efficient, flexible, and cost-effective solution for users and developers. It enhances overall data processing capabilities and user experience while meeting business requirements.

0 1 0
Share on

Apache Flink Community

173 posts | 48 followers

You may also like

Comments

Apache Flink Community

173 posts | 48 followers

Related Products