diff --git a/.gitignore b/.gitignore index 2c19b1b3a2cd5..af3e81ef32d64 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,8 @@ iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/ # Relational Grammar ANTLR iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/.antlr/ + +# Claude Code +CLAUDE.md +.omc/ +.claude/ diff --git a/external-service-impl/flight-sql/pom.xml b/external-service-impl/flight-sql/pom.xml index be363c336b76f..2e1a4c301a93e 100644 --- a/external-service-impl/flight-sql/pom.xml +++ b/external-service-impl/flight-sql/pom.xml @@ -38,10 +38,25 @@ org.apache.arrow flight-sql + + + org.apache.arrow + arrow-memory-netty + + + org.apache.arrow + arrow-memory-netty-buffer-patch + + + + io.netty + * + + org.apache.arrow - arrow-memory-netty + arrow-memory-unsafe runtime diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java index b591665eab028..66da06f00d22e 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java @@ -19,23 +19,28 @@ package org.apache.iotdb.flight; +import org.apache.arrow.flight.CallHeaders; import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator; +import org.apache.arrow.flight.auth2.CallHeaderAuthenticator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + /** - * Arrow Flight SQL credential validator using Arrow's built-in auth2 framework. Validates - * username/password credentials via IoTDB's SessionManager and returns a Bearer token string as the - * peer identity for subsequent requests. - * - *

Used with {@link BasicCallHeaderAuthenticator} and {@link - * org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator} to provide Basic → Bearer token - * authentication flow. + * Arrow Flight SQL authenticator that supports both Basic and Bearer token authentication. On the + * first call, Basic credentials are validated and a Bearer token is returned. On subsequent calls, + * the Bearer token is used to look up the existing session, avoiding creating a new session per + * call. */ -public class FlightSqlAuthHandler implements BasicCallHeaderAuthenticator.CredentialValidator { +public class FlightSqlAuthHandler implements CallHeaderAuthenticator { private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlAuthHandler.class); + private static final String AUTHORIZATION_HEADER = "authorization"; + private static final String BASIC_PREFIX = "Basic "; + private static final String BEARER_PREFIX = "Bearer "; + private final FlightSqlSessionManager sessionManager; public FlightSqlAuthHandler(FlightSqlSessionManager sessionManager) { @@ -43,17 +48,89 @@ public FlightSqlAuthHandler(FlightSqlSessionManager sessionManager) { } @Override - public org.apache.arrow.flight.auth2.CallHeaderAuthenticator.AuthResult validate( - String username, String password) { - LOGGER.debug("Validating credentials for user: {}", username); - + public AuthResult authenticate(CallHeaders headers) { + Iterable authHeaders; try { - String token = sessionManager.authenticate(username, password, "unknown"); - // Return the token as the peer identity; GeneratedBearerTokenAuthenticator - // wraps it in a Bearer token and sets it in the response header. - return () -> token; - } catch (SecurityException e) { - throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException(); + authHeaders = headers.getAll(AUTHORIZATION_HEADER); + } catch (NullPointerException e) { + throw CallStatus.UNAUTHENTICATED + .withDescription("Missing Authorization header (null header map)") + .toRuntimeException(); + } + + // First pass: check for Bearer token (reuse existing session) + String basicHeader = null; + if (authHeaders == null) { + throw CallStatus.UNAUTHENTICATED + .withDescription("Missing Authorization header") + .toRuntimeException(); + } + for (String authHeader : authHeaders) { + if (authHeader.startsWith(BEARER_PREFIX)) { + String token = authHeader.substring(BEARER_PREFIX.length()); + try { + sessionManager.getSessionByToken(token); + return bearerResult(token); + } catch (SecurityException e) { + // Bearer token invalid/expired, fall through to Basic auth + LOGGER.debug("Bearer token invalid, falling back to Basic auth"); + } + } else if (authHeader.startsWith(BASIC_PREFIX) && basicHeader == null) { + basicHeader = authHeader; + } } + + // Second pass: fall back to Basic auth (create new session) + if (basicHeader != null) { + String encoded = basicHeader.substring(BASIC_PREFIX.length()); + String decoded = new String(Base64.getDecoder().decode(encoded), StandardCharsets.UTF_8); + int colonIdx = decoded.indexOf(':'); + if (colonIdx < 0) { + throw CallStatus.UNAUTHENTICATED + .withDescription("Invalid Basic credentials format") + .toRuntimeException(); + } + String username = decoded.substring(0, colonIdx); + String password = decoded.substring(colonIdx + 1); + + String clientId = headers.get("x-flight-sql-client-id"); + LOGGER.debug("Validating credentials for user: {}, clientId: {}", username, clientId); + try { + String token = sessionManager.authenticate(username, password, "unknown", clientId); + return bearerResult(token); + } catch (SecurityException e) { + throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException(); + } + } + + throw CallStatus.UNAUTHENTICATED + .withDescription("Missing or unsupported Authorization header") + .toRuntimeException(); + } + + /** + * Creates an AuthResult that sends the Bearer token back in response headers. The client's + * ClientIncomingAuthHeaderMiddleware captures this token for use on subsequent calls. + */ + private static AuthResult bearerResult(String token) { + return new AuthResult() { + @Override + public String getPeerIdentity() { + return token; + } + + @Override + public void appendToOutgoingHeaders(CallHeaders outgoingHeaders) { + if (outgoingHeaders == null) { + return; + } + try { + outgoingHeaders.insert(AUTHORIZATION_HEADER, BEARER_PREFIX + token); + } catch (NullPointerException e) { + // Some CallHeaders implementations have null internal maps for certain RPCs + LOGGER.debug("Could not append Bearer token to outgoing headers", e); + } + } + }; } } diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java deleted file mode 100644 index 2cf0d048556b7..0000000000000 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.flight; - -import org.apache.arrow.flight.CallHeaders; -import org.apache.arrow.flight.CallInfo; -import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.FlightServerMiddleware; -import org.apache.arrow.flight.RequestContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Flight Server middleware for handling Bearer token / Basic authentication. Supports initial login - * via Basic auth header (username:password), returning a Bearer token. Subsequent requests use the - * Bearer token. - */ -public class FlightSqlAuthMiddleware implements FlightServerMiddleware { - - private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlAuthMiddleware.class); - - /** The middleware key used to retrieve this middleware in the CallContext. */ - public static final Key KEY = Key.of("flight-sql-auth-middleware"); - - private final CallHeaders incomingHeaders; - - FlightSqlAuthMiddleware(CallHeaders incomingHeaders) { - this.incomingHeaders = incomingHeaders; - } - - /** Returns the incoming call headers for session lookup. */ - public CallHeaders getCallHeaders() { - return incomingHeaders; - } - - @Override - public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { - // no-op: token is set during Handshake response, not here - } - - @Override - public void onCallCompleted(CallStatus status) { - // no-op - } - - @Override - public void onCallErrored(Throwable err) { - // no-op - } - - // ===================== Factory ===================== - - /** Factory that creates FlightSqlAuthMiddleware for each call. */ - public static class Factory implements FlightServerMiddleware.Factory { - - private final FlightSqlSessionManager sessionManager; - - public Factory(FlightSqlSessionManager sessionManager) { - this.sessionManager = sessionManager; - } - - @Override - public FlightSqlAuthMiddleware onCallStarted( - CallInfo callInfo, CallHeaders incomingHeaders, RequestContext context) { - return new FlightSqlAuthMiddleware(incomingHeaders); - } - - public FlightSqlSessionManager getSessionManager() { - return sessionManager; - } - } -} diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java index 96480449fcc14..c775b84f73c67 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java @@ -24,8 +24,6 @@ import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.Location; -import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator; -import org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.slf4j.Logger; @@ -46,89 +44,120 @@ public class FlightSqlService implements IExternalService { private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlService.class); private static final long SESSION_TIMEOUT_MINUTES = 30; + private final Object lifecycleLock = new Object(); private FlightServer flightServer; private BufferAllocator allocator; private FlightSqlSessionManager flightSessionManager; private IoTDBFlightSqlProducer producer; @Override - public void start() { - int port = IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlPort(); - LOGGER.info("Starting Arrow Flight SQL service on port {}", port); - - try { - // Create the root allocator for Arrow memory management - allocator = new RootAllocator(Long.MAX_VALUE); - - // Create session manager with TTL - flightSessionManager = new FlightSqlSessionManager(SESSION_TIMEOUT_MINUTES); - - // Create the auth handler - FlightSqlAuthHandler authHandler = new FlightSqlAuthHandler(flightSessionManager); - - // Create the Flight SQL producer - producer = new IoTDBFlightSqlProducer(allocator, flightSessionManager); - - // Build the Flight server with auth2 Bearer token authentication - Location location = Location.forGrpcInsecure("0.0.0.0", port); - flightServer = - FlightServer.builder(allocator, location, producer) - .headerAuthenticator( - new GeneratedBearerTokenAuthenticator( - new BasicCallHeaderAuthenticator(authHandler))) - .build(); - - flightServer.start(); - LOGGER.info( - "Arrow Flight SQL service started successfully on port {}", flightServer.getPort()); - } catch (IOException e) { - LOGGER.error("Failed to start Arrow Flight SQL service", e); - stop(); - throw new RuntimeException("Failed to start Arrow Flight SQL service", e); + public synchronized void start() { + synchronized (lifecycleLock) { + if (flightServer != null) { + LOGGER.warn("Arrow Flight SQL service already started"); + return; + } + + int port = IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlPort(); + LOGGER.info("Starting Arrow Flight SQL service on port {}", port); + + try { + // Create the root allocator for Arrow memory management with memory limit + long maxMemory = Runtime.getRuntime().maxMemory(); + long allocatorLimit = + Math.min( + IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlMaxAllocatorMemory(), + maxMemory / 4); + allocator = new RootAllocator(allocatorLimit); + LOGGER.info( + "Arrow allocator initialized with limit: {} bytes ({} MB)", + allocatorLimit, + allocatorLimit / (1024 * 1024)); + + Location location = Location.forGrpcInsecure("0.0.0.0", port); + + // Create session manager with TTL + flightSessionManager = new FlightSqlSessionManager(SESSION_TIMEOUT_MINUTES); + FlightSqlAuthHandler authHandler = new FlightSqlAuthHandler(flightSessionManager); + + // Create the Flight SQL producer + producer = new IoTDBFlightSqlProducer(allocator, flightSessionManager); + + flightServer = + FlightServer.builder(allocator, location, producer) + .headerAuthenticator(authHandler) + // directExecutor: run gRPC handlers in the Netty event loop thread to + // avoid thread scheduling issues with the default executor that cause + // "end-of-stream mid-frame" errors on subsequent RPCs. + .transportHint( + "grpc.builderConsumer", + (java.util.function.Consumer) + nsb -> { + nsb.directExecutor(); + nsb.initialFlowControlWindow(1048576); + nsb.flowControlWindow(1048576); + }) + .build(); + + flightServer.start(); + LOGGER.info( + "Arrow Flight SQL service started successfully on port {}", flightServer.getPort()); + } catch (IOException e) { + LOGGER.error("Failed to start Arrow Flight SQL service", e); + stop(); + throw new RuntimeException("Failed to start Arrow Flight SQL service", e); + } } } @Override - public void stop() { - LOGGER.info("Stopping Arrow Flight SQL service"); + public synchronized void stop() { + synchronized (lifecycleLock) { + if (flightServer == null) { + LOGGER.warn("Arrow Flight SQL service not started"); + return; + } - if (flightServer != null) { - try { - flightServer.shutdown(); - flightServer.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting for Flight server shutdown", e); - Thread.currentThread().interrupt(); + LOGGER.info("Stopping Arrow Flight SQL service"); + + if (flightServer != null) { try { - flightServer.close(); - } catch (Exception ex) { - LOGGER.warn("Error force-closing Flight server", ex); + flightServer.shutdown(); + flightServer.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for Flight server shutdown", e); + Thread.currentThread().interrupt(); + try { + flightServer.close(); + } catch (Exception ex) { + LOGGER.warn("Error force-closing Flight server", ex); + } + } catch (Exception e) { + LOGGER.warn("Error shutting down Flight server", e); } - } catch (Exception e) { - LOGGER.warn("Error shutting down Flight server", e); + flightServer = null; } - flightServer = null; - } - if (producer != null) { - try { - producer.close(); - } catch (Exception e) { - LOGGER.warn("Error closing Flight SQL producer", e); + if (producer != null) { + try { + producer.close(); + } catch (Exception e) { + LOGGER.warn("Error closing Flight SQL producer", e); + } + producer = null; } - producer = null; - } - if (flightSessionManager != null) { - flightSessionManager.close(); - flightSessionManager = null; - } + if (flightSessionManager != null) { + flightSessionManager.close(); + flightSessionManager = null; + } - if (allocator != null) { - allocator.close(); - allocator = null; - } + if (allocator != null) { + allocator.close(); + allocator = null; + } - LOGGER.info("Arrow Flight SQL service stopped"); + LOGGER.info("Arrow Flight SQL service stopped"); + } } } diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java index ad829888f2203..825c930f1b765 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java @@ -20,9 +20,11 @@ package org.apache.iotdb.flight; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.InternalClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.rpc.TSStatusCode; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -31,7 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.UUID; +import java.security.SecureRandom; +import java.util.Base64; import java.util.concurrent.TimeUnit; /** @@ -43,30 +46,56 @@ public class FlightSqlSessionManager { private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlSessionManager.class); private static final String AUTHORIZATION_HEADER = "authorization"; private static final String BEARER_PREFIX = "Bearer "; + private static final SecureRandom SECURE_RANDOM = new SecureRandom(); + private static final int MAX_CLIENT_ID_LENGTH = 64; + private static final int MAX_SESSIONS = 1000; + private static final java.util.regex.Pattern CLIENT_ID_PATTERN = + java.util.regex.Pattern.compile("^[a-zA-Z0-9\\-]+$"); private final SessionManager sessionManager = SessionManager.getInstance(); /** Cache of Bearer token -> IClientSession with configurable TTL. */ private final Cache tokenCache; + /** + * Cache of (username@clientId) -> Bearer token for session reuse. Avoids repeated session + * creation on every RPC — necessary because the Arrow Flight client middleware does not always + * cache the Bearer token, causing Basic auth to be re-sent on every call. + * + *

Keyed by {@code username@clientId} where clientId comes from the {@code + * x-flight-sql-client-id} header. This ensures different logical clients (even with the same + * username) get independent sessions with separate USE database contexts. If no clientId header + * is present, falls back to username-only keying (shared session). + */ + private final Cache clientSessionCache; + public FlightSqlSessionManager(long sessionTimeoutMinutes) { this.tokenCache = Caffeine.newBuilder() + .maximumSize(MAX_SESSIONS) .expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES) .removalListener( (String token, IClientSession session, RemovalCause cause) -> { if (session != null && cause != RemovalCause.REPLACED) { - LOGGER.info("Flight SQL session expired, closing: {}", session); - sessionManager.closeSession( - session, - queryId -> - org.apache.iotdb.db.queryengine.plan.Coordinator.getInstance() - .cleanupQueryExecution(queryId), - false); - sessionManager.removeCurrSessionForMqtt(null); // handled via sessions map only + LOGGER.info("Flight SQL session expired: {}, cause: {}", session, cause); + try { + sessionManager.closeSession( + session, + queryId -> + org.apache.iotdb.db.queryengine.plan.Coordinator.getInstance() + .cleanupQueryExecution(queryId), + false); + } catch (Exception e) { + LOGGER.error("Error closing expired session", e); + } } }) .build(); + this.clientSessionCache = + Caffeine.newBuilder() + .maximumSize(MAX_SESSIONS) + .expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES) + .build(); } /** @@ -74,39 +103,78 @@ public FlightSqlSessionManager(long sessionTimeoutMinutes) { * * @param username the username * @param password the password - * @param clientAddress the client's IP address + * @param clientAddress the client's IP address (for logging) + * @param clientId optional client identifier from x-flight-sql-client-id header (may be null) * @return the Bearer token if authentication succeeds * @throws SecurityException if authentication fails */ - public String authenticate(String username, String password, String clientAddress) { - // Create a session for this client - IClientSession session = new InternalClientSession("FlightSQL-" + clientAddress); - session.setSqlDialect(IClientSession.SqlDialect.TABLE); + public String authenticate( + String username, String password, String clientAddress, String clientId) { + // NOTE: We intentionally do NOT call SessionManager.login() here because it performs + // blocking I/O that is incompatible with directExecutor() on the Netty event loop: + // - DataNodeAuthUtils.checkPasswordExpiration: executes SELECT via Coordinator + // - AuthorityChecker.getUserId: sends RPC to ConfigNode on cache miss + // Blocking the event loop corrupts HTTP/2 connection state and causes "end-of-stream + // mid-frame" errors on subsequent RPCs. + // + // Functional gaps vs SessionManager.login(): + // - Password expiration checks (requires Coordinator query) + // - Login lock / brute-force protection (LoginLockManager is in-memory but keys + // by userId; AuthorityChecker.getUserId() is a blocking RPC, so we cannot obtain + // a correct userId without risking event loop stalls) + // + // Risk: AuthorityChecker.checkUser() may perform a one-time blocking RPC to ConfigNode + // on cache miss (ClusterAuthorityFetcher.login). After the first successful auth, the + // credential is cached locally, and clientSessionCache avoids repeated authenticate() + // calls for the same client. + // + // TODO: Support password expiration and login lock. This requires either: + // (a) async auth support in Arrow Flight (not yet available), or + // (b) resolving the Netty classpath conflict so directExecutor() is no longer needed. + + // Always verify credentials — never skip password verification even if a cached + // session exists for this client. + org.apache.iotdb.common.rpc.thrift.TSStatus status; + try { + status = AuthorityChecker.checkUser(username, password); + } catch (Exception e) { + throw new SecurityException("Authentication failed", e); + } + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Authentication failed for client: {}", clientAddress); + throw new SecurityException("Authentication failed: wrong username or password"); + } - // Register the session before login (MQTT pattern) - sessionManager.registerSessionForMqtt(session); - - // Use SessionManager's login method - org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp loginResp = - sessionManager.login( - session, - username, - password, - java.time.ZoneId.systemDefault().getId(), - SessionManager.CURRENT_RPC_VERSION, - IoTDBConstant.ClientVersion.V_1_0, - IClientSession.SqlDialect.TABLE); - - if (loginResp.getCode() != org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // Remove the session if login failed - sessionManager.removeCurrSessionForMqtt(null); - throw new SecurityException("Authentication failed: " + loginResp.getMessage()); + // Reuse existing session for this client. + // Key uses \0 (null byte) delimiter — cannot appear in usernames or HTTP headers, + // so the mapping (username, clientId) -> cacheKey is injective (no collisions). + String validClientId = validateClientId(clientId); + String cacheKey = validClientId != null ? username + "\0" + validClientId : username; + String existingToken = clientSessionCache.getIfPresent(cacheKey); + if (existingToken != null && tokenCache.getIfPresent(existingToken) != null) { + return existingToken; } - // Generate Bearer token and store in cache - String token = UUID.randomUUID().toString(); + // Create session. Do NOT call registerSession() — it sets a ThreadLocal (currSession) + // designed for the client-thread model (Thrift). gRPC with directExecutor() runs all + // handlers on the Netty event loop, so ThreadLocal-based session tracking would pollute. + IClientSession session = new InternalClientSession("FlightSQL-" + clientAddress); + session.setSqlDialect(IClientSession.SqlDialect.TABLE); + // Pass -1L for userId — getUserId() sends blocking RPC to ConfigNode. + sessionManager.supplySession( + session, + -1L, + username, + java.time.ZoneId.systemDefault(), + IoTDBConstant.ClientVersion.V_1_0); + + // Generate cryptographically secure Bearer token (32 bytes = 256 bits) + byte[] tokenBytes = new byte[32]; + SECURE_RANDOM.nextBytes(tokenBytes); + String token = Base64.getUrlEncoder().withoutPadding().encodeToString(tokenBytes); tokenCache.put(token, session); - LOGGER.info("Flight SQL user '{}' authenticated, session: {}", username, session); + clientSessionCache.put(cacheKey, token); + LOGGER.info("Flight SQL authentication successful for client: {}", clientAddress); return token; } @@ -141,6 +209,29 @@ public IClientSession getSessionByToken(String token) { return session; } + /** + * Validates the client ID from the x-flight-sql-client-id header. Returns the validated clientId, + * or null if the header was absent (null/empty). Non-empty invalid clientIds are rejected + * (fail-closed) to prevent silent fallback to shared username-only sessions, which would break + * USE database isolation. + * + * @throws SecurityException if clientId is non-empty but invalid (too long or bad characters) + */ + private static String validateClientId(String clientId) { + if (clientId == null || clientId.isEmpty()) { + return null; + } + if (clientId.length() > MAX_CLIENT_ID_LENGTH) { + throw new SecurityException( + "Client ID exceeds maximum length of " + MAX_CLIENT_ID_LENGTH + " characters"); + } + if (!CLIENT_ID_PATTERN.matcher(clientId).matches()) { + throw new SecurityException( + "Client ID contains invalid characters (only alphanumeric and dash allowed)"); + } + return clientId; + } + /** Invalidates all sessions and cleans up resources. */ public void close() { tokenCache.invalidateAll(); diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java index 067ccfb20d4b0..1793a64abecd9 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java @@ -34,6 +34,10 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.rpc.TSStatusCode; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.Criteria; @@ -54,11 +58,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.Collections; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * Apache Arrow Flight SQL producer implementation for IoTDB. Handles SQL query execution via the @@ -76,8 +79,25 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { private final SqlParser sqlParser = new SqlParser(); private final Metadata metadata = LocalExecutionPlanner.getInstance().metadata; - /** Stores query execution context by queryId for streaming results via getStream. */ - private final ConcurrentHashMap activeQueries = new ConcurrentHashMap<>(); + /** + * Stores query execution context by queryId for streaming results via getStream. Uses Caffeine + * cache with TTL to prevent resource leaks when clients don't call getStream. + */ + private final Cache activeQueries = + Caffeine.newBuilder() + .expireAfterWrite(CONFIG.getQueryTimeoutThreshold(), TimeUnit.MILLISECONDS) + .removalListener( + (Long queryId, QueryContext ctx, RemovalCause cause) -> { + if (ctx != null && cause != RemovalCause.EXPLICIT) { + LOGGER.warn("Query {} evicted due to {}, cleaning up", queryId, cause); + try { + coordinator.cleanupQueryExecution(queryId); + } catch (Exception e) { + LOGGER.error("Error cleaning up evicted query {}", queryId, e); + } + } + }) + .build(); public IoTDBFlightSqlProducer( BufferAllocator allocator, FlightSqlSessionManager flightSessionManager) { @@ -105,14 +125,31 @@ private IClientSession getSessionFromContext(CallContext context) { // ===================== SQL Query Execution ===================== + private static final int MAX_QUERY_LENGTH = 100_000; // 100KB + @Override public FlightInfo getFlightInfoStatement( FlightSql.CommandStatementQuery command, CallContext context, FlightDescriptor descriptor) { String sql = command.getQuery(); - LOGGER.debug("getFlightInfoStatement: {}", sql); + + // Validate query length + if (sql == null || sql.trim().isEmpty()) { + throw CallStatus.INVALID_ARGUMENT.withDescription("Empty SQL query").toRuntimeException(); + } + if (sql.length() > MAX_QUERY_LENGTH) { + throw CallStatus.INVALID_ARGUMENT + .withDescription("Query exceeds maximum length of " + MAX_QUERY_LENGTH + " characters") + .toRuntimeException(); + } IClientSession session = getSessionFromContext(context); + // Log query for audit (truncate if too long) + LOGGER.info( + "Executing query for user {}: {}", + session.getUsername(), + sql.substring(0, Math.min(sql.length(), 200))); + Long queryId = null; try { queryId = sessionManager.requestQueryId(); @@ -142,9 +179,10 @@ public FlightInfo getFlightInfoStatement( IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); if (queryExecution == null) { - throw CallStatus.INTERNAL - .withDescription("Query execution not found after execution") - .toRuntimeException(); + // Non-query statements (USE, CREATE, INSERT, etc.) don't produce a query execution. + // Return an empty FlightInfo with no endpoints. + return new FlightInfo( + new Schema(Collections.emptyList()), descriptor, Collections.emptyList(), 0, 0); } DatasetHeader header = queryExecution.getDatasetHeader(); @@ -153,20 +191,28 @@ public FlightInfo getFlightInfoStatement( // Store the query context for later getStream calls activeQueries.put(queryId, new QueryContext(queryExecution, header, session)); - // Build ticket containing the queryId - byte[] ticketBytes = Long.toString(queryId).getBytes(StandardCharsets.UTF_8); - Ticket ticket = new Ticket(ticketBytes); + // Build ticket as a serialized TicketStatementQuery protobuf. + // The FlightSqlProducer base class's getStream() unpacks tickets as Any + // and dispatches to getStreamStatement(). + ByteString handle = ByteString.copyFromUtf8(Long.toString(queryId)); + FlightSql.TicketStatementQuery ticketQuery = + FlightSql.TicketStatementQuery.newBuilder().setStatementHandle(handle).build(); + Ticket ticket = new Ticket(Any.pack(ticketQuery).toByteArray()); FlightEndpoint endpoint = new FlightEndpoint(ticket); return new FlightInfo(arrowSchema, descriptor, Collections.singletonList(endpoint), -1, -1); - } catch (RuntimeException e) { + } catch (Exception e) { // Cleanup on error + LOGGER.error("Error executing query: {}", sql, e); if (queryId != null) { coordinator.cleanupQueryExecution(queryId); - activeQueries.remove(queryId); + activeQueries.invalidate(queryId); } - throw e; + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException(); } } @@ -184,17 +230,26 @@ public void getStreamStatement( CallContext context, ServerStreamListener listener) { ByteString handle = ticketQuery.getStatementHandle(); - long queryId = Long.parseLong(handle.toStringUtf8()); + long queryId; + try { + queryId = Long.parseLong(handle.toStringUtf8()); + } catch (NumberFormatException e) { + listener.error( + CallStatus.INVALID_ARGUMENT + .withDescription("Invalid statement handle: " + handle.toStringUtf8()) + .toRuntimeException()); + return; + } streamQueryResults(queryId, listener); } /** Streams query results for a given queryId as Arrow VectorSchemaRoot batches. */ private void streamQueryResults(long queryId, ServerStreamListener listener) { - QueryContext ctx = activeQueries.get(queryId); + QueryContext ctx = activeQueries.getIfPresent(queryId); if (ctx == null) { listener.error( CallStatus.NOT_FOUND - .withDescription("Query not found for id: " + queryId) + .withDescription("Query not found or expired: " + queryId) .toRuntimeException()); return; } @@ -220,9 +275,15 @@ private void streamQueryResults(long queryId, ServerStreamListener listener) { } catch (IoTDBException e) { LOGGER.error("Error streaming query results for queryId={}", queryId, e); listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException()); + } catch (Exception e) { + LOGGER.error("Unexpected error streaming query results for queryId={}", queryId, e); + listener.error( + CallStatus.INTERNAL + .withDescription("Internal error: " + e.getMessage()) + .toRuntimeException()); } finally { coordinator.cleanupQueryExecution(queryId); - activeQueries.remove(queryId); + activeQueries.invalidate(queryId); if (root != null) { root.close(); } @@ -503,14 +564,14 @@ public void getStreamCrossReference( @Override public void close() throws Exception { // Clean up all active queries - for (Long queryId : activeQueries.keySet()) { + for (Long queryId : activeQueries.asMap().keySet()) { try { coordinator.cleanupQueryExecution(queryId); } catch (Exception e) { LOGGER.warn("Error cleaning up query {} during shutdown", queryId, e); } } - activeQueries.clear(); + activeQueries.invalidateAll(); } // ===================== Inner Classes ===================== diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java index 396f230d0b91c..84c6984d9a688 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java @@ -122,7 +122,8 @@ public static void fillVectorSchemaRoot( for (int colIdx = 0; colIdx < columnNames.size(); colIdx++) { String colName = columnNames.get(colIdx); - Integer sourceIdx = headerMap.get(colName); + int sourceIdx = + (headerMap != null && headerMap.containsKey(colName)) ? headerMap.get(colName) : colIdx; Column column = tsBlock.getColumn(sourceIdx); TSDataType dataType = dataTypes.get(colIdx); FieldVector fieldVector = root.getVector(colIdx); diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 4ef730957e4cd..06382ea991321 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -246,6 +246,13 @@ provided + + org.apache.iotdb + flight-sql + 2.0.7-SNAPSHOT + + provided + org.apache.arrow flight-sql diff --git a/integration-test/src/assembly/mpp-share.xml b/integration-test/src/assembly/mpp-share.xml index 70072e8282ec6..74de1ae8b2377 100644 --- a/integration-test/src/assembly/mpp-share.xml +++ b/integration-test/src/assembly/mpp-share.xml @@ -39,5 +39,9 @@ ${project.basedir}/../external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar lib + + ${project.basedir}/../external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar + lib + diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java index d931ca7799944..625337ab4fa93 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java @@ -25,7 +25,11 @@ import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.arrow.flight.CallHeaders; +import org.apache.arrow.flight.CallInfo; +import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightClientMiddleware; import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightStream; @@ -49,6 +53,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.UUID; import static org.junit.Assert.*; @@ -62,11 +67,12 @@ public class IoTDBArrowFlightSqlIT { private static final String DATABASE = "flightsql_test_db"; + private static final String TABLE = DATABASE + ".test_table"; private static final String USER = "root"; private static final String PASSWORD = "root"; + private String clientId; private BufferAllocator allocator; - private FlightClient flightClient; private FlightSqlClient flightSqlClient; private CredentialCallOption bearerToken; @@ -77,27 +83,15 @@ public void setUp() throws Exception { baseEnv.getConfig().getDataNodeConfig().setEnableArrowFlightSqlService(true); baseEnv.initClusterEnvironment(); - // Get the Flight SQL port from the data node int port = EnvFactory.getEnv().getArrowFlightSqlPort(); - - // Create Arrow allocator and Flight client with Bearer token auth middleware allocator = new RootAllocator(Long.MAX_VALUE); Location location = Location.forGrpcInsecure("127.0.0.1", port); - // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from the - // auth handshake - ClientIncomingAuthHeaderMiddleware.Factory authFactory = - new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); - - flightClient = FlightClient.builder(allocator, location).intercept(authFactory).build(); - - // Authenticate: sends Basic credentials, server returns Bearer token + clientId = UUID.randomUUID().toString(); + flightSqlClient = createFlightSqlClient(clientId); bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); - // Wrap in FlightSqlClient for Flight SQL protocol operations - flightSqlClient = new FlightSqlClient(flightClient); - - // Use the standard session to create the test database and table with data + // Create test data via native session (not Flight SQL) try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS " + DATABASE); } @@ -132,24 +126,37 @@ public void tearDown() throws Exception { // ignore } } - if (flightClient != null) { - try { - flightClient.close(); - } catch (Exception e) { - // ignore - } - } if (allocator != null) { allocator.close(); } EnvFactory.getEnv().cleanClusterEnvironment(); } + @Test + public void testShowDatabases() throws Exception { + FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", bearerToken); + + List> rows = fetchAllRows(flightInfo); + assertTrue("Should have at least 1 database", rows.size() >= 1); + + boolean found = false; + for (List row : rows) { + for (String val : row) { + if (val.contains(DATABASE)) { + found = true; + break; + } + } + } + assertTrue("Should find test database " + DATABASE, found); + } + @Test public void testQueryWithAllDataTypes() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( - "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM test_table ORDER BY time", bearerToken); + "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM " + TABLE + " ORDER BY time", + bearerToken); // Validate schema Schema schema = flightInfo.getSchema(); @@ -166,7 +173,7 @@ public void testQueryWithAllDataTypes() throws Exception { public void testQueryWithFilter() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( - "SELECT id1, s1 FROM test_table WHERE id1 = 'device1' ORDER BY time", bearerToken); + "SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY time", bearerToken); List> rows = fetchAllRows(flightInfo); assertEquals("Should have 2 rows for device1", 2, rows.size()); @@ -177,7 +184,9 @@ public void testQueryWithAggregation() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum " - + "FROM test_table GROUP BY id1 ORDER BY id1", + + "FROM " + + TABLE + + " GROUP BY id1 ORDER BY id1", bearerToken); List> rows = fetchAllRows(flightInfo); @@ -187,39 +196,133 @@ public void testQueryWithAggregation() throws Exception { @Test public void testEmptyResult() throws Exception { FlightInfo flightInfo = - flightSqlClient.execute("SELECT * FROM test_table WHERE id1 = 'nonexistent'", bearerToken); + flightSqlClient.execute( + "SELECT * FROM " + TABLE + " WHERE id1 = 'nonexistent'", bearerToken); List> rows = fetchAllRows(flightInfo); assertEquals("Should have 0 rows", 0, rows.size()); } @Test - public void testShowDatabases() throws Exception { - FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", bearerToken); + public void testUseDbSessionPersistence() throws Exception { + // Connection 1: USE database (same clientId shares the session) + flightSqlClient.execute("USE " + DATABASE, bearerToken); - List> rows = fetchAllRows(flightInfo); - assertTrue("Should have at least 1 database", rows.size() >= 1); + // Connection 2: query without fully-qualified table name. + // Same clientId ensures the same session is reused, so USE context persists. + FlightSqlClient client2 = createFlightSqlClient(clientId); + try { + CredentialCallOption token2 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + FlightInfo flightInfo = client2.execute("SELECT * FROM test_table ORDER BY time", token2); + List> rows = fetchAllRows(flightInfo, client2, token2); + assertEquals("Should have 3 rows from unqualified query after USE", 3, rows.size()); + } finally { + client2.close(); + } + } - boolean found = false; - for (List row : rows) { - for (String val : row) { - if (val.contains(DATABASE)) { - found = true; - break; - } - } + @Test + public void testUseDbWithFullyQualifiedFallback() throws Exception { + // Connection 1: USE database + flightSqlClient.execute("USE " + DATABASE, bearerToken); + + // Connection 2: unqualified query (same clientId → same session) + FlightSqlClient client2 = createFlightSqlClient(clientId); + try { + CredentialCallOption token2 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + FlightInfo infoUnqualified = + client2.execute("SELECT * FROM test_table ORDER BY time", token2); + List> rowsUnqualified = fetchAllRows(infoUnqualified, client2, token2); + assertEquals("Unqualified query should return 3 rows", 3, rowsUnqualified.size()); + } finally { + client2.close(); + } + + // Connection 3: fully-qualified query + FlightSqlClient client3 = createFlightSqlClient(clientId); + try { + CredentialCallOption token3 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + FlightInfo infoQualified = + client3.execute("SELECT * FROM " + TABLE + " ORDER BY time", token3); + List> rowsQualified = fetchAllRows(infoQualified, client3, token3); + assertEquals("Fully-qualified query should also return 3 rows", 3, rowsQualified.size()); + } finally { + client3.close(); } - assertTrue("Should find test database " + DATABASE, found); } + @Test + public void testUseDbIsolationAcrossClients() throws Exception { + // Client A (clientId from setUp): USE DATABASE + flightSqlClient.execute("USE " + DATABASE, bearerToken); + + // Client B (different clientId): gets its own independent session with NO USE context. + // Querying an unqualified table name should fail because no database is selected. + String clientIdB = UUID.randomUUID().toString(); + FlightSqlClient clientB = createFlightSqlClient(clientIdB); + CredentialCallOption tokenB = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + try { + clientB.execute("SELECT * FROM test_table", tokenB); + fail("Client B should fail on unqualified table query without USE"); + } catch (Exception expected) { + // Expected: Client B has no database context, so unqualified table query fails. + // Arrow Flight wraps the actual error, so we just verify the query did fail. + assertNotNull("Exception should have a message", expected.getMessage()); + } finally { + clientB.close(); + } + + // Client A's USE context is preserved (same clientId → same session) + FlightSqlClient clientA2 = createFlightSqlClient(clientId); + CredentialCallOption tokenA2 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + try { + FlightInfo infoA = clientA2.execute("SELECT * FROM test_table ORDER BY time", tokenA2); + List> rowsA = fetchAllRows(infoA, clientA2, tokenA2); + assertEquals("Client A should still see 3 rows after Client B's queries", 3, rowsA.size()); + } finally { + clientA2.close(); + } + } + + @Test + public void testInvalidClientIdRejected() throws Exception { + // A non-empty clientId with invalid characters (contains @) should be rejected (fail-closed). + // Only null/empty clientId should fall back to shared session keying. + String invalidClientId = "bad@client!id"; + FlightSqlClient invalidClient = createFlightSqlClient(invalidClientId); + CredentialCallOption token = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + try { + invalidClient.execute("SHOW DATABASES", token); + fail("Server should reject invalid clientId during authentication"); + } catch (Exception expected) { + // Expected: server rejects the invalid clientId + assertNotNull("Exception should have a message", expected.getMessage()); + } finally { + invalidClient.close(); + } + } + + // ===================== Helper Methods ===================== + /** - * Fetches all rows from all endpoints in a FlightInfo. Each row is a list of string - * representations of the column values. + * Fetches all rows from all endpoints in a FlightInfo using the shared client. Each row is a list + * of string representations of the column values. */ private List> fetchAllRows(FlightInfo flightInfo) throws Exception { + return fetchAllRows(flightInfo, flightSqlClient, bearerToken); + } + + private List> fetchAllRows( + FlightInfo flightInfo, FlightSqlClient client, CredentialCallOption token) throws Exception { List> rows = new ArrayList<>(); for (FlightEndpoint endpoint : flightInfo.getEndpoints()) { - try (FlightStream stream = flightSqlClient.getStream(endpoint.getTicket(), bearerToken)) { + try (FlightStream stream = client.getStream(endpoint.getTicket(), token)) { while (stream.next()) { VectorSchemaRoot root = stream.getRoot(); int rowCount = root.getRowCount(); @@ -236,4 +339,50 @@ private List> fetchAllRows(FlightInfo flightInfo) throws Exception } return rows; } + + private FlightSqlClient createFlightSqlClient(String flightClientId) { + int port = EnvFactory.getEnv().getArrowFlightSqlPort(); + Location location = Location.forGrpcInsecure("127.0.0.1", port); + ClientIncomingAuthHeaderMiddleware.Factory authFactory = + new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); + FlightClient client = + FlightClient.builder(allocator, location) + .intercept(authFactory) + .intercept(new ClientIdMiddlewareFactory(flightClientId)) + .build(); + return new FlightSqlClient(client); + } + + /** + * FlightClientMiddleware that injects the x-flight-sql-client-id header on every call. This + * allows the server to key sessions per logical client, enabling per-client USE database + * isolation. + */ + private static class ClientIdMiddlewareFactory implements FlightClientMiddleware.Factory { + private final String flightClientId; + + ClientIdMiddlewareFactory(String flightClientId) { + this.flightClientId = flightClientId; + } + + @Override + public FlightClientMiddleware onCallStarted(CallInfo info) { + return new FlightClientMiddleware() { + @Override + public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { + outgoingHeaders.insert("x-flight-sql-client-id", flightClientId); + } + + @Override + public void onHeadersReceived(CallHeaders incomingHeaders) { + // no-op + } + + @Override + public void onCallCompleted(CallStatus status) { + // no-op + } + }; + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java index 014586c5e1206..c881479e83c0d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java @@ -67,7 +67,7 @@ public static void tearDown() throws Exception { public void testQueryResult() { String[] retArray = new String[] { - "MQTT,1,STOPPED,", "REST,1,STOPPED,", + "FLIGHT_SQL,1,STOPPED,", "MQTT,1,STOPPED,", "REST,1,STOPPED,", }; // TableModel diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 3fef5d79fe11a..e63fb54f66b9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -112,6 +112,9 @@ public class IoTDBConfig { /** The Arrow Flight SQL service binding port. */ private int arrowFlightSqlPort = 8904; + /** The Arrow Flight SQL max allocator memory in bytes (default: 4GB). */ + private long arrowFlightSqlMaxAllocatorMemory = 4L * 1024 * 1024 * 1024; + /** The mqtt service binding host. */ private String mqttHost = "127.0.0.1"; @@ -2564,6 +2567,14 @@ public void setArrowFlightSqlPort(int arrowFlightSqlPort) { this.arrowFlightSqlPort = arrowFlightSqlPort; } + public long getArrowFlightSqlMaxAllocatorMemory() { + return arrowFlightSqlMaxAllocatorMemory; + } + + public void setArrowFlightSqlMaxAllocatorMemory(long arrowFlightSqlMaxAllocatorMemory) { + this.arrowFlightSqlMaxAllocatorMemory = arrowFlightSqlMaxAllocatorMemory; + } + public String getMqttHost() { return mqttHost; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 6730138b2af5c..3edd09100498a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -880,6 +880,9 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException // mqtt loadMqttProps(properties); + // Arrow Flight SQL + loadArrowFlightSqlProps(properties); + conf.setIntoOperationBufferSizeInByte( Long.parseLong( properties.getProperty( @@ -1942,6 +1945,24 @@ private void loadMqttProps(TrimProperties properties) { } } + // Arrow Flight SQL related + private void loadArrowFlightSqlProps(TrimProperties properties) { + if (properties.getProperty("enable_arrow_flight_sql_service") != null) { + conf.setEnableArrowFlightSqlService( + Boolean.parseBoolean(properties.getProperty("enable_arrow_flight_sql_service").trim())); + } + + if (properties.getProperty("arrow_flight_sql_port") != null) { + conf.setArrowFlightSqlPort( + Integer.parseInt(properties.getProperty("arrow_flight_sql_port").trim())); + } + + if (properties.getProperty("arrow_flight_sql_max_allocator_memory") != null) { + conf.setArrowFlightSqlMaxAllocatorMemory( + Long.parseLong(properties.getProperty("arrow_flight_sql_max_allocator_memory").trim())); + } + } + // timed flush memtable private void loadTimedService(TrimProperties properties) throws IOException { conf.setEnableTimedFlushSeqMemtable( diff --git a/pom.xml b/pom.xml index 9312be038d3bf..aba63e83d6833 100644 --- a/pom.xml +++ b/pom.xml @@ -769,6 +769,11 @@ adbc-driver-flight-sql 0.22.0 + + org.apache.arrow + arrow-memory-unsafe + ${arrow.version} +