-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add new mse metrics #17419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add new mse metrics #17419
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR enhances metrics collection for Multi-Stage Execution (MSE) by adding granular tracking of stages and opchains on both broker and server sides. The changes support better observability of query execution by tracking CPU time, memory allocation, and emitted documents.
- Changes the visibility of
copyStatMaps()method fromprotectedtopublicacross all operator implementations - Adds new broker-side metrics for tracking stage and opchain lifecycle (started/completed)
- Adds new server-side metrics for tracking opchain execution, CPU time, memory allocation, and emitted rows
- Introduces
getUnsafe()method toStatMapfor flexible stat retrieval without complex type casting
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| MultiStageOperator.java | Changes copyStatMaps() from protected abstract to public abstract |
| OpChainSchedulerService.java | Adds metrics collection for opchain lifecycle and resource usage on server |
| ServerMeter.java | Defines new MSE-related meter types and adds getGlobalMeter() helper |
| BrokerMeter.java | Defines new broker-side stage/opchain meter types and adds getGlobalMeter() helper |
| StatMap.java | Adds getUnsafe() method for string-based key lookup with default values |
| MultiStageBrokerRequestHandler.java | Instruments query execution with stage/opchain counting and meter marking |
| Multiple operator files | Updates copyStatMaps() implementations from protected to public |
Comments suppressed due to low confidence (1)
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:1
- Corrected spelling of 'competed' to 'completed'.
/**
pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #17419 +/- ##
=============================================
- Coverage 55.65% 34.00% -21.66%
- Complexity 702 771 +69
=============================================
Files 2463 3161 +698
Lines 139016 188481 +49465
Branches 22167 28833 +6666
=============================================
- Hits 77376 64086 -13290
- Misses 55119 119087 +63968
+ Partials 6521 5308 -1213
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:1
- Corrected spelling of 'competedOpchains' to 'completedOpchains'.
/**
pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
Show resolved
Hide resolved
| int stageCount = dispatchableSubPlan.getQueryStageMap().size(); | ||
| int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream() |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dispatchableSubPlan.getQueryStageMap() is called twice. Consider storing the result in a variable to avoid redundant calls.
| int stageCount = dispatchableSubPlan.getQueryStageMap().size(); | |
| int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream() | |
| Map<Integer, DispatchablePlanFragment> queryStageMap = dispatchableSubPlan.getQueryStageMap(); | |
| int stageCount = queryStageMap.size(); | |
| int opChainCount = queryStageMap.values().stream() |
| public <E> E getUnsafe(String keyName, E defaultValue) | ||
| throws ClassCastException { | ||
| K key = getKey(keyName); | ||
| if (key == null) { | ||
| return defaultValue; | ||
| } | ||
| return (E) getAny(key); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getUnsafe() method should document the behavior when the key exists but has a null value, as getAny() can return null. Currently, it's unclear whether this would return null or defaultValue.
pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
Outdated
Show resolved
Hide resolved
| public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGE_STARTED", "stages", false); | ||
| public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGE_COMPLETED", "stages", false); | ||
| public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAIN_STARTED", "opchains", false); | ||
| public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAIN_COMPLETED", "opchains", false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the global parameter be true for all of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed here and in server meters
|
|
||
| int stageCount = dispatchableSubPlan.getQueryStageMap().size(); | ||
| int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream() | ||
| .mapToInt(stage -> stage.getServerInstances().size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use the worker info instead of server info since we could potentially have multiple workers per server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, but I didn't find that and it looks like right now we don't schedule more than one worker per server.
Should I use stage.getWorkerMetadataList instead?
| protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter(); | ||
| protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter(); | ||
| protected final PinotMeter _opchainsStartedMeter = BrokerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter(); | ||
| protected final PinotMeter _opchainsCompletedMeter = BrokerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BaseBrokerRequestHandler class already has an instance of BrokerMetrics. We should use that here instead for better testability instead of the global singleton IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The attribute in the parent class is actually final and initialized to the value used here (the one returned by BrokerMetrics.get()).
In general, the idea is to use BrokerMetrics.get() whenever possible. We should also keep the meters as attributes, as I'm doing here, to speed up calls (since we don't need to look up the meter each time). In BaseBrokerRequestHandler we use the old pattern, where we have an attribute for BrokerMetrics but look for the metric each time we need to modify it.
…kerMeter.java Co-authored-by: Yash Mayya <[email protected]>
This PR adds new metrics for MSE:
On broker:
pinot.broker.mseOpchainStarted&&pinot.broker.mseOpchainCompleted(meter): Total opchains started/finishedpinot.broker.mseStagesStarted&&pinot.broker.mseStagesCompleted(meter): Total stages started/finishedOn server:
pinot.server.mseOpchainsStarted&pinot.server.mseOpchainsCompleted(meters): Total opchain started/finishedpinot.server.mseCpuExecutionTimeMs(meter): CPU time for each taskpinot.server.mseMemoryAllocatedBytes(meter): Memory allocation for each taskpinot.server.mseEmittedRows(meter): Emitted docs