Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.9.0 [unreleased]

### Features

1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client.

## 1.8.0 [2026-02-19]

### Features
Expand Down
34 changes: 32 additions & 2 deletions src/main/java/com/influxdb/v3/client/config/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand All @@ -36,6 +37,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.grpc.ClientInterceptor;

import com.influxdb.v3.client.write.WriteOptions;
import com.influxdb.v3.client.write.WritePrecision;

Expand Down Expand Up @@ -68,6 +71,7 @@
* <li><code>headers</code> - headers to be added to requests</li>
* <li><code>sslRootsFilePath</code> - path to the stored certificates file in PEM format</li>
* <li><code>disableGRPCCompression</code> - disables the default gRPC compression header</li>
* <li><code>interceptors</code> - list of client interceptors to be used in the query API</li>
* </ul>
* <p>
* If you want to create a client with custom configuration, you can use following code:
Expand Down Expand Up @@ -115,6 +119,7 @@ public final class ClientConfig {
private final Map<String, String> headers;
private final String sslRootsFilePath;
private final boolean disableGRPCCompression;
private final List<ClientInterceptor> interceptors;

/**
* Deprecated use {@link #proxyUrl}.
Expand Down Expand Up @@ -329,6 +334,16 @@ public boolean getDisableGRPCCompression() {
return disableGRPCCompression;
}

/**
* Gets a list of client interceptors.
*
* @return a list of client interceptors.
*/
@Nullable
public List<ClientInterceptor> getInterceptors() {
return interceptors;
}

/**
* Validates the configuration properties.
*/
Expand Down Expand Up @@ -366,7 +381,8 @@ public boolean equals(final Object o) {
&& Objects.equals(authenticator, that.authenticator)
&& Objects.equals(headers, that.headers)
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath)
&& disableGRPCCompression == that.disableGRPCCompression;
&& disableGRPCCompression == that.disableGRPCCompression
&& Objects.equals(interceptors, that.interceptors);
}

@Override
Expand All @@ -375,7 +391,7 @@ public int hashCode() {
database, writePrecision, gzipThreshold, writeNoSync,
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
proxy, proxyUrl, authenticator, headers,
defaultTags, sslRootsFilePath, disableGRPCCompression);
defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors);
}

@Override
Expand Down Expand Up @@ -429,6 +445,7 @@ public static final class Builder {
private Map<String, String> headers;
private String sslRootsFilePath;
private boolean disableGRPCCompression;
private List<ClientInterceptor> interceptors;

/**
* Sets the URL of the InfluxDB server.
Expand Down Expand Up @@ -723,6 +740,18 @@ public Builder disableGRPCCompression(final boolean disableGRPCCompression) {
return this;
}

/**
* Sets a list of interceptors to be used for the query API.
*
* @param interceptors a list of ClientInterceptor
* @return this
*/
@Nonnull
public Builder interceptors(@Nullable final List<ClientInterceptor> interceptors) {
this.interceptors = interceptors;
return this;
}

/**
* Build an instance of {@code ClientConfig}.
*
Expand Down Expand Up @@ -897,5 +926,6 @@ private ClientConfig(@Nonnull final Builder builder) {
headers = builder.headers;
sslRootsFilePath = builder.sslRootsFilePath;
disableGRPCCompression = builder.disableGRPCCompression;
interceptors = builder.interceptors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
.with(Codec.Identity.NONE, false));
}

if (config.getInterceptors() != null) {
nettyChannelBuilder.intercept(config.getInterceptors());
}

return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ProxyDetector;
import io.grpc.internal.GrpcUtil;
import org.apache.arrow.flight.CallHeaders;
Expand Down Expand Up @@ -108,6 +116,44 @@ public void callHeaders() throws Exception {
}
}

@Test
public void setHeaderInInterceptor() throws Exception {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions,
final Channel next
) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<>(call) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata headers) {
Metadata.Key<String> key = Metadata.Key.of(
"some-header",
Metadata.ASCII_STRING_MARSHALLER
);
headers.put(key, "This is from interceptor");
super.start(responseListener, headers);
}
};
}
};

ClientConfig clientConfig = new ClientConfig.Builder()
.host(server.getLocation().getUri().toString())
.token("my-token".toCharArray())
.interceptors(List.of(interceptor))
.build();

try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig);
var data = executeQuery(flightSqlClient, Map.of(), Map.of())) {
Assertions.assertThat(data.count()).isEqualTo(rowCount);
final Map<String, String> receivedHeaders = headerFactory.getLastInstance().getHeaders();
Assertions.assertThat(receivedHeaders.get("some-header")).isEqualTo("This is from interceptor");
}
}

@Test
public void callHeadersWithoutToken() throws Exception {
ClientConfig clientConfig = new ClientConfig.Builder()
Expand Down
Loading