Skip to content

Conversation

@gortiz
Copy link
Contributor

@gortiz gortiz commented Dec 23, 2025

This PR adds new metrics for MSE:

On broker:

  • pinot.broker.mseOpchainStarted && pinot.broker.mseOpchainCompleted (meter): Total opchains started/finished
  • pinot.broker.mseStagesStarted && pinot.broker.mseStagesCompleted (meter): Total stages started/finished

On server:

  • pinot.server.mseOpchainsStarted & pinot.server.mseOpchainsCompleted (meters): Total opchain started/finished
  • pinot.server.mseCpuExecutionTimeMs (meter): CPU time for each task
  • pinot.server.mseMemoryAllocatedBytes (meter): Memory allocation for each task
  • pinot.server.mseEmittedRows (meter): Emitted docs

@gortiz gortiz added observability multi-stage Related to the multi-stage query engine labels Dec 23, 2025
@gortiz gortiz requested review from Jackie-Jiang, Copilot and yashmayya and removed request for Jackie-Jiang December 23, 2025 12:06
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances metrics collection for Multi-Stage Execution (MSE) by adding granular tracking of stages and opchains on both broker and server sides. The changes support better observability of query execution by tracking CPU time, memory allocation, and emitted documents.

  • Changes the visibility of copyStatMaps() method from protected to public across all operator implementations
  • Adds new broker-side metrics for tracking stage and opchain lifecycle (started/completed)
  • Adds new server-side metrics for tracking opchain execution, CPU time, memory allocation, and emitted rows
  • Introduces getUnsafe() method to StatMap for flexible stat retrieval without complex type casting

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
MultiStageOperator.java Changes copyStatMaps() from protected abstract to public abstract
OpChainSchedulerService.java Adds metrics collection for opchain lifecycle and resource usage on server
ServerMeter.java Defines new MSE-related meter types and adds getGlobalMeter() helper
BrokerMeter.java Defines new broker-side stage/opchain meter types and adds getGlobalMeter() helper
StatMap.java Adds getUnsafe() method for string-based key lookup with default values
MultiStageBrokerRequestHandler.java Instruments query execution with stage/opchain counting and meter marking
Multiple operator files Updates copyStatMaps() implementations from protected to public
Comments suppressed due to low confidence (1)

pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:1

  • Corrected spelling of 'competed' to 'completed'.
/**

@codecov-commenter
Copy link

codecov-commenter commented Dec 23, 2025

Codecov Report

❌ Patch coverage is 25.45455% with 41 lines in your changes missing coverage. Please review.
✅ Project coverage is 34.00%. Comparing base (6d4df46) to head (6777d7d).
⚠️ Report is 19 commits behind head on master.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
...uery/runtime/executor/OpChainSchedulerService.java 0.00% 18 Missing ⚠️
...ava/org/apache/pinot/common/datatable/StatMap.java 0.00% 14 Missing ⚠️
...requesthandler/MultiStageBrokerRequestHandler.java 33.33% 8 Missing ⚠️
...a/org/apache/pinot/common/metrics/ServerMeter.java 85.71% 1 Missing ⚠️

❗ There is a different number of reports uploaded between BASE (6d4df46) and HEAD (6777d7d). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (6d4df46) HEAD (6777d7d)
unittests1 1 0
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #17419       +/-   ##
=============================================
- Coverage     55.65%   34.00%   -21.66%     
- Complexity      702      771       +69     
=============================================
  Files          2463     3161      +698     
  Lines        139016   188481    +49465     
  Branches      22167    28833     +6666     
=============================================
- Hits          77376    64086    -13290     
- Misses        55119   119087    +63968     
+ Partials       6521     5308     -1213     
Flag Coverage Δ
custom-integration1 100.00% <ø> (?)
integration 100.00% <ø> (?)
integration1 100.00% <ø> (?)
integration2 0.00% <ø> (?)
java-11 33.99% <25.45%> (?)
java-21 33.94% <25.45%> (-21.72%) ⬇️
temurin 34.00% <25.45%> (-21.66%) ⬇️
unittests 33.99% <25.45%> (-21.67%) ⬇️
unittests1 ?
unittests2 33.99% <25.45%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@gortiz gortiz requested a review from Copilot December 23, 2025 14:58
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:1

  • Corrected spelling of 'competedOpchains' to 'completedOpchains'.
/**

Comment on lines +590 to +591
int stageCount = dispatchableSubPlan.getQueryStageMap().size();
int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatchableSubPlan.getQueryStageMap() is called twice. Consider storing the result in a variable to avoid redundant calls.

Suggested change
int stageCount = dispatchableSubPlan.getQueryStageMap().size();
int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
Map<Integer, DispatchablePlanFragment> queryStageMap = dispatchableSubPlan.getQueryStageMap();
int stageCount = queryStageMap.size();
int opChainCount = queryStageMap.values().stream()

Copilot uses AI. Check for mistakes.
Comment on lines +182 to +188
public <E> E getUnsafe(String keyName, E defaultValue)
throws ClassCastException {
K key = getKey(keyName);
if (key == null) {
return defaultValue;
}
return (E) getAny(key);
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getUnsafe() method should document the behavior when the key exists but has a null value, as getAny() can return null. Currently, it's unclear whether this would return null or defaultValue.

Copilot uses AI. Check for mistakes.
Comment on lines 238 to 241
public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGE_STARTED", "stages", false);
public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGE_COMPLETED", "stages", false);
public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAIN_STARTED", "opchains", false);
public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAIN_COMPLETED", "opchains", false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the global parameter be true for all of these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed here and in server meters


int stageCount = dispatchableSubPlan.getQueryStageMap().size();
int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
.mapToInt(stage -> stage.getServerInstances().size())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use the worker info instead of server info since we could potentially have multiple workers per server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, but I didn't find that and it looks like right now we don't schedule more than one worker per server.

Should I use stage.getWorkerMetadataList instead?

Comment on lines +140 to +143
protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter();
protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter();
protected final PinotMeter _opchainsStartedMeter = BrokerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter();
protected final PinotMeter _opchainsCompletedMeter = BrokerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseBrokerRequestHandler class already has an instance of BrokerMetrics. We should use that here instead for better testability instead of the global singleton IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The attribute in the parent class is actually final and initialized to the value used here (the one returned by BrokerMetrics.get()).

In general, the idea is to use BrokerMetrics.get() whenever possible. We should also keep the meters as attributes, as I'm doing here, to speed up calls (since we don't need to look up the meter each time). In BaseBrokerRequestHandler we use the old pattern, where we have an attribute for BrokerMetrics but look for the metric each time we need to modify it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

metrics multi-stage Related to the multi-stage query engine observability

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants