Skip to content

[core][flink] Support parallelism snapshot expire#7027

Open
wzhero1 wants to merge 8 commits intoapache:masterfrom
wzhero1:feat/paimon-expire-snapshot-parallel-opt
Open

[core][flink] Support parallelism snapshot expire#7027
wzhero1 wants to merge 8 commits intoapache:masterfrom
wzhero1:feat/paimon-expire-snapshot-parallel-opt

Conversation

@wzhero1
Copy link
Copy Markdown
Contributor

@wzhero1 wzhero1 commented Jan 13, 2026

Purpose

This PR implements parallel snapshot expiration using Flink distributed execution to improve the performance of large-scale cleanup operations.

Motivation:

  • Serial file deletion becomes a performance bottleneck for tables with large amounts of data
  • Current implementation cannot leverage Flink's distributed computing capabilities

Architecture:

The expire process is split into two phases:

  • Worker phase (parallel flatMap): Deletes data files and changelog files in parallel using RangePartitionedExpireFunction. Tasks are partitioned by snapshot ID range to maximize cache locality (adjacent snapshots share manifest files and tag data file skippers).
  • Sink phase (serial): Deletes manifests and snapshot metadata serially using SnapshotExpireSink, because manifestSkippingSet is global mutable state and snapshot deletion must be contiguous.

Key classes:

  • ExpireSnapshotsPlanner: Computes expiration plan including snapshot range, four types of tasks, protection set, and snapshot cache
  • SnapshotExpireTask: Abstract base with 4 polymorphic subclasses (DeleteDataFilesTask, DeleteChangelogFilesTask, DeleteManifestsTask, DeleteSnapshotTask)
  • ExpireContext: Holds runtime dependencies (SnapshotManager, SnapshotDeletion, ChangelogManager) and shared state (taggedSnapshots, snapshotCache, skippingSet)
  • ExpireSnapshotsPlan: Contains task lists, protection set, snapshot cache, and range-based task partitioning logic
  • DeletionReport: Carries deletion bucket info from workers to sink for empty directory cleanup

Execution modes:

  • Local mode: forceStartFlinkJob=false and parallelism <= 1 → executes locally via ExpireSnapshotsProcedure
  • Parallel mode: forceStartFlinkJob=true or parallelism > 1 → uses Flink distributed execution

Other changes:

  • Moved planner/plan/task/report classes from paimon-core to paimon-flink (per reviewer suggestion)
  • Planner pre-collects snapshots in parallel using CompletableFuture and caches them, serialized to both workers and sink to avoid redundant reads
  • ExpireSnapshotsImpl (serial mode) reuses ExpireSnapshotsPlanner for planning, keeping serial execution path unchanged
  • Added failure recovery test: verifies that if expire job fails midway (data files partially deleted but snapshot metadata intact), a subsequent run completes successfully

Tests

Unit Tests:

  • ExpireSnapshotsPlanTest - Tests range-based task partitioning logic
  • DeletionReportTest - Tests deletion report serialization

Integration Tests:

  • ParallelExpireSnapshotsActionITCase - Extends ExpireSnapshotsTest to validate parallel mode produces same results as serial mode, plus failure recovery test
  • ExpireSnapshotsProcedureITCase - Tests both local and parallel modes via procedure

API and Format

New CLI parameter for expire-snapshots action:

Parameter Type Default Description
--parallelism Integer env parallelism Parallelism for parallel expire workers

No storage format changes.

Backward compatible: Default behavior (local mode) remains unchanged.

Documentation

Yes, this introduces a new feature: parallel snapshot expiration.

Documentation should cover:

  • New --parallelism parameter for expire-snapshots action
  • Either --parallelism > 1 or --force_start_flink_job triggers parallel mode

@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from 69b0a18 to f61b9b4 Compare January 13, 2026 11:13
@wzhero1 wzhero1 changed the title [flink] Support parallelism snapshot expire [core][flink] Support parallelism snapshot expire Jan 13, 2026
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from f61b9b4 to be7a32b Compare January 13, 2026 12:19
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from be7a32b to 59788fc Compare January 26, 2026 05:49
Copy link
Copy Markdown
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments as below.

}

@Override
public void run() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this method is overridden, this class needs not implement LocalAction anymore. Still you can keep the executeLocally method for internal use.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep LocalAction here. By default (without forceStartFlinkJob), the action runs locally via super.run() → executeLocally(), which is sufficient for normal scenarios and safer. Only when
forceStartFlinkJob is set, we start a Flink job with multi-parallelism for large-scale expiration. This also leaves room for future adjustments.

@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch 3 times, most recently from b1d2e49 to 344f5c8 Compare March 12, 2026 07:19
ExpireSnapshotsPlan plan = planner.plan(expireConfig);
if (plan.isEmpty()) {
LOG.info("No snapshots to expire");
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to configure an empty Flink job, even if the job would complete immediately after submission. This is what "force_start_flink_job" means.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep the early return here. Submitting an empty Flink job still involves cluster resource allocation and scheduling overhead for zero useful work. I've improved the log message to make the skip behavior clear to users.

@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch 5 times, most recently from 1aa9654 to 35d548b Compare March 23, 2026 02:05
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from 35d548b to 8985e0e Compare March 23, 2026 03:09
@wzhero1
Copy link
Copy Markdown
Contributor Author

wzhero1 commented Mar 23, 2026

Based on actual implementation:

PR Comment work items:

  1. Empty plan logging: Adjusted log message to "No snapshots to expire, skipping Flink job submission." (still returns early, not submitting an empty job)
  2. Sink operator rename: Renamed sink operator from "SnapshotExpire" to "SnapshotExpireCommit" to distinguish from worker phase
  3. Inline executeTask: Removed single-use method, inlined task execution directly into flatMap
  4. Progress logging: Added per-task progress log ("Processing expire task {}/{}, {}"), start/end logs with elapsed time
  5. Failure recovery test: Added testExpireRecoveryAfterPartialFailure with failure injection via volatile int failAfterTasks
  6. Test class rename: ExpireSnapshotsActionITCase → ParallelExpireSnapshotsActionITCase
  7. Disabled test comments: Added @disabled annotations with explanations for tests not suitable for parallel mode
  8. Polymorphic refactoring: Replaced TaskType enum + ExpireSnapshotsExecutor switch dispatch with 4 polymorphic SnapshotExpireTask subclasses, deleted ExpireSnapshotsExecutor
  9. Logic migration: Moved planner/plan/task/report classes from paimon-core to paimon-flink package

Additional changes (not from comments):

  • ExpireContext simplification: Removed delegate methods, callers access underlying objects directly (context.snapshotDeletion().xxx()). Made taggedSnapshots and snapshotCache
    final constructor parameters, only skippingSet retains a setter (mutated during manifest deletion)
  • snapshotCache in parallel mode: Propagated planner's pre-collected snapshot cache to both workers and sink via serialization, avoiding redundant snapshot file reads
  • Trigger condition: Changed to forceStartFlinkJob || parallelism > 1 — either condition enters Flink parallel pipeline

@wzhero1 wzhero1 marked this pull request as ready for review March 23, 2026 06:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants