From 5c2519583eceb594ee0795623d466061bb693dee Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Mon, 6 Apr 2026 11:08:55 -0700 Subject: [PATCH 01/10] Integrate SQL REST endpoint with analytics engine path Add SQL query routing through the analytics engine for Parquet-backed indices. SQL queries targeting "parquet_*" indices are routed to RestUnifiedQueryAction via the unified query pipeline (Calcite SQL parser -> UnifiedQueryPlanner -> AnalyticsExecutionEngine). Changes: - Add SqlNode table extraction in RestUnifiedQueryAction.extractIndexName() to support SQL query routing (handles SqlSelect -> SqlIdentifier) - Add executeSql() and explainSql() methods in RestUnifiedQueryAction for SQL queries (parallel to existing PPL execute/explain) - Add analytics routing in RestSqlAction via optional BiFunction router that checks isAnalyticsIndex before delegating to SQLService - Wire the router in SQLPlugin.createSqlAnalyticsRouter() - Non-analytics SQL queries fall through to the existing V2 engine - Add AnalyticsSQLIT integration tests: SELECT *, column projection, explain, non-parquet fallback, syntax error handling Resolves: https://github.com/opensearch-project/sql/issues/5248 Signed-off-by: Kai Huang --- .../analytics/AnalyticsExecutionEngine.java | 4 +- .../sql/sql/AnalyticsSQLExplainIT.java | 94 ++++++++++++++++ .../opensearch/sql/sql/AnalyticsSQLIT.java | 101 ++++++++++++++++++ .../analytics_sql/explain_select_columns.json | 5 + .../analytics_sql/explain_select_star.json | 5 + .../analytics_sql/explain_select_where.json | 5 + .../sql/legacy/plugin/RestSqlAction.java | 48 ++++++++- plugin/build.gradle | 1 + .../org/opensearch/sql/plugin/SQLPlugin.java | 76 ++++++++++++- .../plugin/rest/RestUnifiedQueryAction.java | 75 ++++++++++++- 10 files changed, 410 insertions(+), 4 deletions(-) create mode 100644 integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java create mode 100644 integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_columns.json create mode 100644 integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_star.json create mode 100644 integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_where.json diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index 07176586004..82f57af5b97 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -99,7 +99,9 @@ public void explain( ResponseListener listener) { try { String logical = RelOptUtil.toString(plan, mode.toExplainLevel()); - listener.onResponse(new ExplainResponse(new ExplainResponseNodeV2(logical, null, null))); + ExplainResponse response = + new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)); + listener.onResponse(ExplainResponse.normalizeLf(response)); } catch (Exception e) { listener.onFailure(e); } diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java new file mode 100644 index 00000000000..9fb2bcbed96 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.sql; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId; + +import com.google.common.io.Resources; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.sql.legacy.SQLIntegTestCase; + +/** + * Explain integration tests for SQL queries routed through the analytics engine path (Project + * Analytics engine). Validates that SQL queries targeting "parquet_*" indices produce correct + * logical plans via the _plugins/_sql/_explain endpoint. + * + *

Expected output files are in resources/expectedOutput/analytics_sql/. Each test compares the + * explain JSON output against its expected file. + */ +@SuppressWarnings("deprecation") // assertJsonEqualsIgnoreId is correct for JSON explain response +public class AnalyticsSQLExplainIT extends SQLIntegTestCase { + + @Override + protected void init() throws Exception { + if (!isIndexExist(client(), "parquet_logs")) { + Request request = new Request("PUT", "/parquet_logs"); + request.setJsonEntity( + "{" + + "\"mappings\": {" + + " \"properties\": {" + + " \"ts\": {\"type\": \"date\"}," + + " \"status\": {\"type\": \"integer\"}," + + " \"message\": {\"type\": \"keyword\"}," + + " \"ip_addr\": {\"type\": \"keyword\"}" + + " }" + + "}" + + "}"); + client().performRequest(request); + } + } + + private String explainSqlQuery(String sql) throws IOException { + Request request = new Request("POST", "/_plugins/_sql/_explain"); + request.setJsonEntity("{\"query\": \"" + sql + "\"}"); + request.setOptions(RequestOptions.DEFAULT); + Response response = client().performRequest(request); + return getResponseBody(response); + } + + private static String loadExpectedJson(String fileName) { + return loadFromFile("expectedOutput/analytics_sql/" + fileName); + } + + private static String loadFromFile(String filename) { + try { + URI uri = Resources.getResource(filename).toURI(); + return new String(Files.readAllBytes(Paths.get(uri))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testExplainSelectStar() throws IOException { + assertJsonEqualsIgnoreId( + loadExpectedJson("explain_select_star.json"), + explainSqlQuery("SELECT * FROM parquet_logs")); + } + + @Test + public void testExplainSelectColumns() throws IOException { + assertJsonEqualsIgnoreId( + loadExpectedJson("explain_select_columns.json"), + explainSqlQuery("SELECT ts, status FROM parquet_logs")); + } + + @Test + public void testExplainSelectWithWhere() throws IOException { + assertJsonEqualsIgnoreId( + loadExpectedJson("explain_select_where.json"), + explainSqlQuery("SELECT ts, message FROM parquet_logs WHERE status = 200")); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java new file mode 100644 index 00000000000..22b716b57c0 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.sql; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.legacy.SQLIntegTestCase; + +/** + * Integration tests for SQL queries routed through the analytics engine path. Queries targeting + * "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses {@code + * AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}. + * + *

The stub executor returns rows in a fixed order [ts, status, message, ip_addr] regardless of + * the plan. The schema from OpenSearchSchemaBuilder is alphabetical [ip_addr, message, status, ts]. + * AnalyticsExecutionEngine maps values by position, so the data values appear mismatched. This is + * expected; the real analytics engine will evaluate the plan correctly. + */ +public class AnalyticsSQLIT extends SQLIntegTestCase { + + @Override + protected void init() throws Exception { + createParquetLogsIndex(); + } + + private void createParquetLogsIndex() throws IOException { + if (isIndexExist(client(), "parquet_logs")) { + return; + } + Request request = new Request("PUT", "/parquet_logs"); + request.setJsonEntity( + "{" + + "\"mappings\": {" + + " \"properties\": {" + + " \"ts\": {\"type\": \"date\"}," + + " \"status\": {\"type\": \"integer\"}," + + " \"message\": {\"type\": \"keyword\"}," + + " \"ip_addr\": {\"type\": \"keyword\"}" + + " }" + + "}" + + "}"); + client().performRequest(request); + } + + private JSONObject executeSqlQuery(String sql) throws IOException { + Request request = new Request("POST", "/_plugins/_sql"); + request.setJsonEntity("{\"query\": \"" + sql + "\"}"); + request.setOptions(RequestOptions.DEFAULT); + Response response = client().performRequest(request); + return new JSONObject(getResponseBody(response)); + } + + @Test + public void testSelectStarSchemaAndData() throws IOException { + JSONObject result = executeSqlQuery("SELECT * FROM parquet_logs"); + verifySchema( + result, + schema("ip_addr", "string"), + schema("message", "string"), + schema("status", "integer"), + schema("ts", "timestamp")); + // Stub returns [ts, status, message, ip_addr] per row, mapped by position to + // [ip_addr, message, status, ts] schema. Values appear mismatched — expected with stub. + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"), + rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"), + rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3")); + } + + @Test + public void testSelectSpecificColumns() throws IOException { + JSONObject result = executeSqlQuery("SELECT status, message FROM parquet_logs"); + verifySchema(result, schema("status", "integer"), schema("message", "string")); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 200), + rows("2024-01-15 10:31:00", 200), + rows("2024-01-15 10:32:00", 500)); + } + + @Test(expected = ResponseException.class) + public void testSyntaxError() throws IOException { + executeSqlQuery("SELEC * FROM parquet_logs"); + } +} diff --git a/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_columns.json b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_columns.json new file mode 100644 index 00000000000..ceea291405d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_columns.json @@ -0,0 +1,5 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], status=[$2])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_star.json b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_star.json new file mode 100644 index 00000000000..094ff52296d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_star.json @@ -0,0 +1,5 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ip_addr=[$0], message=[$1], status=[$2], ts=[$3])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_where.json b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_where.json new file mode 100644 index 00000000000..4835038241e --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_where.json @@ -0,0 +1,5 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], message=[$1])\n LogicalFilter(condition=[=($2, 200)])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n" + } +} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 9be2367dcaa..46ac1180d62 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.regex.Pattern; import org.apache.logging.log4j.LogManager; @@ -83,10 +84,25 @@ public class RestSqlAction extends BaseRestHandler { /** New SQL query request handler. */ private final RestSQLQueryAction newSqlQueryHandler; + /** + * Optional analytics router. If set, it's called before the normal SQL engine. Accepts the + * request and channel, returns {@code true} if it handled the request, {@code false} to fall + * through to normal SQL engine. + */ + private final BiFunction analyticsRouter; + public RestSqlAction(Settings settings, Injector injector) { + this(settings, injector, null); + } + + public RestSqlAction( + Settings settings, + Injector injector, + BiFunction analyticsRouter) { super(); this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.newSqlQueryHandler = new RestSQLQueryAction(injector); + this.analyticsRouter = analyticsRouter; } @Override @@ -134,7 +150,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Format format = SqlRequestParam.getFormat(request.params()); - // Route request to new query engine if it's supported already SQLQueryRequest newSqlRequest = new SQLQueryRequest( sqlRequest.getJsonContent(), @@ -142,6 +157,37 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli request.path(), request.params(), sqlRequest.cursor()); + + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices. + // The router returns true and sends the response directly if it handled the request. + if (analyticsRouter != null) { + final SQLQueryRequest finalRequest = newSqlRequest; + return channel -> { + if (!analyticsRouter.apply(finalRequest, channel)) { + // Not an analytics query — delegate to normal SQL engine + try { + newSqlQueryHandler + .prepareRequest( + finalRequest, + (ch, ex) -> { + try { + Format fmt = SqlRequestParam.getFormat(request.params()); + QueryAction qa = explainRequest(client, sqlRequest, fmt); + executeSqlRequest(request, qa, client, ch); + } catch (Exception e) { + handleException(ch, e); + } + }, + this::handleException) + .accept(channel); + } catch (Exception e) { + handleException(channel, e); + } + } + }; + } + + // Route request to new query engine if it's supported already return newSqlQueryHandler.prepareRequest( newSqlRequest, (restChannel, exception) -> { diff --git a/plugin/build.gradle b/plugin/build.gradle index af190dd7122..7174ca38835 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -348,6 +348,7 @@ def getJobSchedulerPlugin() { testClusters.integTest { plugin(getJobSchedulerPlugin()) + plugin provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) } plugin(project.tasks.bundlePlugin.archiveFile) testDistribution = "ARCHIVE" diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index edffd65f6bf..9603fbd7de6 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -36,8 +37,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; @@ -50,11 +53,14 @@ import org.opensearch.plugins.ScriptPlugin; import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; @@ -85,6 +91,8 @@ import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse; import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse; import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse; +import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; @@ -98,10 +106,14 @@ import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; +import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction; +import org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor; import org.opensearch.sql.plugin.transport.PPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; @@ -117,6 +129,7 @@ import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse; +import org.opensearch.sql.sql.domain.SQLQueryRequest; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -165,7 +178,7 @@ public List getRestHandlers( return Arrays.asList( new RestPPLQueryAction(), new RestPPLGrammarAction(), - new RestSqlAction(settings, injector), + new RestSqlAction(settings, injector, createSqlAnalyticsRouter()), new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), new RestQuerySettingsAction(settings, restController), @@ -175,6 +188,67 @@ public List getRestHandlers( new RestDirectQueryResourcesManagementAction((OpenSearchSettings) pluginSettings)); } + /** + * Creates a routing function for SQL queries targeting analytics engine indices. Returns {@code + * true} if the query was handled (analytics index), {@code false} to fall through to normal SQL. + */ + private BiFunction createSqlAnalyticsRouter() { + RestUnifiedQueryAction unifiedQueryHandler = + new RestUnifiedQueryAction(client, clusterService, new StubQueryPlanExecutor()); + return (sqlRequest, channel) -> { + if (!unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) { + return false; + } + if (sqlRequest.isExplainRequest()) { + unifiedQueryHandler.explainSql( + sqlRequest.getQuery(), + QueryType.SQL, + new ResponseListener<>() { + @Override + public void onResponse(ExplainResponse response) { + JsonResponseFormatter formatter = + new JsonResponseFormatter<>(Style.PRETTY) { + @Override + protected Object buildJsonObject(ExplainResponse resp) { + return resp; + } + }; + channel.sendResponse( + new BytesRestResponse( + RestStatus.OK, + "application/json; charset=UTF-8", + formatter.format(response))); + } + + @Override + public void onFailure(Exception e) { + channel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + } else { + unifiedQueryHandler.executeSql( + sqlRequest.getQuery(), + QueryType.SQL, + new ActionListener<>() { + @Override + public void onResponse(TransportPPLQueryResponse response) { + channel.sendResponse( + new BytesRestResponse( + RestStatus.OK, "application/json; charset=UTF-8", response.getResult())); + } + + @Override + public void onFailure(Exception e) { + channel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + } + return true; + }; + } + /** Register action and handler so that transportClient can find proxy for action. */ @Override public List> getActions() { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 6fc4b2bd058..8abf0b2dc6d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -12,6 +12,10 @@ import java.util.Map; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; @@ -23,6 +27,7 @@ import org.opensearch.sql.api.UnifiedQueryContext; import org.opensearch.sql.api.UnifiedQueryPlanner; import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.statement.ExplainMode; import org.opensearch.sql.ast.tree.Relation; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.calcite.CalcitePlanContext; @@ -182,7 +187,75 @@ private static String extractIndexName(String query, UnifiedQueryContext context if (parseResult instanceof UnresolvedPlan unresolvedPlan) { return unresolvedPlan.accept(new IndexNameExtractor(), null); } - // TODO: handle SQL SqlNode for table extraction when unified SQL is enabled + if (parseResult instanceof SqlNode sqlNode) { + return extractTableNameFromSqlNode(sqlNode); + } + return null; + } + + /** + * Execute a SQL query through the unified query pipeline. Uses {@link + * org.opensearch.sql.plugin.transport.TransportPPLQueryResponse} as the transport response type + * since both PPL and SQL share the same JSON response format. + */ + public void executeSql( + String query, QueryType queryType, ActionListener listener) { + client + .threadPool() + .schedule( + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, false)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.execute( + plan, planContext, createQueryListener(queryType, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + }), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + /** Explain a SQL query through the unified query pipeline. */ + public void explainSql( + String query, QueryType queryType, ResponseListener listener) { + client + .threadPool() + .schedule( + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, false)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.explain(plan, ExplainMode.STANDARD, planContext, listener); + } catch (Exception e) { + listener.onFailure(e); + } + }), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + /** Extracts the table name from a Calcite SqlNode parse tree. */ + private static String extractTableNameFromSqlNode(SqlNode sqlNode) { + if (sqlNode instanceof SqlSelect select) { + SqlNode from = select.getFrom(); + if (from instanceof SqlIdentifier id) { + return id.toString(); + } + if (from instanceof SqlJoin join) { + // For joins, extract from the left table + if (join.getLeft() instanceof SqlIdentifier leftId) { + return leftId.toString(); + } + } + } return null; } From 1f8ee7c68de51d9ab43cde45119cd4e62b710a7a Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 10:28:57 -0700 Subject: [PATCH 02/10] Use queryType to branch index extraction instead of instanceof Signed-off-by: Kai Huang --- .../sql/plugin/rest/RestUnifiedQueryAction.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 8abf0b2dc6d..7c17a466408 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -81,7 +81,7 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { return false; } try (UnifiedQueryContext context = buildParsingContext(queryType)) { - String indexName = extractIndexName(query, context); + String indexName = extractIndexName(query, queryType, context); if (indexName == null) { return false; } @@ -182,15 +182,14 @@ private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) * Extract the source index name by parsing the query and visiting the AST to find the Relation * node. Uses the context's parser which supports both PPL and SQL. */ - private static String extractIndexName(String query, UnifiedQueryContext context) { - Object parseResult = context.getParser().parse(query); - if (parseResult instanceof UnresolvedPlan unresolvedPlan) { + private static String extractIndexName( + String query, QueryType queryType, UnifiedQueryContext context) { + if (queryType == QueryType.PPL) { + UnresolvedPlan unresolvedPlan = (UnresolvedPlan) context.getParser().parse(query); return unresolvedPlan.accept(new IndexNameExtractor(), null); } - if (parseResult instanceof SqlNode sqlNode) { - return extractTableNameFromSqlNode(sqlNode); - } - return null; + SqlNode sqlNode = (SqlNode) context.getParser().parse(query); + return extractTableNameFromSqlNode(sqlNode); } /** From b4f250000516b6a649fb6a50883a600058257fee Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 10:33:19 -0700 Subject: [PATCH 03/10] Use Optional for extractIndexName return type Signed-off-by: Kai Huang --- .../plugin/rest/RestUnifiedQueryAction.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 7c17a466408..40fb95acea0 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -11,6 +11,7 @@ import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import java.util.Map; +import java.util.Optional; import org.apache.calcite.rel.RelNode; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlJoin; @@ -81,13 +82,14 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { return false; } try (UnifiedQueryContext context = buildParsingContext(queryType)) { - String indexName = extractIndexName(query, queryType, context); - if (indexName == null) { - return false; - } - int lastDot = indexName.lastIndexOf('.'); - String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; - return tableName.startsWith("parquet_"); + return extractIndexName(query, queryType, context) + .map( + indexName -> { + int lastDot = indexName.lastIndexOf('.'); + return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + }) + .map(tableName -> tableName.startsWith("parquet_")) + .orElse(false); } catch (Exception e) { return false; } @@ -182,14 +184,14 @@ private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) * Extract the source index name by parsing the query and visiting the AST to find the Relation * node. Uses the context's parser which supports both PPL and SQL. */ - private static String extractIndexName( + private static Optional extractIndexName( String query, QueryType queryType, UnifiedQueryContext context) { if (queryType == QueryType.PPL) { UnresolvedPlan unresolvedPlan = (UnresolvedPlan) context.getParser().parse(query); - return unresolvedPlan.accept(new IndexNameExtractor(), null); + return Optional.ofNullable(unresolvedPlan.accept(new IndexNameExtractor(), null)); } SqlNode sqlNode = (SqlNode) context.getParser().parse(query); - return extractTableNameFromSqlNode(sqlNode); + return Optional.ofNullable(extractTableNameFromSqlNode(sqlNode)); } /** From b95d48d2471909ffdac1f21ccf96920b86e7c02a Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 10:52:11 -0700 Subject: [PATCH 04/10] Unify execute and explain methods for both PPL and SQL paths Signed-off-by: Kai Huang --- .../org/opensearch/sql/plugin/SQLPlugin.java | 7 +- .../plugin/rest/RestUnifiedQueryAction.java | 120 ++++-------------- .../transport/TransportPPLQueryAction.java | 7 +- 3 files changed, 38 insertions(+), 96 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 9603fbd7de6..f5c8d500b59 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -60,6 +60,7 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.sql.ast.statement.ExplainMode; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; @@ -200,9 +201,10 @@ private BiFunction createSqlAnalyticsRout return false; } if (sqlRequest.isExplainRequest()) { - unifiedQueryHandler.explainSql( + unifiedQueryHandler.explain( sqlRequest.getQuery(), QueryType.SQL, + ExplainMode.STANDARD, new ResponseListener<>() { @Override public void onResponse(ExplainResponse response) { @@ -227,9 +229,10 @@ public void onFailure(Exception e) { } }); } else { - unifiedQueryHandler.executeSql( + unifiedQueryHandler.execute( sqlRequest.getQuery(), QueryType.SQL, + false, new ActionListener<>() { @Override public void onResponse(TransportPPLQueryResponse response) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 40fb95acea0..019f774000d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -39,7 +39,6 @@ import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; import org.opensearch.sql.lang.LangSpec; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; -import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.ResponseFormatter; import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; @@ -99,70 +98,56 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { public void execute( String query, QueryType queryType, - PPLQueryRequest pplRequest, + boolean profiling, ActionListener listener) { client .threadPool() .schedule( - withCurrentContext(() -> doExecute(query, queryType, pplRequest, listener)), + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, profiling)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.execute( + plan, planContext, createQueryListener(queryType, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + }), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } /** * Explain a query through the unified query pipeline on the sql-worker thread pool. Returns - * ExplainResponse via ResponseListener so the caller (TransportPPLQueryAction) can format it - * using its own createExplainResponseListener. + * ExplainResponse via ResponseListener so the caller can format it. */ public void explain( String query, QueryType queryType, - PPLQueryRequest pplRequest, + ExplainMode mode, ResponseListener listener) { client .threadPool() .schedule( - withCurrentContext(() -> doExplain(query, queryType, pplRequest, listener)), + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, false)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.explain(plan, mode, planContext, listener); + } catch (Exception e) { + listener.onFailure(e); + } + }), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } - private void doExecute( - String query, - QueryType queryType, - PPLQueryRequest pplRequest, - ActionListener listener) { - try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) { - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); - RelNode plan = planner.plan(query); - - CalcitePlanContext planContext = context.getPlanContext(); - plan = addQuerySizeLimit(plan, planContext); - - analyticsEngine.execute(plan, planContext, createQueryListener(queryType, listener)); - } catch (Exception e) { - listener.onFailure(e); - } - } - - private void doExplain( - String query, - QueryType queryType, - PPLQueryRequest pplRequest, - ResponseListener listener) { - try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) { - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); - RelNode plan = planner.plan(query); - - CalcitePlanContext planContext = context.getPlanContext(); - plan = addQuerySizeLimit(plan, planContext); - - analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } - /** * Build a lightweight context for parsing only (index name extraction). Does not require cluster * state or catalog schema. @@ -194,55 +179,6 @@ private static Optional extractIndexName( return Optional.ofNullable(extractTableNameFromSqlNode(sqlNode)); } - /** - * Execute a SQL query through the unified query pipeline. Uses {@link - * org.opensearch.sql.plugin.transport.TransportPPLQueryResponse} as the transport response type - * since both PPL and SQL share the same JSON response format. - */ - public void executeSql( - String query, QueryType queryType, ActionListener listener) { - client - .threadPool() - .schedule( - withCurrentContext( - () -> { - try (UnifiedQueryContext context = buildContext(queryType, false)) { - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); - RelNode plan = planner.plan(query); - CalcitePlanContext planContext = context.getPlanContext(); - plan = addQuerySizeLimit(plan, planContext); - analyticsEngine.execute( - plan, planContext, createQueryListener(queryType, listener)); - } catch (Exception e) { - listener.onFailure(e); - } - }), - new TimeValue(0), - SQL_WORKER_THREAD_POOL_NAME); - } - - /** Explain a SQL query through the unified query pipeline. */ - public void explainSql( - String query, QueryType queryType, ResponseListener listener) { - client - .threadPool() - .schedule( - withCurrentContext( - () -> { - try (UnifiedQueryContext context = buildContext(queryType, false)) { - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); - RelNode plan = planner.plan(query); - CalcitePlanContext planContext = context.getPlanContext(); - plan = addQuerySizeLimit(plan, planContext); - analyticsEngine.explain(plan, ExplainMode.STANDARD, planContext, listener); - } catch (Exception e) { - listener.onFailure(e); - } - }), - new TimeValue(0), - SQL_WORKER_THREAD_POOL_NAME); - } - /** Extracts the table name from a Calcite SqlNode parse tree. */ private static String extractTableNameFromSqlNode(SqlNode sqlNode) { if (sqlNode instanceof SqlSelect select) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index 18b9e89cbb2..2ae8b7949ed 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -136,11 +136,14 @@ protected void doExecute( unifiedQueryHandler.explain( transformedRequest.getRequest(), QueryType.PPL, - transformedRequest, + transformedRequest.mode(), createExplainResponseListener(transformedRequest, clearingListener)); } else { unifiedQueryHandler.execute( - transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener); + transformedRequest.getRequest(), + QueryType.PPL, + transformedRequest.profile(), + clearingListener); } return; } From f5d7d1584d3838585f262df38535350812ecc75a Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 10:55:25 -0700 Subject: [PATCH 05/10] Reuse base class explainQuery() in AnalyticsSQLExplainIT Signed-off-by: Kai Huang --- .../sql/sql/AnalyticsSQLExplainIT.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java index 9fb2bcbed96..427cb18bab9 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java @@ -5,7 +5,6 @@ package org.opensearch.sql.sql; -import static org.opensearch.sql.legacy.TestUtils.getResponseBody; import static org.opensearch.sql.legacy.TestUtils.isIndexExist; import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId; @@ -16,8 +15,6 @@ import java.nio.file.Paths; import org.junit.Test; import org.opensearch.client.Request; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.Response; import org.opensearch.sql.legacy.SQLIntegTestCase; /** @@ -50,14 +47,6 @@ protected void init() throws Exception { } } - private String explainSqlQuery(String sql) throws IOException { - Request request = new Request("POST", "/_plugins/_sql/_explain"); - request.setJsonEntity("{\"query\": \"" + sql + "\"}"); - request.setOptions(RequestOptions.DEFAULT); - Response response = client().performRequest(request); - return getResponseBody(response); - } - private static String loadExpectedJson(String fileName) { return loadFromFile("expectedOutput/analytics_sql/" + fileName); } @@ -74,21 +63,20 @@ private static String loadFromFile(String filename) { @Test public void testExplainSelectStar() throws IOException { assertJsonEqualsIgnoreId( - loadExpectedJson("explain_select_star.json"), - explainSqlQuery("SELECT * FROM parquet_logs")); + loadExpectedJson("explain_select_star.json"), explainQuery("SELECT * FROM parquet_logs")); } @Test public void testExplainSelectColumns() throws IOException { assertJsonEqualsIgnoreId( loadExpectedJson("explain_select_columns.json"), - explainSqlQuery("SELECT ts, status FROM parquet_logs")); + explainQuery("SELECT ts, status FROM parquet_logs")); } @Test public void testExplainSelectWithWhere() throws IOException { assertJsonEqualsIgnoreId( loadExpectedJson("explain_select_where.json"), - explainSqlQuery("SELECT ts, message FROM parquet_logs WHERE status = 200")); + explainQuery("SELECT ts, message FROM parquet_logs WHERE status = 200")); } } From d65305a94128130f71fc31202f8164eeacdd4322 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 11:46:08 -0700 Subject: [PATCH 06/10] Reuse base class executeQuery() in AnalyticsSQLIT Signed-off-by: Kai Huang --- .../org/opensearch/sql/sql/AnalyticsSQLIT.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java index 22b716b57c0..e50467644fb 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java @@ -5,7 +5,6 @@ package org.opensearch.sql.sql; -import static org.opensearch.sql.legacy.TestUtils.getResponseBody; import static org.opensearch.sql.legacy.TestUtils.isIndexExist; import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.schema; @@ -16,8 +15,6 @@ import org.json.JSONObject; import org.junit.Test; import org.opensearch.client.Request; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.Response; import org.opensearch.client.ResponseException; import org.opensearch.sql.legacy.SQLIntegTestCase; @@ -57,17 +54,9 @@ private void createParquetLogsIndex() throws IOException { client().performRequest(request); } - private JSONObject executeSqlQuery(String sql) throws IOException { - Request request = new Request("POST", "/_plugins/_sql"); - request.setJsonEntity("{\"query\": \"" + sql + "\"}"); - request.setOptions(RequestOptions.DEFAULT); - Response response = client().performRequest(request); - return new JSONObject(getResponseBody(response)); - } - @Test public void testSelectStarSchemaAndData() throws IOException { - JSONObject result = executeSqlQuery("SELECT * FROM parquet_logs"); + JSONObject result = executeQuery("SELECT * FROM parquet_logs"); verifySchema( result, schema("ip_addr", "string"), @@ -85,7 +74,7 @@ public void testSelectStarSchemaAndData() throws IOException { @Test public void testSelectSpecificColumns() throws IOException { - JSONObject result = executeSqlQuery("SELECT status, message FROM parquet_logs"); + JSONObject result = executeQuery("SELECT status, message FROM parquet_logs"); verifySchema(result, schema("status", "integer"), schema("message", "string")); verifyDataRows( result, @@ -96,6 +85,6 @@ public void testSelectSpecificColumns() throws IOException { @Test(expected = ResponseException.class) public void testSyntaxError() throws IOException { - executeSqlQuery("SELEC * FROM parquet_logs"); + executeQuery("SELEC * FROM parquet_logs"); } } From aacb41855c39eb717c2d8801c35ff92bbbb91cd4 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 11:59:14 -0700 Subject: [PATCH 07/10] Remove no-arg constructor and null check for analyticsRouter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit analyticsRouter is always provided — only one caller exists (SQLPlugin.getRestHandlers). Remove the backward-compatible no-arg constructor and the null check. Signed-off-by: Kai Huang --- .../sql/legacy/plugin/RestSqlAction.java | 81 ++++++------------- 1 file changed, 26 insertions(+), 55 deletions(-) diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 46ac1180d62..c0cca31be0d 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -61,7 +61,6 @@ import org.opensearch.sql.legacy.request.SqlRequestParam; import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException; import org.opensearch.sql.legacy.utils.JsonPrettyFormatter; -import org.opensearch.sql.legacy.utils.QueryDataAnonymizer; import org.opensearch.sql.sql.domain.SQLQueryRequest; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.node.NodeClient; @@ -85,16 +84,12 @@ public class RestSqlAction extends BaseRestHandler { private final RestSQLQueryAction newSqlQueryHandler; /** - * Optional analytics router. If set, it's called before the normal SQL engine. Accepts the - * request and channel, returns {@code true} if it handled the request, {@code false} to fall - * through to normal SQL engine. + * Analytics router. Called before the normal SQL engine. Accepts the request and channel, returns + * {@code true} if it handled the request (analytics index), {@code false} to fall through to + * normal SQL engine. */ private final BiFunction analyticsRouter; - public RestSqlAction(Settings settings, Injector injector) { - this(settings, injector, null); - } - public RestSqlAction( Settings settings, Injector injector, @@ -160,54 +155,30 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices. // The router returns true and sends the response directly if it handled the request. - if (analyticsRouter != null) { - final SQLQueryRequest finalRequest = newSqlRequest; - return channel -> { - if (!analyticsRouter.apply(finalRequest, channel)) { - // Not an analytics query — delegate to normal SQL engine - try { - newSqlQueryHandler - .prepareRequest( - finalRequest, - (ch, ex) -> { - try { - Format fmt = SqlRequestParam.getFormat(request.params()); - QueryAction qa = explainRequest(client, sqlRequest, fmt); - executeSqlRequest(request, qa, client, ch); - } catch (Exception e) { - handleException(ch, e); - } - }, - this::handleException) - .accept(channel); - } catch (Exception e) { - handleException(channel, e); - } + final SQLQueryRequest finalRequest = newSqlRequest; + return channel -> { + if (!analyticsRouter.apply(finalRequest, channel)) { + // Not an analytics query — delegate to normal SQL engine + try { + newSqlQueryHandler + .prepareRequest( + finalRequest, + (ch, ex) -> { + try { + Format fmt = SqlRequestParam.getFormat(request.params()); + QueryAction qa = explainRequest(client, sqlRequest, fmt); + executeSqlRequest(request, qa, client, ch); + } catch (Exception e) { + handleException(ch, e); + } + }, + this::handleException) + .accept(channel); + } catch (Exception e) { + handleException(channel, e); } - }; - } - - // Route request to new query engine if it's supported already - return newSqlQueryHandler.prepareRequest( - newSqlRequest, - (restChannel, exception) -> { - try { - if (newSqlRequest.isExplainRequest()) { - LOG.info( - "Request is falling back to old SQL engine due to: " + exception.getMessage()); - } - LOG.info( - "[{}] Request {} is not supported and falling back to old SQL engine", - QueryContext.getRequestId(), - newSqlRequest); - LOG.info("Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); - QueryAction queryAction = explainRequest(client, sqlRequest, format); - executeSqlRequest(request, queryAction, client, restChannel); - } catch (Exception e) { - handleException(restChannel, e); - } - }, - this::handleException); + } + }; } catch (Exception e) { return channel -> handleException(channel, e); } From 617cbed12c1cb5d4b20ee6f035fdd6c8277880ca Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 13:53:31 -0700 Subject: [PATCH 08/10] Extract V2 engine delegation into delegateToV2Engine method Restore original logging (query anonymization, explain fallback) that was lost when the fallback logic was inlined into the analytics router lambda. Signed-off-by: Kai Huang --- .../sql/legacy/plugin/RestSqlAction.java | 59 +++++++++++++------ 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index c0cca31be0d..216815b1e89 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -61,6 +61,7 @@ import org.opensearch.sql.legacy.request.SqlRequestParam; import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException; import org.opensearch.sql.legacy.utils.JsonPrettyFormatter; +import org.opensearch.sql.legacy.utils.QueryDataAnonymizer; import org.opensearch.sql.sql.domain.SQLQueryRequest; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.node.NodeClient; @@ -158,25 +159,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli final SQLQueryRequest finalRequest = newSqlRequest; return channel -> { if (!analyticsRouter.apply(finalRequest, channel)) { - // Not an analytics query — delegate to normal SQL engine - try { - newSqlQueryHandler - .prepareRequest( - finalRequest, - (ch, ex) -> { - try { - Format fmt = SqlRequestParam.getFormat(request.params()); - QueryAction qa = explainRequest(client, sqlRequest, fmt); - executeSqlRequest(request, qa, client, ch); - } catch (Exception e) { - handleException(ch, e); - } - }, - this::handleException) - .accept(channel); - } catch (Exception e) { - handleException(channel, e); - } + delegateToV2Engine(request, client, sqlRequest, finalRequest, format, channel); } }; } catch (Exception e) { @@ -184,6 +167,44 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } } + /** Delegate a SQL query to the V2 engine with legacy fallback. */ + private void delegateToV2Engine( + RestRequest request, + NodeClient client, + SqlRequest sqlRequest, + SQLQueryRequest sqlQueryRequest, + Format format, + RestChannel channel) { + try { + newSqlQueryHandler + .prepareRequest( + sqlQueryRequest, + (restChannel, exception) -> { + try { + if (sqlQueryRequest.isExplainRequest()) { + LOG.info( + "Request is falling back to old SQL engine due to: " + + exception.getMessage()); + } + LOG.info( + "[{}] Request {} is not supported and falling back to old SQL engine", + QueryContext.getRequestId(), + sqlQueryRequest); + LOG.info( + "Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); + QueryAction queryAction = explainRequest(client, sqlRequest, format); + executeSqlRequest(request, queryAction, client, restChannel); + } catch (Exception e) { + handleException(restChannel, e); + } + }, + this::handleException) + .accept(channel); + } catch (Exception e) { + handleException(channel, e); + } + } + private void handleException(RestChannel restChannel, Exception exception) { logAndPublishMetrics(exception); if (exception instanceof OpenSearchException) { From 543e32c84365f7c9a51ee6f65c930f0257aa1b4a Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 13:59:34 -0700 Subject: [PATCH 09/10] Refactor SQL table extraction to use SqlBasicVisitor pattern Replace instanceof checks with SqlTableNameExtractor visitor, consistent with how PPL uses AbstractNodeVisitor for index name extraction. Signed-off-by: Kai Huang --- .../plugin/rest/RestUnifiedQueryAction.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 019f774000d..e66a5446a9a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.Optional; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlNode; @@ -179,31 +180,38 @@ private static Optional extractIndexName( return Optional.ofNullable(extractTableNameFromSqlNode(sqlNode)); } - /** Extracts the table name from a Calcite SqlNode parse tree. */ - private static String extractTableNameFromSqlNode(SqlNode sqlNode) { - if (sqlNode instanceof SqlSelect select) { - SqlNode from = select.getFrom(); - if (from instanceof SqlIdentifier id) { - return id.toString(); + /** AST visitor that extracts the source index name from a Relation node (PPL path). */ + private static class IndexNameExtractor extends AbstractNodeVisitor { + @Override + public String visitRelation(Relation node, Void context) { + return node.getTableQualifiedName().toString(); + } + } + + /** SqlNode visitor that extracts the source table name from a SQL parse tree. */ + private static class SqlTableNameExtractor + extends org.apache.calcite.sql.util.SqlBasicVisitor { + @Override + public String visit(SqlCall call) { + if (call instanceof SqlSelect select) { + return select.getFrom().accept(this); } - if (from instanceof SqlJoin join) { - // For joins, extract from the left table - if (join.getLeft() instanceof SqlIdentifier leftId) { - return leftId.toString(); - } + if (call instanceof SqlJoin join) { + return join.getLeft().accept(this); } + return null; } - return null; - } - /** AST visitor that extracts the source index name from a Relation node. */ - private static class IndexNameExtractor extends AbstractNodeVisitor { @Override - public String visitRelation(Relation node, Void context) { - return node.getTableQualifiedName().toString(); + public String visit(SqlIdentifier id) { + return id.toString(); } } + private static String extractTableNameFromSqlNode(SqlNode sqlNode) { + return sqlNode.accept(new SqlTableNameExtractor()); + } + private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) { return LogicalSystemLimit.create( LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT, From 7125fb66965667f5b9400a6d390eba6b2b275cc2 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 7 Apr 2026 14:01:20 -0700 Subject: [PATCH 10/10] Move SqlBasicVisitor to import header Signed-off-by: Kai Huang --- .../opensearch/sql/plugin/rest/RestUnifiedQueryAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index e66a5446a9a..acd50ac8b1f 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -18,6 +18,7 @@ import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.util.SqlBasicVisitor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; @@ -189,8 +190,7 @@ public String visitRelation(Relation node, Void context) { } /** SqlNode visitor that extracts the source table name from a SQL parse tree. */ - private static class SqlTableNameExtractor - extends org.apache.calcite.sql.util.SqlBasicVisitor { + private static class SqlTableNameExtractor extends SqlBasicVisitor { @Override public String visit(SqlCall call) { if (call instanceof SqlSelect select) {