Skip to content

Commit 7749632

Browse files
authored
refactor embeddings, add key (#14)
* port openai_embeddings, refactor for key module * composer update * fix settings fetch * fix settings * get embeddings working with key * use key in openai.module * refactor embeddings * refactor embeddings
1 parent 71ef282 commit 7749632

File tree

6 files changed

+465
-405
lines changed

6 files changed

+465
-405
lines changed

modules/openai_embeddings/includes/EmbeddingQueueWorker.php

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,33 @@ public function processItem($data) {
1919
throw new Exception("Could not load entity with ID {$data['entity_id']}.");
2020
}
2121

22-
// Log the entity type.
23-
watchdog('openai_embeddings', 'Processing entity ID: @id, Type: @type', [
24-
'@id' => $entity->nid,
25-
'@type' => $entity->type,
26-
], WATCHDOG_DEBUG);
27-
2822
// Get configuration.
2923
$config = config_get('openai_embeddings.settings');
3024
$stopwords = array_map('trim', explode(',', $config['stopwords'] ?? ''));
3125
$model = $config['model'] ?? 'text-embedding-ada-002';
3226
$plugin_id = $config['vector_client_plugin'] ?? NULL;
33-
$allowed_bundles = $config['content_types'] ?? [];
27+
28+
// Retrieve the API key from the Key module.
29+
$openai_config = config('openai.settings');
30+
$apiKey = key_get_key_value($openai_config->get('api_key'));
31+
if (!$apiKey) {
32+
throw new Exception('OpenAI API key is not configured or could not be retrieved.');
33+
}
34+
35+
// Initialize OpenAIApi instance.
36+
$openai_api = new OpenAIApi($apiKey);
3437

3538
if (!$plugin_id) {
3639
throw new Exception('Vector client plugin ID is not configured.');
3740
}
3841

3942
// Skip entity if its bundle is not allowed.
43+
$allowed_bundles = $config['content_types'] ?? [];
4044
if (!in_array($entity->type, $allowed_bundles)) {
41-
/*watchdog('openai_embeddings', 'Skipping entity ID: @id because its bundle (@bundle) is not allowed.', [
45+
watchdog('openai_embeddings', 'Skipping entity ID: @id because its bundle (@bundle) is not allowed.', [
4246
'@id' => $entity->nid,
4347
'@bundle' => $entity->type,
44-
], WATCHDOG_INFO);*/
48+
], WATCHDOG_INFO);
4549
return;
4650
}
4751

@@ -59,20 +63,17 @@ public function processItem($data) {
5963

6064
// Check if the field type is supported.
6165
if (!in_array($field_type, $supported_field_types)) {
62-
/*watchdog('openai_embeddings', 'Skipping unsupported field: @field_name, Type: @field_type', [
66+
watchdog('openai_embeddings', 'Skipping unsupported field: @field_name, Type: @field_type', [
6367
'@field_name' => $field_name,
6468
'@field_type' => $field_type,
65-
], WATCHDOG_INFO);*/
69+
], WATCHDOG_INFO);
6670
continue;
6771
}
6872

6973
// Retrieve field values.
7074
$field_items = field_get_items('node', $entity, $field_name);
7175
if (empty($field_items)) {
72-
/*watchdog('openai_embeddings', 'Field @field_name has no items or is empty.', [
73-
'@field_name' => $field_name,
74-
], WATCHDOG_INFO);*/
75-
continue;
76+
continue; // Do not log empty fields to reduce noise.
7677
}
7778

7879
foreach ($field_items as $delta => $item) {
@@ -81,17 +82,20 @@ public function processItem($data) {
8182
}
8283

8384
// Prepare text and remove stopwords.
84-
$text = StringHelper::prepareText($item['value'], [], 8000);
85+
$text = openai_embeddings_prepare_text($item['value'], 8000);
8586
foreach ($stopwords as $word) {
8687
$text = $this->removeStopWord($word, $text);
8788
}
8889

89-
// Generate embedding via OpenAI.
90-
$response = openai_client_embed($text, $model);
91-
if (empty($response['data'][0]['embedding'])) {
92-
throw new Exception('Failed to generate embedding.');
90+
// Generate embedding using OpenAIApi and model.
91+
$embedding = $openai_api->embedding($text, $model);
92+
if (empty($embedding)) {
93+
watchdog('openai_embeddings', 'Failed to generate embedding for entity ID: @id, field: @field.', [
94+
'@id' => $entity->nid,
95+
'@field' => $field_name,
96+
], WATCHDOG_WARNING);
97+
continue;
9398
}
94-
$embedding = $response['data'][0]['embedding'];
9599

96100
// Dynamically determine the namespace based on entity type.
97101
$collection = $data['entity_type']; // Use the entity type directly.
@@ -127,21 +131,18 @@ public function processItem($data) {
127131
])
128132
->fields([
129133
'embedding' => json_encode(['data' => $embedding]),
130-
'data' => json_encode(['usage' => $response['usage']]),
134+
'data' => json_encode(['usage' => []]), // Update with actual usage data if needed.
131135
])
132136
->execute();
133137
}
134138
}
135-
}
136-
catch (Exception $e) {
139+
} catch (Exception $e) {
137140
watchdog('openai_embeddings', 'Error processing queue item: @message', [
138141
'@message' => $e->getMessage(),
139142
], WATCHDOG_ERROR);
140143
}
141144
}
142145

143-
144-
145146
/**
146147
* Generates a unique ID for the record in the vector database.
147148
*
Lines changed: 88 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,60 @@
11
<?php
22

3-
class MilvusVectorClient {
3+
use GuzzleHttp\Client as GuzzleClient;
44

5-
protected $httpClient;
6-
protected $config;
5+
/**
6+
* Milvus vector client class.
7+
*/
8+
class MilvusVectorClient extends VectorClientBase {
79

8-
/**
9-
* Constructor to set configuration.
10-
*
11-
* @param array $config
12-
* Configuration array.
13-
*/
14-
public function __construct(array $config) {
15-
$this->config = $config;
16-
}
10+
const API_VERSION = '/v1/vector';
1711

1812
/**
1913
* Get the Milvus client.
2014
*
2115
* @return \GuzzleHttp\Client
2216
* The HTTP client.
2317
*/
24-
public function getClient(): \GuzzleHttp\Client {
25-
if (!isset($this->httpClient)) {
26-
$options = [
18+
protected function getMilvusClient(): GuzzleClient {
19+
try {
20+
// Resolve the token and hostname from configuration.
21+
$token = $this->resolveConfigValue('milvus_token', 'Milvus token', TRUE);
22+
$hostname = $this->resolveConfigValue('milvus_hostname', 'Milvus hostname');
23+
24+
// Validate hostname format.
25+
if (!filter_var($hostname, FILTER_VALIDATE_URL)) {
26+
throw new \Exception("Invalid hostname format: $hostname");
27+
}
28+
29+
// Trim and format the hostname.
30+
$hostname = rtrim($hostname, '/');
31+
32+
// Log resolved values for debugging.
33+
watchdog('openai_embeddings', "Milvus hostname resolved: @hostname", ['@hostname' => $hostname], WATCHDOG_DEBUG);
34+
35+
// Return a configured HTTP client.
36+
return $this->getHttpClient([
2737
'headers' => [
38+
'Authorization' => "Bearer $token",
2839
'Content-Type' => 'application/json',
29-
'Authorization' => 'Bearer ' . $this->config['token'],
30-
'Accept' => 'application/json',
3140
],
32-
'base_uri' => $this->config['hostname'],
33-
];
34-
$this->httpClient = new \GuzzleHttp\Client($options);
41+
'base_uri' => $hostname . self::API_VERSION,
42+
]);
43+
} catch (\Exception $e) {
44+
$this->handleError('client initialization', $e);
3545
}
36-
return $this->httpClient;
3746
}
3847

3948
/**
40-
* Query the vector database.
41-
*
42-
* @param array $parameters
43-
* Parameters for the query.
44-
*
45-
* @return array
46-
* The response data.
49+
* {@inheritdoc}
4750
*/
4851
public function query(array $parameters): array {
49-
if (empty($parameters['collection'])) {
50-
throw new \Exception('Collection name is required by Milvus');
51-
}
52-
if (empty($parameters['vector'])) {
53-
throw new \Exception('Vector to query is required by Milvus');
54-
}
52+
$client = $this->getMilvusClient();
5553

5654
$payload = [
5755
'vector' => $parameters['vector'],
5856
'collectionName' => $parameters['collection'],
59-
'limit' => $parameters['top_k'] ?? 5,
57+
'limit' => $parameters['top_k'] ?? self::DEFAULT_TOP_K,
6058
];
6159

6260
if (!empty($parameters['outputFields'])) {
@@ -71,116 +69,92 @@ public function query(array $parameters): array {
7169
$payload['filter'] = implode(' AND ', $filters);
7270
}
7371

74-
$response = $this->getClient()->post('/v1/vector/search', ['json' => $payload]);
75-
return json_decode($response->getBody()->getContents(), TRUE);
76-
}
72+
$this->logPayload('query', $payload);
7773

78-
/**
79-
* Insert or update vectors in Milvus.
80-
*
81-
* @param array $parameters
82-
* Parameters for the upsert operation.
83-
*/
84-
public function upsert(array $parameters): void {
85-
if (empty($parameters['collection'])) {
86-
throw new \Exception('Collection name is required by Milvus');
87-
}
88-
if (empty($parameters['vectors'])) {
89-
throw new \Exception('Vectors are required by Milvus');
90-
}
74+
try {
75+
$response = $client->post('/search', ['json' => $payload]);
76+
$response_data = json_decode($response->getBody()->getContents(), TRUE);
9177

92-
$data = $parameters['vectors'];
93-
$payload = [
94-
'collectionName' => $parameters['collection'],
95-
'data' => $data,
96-
];
78+
$this->logResponse('query', $response_data);
9779

98-
$this->getClient()->post('/v1/vector/insert', ['json' => $payload]);
80+
return $response_data;
81+
} catch (\Exception $e) {
82+
$this->handleError('query', $e);
83+
}
9984
}
10085

10186
/**
102-
* Fetch records from Milvus.
103-
*
104-
* @param array $parameters
105-
* Parameters for fetching records.
106-
*
107-
* @return array
108-
* The fetched records.
87+
* {@inheritdoc}
10988
*/
110-
public function fetch(array $parameters): array {
111-
if (empty($parameters['collection'])) {
112-
throw new \Exception('Collection name is required by Milvus');
113-
}
114-
if (empty($parameters['source_ids'])) {
115-
throw new \Exception('Source IDs to fetch are required by Milvus');
89+
public function upsert(array $parameters): void {
90+
$client = $this->getMilvusClient();
91+
92+
if (empty($parameters['collection']) || empty($parameters['vectors'])) {
93+
throw new \Exception('Both collection name and vectors are required for upsert.');
11694
}
11795

11896
$payload = [
11997
'collectionName' => $parameters['collection'],
120-
'filter' => 'source_id in ["' . implode('", "', $parameters['source_ids']) . '"]',
98+
'data' => $parameters['vectors'],
12199
];
122100

123-
$response = $this->getClient()->post('/v1/vector/get', ['json' => $payload]);
124-
return json_decode($response->getBody()->getContents(), TRUE);
101+
$this->logPayload('upsert', $payload);
102+
103+
try {
104+
$client->post('/insert', ['json' => $payload]);
105+
} catch (\Exception $e) {
106+
$this->handleError('upsert', $e);
107+
}
125108
}
126109

127110
/**
128-
* Delete records in Milvus.
129-
*
130-
* @param array $parameters
131-
* Parameters for deleting records.
111+
* {@inheritdoc}
132112
*/
133113
public function delete(array $parameters): void {
134-
if (empty($parameters['collection'])) {
135-
throw new \Exception('Collection name is required by Milvus');
136-
}
137-
if (empty($parameters['source_ids'])) {
138-
throw new \Exception('Source IDs to delete are required by Milvus');
114+
$client = $this->getMilvusClient();
115+
116+
if (empty($parameters['collection']) || empty($parameters['source_ids'])) {
117+
throw new \Exception('Both collection name and source IDs are required for deletion.');
139118
}
140119

141120
$payload = [
142121
'collectionName' => $parameters['collection'],
143122
'filter' => 'source_id in ["' . implode('", "', $parameters['source_ids']) . '"]',
144123
];
145124

146-
$this->getClient()->post('/v1/vector/delete', ['json' => $payload]);
125+
$this->logPayload('delete', $payload);
126+
127+
try {
128+
$client->post('/delete', ['json' => $payload]);
129+
} catch (\Exception $e) {
130+
$this->handleError('delete', $e);
131+
}
147132
}
148133

149134
/**
150-
* Fetch statistics for Milvus.
151-
*
152-
* @return array
153-
* The stats array.
135+
* {@inheritdoc}
154136
*/
155137
public function stats(): array {
156-
$response = $this->getClient()->get('/v1/vector/collections');
157-
return json_decode($response->getBody()->getContents(), TRUE);
158-
}
138+
$client = $this->getMilvusClient();
159139

160-
/**
161-
* Build stats table.
162-
*
163-
* @return array
164-
* Render array for the stats table.
165-
*/
166-
public function buildStatsTable(): array {
167-
$collections = $this->stats();
168-
$rows = [];
169-
170-
foreach ($collections as $collection) {
171-
$rows[] = [
172-
'Collection' => $collection['collectionName'],
173-
'Vectors' => $collection['shardsNum'] ?? 'N/A',
174-
];
175-
}
140+
try {
141+
$response = $client->get('/collections');
142+
$stats = json_decode($response->getBody()->getContents(), TRUE);
176143

177-
return [
178-
'#type' => 'table',
179-
'#header' => [
180-
t('Collection Name'),
181-
t('Vector Count'),
182-
],
183-
'#rows' => $rows,
184-
];
144+
$this->logResponse('stats', $stats);
145+
146+
$rows = [];
147+
foreach ($stats as $collection) {
148+
$rows[] = [
149+
'Collection' => $collection['collectionName'],
150+
'Vectors' => $collection['shardsNum'] ?? 'N/A',
151+
];
152+
}
153+
154+
return $rows;
155+
} catch (\Exception $e) {
156+
$this->handleError('stats', $e);
157+
return [];
158+
}
185159
}
186160
}

0 commit comments

Comments
 (0)