Skip to content

Commit b36249e

Browse files
authored
SYM-7152: Limit time purge job spends on stranded data (#464) (#469)
1 parent 4dca358 commit b36249e

File tree

3 files changed

+59
-9
lines changed

3 files changed

+59
-9
lines changed

symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ private ParameterConstants() {
316316
public final static String PURGE_STATS_RETENTION_MINUTES = "purge.stats.retention.minutes";
317317
public final static String PURGE_TRIGGER_HIST_RETENTION_MINUTES = "purge.trigger.hist.retention.minutes";
318318
public final static String PURGE_EXPIRED_DATA_GAP_RETENTION_MINUTES = "purge.expired.data.gap.retention.minutes";
319+
public final static String PURGE_STRANDED_DATA_RECAPTURE_ENABLED = "job.purge.recapture.stranded.data";
320+
public final static String PURGE_STRANDED_DATA_TIME_LIMIT_MS = "job.purge.stranded.max.time.ms";
319321
public final static String PURGE_MONITOR_EVENT_RETENTION_MINUTES = "purge.monitor.event.retention.minutes";
320322
public final static String PURGE_MAX_NUMBER_OF_DATA_IDS = "job.purge.max.num.data.to.delete.in.tx";
321323
public final static String PURGE_MAX_NUMBER_OF_BATCH_IDS = "job.purge.max.num.batches.to.delete.in.tx";

symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public PurgeService(IParameterService parameterService, ISymmetricDialect symmet
8787
setSqlMap(new PurgeServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens()));
8888
}
8989

90+
@Override
9091
public long purgeOutgoing(boolean force) {
9192
long rowsPurged = 0;
9293
long startTime = System.currentTimeMillis();
@@ -115,6 +116,7 @@ public long purgeOutgoing(boolean force) {
115116
return rowsPurged;
116117
}
117118

119+
@Override
118120
public long purgeIncoming(boolean force) {
119121
long rowsPurged = 0;
120122
Calendar retentionCutoff = Calendar.getInstance();
@@ -123,6 +125,7 @@ public long purgeIncoming(boolean force) {
123125
return rowsPurged;
124126
}
125127

128+
@Override
126129
public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
127130
long rowsPurged = 0;
128131
if (force || clusterService.lock(ClusterConstants.PURGE_OUTGOING)) {
@@ -455,15 +458,22 @@ private int purgeDataGapsExpired(OutgoingContext context) {
455458
long ts = System.currentTimeMillis();
456459
int[] argTypes = new int[] { symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() };
457460
for (DataGap gap : dataGapsExpiredToCheck) {
458-
int count = dataService.reCaptureData(gap.getStartId(), gap.getEndId());
461+
Object[] args = new Object[] { gap.getStartId(), gap.getEndId() };
462+
if (parameterService.is(ParameterConstants.PURGE_STRANDED_DATA_RECAPTURE_ENABLED)) {
463+
int recapturedRowCount = dataService.reCaptureData(gap.getStartId(), gap.getEndId());
464+
if (log.isDebugEnabled()) {
465+
log.debug("Recaptured {} rows of stranded data for gap {} - {}", recapturedRowCount, gap.getStartId(), gap.getEndId());
466+
}
467+
} else if (log.isDebugEnabled()) {
468+
log.debug("Skipped recapture of stranded data for gap {} - {}", gap.getStartId(), gap.getEndId());
469+
}
470+
int count = sqlTemplate.update(getSql("deleteDataByRangeSql"), args, argTypes);
459471
purgedDataRowCount += count;
460472
statisticManager.incrementPurgedExpiredDataRows(count);
461-
Object[] args = new Object[] { gap.getStartId(), gap.getEndId() };
462-
sqlTemplate.update(getSql("deleteDataByRangeSql"), args, argTypes);
463473
purgedDataGapCount++;
464474
checkedDataGapCount++;
465-
if (System.currentTimeMillis() - ts > 60000) {
466-
log.info("Checked {} expired data gaps", checkedDataGapCount);
475+
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE) {
476+
log.info("Checked {} expired data gaps. Deleted {} data rows.", checkedDataGapCount, count);
467477
ts = System.currentTimeMillis();
468478
}
469479
}
@@ -646,7 +656,12 @@ private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, OutgoingCon
646656
}
647657
args = new Object[] { minId = minMaxAvoidGaps[0], maxId = minMaxAvoidGaps[1], cutoffTime };
648658
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP };
649-
dataService.reCaptureData(minId, maxId);
659+
if (parameterService.is(ParameterConstants.PURGE_STRANDED_DATA_RECAPTURE_ENABLED)) {
660+
int recapturedRowCount = dataService.reCaptureData(minId, maxId);
661+
log.debug("Recaptured {} stranded data rows for range {} - {}", recapturedRowCount, minId, maxId);
662+
} else if (log.isDebugEnabled()) {
663+
log.debug("Skipped recapture of stranded data for range {} - {}", minId, maxId);
664+
}
650665
break;
651666
case STRANDED_DATA_EVENT:
652667
deleteSql = getSql("deleteStrandedDataEvent");
@@ -659,11 +674,21 @@ private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, OutgoingCon
659674
log.debug("Deleted {} rows", count);
660675
statConsumer.accept(count);
661676
totalCount += count;
677+
long currentRunTimeMs = System.currentTimeMillis() - ts;
662678
if (count == 0 && (identifier == MinMaxDeleteSql.STRANDED_DATA || identifier == MinMaxDeleteSql.STRANDED_DATA_EVENT)) {
663-
log.info("Ending purge of {} early at {} after finding empty space", name, maxId);
664-
break;
679+
long runtimeLimit = parameterService.getLong(ParameterConstants.PURGE_STRANDED_DATA_TIME_LIMIT_MS);
680+
if (runtimeLimit > 0 && currentRunTimeMs >= runtimeLimit) {
681+
log.info("Ending purge of {} early at {} after finding empty space. Total rows purged={}", name, maxId, totalCount);
682+
break;
683+
} else {
684+
if (currentRunTimeMs > DateUtils.MILLIS_PER_MINUTE * 5) {
685+
log.info("Skipping empty space in {} for range {} - {}", name, minId, maxId);
686+
} else {
687+
log.debug("Skipping empty space in {} for range {} - {}. Total rows purged={}", name, minId, maxId, totalCount);
688+
}
689+
}
665690
}
666-
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5) {
691+
if (currentRunTimeMs > DateUtils.MILLIS_PER_MINUTE * 5) {
667692
log.info("Purged {} of {} rows so far using {} statements", new Object[] { totalCount, name, totalDeleteStmts });
668693
ts = System.currentTimeMillis();
669694
clusterService.refreshLock(ClusterConstants.PURGE_OUTGOING);
@@ -747,6 +772,7 @@ public static long[] getMinMaxAvoidGaps(long minId, long maxId, List<DataGap> da
747772
return new long[] { minId, maxId };
748773
}
749774

775+
@Override
750776
public long purgeIncoming(Calendar retentionCutoff, boolean force) {
751777
long purgedRowCount = 0;
752778
long startTime = System.currentTimeMillis();
@@ -807,6 +833,7 @@ private long purgeIncomingBatch(final Calendar time) {
807833
log.info("Getting range for incoming batch");
808834
List<NodeBatchRange> nodeBatchRangeList = sqlTemplateDirty.query(
809835
getSql("selectIncomingBatchRangeSql"), new ISqlRowMapper<NodeBatchRange>() {
836+
@Override
810837
public NodeBatchRange mapRow(Row rs) {
811838
return new NodeBatchRange(rs.getString("node_id"), rs.getLong("min_id"), rs
812839
.getLong("max_id"));
@@ -849,6 +876,7 @@ private int purgeByNodeBatchRangeList(List<NodeBatchRange> nodeBatchRangeList) {
849876
return totalCount;
850877
}
851878

879+
@Override
852880
public void purgeStats(boolean force) {
853881
Calendar retentionCutoff = Calendar.getInstance();
854882
retentionCutoff.add(Calendar.MINUTE,
@@ -872,6 +900,7 @@ public void purgeStats(boolean force) {
872900
}
873901
}
874902

903+
@Override
875904
public void purgeAllIncomingEventsForNode(String nodeId) {
876905
int count = sqlTemplate.update(getSql("deleteIncomingBatchByNodeSql"),
877906
new Object[] { nodeId });

symmetric-core/src/main/resources/symmetric-default.properties

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,6 +2173,25 @@ purge.trigger.hist.retention.minutes=86400
21732173
# Type: integer
21742174
purge.expired.data.gap.retention.minutes=1440
21752175

2176+
# Controls section of the purge job responsible for combing through stranded data in attempt to salvage it.
2177+
# Recapturing stranded data involves queries against the original tables, thus increasing processing time significantly.
2178+
# Disable this feature temporarily only if there is a significant backlog to clear and a support engineer has recommended this action.
2179+
#
2180+
# DatabaseOverridable: true
2181+
# Tags: purge
2182+
# Type: boolean
2183+
job.purge.recapture.stranded.data=true
2184+
2185+
# This limits processing time purge job spends cleaning up stranded data.
2186+
# The purge job will automatically move to the next processing step once specified time limit has been reached.
2187+
# If you see a log message to about this limit often, this indicates that the purge job is falling behind.
2188+
# It might indicate that the database server is not able to keep up with the workload or limit is too low.
2189+
#
2190+
# DatabaseOverridable: true
2191+
# Tags: purge
2192+
# Type: integer
2193+
job.purge.stranded.max.time.ms=900000
2194+
21762195
# This is the retention time for how long to keep monitor events before purging them.
21772196
#
21782197
# DatabaseOverridable: true

0 commit comments

Comments
 (0)