Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.Field;

import org.apache.comet.IcebergApi;

/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */
public abstract class AbstractCometSchemaImporter {
private final BufferAllocator allocator;
Expand Down Expand Up @@ -67,6 +69,7 @@ public FieldVector importVector(ArrowArray array, ArrowSchema schema) {
return vector;
}

@IcebergApi
public void close() {
provider.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.arrow.memory.BufferAllocator;

/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */
@IcebergApi
public class CometSchemaImporter extends AbstractCometSchemaImporter {
@IcebergApi
public CometSchemaImporter(BufferAllocator allocator) {
super(allocator);
}
Expand Down
44 changes: 44 additions & 0 deletions common/src/main/java/org/apache/comet/IcebergApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Indicates that the annotated element is part of the public API used by Apache Iceberg.
*
* <p>This annotation marks classes, methods, constructors, and fields that form the contract
* between Comet and Iceberg. Changes to these APIs may break Iceberg's Comet integration, so
* contributors should exercise caution and consider backward compatibility when modifying annotated
* elements.
*
* <p>The Iceberg integration uses Comet's native Parquet reader for accelerated vectorized reads.
* See the contributor guide documentation for details on how Iceberg uses these APIs.
*
* @see <a href="https://iceberg.apache.org/">Apache Iceberg</a>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.FIELD})
public @interface IcebergApi {}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.spark.sql.types.TimestampNTZType$;

import org.apache.comet.CometConf;
import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometVector;

/** Base class for Comet Parquet column reader implementations. */
@IcebergApi
public abstract class AbstractColumnReader implements AutoCloseable {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractColumnReader.class);

Expand Down Expand Up @@ -61,7 +63,7 @@ public abstract class AbstractColumnReader implements AutoCloseable {
protected int batchSize;

/** A pointer to the native implementation of ColumnReader. */
protected long nativeHandle;
@IcebergApi protected long nativeHandle;

AbstractColumnReader(
DataType type,
Expand Down Expand Up @@ -96,6 +98,7 @@ String getPath() {
/**
* Set the batch size of this reader to be 'batchSize'. Also initializes the native column reader.
*/
@IcebergApi
public void setBatchSize(int batchSize) {
assert nativeHandle == 0
: "Native column reader shouldn't be initialized before " + "'setBatchSize' is called";
Expand All @@ -113,6 +116,7 @@ public void setBatchSize(int batchSize) {
/** Returns the {@link CometVector} read by this reader. */
public abstract CometVector currentBatch();

@IcebergApi
@Override
public void close() {
if (nativeHandle != 0) {
Expand Down
10 changes: 7 additions & 3 deletions common/src/main/java/org/apache/comet/parquet/BatchReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

import org.apache.comet.CometConf;
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.IcebergApi;
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
Expand All @@ -87,6 +88,7 @@
* }
* </pre>
*/
@IcebergApi
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
Expand Down Expand Up @@ -186,9 +188,9 @@ public BatchReader(
}

/**
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public BatchReader(AbstractColumnReader[] columnReaders) {
// Todo: set useDecimal128 and useLazyMaterialization
int numColumns = columnReaders.length;
Expand Down Expand Up @@ -384,17 +386,17 @@ public void init() throws URISyntaxException, IOException {
}

/**
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public void setSparkSchema(StructType schema) {
this.sparkSchema = schema;
}

/**
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public AbstractColumnReader[] getColumnReaders() {
return columnReaders;
}
Expand Down Expand Up @@ -498,6 +500,7 @@ public boolean nextBatch() throws IOException {
return nextBatch(batchSize);
}

@IcebergApi
public boolean nextBatch(int batchSize) {
long totalDecodeTime = 0, totalLoadTime = 0;
for (int i = 0; i < columnReaders.length; i++) {
Expand All @@ -524,6 +527,7 @@ public boolean nextBatch(int batchSize) {
return true;
}

@IcebergApi
@Override
public void close() throws IOException {
if (columnReaders != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@

import org.apache.comet.CometConf;
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometDecodedVector;
import org.apache.comet.vector.CometDictionary;
import org.apache.comet.vector.CometDictionaryVector;
import org.apache.comet.vector.CometPlainVector;
import org.apache.comet.vector.CometVector;

@IcebergApi
public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class);
protected final BufferAllocator ALLOCATOR = new RootAllocator();
Expand Down Expand Up @@ -111,9 +113,9 @@ public class ColumnReader extends AbstractColumnReader {
* Set the page reader for a new column chunk to read. Expects to call `readBatch` after this.
*
* @param pageReader the page reader for the new column chunk
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public void setPageReader(PageReader pageReader) throws IOException {
this.pageReader = pageReader;

Expand All @@ -129,6 +131,7 @@ public void setPageReader(PageReader pageReader) throws IOException {
}

/** This method is called from Apache Iceberg. */
@IcebergApi
public void setRowGroupReader(RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec)
throws IOException {
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(columnSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;

import org.apache.comet.IcebergApi;

/**
* A column reader that always return constant vectors. Used for reading partition columns, for
* instance.
*/
@IcebergApi
public class ConstantColumnReader extends MetadataColumnReader {
/** Whether all the values in this constant column are nulls */
private boolean isNull;
Expand All @@ -53,16 +56,17 @@ public class ConstantColumnReader extends MetadataColumnReader {
}

/**
* @deprecated since 0.10.0, will be removed in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
super(type, descriptor, useDecimal128, true);
this.value = value;
}

// Used by Iceberg
@IcebergApi
public ConstantColumnReader(
DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) {
super(type, spec, useDecimal128, true);
Expand Down
8 changes: 8 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.execution.metric.SQLMetric;

import org.apache.comet.IcebergApi;

import static org.apache.parquet.hadoop.ParquetFileWriter.EFMAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;

Expand All @@ -101,6 +103,7 @@
* A Parquet file reader. Mostly followed {@code ParquetFileReader} in {@code parquet-mr}, but with
* customizations & optimizations for Comet.
*/
@IcebergApi
public class FileReader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);

Expand Down Expand Up @@ -135,6 +138,7 @@ public class FileReader implements Closeable {
}

/** This constructor is called from Apache Iceberg. */
@IcebergApi
public FileReader(
WrappedInputFile file,
ReadOptions cometOptions,
Expand Down Expand Up @@ -258,6 +262,7 @@ public void setRequestedSchema(List<ColumnDescriptor> projection) {
}

/** This method is called from Apache Iceberg. */
@IcebergApi
public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specList) {
paths.clear();
for (ParquetColumnSpec colSpec : specList) {
Expand Down Expand Up @@ -336,6 +341,7 @@ public long getFilteredRecordCount() {
}

/** Skips the next row group. Returns false if there's no row group to skip. Otherwise, true. */
@IcebergApi
public boolean skipNextRowGroup() {
return advanceToNextBlock();
}
Expand All @@ -344,6 +350,7 @@ public boolean skipNextRowGroup() {
* Returns the next row group to read (after applying row group filtering), or null if there's no
* more row group.
*/
@IcebergApi
public RowGroupReader readNextRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
return null;
Expand Down Expand Up @@ -864,6 +871,7 @@ public void closeStream() throws IOException {
}
}

@IcebergApi
@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.spark.sql.types.DataType;

import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometPlainVector;
import org.apache.comet.vector.CometVector;

/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */
@IcebergApi
public class MetadataColumnReader extends AbstractColumnReader {
private final BufferAllocator allocator = new RootAllocator();

Expand All @@ -43,9 +45,9 @@ public class MetadataColumnReader extends AbstractColumnReader {
private boolean isConstant;

/**
* @deprecated since 0.10.0, will be made package private in 0.11.0.
* @see <a href="https://github.com/apache/datafusion-comet/issues/2079">Comet Issue #2079</a>
*/
@IcebergApi
public MetadataColumnReader(
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
Expand All @@ -55,6 +57,7 @@ public MetadataColumnReader(
}

// Used by Iceberg
@IcebergApi
public MetadataColumnReader(
DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
Expand All @@ -69,6 +72,7 @@ public void setBatchSize(int batchSize) {
super.setBatchSize(batchSize);
}

@IcebergApi
@Override
public void readBatch(int total) {
if (vector == null) {
Expand All @@ -90,6 +94,7 @@ void setNumNulls(int total) {
vector.setNumNulls(total);
}

@IcebergApi
@Override
public CometVector currentBatch() {
return vector;
Expand Down
4 changes: 4 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.comet.IcebergApi;
import org.apache.comet.NativeBase;

public final class Native extends NativeBase {
Expand Down Expand Up @@ -143,6 +144,7 @@ public static native void setPageV2(
*
* @param handle the handle to the native Parquet column reader
*/
@IcebergApi
public static native void resetBatch(long handle);

/**
Expand Down Expand Up @@ -221,12 +223,14 @@ public static native void setPageV2(
public static native void setDecimal(long handle, byte[] value);

/** Set position of row index vector for Iceberg Metadata Column */
@IcebergApi
public static native void setPosition(long handle, long value, int size);

/** Set row index vector for Spark row index metadata column and return vector size */
public static native int setIndices(long handle, long offset, int size, long[] indices);

/** Set deleted info for Iceberg Metadata Column */
@IcebergApi
public static native void setIsDeleted(long handle, boolean[] isDeleted);

/**
Expand Down
Loading
Loading