Problem Encountered
In our project, SeaTunnel is used to extract data from the business database into the data warehouse (StarRocks), and we’ve already successfully used MySQL-CDC for large-scale real-time synchronization. However, we encountered an abnormal issue when syncing a particular MySQL table: after the job started, the logs showed zero read and write counts, and the job didn’t stop for a long time. After 6 hours of running, it terminated with a checkpoint timeout error.
The job structure is as follows (sensitive information removed):
Key logs during execution:
Background
- Scenario: Real-time data extraction from MySQL to StarRocks using MySQL-CDC
- SeaTunnel version: 2.3.9
- MySQL version: 8.x
- StarRocks version: 3.2
- Source table data volume: 60–70 million rows
Key Questions
- Why do the read and write counts remain at 0?
- Why does it take so long to throw a timeout error?
Analysis Process
We’ve used MySQL-CDC for many sync jobs before, and the configurations were mostly the same, so the issue likely isn’t with SeaTunnel itself.
We compared this source table with previously successful ones to see if there were differences.
Sure enough, we found something suspicious:
The previous tables all had auto-increment primary keys; this one didn’t. It only had multiple unique indexes.
So the question arises: How exactly does SeaTunnel sync data?
As far as we know, SeaTunnel uses a two-step approach when syncing CDC data: first snapshot sync, then binlog-based incremental sync.
Since read count remains zero, the job must be stuck at the snapshot phase. So how does snapshot sync work?
We checked the official SeaTunnel docs:
MySQL CDC | Apache SeaTunnel:
https://seatunnel.apache.org/zh-CN/docs/2.3.9/connector-v2/source/MySQL-CDC
There isn’t any architectural explanation, but we did find some configurable parameters.
Parameter Explanation
chunk-key.even-distribution.factor.upper-bound
Default value: 100
Description:
The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor (e.g., (MAX(id) – MIN(id) + 1) / row count) is ≤ this value, the table is considered evenly distributed and will use uniform chunking. If it exceeds this value, and the estimated number of shards surpasses sample-sharding.threshold
, the sampling-based sharding strategy will be used. Default: 100.0
chunk-key.even-distribution.factor.lower-bound
Default value: 0.5
Description:
The lower bound of the distribution factor. If the distribution factor is ≥ this value, the table is considered evenly distributed. Otherwise, it’s considered uneven and might trigger sampling-based sharding.
sample-sharding.threshold
Default value: 1000
Description:
If the distribution factor is outside the [lower, upper] range and the estimated number of shards (approx. row count / chunk size) exceeds this threshold, the sampling-based sharding strategy will be used. This improves efficiency for large datasets.
inverse-sampling.rate
Default value: 1000
Description:
Used in sampling-based sharding. A value of 1000 means a 1/1000 sampling rate. It controls the granularity of sampling and affects the number of final shards.
snapshot.split.size
Default value: 8096
Description:
The number of rows per chunk in snapshot sync. Tables will be split into chunks based on this.
snapshot.fetch.size
Default value: 1024
Description:
Maximum number of rows fetched per poll during snapshot reading.
From these parameters, we learned:
During snapshot sync, SeaTunnel chunks data into multiple splits. The sharding strategy depends on whether the data is evenly distributed.
Our table has ~60 million rows (estimated by business staff since we couldn’t count them directly).
Since the table has no primary key, we weren’t sure what field SeaTunnel uses for chunking.
We assumed it used the ID column, which does have a unique index, and tested:
SELECT MAX(ID), MIN(ID) FROM table;
- Max key value: 804306477418
- Min key value: 607312608210
- Distribution factor = (804306477418 – 607312608210 + 1) / 60,000,000 ≈ 3283.23
This is clearly outside the [0.5, 100] “even” range → so SeaTunnel considers this uneven distribution.
- Default chunk size: 8096
- Shard count = 60,000,000 / 8096 ≈ 7411 → greater than
sample-sharding.threshold
(1000)
So, SeaTunnel likely switched to sampling-based sharding.
- Sampling rate (inverse): 1000 → need to sample 60,000 rows
At this point, we were convinced that SeaTunnel was stuck sampling—and we became curious: how exactly does it sample? Why does it run for 6 hours?
Even with 60M rows, sampling 60K shouldn’t be that slow. Surely it’s scanning the ID column (which has a unique index)?
We decided to dive into the source code.
GitHub: https://github.com/apache/seatunnel/
SeaTunnel’s architecture is quite complex, and setting up the environment took us a full day (mostly dependency setup).
Finding the critical logic took another day—we traced from log messages and keyword searches.
Partial Source Code Analysis
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception {
final String splitColumnName = splitColumn.name();
// Get min/max values
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// Empty table or only one row — full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
// Get chunk size, distribution factor bounds, and sampling threshold from config
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold();
log.info("Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, "
+ "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}",
tableId, splitColumnName, min, max, chunkSize,
distributionFactorUpper, distributionFactorLower, sampleShardingThreshold);
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0 &&
doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
int shardCount = (int) (approximateRowCnt / chunkSize);
int inverseSamplingRate = sourceConfig.getInverseSamplingRate();
if (sampleShardingThreshold < shardCount) {
if (inverseSamplingRate > chunkSize) {
log.warn("inverseSamplingRate {} > chunkSize {}, adjusting...", inverseSamplingRate, chunkSize);
inverseSamplingRate = chunkSize;
}
log.info("Using sampling sharding for table {}, rate = {}", tableId, inverseSamplingRate);
Object[] sample = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate);
log.info("Sampled {} records from table {}", sample.length, tableId);
return efficientShardingThroughSampling(tableId, sample, approximateRowCnt, shardCount);
}
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
Let’s focus on the sampling logic:
public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate
) throws Exception {
final String sampleQuery = String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));
Statement stmt = null;
ResultSet rs = null;
List<Object> results = new ArrayList<>();
try {
stmt = jdbc.connection().createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);
rs = stmt.executeQuery(sampleQuery);
int count = 0;
while (rs.next()) {
count++;
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
}
} finally {
if (rs != null) rs.close();
if (stmt != null) stmt.close();
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
return resultsArray;
}
This is the core sampling logic:
It scans the entire table row by row, sampling 1 out of every 1000 records.
That explains why it was running so slowly — we saw Processing row index
messages in the logs and wondered what they were doing.
Roughly 60,000 IDs were sampled.
Now for the sampling-based sharding strategy:
protected List<ChunkRange> efficientShardingThroughSampling(
TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount
) {
log.info("Using sampling-based sharding on table {}, approx rows: {}, shards: {}",
tableId, approximateRowCnt, shardCount);
List<ChunkRange> splits = new ArrayList<>();
if (shardCount == 0) {
splits.add(ChunkRange.of(null, null));
return splits;
}
double approxSamplePerShard = (double) sampleData.length / shardCount;
Object lastEnd = null;
if (approxSamplePerShard <= 1) {
splits.add(ChunkRange.of(null, sampleData[0]));
lastEnd = sampleData[0];
for (int i = 1; i < sampleData.length; i++) {
if (!sampleData[i].equals(lastEnd)) {
splits.add(ChunkRange.of(lastEnd, sampleData[i]));
lastEnd = sampleData[i];
}
}
splits.add(ChunkRange.of(lastEnd, null));
} else {
for (int i = 0; i < shardCount; i++) {
Object chunkStart = lastEnd;
Object chunkEnd = (i < shardCount - 1)
? sampleData[(int) ((i + 1) * approxSamplePerShard)]
: null;
if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, chunkStart)) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
lastEnd = chunkEnd;
}
}
}
return splits;
}
Each chunk gets a distinct start and end based on the sorted sampled IDs — no overlap.
Let’s look at the ChunkRange
class that represents the result:
Snapshot sharding enables parallel data reads, speeding up historical sync.
Final Solution
Through the above analysis, we confirmed that the job was stuck in the snapshot phase performing sampling, triggered because SeaTunnel determined the source table was unevenly distributed.
Since the sync job had been blocked for days, we came up with a simple fix: adjust the distribution factor thresholds so SeaTunnel would treat the table as evenly distributed and skip sampling.
The default factor range is 0.5 ~ 100
, but our table’s factor was ~3283 — so we increased the upper bound to 4000. The final configuration was:
snapshot.split.size
: Our table was highly sparse, so we increased this value drastically (randomly multiplied by 1000 — admittedly not very scientific).
table-names-config
: Manually specified the primary key and split key, since the table had no primary key and we weren’t sure which column SeaTunnel used. Better to be explicit.
Final Result
It finally started syncing! 🎉