diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index 07176586004..82f57af5b97 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -99,7 +99,9 @@ public void explain( ResponseListener listener) { try { String logical = RelOptUtil.toString(plan, mode.toExplainLevel()); - listener.onResponse(new ExplainResponse(new ExplainResponseNodeV2(logical, null, null))); + ExplainResponse response = + new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)); + listener.onResponse(ExplainResponse.normalizeLf(response)); } catch (Exception e) { listener.onFailure(e); } diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java new file mode 100644 index 00000000000..427cb18bab9 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLExplainIT.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.sql; + +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId; + +import com.google.common.io.Resources; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.sql.legacy.SQLIntegTestCase; + +/** + * Explain integration tests for SQL queries routed through the analytics engine path (Project + * Analytics engine). Validates that SQL queries targeting "parquet_*" indices produce correct + * logical plans via the _plugins/_sql/_explain endpoint. + * + *

Expected output files are in resources/expectedOutput/analytics_sql/. Each test compares the + * explain JSON output against its expected file. + */ +@SuppressWarnings("deprecation") // assertJsonEqualsIgnoreId is correct for JSON explain response +public class AnalyticsSQLExplainIT extends SQLIntegTestCase { + + @Override + protected void init() throws Exception { + if (!isIndexExist(client(), "parquet_logs")) { + Request request = new Request("PUT", "/parquet_logs"); + request.setJsonEntity( + "{" + + "\"mappings\": {" + + " \"properties\": {" + + " \"ts\": {\"type\": \"date\"}," + + " \"status\": {\"type\": \"integer\"}," + + " \"message\": {\"type\": \"keyword\"}," + + " \"ip_addr\": {\"type\": \"keyword\"}" + + " }" + + "}" + + "}"); + client().performRequest(request); + } + } + + private static String loadExpectedJson(String fileName) { + return loadFromFile("expectedOutput/analytics_sql/" + fileName); + } + + private static String loadFromFile(String filename) { + try { + URI uri = Resources.getResource(filename).toURI(); + return new String(Files.readAllBytes(Paths.get(uri))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testExplainSelectStar() throws IOException { + assertJsonEqualsIgnoreId( + loadExpectedJson("explain_select_star.json"), explainQuery("SELECT * FROM parquet_logs")); + } + + @Test + public void testExplainSelectColumns() throws IOException { + assertJsonEqualsIgnoreId( + loadExpectedJson("explain_select_columns.json"), + explainQuery("SELECT ts, status FROM parquet_logs")); + } + + @Test + public void testExplainSelectWithWhere() throws IOException { + assertJsonEqualsIgnoreId( + loadExpectedJson("explain_select_where.json"), + explainQuery("SELECT ts, message FROM parquet_logs WHERE status = 200")); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java new file mode 100644 index 00000000000..e50467644fb --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/sql/AnalyticsSQLIT.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.sql; + +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.legacy.SQLIntegTestCase; + +/** + * Integration tests for SQL queries routed through the analytics engine path. Queries targeting + * "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses {@code + * AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}. + * + *

The stub executor returns rows in a fixed order [ts, status, message, ip_addr] regardless of + * the plan. The schema from OpenSearchSchemaBuilder is alphabetical [ip_addr, message, status, ts]. + * AnalyticsExecutionEngine maps values by position, so the data values appear mismatched. This is + * expected; the real analytics engine will evaluate the plan correctly. + */ +public class AnalyticsSQLIT extends SQLIntegTestCase { + + @Override + protected void init() throws Exception { + createParquetLogsIndex(); + } + + private void createParquetLogsIndex() throws IOException { + if (isIndexExist(client(), "parquet_logs")) { + return; + } + Request request = new Request("PUT", "/parquet_logs"); + request.setJsonEntity( + "{" + + "\"mappings\": {" + + " \"properties\": {" + + " \"ts\": {\"type\": \"date\"}," + + " \"status\": {\"type\": \"integer\"}," + + " \"message\": {\"type\": \"keyword\"}," + + " \"ip_addr\": {\"type\": \"keyword\"}" + + " }" + + "}" + + "}"); + client().performRequest(request); + } + + @Test + public void testSelectStarSchemaAndData() throws IOException { + JSONObject result = executeQuery("SELECT * FROM parquet_logs"); + verifySchema( + result, + schema("ip_addr", "string"), + schema("message", "string"), + schema("status", "integer"), + schema("ts", "timestamp")); + // Stub returns [ts, status, message, ip_addr] per row, mapped by position to + // [ip_addr, message, status, ts] schema. Values appear mismatched — expected with stub. + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"), + rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"), + rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3")); + } + + @Test + public void testSelectSpecificColumns() throws IOException { + JSONObject result = executeQuery("SELECT status, message FROM parquet_logs"); + verifySchema(result, schema("status", "integer"), schema("message", "string")); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 200), + rows("2024-01-15 10:31:00", 200), + rows("2024-01-15 10:32:00", 500)); + } + + @Test(expected = ResponseException.class) + public void testSyntaxError() throws IOException { + executeQuery("SELEC * FROM parquet_logs"); + } +} diff --git a/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_columns.json b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_columns.json new file mode 100644 index 00000000000..ceea291405d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_columns.json @@ -0,0 +1,5 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], status=[$2])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_star.json b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_star.json new file mode 100644 index 00000000000..094ff52296d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_star.json @@ -0,0 +1,5 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ip_addr=[$0], message=[$1], status=[$2], ts=[$3])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_where.json b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_where.json new file mode 100644 index 00000000000..4835038241e --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/analytics_sql/explain_select_where.json @@ -0,0 +1,5 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], message=[$1])\n LogicalFilter(condition=[=($2, 200)])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n" + } +} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 9be2367dcaa..216815b1e89 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.regex.Pattern; import org.apache.logging.log4j.LogManager; @@ -83,10 +84,21 @@ public class RestSqlAction extends BaseRestHandler { /** New SQL query request handler. */ private final RestSQLQueryAction newSqlQueryHandler; - public RestSqlAction(Settings settings, Injector injector) { + /** + * Analytics router. Called before the normal SQL engine. Accepts the request and channel, returns + * {@code true} if it handled the request (analytics index), {@code false} to fall through to + * normal SQL engine. + */ + private final BiFunction analyticsRouter; + + public RestSqlAction( + Settings settings, + Injector injector, + BiFunction analyticsRouter) { super(); this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.newSqlQueryHandler = new RestSQLQueryAction(injector); + this.analyticsRouter = analyticsRouter; } @Override @@ -134,7 +146,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Format format = SqlRequestParam.getFormat(request.params()); - // Route request to new query engine if it's supported already SQLQueryRequest newSqlRequest = new SQLQueryRequest( sqlRequest.getJsonContent(), @@ -142,31 +153,58 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli request.path(), request.params(), sqlRequest.cursor()); - return newSqlQueryHandler.prepareRequest( - newSqlRequest, - (restChannel, exception) -> { - try { - if (newSqlRequest.isExplainRequest()) { - LOG.info( - "Request is falling back to old SQL engine due to: " + exception.getMessage()); - } - LOG.info( - "[{}] Request {} is not supported and falling back to old SQL engine", - QueryContext.getRequestId(), - newSqlRequest); - LOG.info("Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); - QueryAction queryAction = explainRequest(client, sqlRequest, format); - executeSqlRequest(request, queryAction, client, restChannel); - } catch (Exception e) { - handleException(restChannel, e); - } - }, - this::handleException); + + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices. + // The router returns true and sends the response directly if it handled the request. + final SQLQueryRequest finalRequest = newSqlRequest; + return channel -> { + if (!analyticsRouter.apply(finalRequest, channel)) { + delegateToV2Engine(request, client, sqlRequest, finalRequest, format, channel); + } + }; } catch (Exception e) { return channel -> handleException(channel, e); } } + /** Delegate a SQL query to the V2 engine with legacy fallback. */ + private void delegateToV2Engine( + RestRequest request, + NodeClient client, + SqlRequest sqlRequest, + SQLQueryRequest sqlQueryRequest, + Format format, + RestChannel channel) { + try { + newSqlQueryHandler + .prepareRequest( + sqlQueryRequest, + (restChannel, exception) -> { + try { + if (sqlQueryRequest.isExplainRequest()) { + LOG.info( + "Request is falling back to old SQL engine due to: " + + exception.getMessage()); + } + LOG.info( + "[{}] Request {} is not supported and falling back to old SQL engine", + QueryContext.getRequestId(), + sqlQueryRequest); + LOG.info( + "Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); + QueryAction queryAction = explainRequest(client, sqlRequest, format); + executeSqlRequest(request, queryAction, client, restChannel); + } catch (Exception e) { + handleException(restChannel, e); + } + }, + this::handleException) + .accept(channel); + } catch (Exception e) { + handleException(channel, e); + } + } + private void handleException(RestChannel restChannel, Exception exception) { logAndPublishMetrics(exception); if (exception instanceof OpenSearchException) { diff --git a/plugin/build.gradle b/plugin/build.gradle index af190dd7122..7174ca38835 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -348,6 +348,7 @@ def getJobSchedulerPlugin() { testClusters.integTest { plugin(getJobSchedulerPlugin()) + plugin provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) } plugin(project.tasks.bundlePlugin.archiveFile) testDistribution = "ARCHIVE" diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index edffd65f6bf..f5c8d500b59 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -36,8 +37,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; @@ -50,11 +53,15 @@ import org.opensearch.plugins.ScriptPlugin; import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; @@ -85,6 +92,8 @@ import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse; import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse; import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse; +import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; @@ -98,10 +107,14 @@ import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; +import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction; +import org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor; import org.opensearch.sql.plugin.transport.PPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; @@ -117,6 +130,7 @@ import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse; +import org.opensearch.sql.sql.domain.SQLQueryRequest; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -165,7 +179,7 @@ public List getRestHandlers( return Arrays.asList( new RestPPLQueryAction(), new RestPPLGrammarAction(), - new RestSqlAction(settings, injector), + new RestSqlAction(settings, injector, createSqlAnalyticsRouter()), new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), new RestQuerySettingsAction(settings, restController), @@ -175,6 +189,69 @@ public List getRestHandlers( new RestDirectQueryResourcesManagementAction((OpenSearchSettings) pluginSettings)); } + /** + * Creates a routing function for SQL queries targeting analytics engine indices. Returns {@code + * true} if the query was handled (analytics index), {@code false} to fall through to normal SQL. + */ + private BiFunction createSqlAnalyticsRouter() { + RestUnifiedQueryAction unifiedQueryHandler = + new RestUnifiedQueryAction(client, clusterService, new StubQueryPlanExecutor()); + return (sqlRequest, channel) -> { + if (!unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) { + return false; + } + if (sqlRequest.isExplainRequest()) { + unifiedQueryHandler.explain( + sqlRequest.getQuery(), + QueryType.SQL, + ExplainMode.STANDARD, + new ResponseListener<>() { + @Override + public void onResponse(ExplainResponse response) { + JsonResponseFormatter formatter = + new JsonResponseFormatter<>(Style.PRETTY) { + @Override + protected Object buildJsonObject(ExplainResponse resp) { + return resp; + } + }; + channel.sendResponse( + new BytesRestResponse( + RestStatus.OK, + "application/json; charset=UTF-8", + formatter.format(response))); + } + + @Override + public void onFailure(Exception e) { + channel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + } else { + unifiedQueryHandler.execute( + sqlRequest.getQuery(), + QueryType.SQL, + false, + new ActionListener<>() { + @Override + public void onResponse(TransportPPLQueryResponse response) { + channel.sendResponse( + new BytesRestResponse( + RestStatus.OK, "application/json; charset=UTF-8", response.getResult())); + } + + @Override + public void onFailure(Exception e) { + channel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + } + return true; + }; + } + /** Register action and handler so that transportClient can find proxy for action. */ @Override public List> getActions() { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 6fc4b2bd058..acd50ac8b1f 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -11,7 +11,14 @@ import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import java.util.Map; +import java.util.Optional; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.util.SqlBasicVisitor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; @@ -23,6 +30,7 @@ import org.opensearch.sql.api.UnifiedQueryContext; import org.opensearch.sql.api.UnifiedQueryPlanner; import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.statement.ExplainMode; import org.opensearch.sql.ast.tree.Relation; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.calcite.CalcitePlanContext; @@ -33,7 +41,6 @@ import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; import org.opensearch.sql.lang.LangSpec; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; -import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.ResponseFormatter; import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; @@ -76,13 +83,14 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { return false; } try (UnifiedQueryContext context = buildParsingContext(queryType)) { - String indexName = extractIndexName(query, context); - if (indexName == null) { - return false; - } - int lastDot = indexName.lastIndexOf('.'); - String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; - return tableName.startsWith("parquet_"); + return extractIndexName(query, queryType, context) + .map( + indexName -> { + int lastDot = indexName.lastIndexOf('.'); + return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + }) + .map(tableName -> tableName.startsWith("parquet_")) + .orElse(false); } catch (Exception e) { return false; } @@ -92,70 +100,56 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { public void execute( String query, QueryType queryType, - PPLQueryRequest pplRequest, + boolean profiling, ActionListener listener) { client .threadPool() .schedule( - withCurrentContext(() -> doExecute(query, queryType, pplRequest, listener)), + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, profiling)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.execute( + plan, planContext, createQueryListener(queryType, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + }), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } /** * Explain a query through the unified query pipeline on the sql-worker thread pool. Returns - * ExplainResponse via ResponseListener so the caller (TransportPPLQueryAction) can format it - * using its own createExplainResponseListener. + * ExplainResponse via ResponseListener so the caller can format it. */ public void explain( String query, QueryType queryType, - PPLQueryRequest pplRequest, + ExplainMode mode, ResponseListener listener) { client .threadPool() .schedule( - withCurrentContext(() -> doExplain(query, queryType, pplRequest, listener)), + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, false)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.explain(plan, mode, planContext, listener); + } catch (Exception e) { + listener.onFailure(e); + } + }), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } - private void doExecute( - String query, - QueryType queryType, - PPLQueryRequest pplRequest, - ActionListener listener) { - try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) { - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); - RelNode plan = planner.plan(query); - - CalcitePlanContext planContext = context.getPlanContext(); - plan = addQuerySizeLimit(plan, planContext); - - analyticsEngine.execute(plan, planContext, createQueryListener(queryType, listener)); - } catch (Exception e) { - listener.onFailure(e); - } - } - - private void doExplain( - String query, - QueryType queryType, - PPLQueryRequest pplRequest, - ResponseListener listener) { - try (UnifiedQueryContext context = buildContext(queryType, pplRequest.profile())) { - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); - RelNode plan = planner.plan(query); - - CalcitePlanContext planContext = context.getPlanContext(); - plan = addQuerySizeLimit(plan, planContext); - - analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } - /** * Build a lightweight context for parsing only (index name extraction). Does not require cluster * state or catalog schema. @@ -177,16 +171,17 @@ private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) * Extract the source index name by parsing the query and visiting the AST to find the Relation * node. Uses the context's parser which supports both PPL and SQL. */ - private static String extractIndexName(String query, UnifiedQueryContext context) { - Object parseResult = context.getParser().parse(query); - if (parseResult instanceof UnresolvedPlan unresolvedPlan) { - return unresolvedPlan.accept(new IndexNameExtractor(), null); + private static Optional extractIndexName( + String query, QueryType queryType, UnifiedQueryContext context) { + if (queryType == QueryType.PPL) { + UnresolvedPlan unresolvedPlan = (UnresolvedPlan) context.getParser().parse(query); + return Optional.ofNullable(unresolvedPlan.accept(new IndexNameExtractor(), null)); } - // TODO: handle SQL SqlNode for table extraction when unified SQL is enabled - return null; + SqlNode sqlNode = (SqlNode) context.getParser().parse(query); + return Optional.ofNullable(extractTableNameFromSqlNode(sqlNode)); } - /** AST visitor that extracts the source index name from a Relation node. */ + /** AST visitor that extracts the source index name from a Relation node (PPL path). */ private static class IndexNameExtractor extends AbstractNodeVisitor { @Override public String visitRelation(Relation node, Void context) { @@ -194,6 +189,29 @@ public String visitRelation(Relation node, Void context) { } } + /** SqlNode visitor that extracts the source table name from a SQL parse tree. */ + private static class SqlTableNameExtractor extends SqlBasicVisitor { + @Override + public String visit(SqlCall call) { + if (call instanceof SqlSelect select) { + return select.getFrom().accept(this); + } + if (call instanceof SqlJoin join) { + return join.getLeft().accept(this); + } + return null; + } + + @Override + public String visit(SqlIdentifier id) { + return id.toString(); + } + } + + private static String extractTableNameFromSqlNode(SqlNode sqlNode) { + return sqlNode.accept(new SqlTableNameExtractor()); + } + private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) { return LogicalSystemLimit.create( LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT, diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index 18b9e89cbb2..2ae8b7949ed 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -136,11 +136,14 @@ protected void doExecute( unifiedQueryHandler.explain( transformedRequest.getRequest(), QueryType.PPL, - transformedRequest, + transformedRequest.mode(), createExplainResponseListener(transformedRequest, clearingListener)); } else { unifiedQueryHandler.execute( - transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener); + transformedRequest.getRequest(), + QueryType.PPL, + transformedRequest.profile(), + clearingListener); } return; }