[core][flink] Support parallelism snapshot expire#7027
[core][flink] Support parallelism snapshot expire#7027wzhero1 wants to merge 8 commits intoapache:masterfrom
Conversation
69b0a18 to
f61b9b4
Compare
f61b9b4 to
be7a32b
Compare
be7a32b to
59788fc
Compare
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left some comments as below.
| } | ||
|
|
||
| @Override | ||
| public void run() throws Exception { |
There was a problem hiding this comment.
In case this method is overridden, this class needs not implement LocalAction anymore. Still you can keep the executeLocally method for internal use.
There was a problem hiding this comment.
Prefer to keep LocalAction here. By default (without forceStartFlinkJob), the action runs locally via super.run() → executeLocally(), which is sufficient for normal scenarios and safer. Only when
forceStartFlinkJob is set, we start a Flink job with multi-parallelism for large-scale expiration. This also leaves room for future adjustments.
.../paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
Outdated
Show resolved
Hide resolved
.../paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
Outdated
Show resolved
Hide resolved
.../paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
Outdated
Show resolved
Hide resolved
...nk/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/ExpireSnapshotsPlan.java
Show resolved
Hide resolved
...ink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java
Outdated
Show resolved
Hide resolved
...ink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/operation/expire/DeletionReport.java
Outdated
Show resolved
Hide resolved
b1d2e49 to
344f5c8
Compare
| ExpireSnapshotsPlan plan = planner.plan(expireConfig); | ||
| if (plan.isEmpty()) { | ||
| LOG.info("No snapshots to expire"); | ||
| return; |
There was a problem hiding this comment.
Better to configure an empty Flink job, even if the job would complete immediately after submission. This is what "force_start_flink_job" means.
There was a problem hiding this comment.
Prefer to keep the early return here. Submitting an empty Flink job still involves cluster resource allocation and scheduling overhead for zero useful work. I've improved the log message to make the skip behavior clear to users.
.../paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
Outdated
Show resolved
Hide resolved
...link-common/src/main/java/org/apache/paimon/flink/expire/RangePartitionedExpireFunction.java
Outdated
Show resolved
Hide resolved
...link-common/src/main/java/org/apache/paimon/flink/expire/RangePartitionedExpireFunction.java
Outdated
Show resolved
Hide resolved
...n-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireSnapshotsActionITCase.java
Outdated
Show resolved
Hide resolved
...common/src/test/java/org/apache/paimon/flink/action/ParallelExpireSnapshotsActionITCase.java
Show resolved
Hide resolved
...n-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireSnapshotsActionITCase.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsExecutor.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
Show resolved
Hide resolved
1aa9654 to
35d548b
Compare
35d548b to
8985e0e
Compare
|
Based on actual implementation: PR Comment work items:
Additional changes (not from comments):
|
Purpose
This PR implements parallel snapshot expiration using Flink distributed execution to improve the performance of large-scale cleanup operations.
Motivation:
Architecture:
The expire process is split into two phases:
RangePartitionedExpireFunction. Tasks are partitioned by snapshot ID range to maximize cache locality (adjacent snapshots share manifest files and tag data file skippers).SnapshotExpireSink, becausemanifestSkippingSetis global mutable state and snapshot deletion must be contiguous.Key classes:
ExpireSnapshotsPlanner: Computes expiration plan including snapshot range, four types of tasks, protection set, and snapshot cacheSnapshotExpireTask: Abstract base with 4 polymorphic subclasses (DeleteDataFilesTask,DeleteChangelogFilesTask,DeleteManifestsTask,DeleteSnapshotTask)ExpireContext: Holds runtime dependencies (SnapshotManager, SnapshotDeletion, ChangelogManager) and shared state (taggedSnapshots, snapshotCache, skippingSet)ExpireSnapshotsPlan: Contains task lists, protection set, snapshot cache, and range-based task partitioning logicDeletionReport: Carries deletion bucket info from workers to sink for empty directory cleanupExecution modes:
forceStartFlinkJob=falseandparallelism <= 1→ executes locally viaExpireSnapshotsProcedureforceStartFlinkJob=trueorparallelism > 1→ uses Flink distributed executionOther changes:
CompletableFutureand caches them, serialized to both workers and sink to avoid redundant readsExpireSnapshotsImpl(serial mode) reusesExpireSnapshotsPlannerfor planning, keeping serial execution path unchangedTests
Unit Tests:
ExpireSnapshotsPlanTest- Tests range-based task partitioning logicDeletionReportTest- Tests deletion report serializationIntegration Tests:
ParallelExpireSnapshotsActionITCase- ExtendsExpireSnapshotsTestto validate parallel mode produces same results as serial mode, plus failure recovery testExpireSnapshotsProcedureITCase- Tests both local and parallel modes via procedureAPI and Format
New CLI parameter for
expire-snapshotsaction:--parallelismNo storage format changes.
Backward compatible: Default behavior (local mode) remains unchanged.
Documentation
Yes, this introduces a new feature: parallel snapshot expiration.
Documentation should cover:
--parallelismparameter forexpire-snapshotsaction--parallelism > 1or--force_start_flink_jobtriggers parallel mode