Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/138522.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138522
summary: Support CCQ for inference commands
area: ES|QL
type: enhancement
issues:
- 136860
6 changes: 6 additions & 0 deletions docs/changelog/138586.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138586
summary: Fix stats after `InferencePlan` (RERANK and COMPLETION)
area: ES|QL
type: bug
issues:
- 138582

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions x-pack/plugin/esql/qa/server/multi-clusters/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ dependencies {
javaRestTestImplementation project(xpackModule('esql:qa:testFixtures'))
javaRestTestImplementation project(xpackModule('esql:qa:server'))
javaRestTestImplementation project(xpackModule('esql'))

clusterPlugins project(':x-pack:plugin:inference:qa:test-service-plugin')
}

def supportedVersion = bwcVersion -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
if (supportRetryOnShardFailures(version) == false) {
cluster.setting("cluster.routing.rebalance.enable", "none");
}
if (localClusterSupportsInferenceTestService()) {
cluster.plugin("inference-service-test");
}
return cluster.build();
}

Expand All @@ -73,6 +76,22 @@ public static org.elasticsearch.Version bwcVersion() {
return local.before(remote) ? local : remote;
}

public static boolean localClusterSupportsInferenceTestService() {
return isNewToOld() && localClusterVersion().onOrAfter(org.elasticsearch.Version.fromString("9.3.0"));
}

/**
* Returns true if the current task is a "newToOld" BWC test.
* Checks the tests.task system property to determine the task type.
*/
public static boolean isNewToOld() {
String taskName = System.getProperty("tests.task");
if (taskName == null) {
return false;
}
return taskName.endsWith("#newToOld");
}

private static Version distributionVersion(String key) {
final String val = System.getProperty(key);
return val != null ? Version.fromString(val) : Version.CURRENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.COMPLETION;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINE_STATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINE_STATS_SUPPORTS_REMOTE;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.RERANK;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.SUBQUERY_IN_FROM_COMMAND;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.TEXT_EMBEDDING_FUNCTION;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.UNMAPPED_FIELDS;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.hasCapabilities;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -81,6 +84,12 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
private static RestClient remoteClusterClient;
private static DataLocation dataLocation = null;

private static final Set<String> LOCAL_ONLY_INFERENCE_CAPABILITIES = Set.of(
RERANK.capabilityName(),
COMPLETION.capabilityName(),
TEXT_EMBEDDING_FUNCTION.capabilityName()
);

@ParametersFactory(argumentFormatting = "csv-spec:%2$s.%3$s")
public static List<Object[]> readScriptSpec() throws Exception {
List<URL> urls = classpathResources("/*.csv-spec");
Expand Down Expand Up @@ -133,8 +142,16 @@ protected void shouldSkipTest(String testName) throws IOException {
.filter(c -> c.equals("metadata_fields_remote_test") == false)
.toList();
}

// Check all capabilities on the local cluster first.
super.shouldSkipTest(testName);
checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);

// Filter out capabilities that are required only on the local cluster and then check the remaining on the remote cluster.
List<String> remoteCapabilities = testCase.requiredCapabilities.stream()
.filter(c -> LOCAL_ONLY_INFERENCE_CAPABILITIES.contains(c) == false)
.toList();
checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, remoteCapabilities);

// Do not run tests including "METADATA _index" unless marked with metadata_fields_remote_test,
// because they may produce inconsistent results with multiple clusters.
assumeFalse("can't test with _index metadata", (remoteMetadata == false) && hasIndexMetadata(testCase.query));
Expand Down Expand Up @@ -230,10 +247,12 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
.collect(Collectors.toSet());

/**
* Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests.
* Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk, _query,
* and _inference requests :
* - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG,
* prevent the splitting of bulk requests.
* - '_query' requests are dispatched to the local cluster only, as we are testing cross-cluster queries.
* - '_inference' requests are dispatched to the local cluster only, as inference endpoints are not available on remote clusters.
*/
static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException {
RestClient twoClients = mock(RestClient.class);
Expand All @@ -245,6 +264,8 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
String endpoint = request.getEndpoint();
if (endpoint.startsWith("/_query")) {
return localClient.performRequest(request);
} else if (endpoint.startsWith("/_inference")) {
return localClient.performRequest(request);
} else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
return remoteClient.performRequest(request);
} else if (endpoint.endsWith("/_bulk")
Expand Down Expand Up @@ -409,6 +430,10 @@ protected boolean supportsInferenceTestService() {
return false;
}

protected boolean supportsInferenceTestServiceOnLocalCluster() {
return Clusters.localClusterSupportsInferenceTestService();
}

@Override
protected boolean supportsIndexModeLookup() throws IOException {
return hasCapabilities(adminClient(), List.of(JOIN_LOOKUP_V12.capabilityName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ protected boolean preserveClusterUponCompletion() {

private void setUpTextEmbeddingInferenceEndpoint() throws IOException {
setupEmbeddings = true;
Request request = new Request("PUT", "_inference/text_embedding/test");
Request request = new Request("PUT", "/_inference/text_embedding/test");
request.setJsonEntity("""
{
"service": "text_embedding_test_service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ public void setUpTextEmbeddingInferenceEndpoint() throws IOException {
return;
}
setupEmbeddings = true;
Request request = new Request("PUT", "_inference/text_embedding/test");
Request request = new Request("PUT", "/_inference/text_embedding/test");
request.setJsonEntity("""
{
"service": "text_embedding_test_service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public void setup() {
assumeTrue("test clusters were broken", testClustersOk);
INGEST.protectedBlock(() -> {
// Inference endpoints must be created before ingesting any datasets that rely on them (mapping of inference_id)
if (supportsInferenceTestService()) {
// If multiple clusters are used, only create endpoints on the local cluster if it supports the inference test service.
if (supportsInferenceTestServiceOnLocalCluster()) {
createInferenceEndpoints(adminClient());
}
loadDataSetIntoEs(
Expand Down Expand Up @@ -238,6 +239,9 @@ protected void shouldSkipTest(String testName) throws IOException {
if (requiresInferenceEndpoint()) {
assumeTrue("Inference test service needs to be supported", supportsInferenceTestService());
}
if (requiresInferenceEndpointOnLocalCluster()) {
assumeTrue("Inference test service needs to be supported", supportsInferenceTestServiceOnLocalCluster());
}
checkCapabilities(adminClient(), testFeatureService, testName, testCase);
assumeTrue("Test " + testName + " is not enabled", isEnabled(testName, instructions, Version.CURRENT));
if (supportsSourceFieldMapping() == false) {
Expand All @@ -251,13 +255,22 @@ protected static void checkCapabilities(
String testName,
CsvTestCase testCase
) {
if (hasCapabilities(client, testCase.requiredCapabilities)) {
checkCapabilities(client, testFeatureService, testName, testCase.requiredCapabilities);
}

protected static void checkCapabilities(
RestClient client,
TestFeatureService testFeatureService,
String testName,
List<String> requiredCapabilities
) {
if (hasCapabilities(client, requiredCapabilities)) {
return;
}

var features = new EsqlFeatures().getFeatures().stream().map(NodeFeature::id).collect(Collectors.toSet());

for (String feature : testCase.requiredCapabilities) {
for (String feature : requiredCapabilities) {
var esqlFeature = "esql." + feature;
assumeTrue("Requested capability " + feature + " is an ESQL cluster feature", features.contains(esqlFeature));
assumeTrue("Test " + testName + " requires " + feature, testFeatureService.clusterHasFeature(esqlFeature));
Expand All @@ -268,9 +281,16 @@ protected boolean supportsInferenceTestService() {
return true;
}

protected boolean supportsInferenceTestServiceOnLocalCluster() {
return supportsInferenceTestService();
}

protected boolean requiresInferenceEndpoint() {
return testCase.requiredCapabilities.contains(SEMANTIC_TEXT_FIELD_CAPS.capabilityName());
}

protected boolean requiresInferenceEndpointOnLocalCluster() {
return Stream.of(
SEMANTIC_TEXT_FIELD_CAPS.capabilityName(),
RERANK.capabilityName(),
COMPLETION.capabilityName(),
KNN_FUNCTION_V5.capabilityName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void setUpIndices() throws IOException {

@Before
public void setUpSparseEmbeddingInferenceEndpoint() throws IOException {
Request request = new Request("PUT", "_inference/sparse_embedding/test_sparse_inference");
Request request = new Request("PUT", "/_inference/sparse_embedding/test_sparse_inference");
request.setJsonEntity("""
{
"service": "test_service",
Expand All @@ -165,7 +165,7 @@ public void setUpSparseEmbeddingInferenceEndpoint() throws IOException {

@Before
public void setUpTextEmbeddingInferenceEndpoint() throws IOException {
Request request = new Request("PUT", "_inference/text_embedding/test_dense_inference");
Request request = new Request("PUT", "/_inference/text_embedding/test_dense_inference");
request.setJsonEntity("""
{
"service": "text_embedding_test_service",
Expand All @@ -191,7 +191,7 @@ public void wipeData() throws IOException {
adminClient().performRequest(new Request("DELETE", "*"));

try {
adminClient().performRequest(new Request("DELETE", "_inference/test_sparse_inference"));
adminClient().performRequest(new Request("DELETE", "/_inference/test_sparse_inference"));
} catch (ResponseException e) {
// 404 here means the endpoint was not created
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
Expand All @@ -200,7 +200,7 @@ public void wipeData() throws IOException {
}

try {
adminClient().performRequest(new Request("DELETE", "_inference/test_dense_inference"));
adminClient().performRequest(new Request("DELETE", "/_inference/test_dense_inference"));
} catch (ResponseException e) {
// 404 here means the endpoint was not created
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public class CsvTestsDataLoader {
private static final TestDataset LOGS = new TestDataset("logs");
private static final TestDataset MV_TEXT = new TestDataset("mv_text");
private static final TestDataset DENSE_VECTOR = new TestDataset("dense_vector");
private static final TestDataset DENSE_VECTOR_TEXT = new TestDataset("dense_vector_text");
private static final TestDataset COLORS = new TestDataset("colors");
private static final TestDataset COLORS_CMYK_LOOKUP = new TestDataset("colors_cmyk").withSetting("lookup-settings.json");
private static final TestDataset EXP_HISTO_SAMPLE = new TestDataset("exp_histo_sample");
Expand Down Expand Up @@ -241,6 +242,7 @@ public class CsvTestsDataLoader {
Map.entry(COLORS_CMYK_LOOKUP.indexName, COLORS_CMYK_LOOKUP),
Map.entry(MULTI_COLUMN_JOINABLE.indexName, MULTI_COLUMN_JOINABLE),
Map.entry(MULTI_COLUMN_JOINABLE_LOOKUP.indexName, MULTI_COLUMN_JOINABLE_LOOKUP),
Map.entry(DENSE_VECTOR_TEXT.indexName, DENSE_VECTOR_TEXT),
Map.entry(EXP_HISTO_SAMPLE.indexName, EXP_HISTO_SAMPLE)
);

Expand Down Expand Up @@ -583,13 +585,13 @@ public static boolean clusterHasCompletionInferenceEndpoint(RestClient client) t

private static void createInferenceEndpoint(RestClient client, TaskType taskType, String inferenceId, String modelSettings)
throws IOException {
Request request = new Request("PUT", "_inference/" + taskType.name() + "/" + inferenceId);
Request request = new Request("PUT", "/_inference/" + taskType.name() + "/" + inferenceId);
request.setJsonEntity(modelSettings);
client.performRequest(request);
}

private static boolean clusterHasInferenceEndpoint(RestClient client, TaskType taskType, String inferenceId) throws IOException {
Request request = new Request("GET", "_inference/" + taskType.name() + "/" + inferenceId);
Request request = new Request("GET", "/_inference/" + taskType.name() + "/" + inferenceId);
try {
client.performRequest(request);
} catch (ResponseException e) {
Expand All @@ -603,7 +605,7 @@ private static boolean clusterHasInferenceEndpoint(RestClient client, TaskType t

private static void deleteInferenceEndpoint(RestClient client, String inferenceId) throws IOException {
try {
client.performRequest(new Request("DELETE", "_inference/" + inferenceId));
client.performRequest(new Request("DELETE", "/_inference/" + inferenceId));
} catch (ResponseException e) {
// 404 here means the endpoint was not created
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,18 @@ title:text | completion:keyword
War and Peace | THIS IS A PROMPT: WAR AND PEACE
War and Peace (Signet Classics) | THIS IS A PROMPT: WAR AND PEACE (SIGNET CLASSICS)
;


completion followed by stats
required_capability: completion
required_capability: match_operator_colon

FROM books METADATA _score
| WHERE title:"war and peace" AND author:"Tolstoy"
| COMPLETION CONCAT("This is a prompt: ", title) WITH { "inference_id" : "test_completion" }
| STATS count=COUNT(*), avg_completion_length = AVG(LENGTH(completion))
;

count:long | avg_completion_length:double
4 | 50.75
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
_id:keyword,text_field:semantic_text,text_embedding_field:dense_vector
1,live long and prosper,[50.0, 57.0, 56.0]
2,all we have to decide is what to do with the time that is given to us, [45.0, 49.0, 57.0]
3,be excellent to each other,[45.0, 55.0, 54.0]
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ golden rod

testKnnWithSemanticText
required_capability: knn_function_v5
required_capability: semantic_text_field_caps

from semantic_text
| where knn(semantic_text_dense_field, [0, 1, 2])
Expand All @@ -353,6 +354,7 @@ live long and prosper

testKnnWithSemanticTextAndKeyword
required_capability: knn_function_v5
required_capability: semantic_text_field_caps

from semantic_text
| where knn(semantic_text_dense_field, [0, 1, 2])
Expand All @@ -369,6 +371,7 @@ be excellent to each other | host3

testKnnWithSemanticTextMultiValueField
required_capability: knn_function_v5
required_capability: semantic_text_field_caps

from semantic_text metadata _id
| where match(st_multi_value, "something") AND match(host, "host1")
Expand All @@ -381,6 +384,7 @@ _id: keyword | st_multi_value:text

testKnnWithSemanticTextWithEvalsAndOtherFunctionsAndStats
required_capability: knn_function_v5
required_capability: semantic_text_field_caps

from semantic_text
| where qstr("description:some*")
Expand All @@ -397,6 +401,7 @@ result:long
testKnnWithSemanticTextAndKql
required_capability: knn_function_v5
required_capability: kql_function
required_capability: semantic_text_field_caps

from semantic_text
| where kql("host:host1") AND knn(semantic_text_dense_field, [0, 1, 2])
Expand Down
Loading
Loading