-
Notifications
You must be signed in to change notification settings - Fork 280
perf: Iceberg serde ~50% faster serialization [iceberg] #3298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ization Cache reflection and computation results to avoid redundant work: 1. ReflectionCache: Cache Class.forName() and getMethod() calls once per convert() instead of per-task (30,000+ times) 2. Partition spec deduplication by object identity: Only call toJson() for new unique specs, not for every task 3. Partition type deduplication by spec identity: Same spec = same partition type, so skip JSON building for duplicate specs 4. Field ID mapping cache: Cache buildFieldIdMapping() results by schema identity to avoid repeated reflection per-column Benchmark results (30,000 tasks): - Original: 34,425 ms per 100 iterations - After caching: 16,618 ms per 100 iterations - Improvement: 52% faster Co-Authored-By: Claude Opus 4.5 <[email protected]>
61d79ec to
bf83e76
Compare
|
Don't we have an IcebergReflection helper? It seems like we should try to encapsulate this logic there. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3298 +/- ##
============================================
+ Coverage 56.12% 60.09% +3.96%
- Complexity 976 1473 +497
============================================
Files 119 175 +56
Lines 11743 16246 +4503
Branches 2251 2688 +437
============================================
+ Hits 6591 9763 +3172
- Misses 4012 5131 +1119
- Partials 1140 1352 +212 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Move the ReflectionCache case class and createReflectionCache() method from CometIcebergNativeScan to the IcebergReflection helper class per code review feedback. This encapsulates all Iceberg reflection caching logic in the shared reflection utilities. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Thanks. I pushed another commit to do this. |
| if (fieldTypeStr == IcebergReflection.TypeNames.UNKNOWN) { | ||
| None | ||
| } else { | ||
| val fieldIdMethod = field.getClass.getMethod("fieldId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like some of these still aren't being cached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Pushed some more in c168832
Add caching for partition data extraction methods that were still being looked up per-field: - PartitionSpec.partitionType() - StructType.fields() - NestedField.type(), fieldId(), name(), isOptional() - StructLike.get(int, Class<?>) These methods are called for every partition field in every task, so caching them provides significant speedup. Co-Authored-By: Claude Opus 4.5 <[email protected]>
mbutrovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good PR. Even if it only accelerates things on the driver, it gives an opportunity to revisit the reflection code. In general, I am a little concerned about some semantics I think we should enforce. Namely, if reflection fails, we should fall back to Spark. I think there are code paths where reflection can fail, but we'll proceed with serializing a scan with possibly missing fields. This is not necessarily all in the scope of the changes in this PR, but maybe we can include them/revisit here, or we should do a quick followup. What do you think @andygrove? If it isn't urgent to merge this, it might be good to do it here.
| val inputPartClass = inputPartition.getClass | ||
|
|
||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to try to encapsulate these remaining reflection calls in the cache as well?
| } catch { | ||
| case e: Exception => | ||
| logWarning(s"Failed to serialize delete file: ${e.getMessage}") | ||
| None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about some of the nested try-catch logic here. If we fail to serialize delete files, we should propagate an exception to make sure we fall back to Spark. IIUC, this code will return None for delete files, which will results in a serialize scan that will not apply delete files and potentially generate invalid data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the functions in IcebergReflection might be okay, but buildFieldIdMapping and extractDeleteFilesList should probably throw at a minimum.
Summary
Class.forName()andgetMethod()reflection calls in aReflectionCacheobjectbuildFieldIdMapping()results by schema identityBenchmark Results (30,000 tasks)
Key Optimizations
convert()call instead of per-taskPartitionSpecParser.toJson()for new unique spec objectsbuildFieldIdMapping()results by schema object identityThe
ReflectionCacheholds:ContentScanTask,FileScanTask,ContentFile,DeleteFile,SchemaParser,Schema,PartitionSpecParser,PartitionSpecfile(),start(),length(),partition(),residual(),schema(),deletes(),spec(),location(),content(),specId(),equalityFieldIds(),toJson()Test plan