diff --git a/examples/bulk_import/example_bulkwriter_stage.py b/examples/bulk_import/example_bulkwriter_volume.py similarity index 90% rename from examples/bulk_import/example_bulkwriter_stage.py rename to examples/bulk_import/example_bulkwriter_volume.py index e89072538..f63c64e8d 100644 --- a/examples/bulk_import/example_bulkwriter_stage.py +++ b/examples/bulk_import/example_bulkwriter_volume.py @@ -16,7 +16,7 @@ import numpy as np from examples.bulk_import.data_gengerator import * -from pymilvus.bulk_writer.stage_bulk_writer import StageBulkWriter +from pymilvus.bulk_writer.volume_bulk_writer import VolumeBulkWriter from pymilvus.orm import utility logging.basicConfig(level=logging.INFO) @@ -46,7 +46,7 @@ API_KEY = "_api_key_for_cluster_org_" # This is currently a private preview feature. If you need to use it, please submit a request and contact us. -STAGE_NAME = "_stage_name_for_project_" +VOLUME_NAME = "_volume_name_for_project_" CLUSTER_ID = "_your_cloud_cluster_id_" DB_NAME = "" # If db_name is not specified, use "" @@ -93,12 +93,12 @@ def build_all_type_schema(): return schema -def example_collection_remote_stage(file_type: BulkFileType): +def example_collection_remote_volume(file_type: BulkFileType): schema = build_all_type_schema() print(f"\n===================== all field types ({file_type.name}) ====================") create_collection(schema, False) - stage_upload_result = stage_remote_writer(file_type, schema) - call_stage_import(stage_upload_result['stage_name'], stage_upload_result['path']) + volume_upload_result = volume_remote_writer(file_type, schema) + call_volume_import(volume_upload_result['volume_name'], volume_upload_result['path']) retrieve_imported_data() @@ -111,16 +111,16 @@ def create_collection(schema: CollectionSchema, drop_if_exist: bool): print(f"Collection '{collection.name}' created") -def stage_remote_writer(file_type, schema): - with StageBulkWriter( +def volume_remote_writer(file_type, schema): + with VolumeBulkWriter( schema=schema, remote_path="bulk_data", file_type=file_type, chunk_size=512 * 1024 * 1024, cloud_endpoint=CLOUD_ENDPOINT, api_key=API_KEY, - stage_name=STAGE_NAME, - ) as stage_bulk_writer: + volume_name=VOLUME_NAME, + ) as volume_bulk_writer: print("Append rows") batch_count = 10000 for i in range(batch_count): @@ -146,12 +146,12 @@ def stage_remote_writer(file_type, schema): "array_int": [k for k in range(10)], "sparse_vector": gen_sparse_vector(False), } - stage_bulk_writer.append_row(row) + volume_bulk_writer.append_row(row) # append rows by numpy type for i in range(batch_count): id = i + batch_count - stage_bulk_writer.append_row({ + volume_bulk_writer.append_row({ "id": np.int64(id), "bool": True if i % 3 == 0 else False, "int8": np.int8(id % 128), @@ -174,12 +174,12 @@ def stage_remote_writer(file_type, schema): "sparse_vector": gen_sparse_vector(True), }) - print(f"{stage_bulk_writer.total_row_count} rows appends") - print(f"{stage_bulk_writer.buffer_row_count} rows in buffer not flushed") + print(f"{volume_bulk_writer.total_row_count} rows appends") + print(f"{volume_bulk_writer.buffer_row_count} rows in buffer not flushed") print("Generate data files...") - stage_bulk_writer.commit() - print(f"Data files have been uploaded: {stage_bulk_writer.batch_files}") - return stage_bulk_writer.get_stage_upload_result() + volume_bulk_writer.commit() + print(f"Data files have been uploaded: {volume_bulk_writer.batch_files}") + return volume_bulk_writer.get_volume_upload_result() def retrieve_imported_data(): @@ -217,7 +217,7 @@ def retrieve_imported_data(): print(item) -def call_stage_import(stage_name: str, path: str): +def call_volume_import(volume_name: str, path: str): print(f"\n===================== import files to cluster ====================") resp = bulk_import( url=CLOUD_ENDPOINT, @@ -225,7 +225,7 @@ def call_stage_import(stage_name: str, path: str): cluster_id=CLUSTER_ID, db_name=DB_NAME, collection_name=COLLECTION_NAME, - stage_name=stage_name, + volume_name=volume_name, data_paths=[[path]] ) print(resp.json()) @@ -270,4 +270,4 @@ def call_stage_import(stage_name: str, path: str): if __name__ == '__main__': create_connection() - example_collection_remote_stage(file_type=BulkFileType.PARQUET) + example_collection_remote_volume(file_type=BulkFileType.PARQUET) diff --git a/examples/bulk_import/example_stage_file_manager.py b/examples/bulk_import/example_stage_file_manager.py deleted file mode 100644 index f8f0163ec..000000000 --- a/examples/bulk_import/example_stage_file_manager.py +++ /dev/null @@ -1,12 +0,0 @@ -from pymilvus.bulk_writer.constants import ConnectType -from pymilvus.bulk_writer.stage_file_manager import StageFileManager - -if __name__ == "__main__": - stage_file_manager = StageFileManager( - cloud_endpoint='https://api.cloud.zilliz.com', - api_key='_api_key_for_cluster_org_', - stage_name='_stage_name_for_project_', - connect_type=ConnectType.AUTO, - ) - result = stage_file_manager.upload_file_to_stage("/Users/zilliz/data/", "data/") - print(f"\nuploadFileToStage results: {result}") diff --git a/examples/bulk_import/example_stage_manager.py b/examples/bulk_import/example_stage_manager.py deleted file mode 100644 index da51c221d..000000000 --- a/examples/bulk_import/example_stage_manager.py +++ /dev/null @@ -1,20 +0,0 @@ -from pymilvus.bulk_writer.stage_manager import StageManager - -PROJECT_ID = "_id_for_project_" -REGION_ID = "_id_for_region_" -STAGE_NAME = "_stage_name_for_project_" - -if __name__ == "__main__": - stage_manager = StageManager( - cloud_endpoint="https://api.cloud.zilliz.com", - api_key="_api_key_for_cluster_org_", - ) - - stage_manager.create_stage(PROJECT_ID, REGION_ID, STAGE_NAME) - print(f"\nStage {STAGE_NAME} created") - - stage_list = stage_manager.list_stages(PROJECT_ID, 1, 10) - print(f"\nlistStages results: ", stage_list.json()['data']) - - stage_manager.delete_stage(STAGE_NAME) - print(f"\nStage {STAGE_NAME} deleted") diff --git a/examples/bulk_import/example_volume_file_manager.py b/examples/bulk_import/example_volume_file_manager.py new file mode 100644 index 000000000..ebc157de8 --- /dev/null +++ b/examples/bulk_import/example_volume_file_manager.py @@ -0,0 +1,12 @@ +from pymilvus.bulk_writer.constants import ConnectType +from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager + +if __name__ == "__main__": + volume_file_manager = VolumeFileManager( + cloud_endpoint='https://api.cloud.zilliz.com', + api_key='_api_key_for_cluster_org_', + volume_name='_volume_name_for_project_', + connect_type=ConnectType.AUTO, + ) + result = volume_file_manager.upload_file_to_volume("/Users/zilliz/data/", "data/") + print(f"\nuploadFileToVolume results: {result}") diff --git a/examples/bulk_import/example_volume_manager.py b/examples/bulk_import/example_volume_manager.py new file mode 100644 index 000000000..e36255cb7 --- /dev/null +++ b/examples/bulk_import/example_volume_manager.py @@ -0,0 +1,20 @@ +from pymilvus.bulk_writer.volume_manager import VolumeManager + +PROJECT_ID = "_id_for_project_" +REGION_ID = "_id_for_region_" +VOLUME_NAME = "_volume_name_for_project_" + +if __name__ == "__main__": + volume_manager = VolumeManager( + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="_api_key_for_cluster_org_", + ) + + volume_manager.create_volume(PROJECT_ID, REGION_ID, VOLUME_NAME) + print(f"\nVolume {VOLUME_NAME} created") + + volume_list = volume_manager.list_volumes(PROJECT_ID, 1, 10) + print(f"\nlistVolumes results: ", volume_list.json()['data']) + + volume_manager.delete_volume(VOLUME_NAME) + print(f"\nVolume {VOLUME_NAME} deleted") diff --git a/pymilvus/bulk_writer/bulk_import.py b/pymilvus/bulk_writer/bulk_import.py index d917dae8a..15c93dab7 100644 --- a/pymilvus/bulk_writer/bulk_import.py +++ b/pymilvus/bulk_writer/bulk_import.py @@ -114,7 +114,7 @@ def bulk_import( access_key: str = "", secret_key: str = "", token: str = "", - stage_name: str = "", + volume_name: str = "", data_paths: [List[List[str]]] = None, verify: Optional[Union[bool, str]] = True, cert: Optional[Union[str, tuple]] = None, @@ -139,7 +139,7 @@ def bulk_import( secret_key (str): secret key to access the object storage(cloud) token (str): access token to access the object storage(cloud) - stage_name (str): name of the stage to import(cloud) + volume_name (str): name of the volume to import(cloud) data_paths (list of list of str): The paths of files that contain the data to import(cloud) verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate or a string, which must be server's certificate path. Defaults to `True`. @@ -181,7 +181,7 @@ def bulk_import( ... token="your-token" # for short-term credentials, also include `token` ... ) - >>> # 3. Import multiple files or folders from a Zilliz Stage into a Zilliz Cloud instance + >>> # 3. Import multiple files or folders from a Zilliz Volume into a Zilliz Cloud instance >>> bulk_import( ... url="https://api.cloud.zilliz.com", # If regions in China, it is: https://api.cloud.zilliz.com.cn ... api_key="YOUR_API_KEY", @@ -189,7 +189,7 @@ def bulk_import( ... db_name="", # Only For Dedicated deployments: this parameter can be specified. ... collection_name="my_collection", ... partition_name="", # If Collection not enable partitionKey, can be specified. - ... stage_name="my_stage", + ... volume_name="my_volume", ... data_paths=[ ... ["parquet-folder/1.parquet"], ... ["parquet-folder-2/"] @@ -210,7 +210,7 @@ def bulk_import( "accessKey": access_key, "secretKey": secret_key, "token": token, - "stageName": stage_name, + "volumeName": volume_name, "dataPaths": data_paths, } diff --git a/pymilvus/bulk_writer/stage_manager.py b/pymilvus/bulk_writer/stage_manager.py deleted file mode 100644 index bd02a766d..000000000 --- a/pymilvus/bulk_writer/stage_manager.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging - -from pymilvus.bulk_writer.stage_restful import create_stage, delete_stage, list_stages - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class StageManager: - def __init__(self, cloud_endpoint: str, api_key: str): - """ - private preview feature. Please submit a request and contact us if you need it. - - Args: - cloud_endpoint (str): The fixed cloud endpoint URL. - - For international regions: https://api.cloud.zilliz.com - - For regions in China: https://api.cloud.zilliz.com.cn - api_key (str): The API key associated with your organization or cluster. - """ - self.cloud_endpoint = cloud_endpoint - self.api_key = api_key - - def create_stage(self, project_id: str, region_id: str, stage_name: str): - """ - Create a stage under the specified project and regionId. - """ - create_stage(self.cloud_endpoint, self.api_key, project_id, region_id, stage_name) - - def delete_stage(self, stage_name: str): - """ - Delete a stage. - """ - delete_stage(self.cloud_endpoint, self.api_key, stage_name) - - def list_stages(self, project_id: str, current_page: int = 1, page_size: int = 10): - """ - Paginated query of the stage list under a specified projectId. - """ - return list_stages(self.cloud_endpoint, self.api_key, project_id, current_page, page_size) diff --git a/pymilvus/bulk_writer/stage_bulk_writer.py b/pymilvus/bulk_writer/volume_bulk_writer.py similarity index 82% rename from pymilvus/bulk_writer/stage_bulk_writer.py rename to pymilvus/bulk_writer/volume_bulk_writer.py index bfbde6c25..462cb9562 100644 --- a/pymilvus/bulk_writer/stage_bulk_writer.py +++ b/pymilvus/bulk_writer/volume_bulk_writer.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional -from pymilvus.bulk_writer.stage_file_manager import StageFileManager +from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager from pymilvus.orm.schema import CollectionSchema from .constants import MB, BulkFileType, ConnectType @@ -12,8 +12,8 @@ logger = logging.getLogger(__name__) -class StageBulkWriter(LocalBulkWriter): - """StageBulkWriter handles writing local bulk files to a remote stage.""" +class VolumeBulkWriter(LocalBulkWriter): + """VolumeBulkWriter handles writing local bulk files to a remote volume.""" def __init__( self, @@ -21,7 +21,7 @@ def __init__( remote_path: str, cloud_endpoint: str, api_key: str, - stage_name: str, + volume_name: str, chunk_size: int = 1024 * MB, file_type: BulkFileType = BulkFileType.PARQUET, config: Optional[dict] = None, @@ -33,11 +33,11 @@ def __init__( remote_dir_path = Path(remote_path) / super().uuid self._remote_path = str(remote_dir_path) + "/" self._remote_files: List[List[str]] = [] - self._stage_name = stage_name - self._stage_file_manager = StageFileManager( + self._volume_name = volume_name + self._volume_file_manager = VolumeFileManager( cloud_endpoint=cloud_endpoint, api_key=api_key, - stage_name=stage_name, + volume_name=volume_name, connect_type=ConnectType.AUTO, ) @@ -50,7 +50,7 @@ def append_row(self, row: Dict[str, Any], **kwargs): super().append_row(row, **kwargs) def commit(self, **kwargs): - """Commit local bulk files and upload to remote stage.""" + """Commit local bulk files and upload to remote volume.""" super().commit(call_back=self._upload) @property @@ -61,8 +61,8 @@ def data_path(self) -> str: def batch_files(self) -> List[List[str]]: return self._remote_files - def get_stage_upload_result(self) -> Dict[str, str]: - return {"stage_name": self._stage_name, "path": str(self._remote_path)} + def get_volume_upload_result(self) -> Dict[str, str]: + return {"volume_name": self._volume_name, "path": str(self._remote_path)} def __exit__(self, exc_type: object, exc_val: object, exc_tb: object): super().__exit__(exc_type, exc_val, exc_tb) @@ -84,7 +84,7 @@ def _local_rm(self, file_path: str): logger.warning(f"Failed to delete local file: {file_path}") def _upload(self, file_list: List[str]) -> List[str]: - """Upload files to remote stage and remove local copies.""" + """Upload files to remote volume and remove local copies.""" uploaded_files: List[str] = [] for file_path in file_list: @@ -105,5 +105,5 @@ def _upload(self, file_list: List[str]) -> List[str]: def _upload_object(self, file_path: str, object_name: str): logger.info(f"Prepare to upload '{file_path}' to '{object_name}'") - self._stage_file_manager.upload_file_to_stage(file_path, self._remote_path) + self._volume_file_manager.upload_file_to_volume(file_path, self._remote_path) logger.info(f"Uploaded file '{file_path}' to '{object_name}'") diff --git a/pymilvus/bulk_writer/stage_file_manager.py b/pymilvus/bulk_writer/volume_file_manager.py similarity index 70% rename from pymilvus/bulk_writer/stage_file_manager.py rename to pymilvus/bulk_writer/volume_file_manager.py index f5b474e32..8320c1152 100644 --- a/pymilvus/bulk_writer/stage_file_manager.py +++ b/pymilvus/bulk_writer/volume_file_manager.py @@ -15,7 +15,7 @@ from pymilvus.bulk_writer.constants import ConnectType from pymilvus.bulk_writer.endpoint_resolver import EndpointResolver from pymilvus.bulk_writer.file_utils import FileUtils -from pymilvus.bulk_writer.stage_restful import apply_stage +from pymilvus.bulk_writer.volume_restful import apply_volume logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -56,23 +56,21 @@ def _url_open( ) -class StageFileManager: +class VolumeFileManager: def __init__( self, cloud_endpoint: str, api_key: str, - stage_name: str, + volume_name: str, connect_type: ConnectType = ConnectType.AUTO, ): """ - private preview feature. Please submit a request and contact us if you need it. - Args: cloud_endpoint (str): The fixed cloud endpoint URL. - For international regions: https://api.cloud.zilliz.com - For regions in China: https://api.cloud.zilliz.com.cn api_key (str): The API key associated with your organization - stage_name (str): The name of the Stage. + volume_name (str): The name of the Volume. connect_type: Current value is mainly for Aliyun OSS buckets, default is Auto. - Default case, if the OSS bucket is reachable via the internal endpoint, the internal endpoint will be used @@ -81,11 +79,11 @@ def __init__( """ self.cloud_endpoint = cloud_endpoint self.api_key = api_key - self.stage_name = stage_name + self.volume_name = volume_name self.connect_type = connect_type self.local_file_paths = [] self.total_bytes = 0 - self.stage_info = {} + self.volume_info = {} self._client = None def _convert_dir_path(self, input_path: str): @@ -95,19 +93,19 @@ def _convert_dir_path(self, input_path: str): return input_path return input_path + "/" - def _refresh_stage_and_client(self, path: str): - logger.info("refreshing stage info...") - response = apply_stage(self.cloud_endpoint, self.api_key, self.stage_name, path) - self.stage_info = response.json()["data"] - logger.info("stage info refreshed.") + def _refresh_volume_and_client(self, path: str): + logger.info("refreshing volume info...") + response = apply_volume(self.cloud_endpoint, self.api_key, self.volume_name, path) + self.volume_info = response.json()["data"] + logger.info("volume info refreshed.") - creds = self.stage_info["credentials"] + creds = self.volume_info["credentials"] http_client = urllib3.PoolManager(maxsize=100) - cloud = self.stage_info["cloud"] - region = self.stage_info["region"] + cloud = self.volume_info["cloud"] + region = self.volume_info["region"] endpoint = EndpointResolver.resolve_endpoint( - self.stage_info["endpoint"], + self.volume_info["endpoint"], cloud, region, self.connect_type, @@ -136,34 +134,45 @@ def _refresh_stage_and_client(self, path: str): def _validate_size(self): file_size_total = self.total_bytes - file_size_limit = self.stage_info["condition"]["maxContentLength"] + file_size_limit = self.volume_info["condition"]["maxContentLength"] if file_size_total > file_size_limit: error_message = ( f"localFileTotalSize {file_size_total} exceeds " f"the maximum contentLength limit {file_size_limit} defined in the condition." - f"If you want to upload larger files, please contact us to lift the restriction." + f"If you are using the free tier, " + f"you may switch to the pay-as-you-go volume plan to support uploading larger files." + ) + raise ValueError(error_message) + + file_number_limit = self.volume_info["condition"].get("maxFileNumber") + if file_number_limit is not None and len(self.local_file_paths) > file_number_limit: + error_message = ( + f"localFileTotalNumber {len(self.local_file_paths)} exceeds " + f"the maximum fileNumber limit {file_number_limit} defined in the condition." + f"If you are using the free tier, " + f"you may switch to the pay-as-you-go volume plan to support uploading larger files." ) raise ValueError(error_message) - def upload_file_to_stage(self, source_file_path: str, target_stage_path: str): + def upload_file_to_volume(self, source_file_path: str, target_volume_path: str): """ - uploads a local file or directory to the specified path within the Stage. + uploads a local file or directory to the specified path within the Volume. Args: source_file_path: the source local file or directory path - target_stage_path: the target directory path in the Stage + target_volume_path: the target directory path in the Volume Raises: Exception: If an error occurs during the upload process. """ self.local_file_paths, self.total_bytes = FileUtils.process_local_path(source_file_path) - stage_path = self._convert_dir_path(target_stage_path) - self._refresh_stage_and_client(stage_path) + volume_path = self._convert_dir_path(target_volume_path) + self._refresh_volume_and_client(volume_path) self._validate_size() file_count = len(self.local_file_paths) logger.info( - f"begin to upload file to stage, localDirOrFilePath:{source_file_path}, fileCount:{file_count} to stageName:{self.stage_name}, stagePath:{stage_path}" + f"begin to upload file to volume, localDirOrFilePath:{source_file_path}, fileCount:{file_count} to volumeName:{self.volume_name}, volumePath:{volume_path}" ) start_time = time.time() @@ -172,7 +181,7 @@ def upload_file_to_stage(self, source_file_path: str, target_stage_path: str): root_path = Path(source_file_path).resolve() uploaded_bytes_lock = threading.Lock() - def _upload_task(file_path: str, root_path: Path, stage_path: str): + def _upload_task(file_path: str, root_path: Path, volume_path: str): nonlocal uploaded_bytes nonlocal uploaded_count path_obj = Path(file_path).resolve() @@ -181,13 +190,13 @@ def _upload_task(file_path: str, root_path: Path, stage_path: str): else: relative_path = path_obj.relative_to(root_path).as_posix() - stage_prefix = f"{self.stage_info['stagePrefix']}" + volume_prefix = f"{self.volume_info['volumePrefix']}" file_start_time = time.time() try: size = Path(file_path).stat().st_size logger.info(f"uploading file, fileName:{file_path}, size:{size} bytes") - remote_file_path = stage_prefix + stage_path + relative_path - self._put_object(file_path, remote_file_path, stage_path) + remote_file_path = volume_prefix + volume_path + relative_path + self._put_object(file_path, remote_file_path, volume_path) with uploaded_bytes_lock: uploaded_bytes += size uploaded_count += 1 @@ -203,36 +212,36 @@ def _upload_task(file_path: str, root_path: Path, stage_path: str): with ThreadPoolExecutor(max_workers=20) as executor: futures = [] for _, file_path in enumerate(self.local_file_paths): - futures.append(executor.submit(_upload_task, file_path, root_path, stage_path)) + futures.append(executor.submit(_upload_task, file_path, root_path, volume_path)) for f in futures: f.result() # wait for all total_elapsed = time.time() - start_time logger.info( - f"All files in {source_file_path} uploaded to stage, " - f"stageName:{self.stage_info['stageName']}, stagePath: {stage_path}, " + f"All files in {source_file_path} uploaded to volume, " + f"volumeName:{self.volume_info['volumeName']}, volumePath: {volume_path}, " f"totalFileCount:{file_count}, totalFileSize:{self.total_bytes}, cost time:{total_elapsed}s" ) - return {"stageName": self.stage_info["stageName"], "path": stage_path} + return {"volumeName": self.volume_info["volumeName"], "path": volume_path} - def _put_object(self, file_path: str, remote_file_path: str, stage_path: str): - expire_time_str = self.stage_info["credentials"]["expireTime"] + def _put_object(self, file_path: str, remote_file_path: str, volume_path: str): + expire_time_str = self.volume_info["credentials"]["expireTime"] expire_time = datetime.fromisoformat(expire_time_str.replace("Z", "+00:00")) now = datetime.now(timezone.utc) if now > expire_time: - self._refresh_stage_and_client(stage_path) + self._refresh_volume_and_client(volume_path) - self._upload_with_retry(file_path, remote_file_path, stage_path) + self._upload_with_retry(file_path, remote_file_path, volume_path) def _upload_with_retry( - self, file_path: str, object_name: str, stage_path: str, max_retries: int = 5 + self, file_path: str, object_name: str, volume_path: str, max_retries: int = 5 ): attempt = 0 while attempt < max_retries: try: self._client.fput_object( - bucket_name=self.stage_info["bucketName"], + bucket_name=self.volume_info["bucketName"], object_name=object_name, file_path=file_path, ) @@ -240,7 +249,7 @@ def _upload_with_retry( except Exception as e: attempt += 1 logger.warning(f"Attempt {attempt} failed to upload {file_path}: {e}") - self._refresh_stage_and_client(stage_path) + self._refresh_volume_and_client(volume_path) if attempt == max_retries: error_message = f"Upload failed after {max_retries} attempts" diff --git a/pymilvus/bulk_writer/volume_manager.py b/pymilvus/bulk_writer/volume_manager.py new file mode 100644 index 000000000..4a94f6220 --- /dev/null +++ b/pymilvus/bulk_writer/volume_manager.py @@ -0,0 +1,37 @@ +import logging + +from pymilvus.bulk_writer.volume_restful import create_volume, delete_volume, list_volumes + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class VolumeManager: + def __init__(self, cloud_endpoint: str, api_key: str): + """ + Args: + cloud_endpoint (str): The fixed cloud endpoint URL. + - For international regions: https://api.cloud.zilliz.com + - For regions in China: https://api.cloud.zilliz.com.cn + api_key (str): The API key associated with your organization or cluster. + """ + self.cloud_endpoint = cloud_endpoint + self.api_key = api_key + + def create_volume(self, project_id: str, region_id: str, volume_name: str): + """ + Create a volume under the specified project and regionId. + """ + create_volume(self.cloud_endpoint, self.api_key, project_id, region_id, volume_name) + + def delete_volume(self, volume_name: str): + """ + Delete a volume. + """ + delete_volume(self.cloud_endpoint, self.api_key, volume_name) + + def list_volumes(self, project_id: str, current_page: int = 1, page_size: int = 10): + """ + Paginated query of the volume list under a specified projectId. + """ + return list_volumes(self.cloud_endpoint, self.api_key, project_id, current_page, page_size) diff --git a/pymilvus/bulk_writer/stage_restful.py b/pymilvus/bulk_writer/volume_restful.py similarity index 86% rename from pymilvus/bulk_writer/stage_restful.py rename to pymilvus/bulk_writer/volume_restful.py index 08d0558eb..77e5fc6d2 100644 --- a/pymilvus/bulk_writer/stage_restful.py +++ b/pymilvus/bulk_writer/volume_restful.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -def list_stages( +def list_volumes( url: str, api_key: str, project_id: str, @@ -16,7 +16,7 @@ def list_stages( page_size: int = 10, **kwargs, ) -> requests.Response: - """call listStages restful interface to list stages of project + """call listVolumes restful interface to list volumes of project Args: url (str): url of the server @@ -28,7 +28,7 @@ def list_stages( Returns: response of the restful interface """ - request_url = url + "/v2/stages" + request_url = url + "/v2/volumes" params = {"projectId": project_id, "currentPage": current_page, "pageSize": page_size} @@ -37,32 +37,32 @@ def list_stages( return resp -def create_stage( +def create_volume( url: str, api_key: str, project_id: str, region_id: str, - stage_name: str, + volume_name: str, **kwargs, ) -> requests.Response: - """call createStage restful interface to create new stage + """call createVolume restful interface to create new volume Args: url (str): url of the server api_key (str): API key to authenticate your requests. project_id (str): id of the project region_id (str): id of the region - stage_name (str): name of the stage + volume_name (str): name of the volume Returns: response of the restful interface """ - request_url = url + "/v2/stages/create" + request_url = url + "/v2/volumes/create" params = { "projectId": project_id, "regionId": region_id, - "stageName": stage_name, + "volumeName": volume_name, } resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs) @@ -70,50 +70,50 @@ def create_stage( return resp -def delete_stage( +def delete_volume( url: str, api_key: str, - stage_name: str, + volume_name: str, **kwargs, ) -> requests.Response: - """call deleteStage restful interface to create stage + """call deleteVolume restful interface to delete volume Args: url (str): url of the server api_key (str): API key to authenticate your requests. - stage_name (str): name of the stage + volume_name (str): name of the volume Returns: response of the restful interface """ - request_url = url + "/v2/stages/" + stage_name + request_url = url + "/v2/volumes/" + volume_name resp = _delete_request(url=request_url, api_key=api_key, **kwargs) _handle_response(request_url, resp.json()) return resp -def apply_stage( +def apply_volume( url: str, api_key: str, - stage_name: str, + volume_name: str, path: str, **kwargs, ) -> requests.Response: - """call applyStage restful interface to apply cred of stage + """call applyVolume restful interface to apply cred of volume Args: url (str): url of the server api_key (str): API key to authenticate your requests. - stage_name (str): name of the stage - path(str): path of the stage + volume_name (str): name of the volume + path(str): path of the volume Returns: response of the restful interface """ - request_url = url + "/v2/stages/apply" + request_url = url + "/v2/volumes/apply" - params = {"stageName": stage_name, "path": path} + params = {"volumeName": volume_name, "path": path} resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs) _handle_response(request_url, resp.json()) diff --git a/tests/test_bulk_writer_stage.py b/tests/test_bulk_writer_stage.py index cf4a62b88..4bd5021a8 100644 --- a/tests/test_bulk_writer_stage.py +++ b/tests/test_bulk_writer_stage.py @@ -6,22 +6,22 @@ import pytest import requests from pymilvus.bulk_writer.constants import BulkFileType, ConnectType -from pymilvus.bulk_writer.stage_bulk_writer import StageBulkWriter -from pymilvus.bulk_writer.stage_file_manager import StageFileManager -from pymilvus.bulk_writer.stage_manager import StageManager -from pymilvus.bulk_writer.stage_restful import ( - apply_stage, - create_stage, - delete_stage, - list_stages, +from pymilvus.bulk_writer.volume_bulk_writer import VolumeBulkWriter +from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager +from pymilvus.bulk_writer.volume_manager import VolumeManager +from pymilvus.bulk_writer.volume_restful import ( + apply_volume, + create_volume, + delete_volume, + list_volumes, ) from pymilvus.client.types import DataType from pymilvus.exceptions import MilvusException from pymilvus.orm.schema import CollectionSchema, FieldSchema -class TestStageRestful: - """Test stage RESTful API functions.""" +class TestVolumeRestful: + """Test volume RESTful API functions.""" @pytest.fixture def mock_response(self) -> Mock: @@ -39,19 +39,19 @@ def api_params(self) -> Dict[str, str]: "api_key": "test_api_key", } - @patch("pymilvus.bulk_writer.stage_restful.requests.get") - def test_list_stages_success( + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_list_volumes_success( self, mock_get: Mock, mock_response: Mock, api_params: Dict[str, str] ) -> None: - """Test successful list_stages call.""" + """Test successful list_volumes call.""" mock_get.return_value = mock_response mock_response.json.return_value = { "code": 0, "message": "success", - "data": {"stages": ["stage1", "stage2"]}, + "data": {"volumes": ["volume1", "volume2"]}, } - response = list_stages( + response = list_volumes( **api_params, project_id="test_project", current_page=1, @@ -59,14 +59,14 @@ def test_list_stages_success( ) assert response.status_code == 200 - assert response.json()["data"]["stages"] == ["stage1", "stage2"] + assert response.json()["data"]["volumes"] == ["volume1", "volume2"] mock_get.assert_called_once() - @patch("pymilvus.bulk_writer.stage_restful.requests.get") - def test_list_stages_failure( + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_list_volumes_failure( self, mock_get: Mock, mock_response: Mock, api_params: Dict[str, str] ) -> None: - """Test failed list_stages call.""" + """Test failed list_volumes call.""" mock_response.json.return_value = { "code": 1001, "message": "Invalid API key", @@ -75,55 +75,55 @@ def test_list_stages_failure( mock_get.return_value = mock_response with pytest.raises(MilvusException, match="Invalid API key"): - list_stages(**api_params, project_id="test_project") + list_volumes(**api_params, project_id="test_project") - @patch("pymilvus.bulk_writer.stage_restful.requests.post") - def test_create_stage_success( + @patch("pymilvus.bulk_writer.volume_restful.requests.post") + def test_create_volume_success( self, mock_post: Mock, mock_response: Mock, api_params: Dict[str, str] ) -> None: - """Test successful create_stage call.""" + """Test successful create_volume call.""" mock_post.return_value = mock_response mock_response.json.return_value = { "code": 0, "message": "success", - "data": {"stageId": "stage123"}, + "data": {"volumeId": "volume123"}, } - response = create_stage( + response = create_volume( **api_params, project_id="test_project", region_id="us-west-2", - stage_name="test_stage", + volume_name="test_volume", ) assert response.status_code == 200 - assert response.json()["data"]["stageId"] == "stage123" + assert response.json()["data"]["volumeId"] == "volume123" mock_post.assert_called_once() - @patch("pymilvus.bulk_writer.stage_restful.requests.delete") - def test_delete_stage_success( + @patch("pymilvus.bulk_writer.volume_restful.requests.delete") + def test_delete_volume_success( self, mock_delete: Mock, mock_response: Mock, api_params: Dict[str, str] ) -> None: - """Test successful delete_stage call.""" + """Test successful delete_volume call.""" mock_delete.return_value = mock_response - response = delete_stage(**api_params, stage_name="test_stage") + response = delete_volume(**api_params, volume_name="test_volume") assert response.status_code == 200 mock_delete.assert_called_once() - @patch("pymilvus.bulk_writer.stage_restful.requests.post") - def test_apply_stage_success( + @patch("pymilvus.bulk_writer.volume_restful.requests.post") + def test_apply_volume_success( self, mock_post: Mock, mock_response: Mock, api_params: Dict[str, str] ) -> None: - """Test successful apply_stage call.""" + """Test successful apply_volume call.""" mock_post.return_value = mock_response mock_response.json.return_value = { "code": 0, "message": "success", "data": { - "stageName": "test_stage", - "stagePrefix": "prefix/", + "volumeName": "test_volume", + "volumePrefix": "prefix/", "endpoint": "s3.amazonaws.com", "bucketName": "test-bucket", "region": "us-west-2", @@ -138,19 +138,19 @@ def test_apply_stage_success( }, } - response = apply_stage( + response = apply_volume( **api_params, - stage_name="test_stage", + volume_name="test_volume", path="data/", ) assert response.status_code == 200 data = response.json()["data"] - assert data["stageName"] == "test_stage" + assert data["volumeName"] == "test_volume" assert data["endpoint"] == "s3.amazonaws.com" mock_post.assert_called_once() - @patch("pymilvus.bulk_writer.stage_restful.requests.get") + @patch("pymilvus.bulk_writer.volume_restful.requests.get") def test_http_error_handling( self, mock_get: Mock, api_params: Dict[str, str] ) -> None: @@ -158,9 +158,9 @@ def test_http_error_handling( mock_get.return_value.status_code = 404 with pytest.raises(MilvusException, match="status code: 404"): - list_stages(**api_params, project_id="test_project") + list_volumes(**api_params, project_id="test_project") - @patch("pymilvus.bulk_writer.stage_restful.requests.get") + @patch("pymilvus.bulk_writer.volume_restful.requests.get") def test_network_error_handling( self, mock_get: Mock, api_params: Dict[str, str] ) -> None: @@ -168,86 +168,86 @@ def test_network_error_handling( mock_get.side_effect = requests.exceptions.ConnectionError("Network error") with pytest.raises(MilvusException, match="Network error"): - list_stages(**api_params, project_id="test_project") + list_volumes(**api_params, project_id="test_project") -class TestStageManager: - """Test StageManager class.""" +class TestVolumeManager: + """Test VolumeManager class.""" @pytest.fixture - def stage_manager(self) -> StageManager: - """Create a StageManager instance.""" - return StageManager( + def volume_manager(self) -> VolumeManager: + """Create a VolumeManager instance.""" + return VolumeManager( cloud_endpoint="https://api.cloud.zilliz.com", api_key="test_api_key", ) - @patch("pymilvus.bulk_writer.stage_manager.create_stage") - def test_create_stage(self, mock_create: Mock, stage_manager: StageManager) -> None: - """Test creating a stage.""" - stage_manager.create_stage( + @patch("pymilvus.bulk_writer.volume_manager.create_volume") + def test_create_volume(self, mock_create: Mock, volume_manager: VolumeManager) -> None: + """Test creating a volume.""" + volume_manager.create_volume( project_id="test_project", region_id="us-west-2", - stage_name="test_stage", + volume_name="test_volume", ) mock_create.assert_called_once_with( - stage_manager.cloud_endpoint, - stage_manager.api_key, + volume_manager.cloud_endpoint, + volume_manager.api_key, "test_project", "us-west-2", - "test_stage", + "test_volume", ) - @patch("pymilvus.bulk_writer.stage_manager.delete_stage") - def test_delete_stage(self, mock_delete: Mock, stage_manager: StageManager) -> None: - """Test deleting a stage.""" - stage_manager.delete_stage(stage_name="test_stage") + @patch("pymilvus.bulk_writer.volume_manager.delete_volume") + def test_delete_volume(self, mock_delete: Mock, volume_manager: VolumeManager) -> None: + """Test deleting a volume.""" + volume_manager.delete_volume(volume_name="test_volume") mock_delete.assert_called_once_with( - stage_manager.cloud_endpoint, - stage_manager.api_key, - "test_stage", + volume_manager.cloud_endpoint, + volume_manager.api_key, + "test_volume", ) - @patch("pymilvus.bulk_writer.stage_manager.list_stages") - def test_list_stages(self, mock_list: Mock, stage_manager: StageManager) -> None: - """Test listing stages.""" + @patch("pymilvus.bulk_writer.volume_manager.list_volumes") + def test_list_volumes(self, mock_list: Mock, volume_manager: VolumeManager) -> None: + """Test listing volumes.""" mock_response = Mock() - mock_response.json.return_value = {"data": {"stages": ["stage1", "stage2"]}} + mock_response.json.return_value = {"data": {"volumes": ["volume1", "volume2"]}} mock_list.return_value = mock_response - result = stage_manager.list_stages(project_id="test_project", current_page=1, page_size=10) + result = volume_manager.list_volumes(project_id="test_project", current_page=1, page_size=10) - assert result.json()["data"]["stages"] == ["stage1", "stage2"] + assert result.json()["data"]["volumes"] == ["volume1", "volume2"] mock_list.assert_called_once_with( - stage_manager.cloud_endpoint, - stage_manager.api_key, + volume_manager.cloud_endpoint, + volume_manager.api_key, "test_project", 1, 10, ) -class TestStageFileManager: - """Test StageFileManager class.""" +class TestVolumeFileManager: + """Test VolumeFileManager class.""" @pytest.fixture - def stage_file_manager(self) -> StageFileManager: - """Create a StageFileManager instance.""" - return StageFileManager( + def volume_file_manager(self) -> VolumeFileManager: + """Create a VolumeFileManager instance.""" + return VolumeFileManager( cloud_endpoint="https://api.cloud.zilliz.com", api_key="test_api_key", - stage_name="test_stage", + volume_name="test_volume", connect_type=ConnectType.AUTO, ) @pytest.fixture - def mock_stage_info(self) -> Dict[str, Any]: - """Mock stage information.""" + def mock_volume_info(self) -> Dict[str, Any]: + """Mock volume information.""" return { - "stageName": "test_stage", - "stagePrefix": "prefix/", + "volumeName": "test_volume", + "volumePrefix": "prefix/", "endpoint": "s3.amazonaws.com", "bucketName": "test-bucket", "region": "us-west-2", @@ -261,142 +261,142 @@ def mock_stage_info(self) -> Dict[str, Any]: }, } - def test_convert_dir_path(self, stage_file_manager: StageFileManager) -> None: + def test_convert_dir_path(self, volume_file_manager: VolumeFileManager) -> None: """Test directory path conversion.""" - assert stage_file_manager._convert_dir_path("") == "" - assert stage_file_manager._convert_dir_path("/") == "" - assert stage_file_manager._convert_dir_path("data") == "data/" - assert stage_file_manager._convert_dir_path("data/") == "data/" - - @patch("pymilvus.bulk_writer.stage_file_manager.apply_stage") - @patch("pymilvus.bulk_writer.stage_file_manager.Minio") - def test_refresh_stage_and_client( + assert volume_file_manager._convert_dir_path("") == "" + assert volume_file_manager._convert_dir_path("/") == "" + assert volume_file_manager._convert_dir_path("data") == "data/" + assert volume_file_manager._convert_dir_path("data/") == "data/" + + @patch("pymilvus.bulk_writer.volume_file_manager.apply_volume") + @patch("pymilvus.bulk_writer.volume_file_manager.Minio") + def test_refresh_volume_and_client( self, mock_minio: Mock, mock_apply: Mock, - stage_file_manager: StageFileManager, - mock_stage_info: Dict[str, Any], + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], ) -> None: - """Test refreshing stage info and client.""" + """Test refreshing volume info and client.""" mock_response = Mock() - mock_response.json.return_value = {"data": mock_stage_info} + mock_response.json.return_value = {"data": mock_volume_info} mock_apply.return_value = mock_response - stage_file_manager._refresh_stage_and_client("data/") + volume_file_manager._refresh_volume_and_client("data/") - assert stage_file_manager.stage_info == mock_stage_info + assert volume_file_manager.volume_info == mock_volume_info mock_apply.assert_called_once() mock_minio.assert_called_once() def test_validate_size_success( - self, stage_file_manager: StageFileManager, mock_stage_info: Dict[str, Any] + self, volume_file_manager: VolumeFileManager, mock_volume_info: Dict[str, Any] ) -> None: """Test successful size validation.""" - stage_file_manager.stage_info = mock_stage_info - stage_file_manager.total_bytes = 1000000 # 1MB + volume_file_manager.volume_info = mock_volume_info + volume_file_manager.total_bytes = 1000000 # 1MB # Should not raise any exception - stage_file_manager._validate_size() + volume_file_manager._validate_size() def test_validate_size_failure( - self, stage_file_manager: StageFileManager, mock_stage_info: Dict[str, Any] + self, volume_file_manager: VolumeFileManager, mock_volume_info: Dict[str, Any] ) -> None: """Test size validation failure.""" - stage_file_manager.stage_info = mock_stage_info - stage_file_manager.total_bytes = 2147483648 # 2GB + volume_file_manager.volume_info = mock_volume_info + volume_file_manager.total_bytes = 2147483648 # 2GB with pytest.raises(ValueError, match="exceeds the maximum contentLength limit"): - stage_file_manager._validate_size() + volume_file_manager._validate_size() - @patch("pymilvus.bulk_writer.stage_file_manager.FileUtils.process_local_path") - @patch.object(StageFileManager, "_refresh_stage_and_client") - @patch.object(StageFileManager, "_validate_size") - @patch.object(StageFileManager, "_put_object") - def test_upload_file_to_stage( + @patch("pymilvus.bulk_writer.volume_file_manager.FileUtils.process_local_path") + @patch.object(VolumeFileManager, "_refresh_volume_and_client") + @patch.object(VolumeFileManager, "_validate_size") + @patch.object(VolumeFileManager, "_put_object") + def test_upload_file_to_volume( self, mock_put_object: Mock, mock_validate: Mock, mock_refresh: Mock, mock_process: Mock, - stage_file_manager: StageFileManager, - mock_stage_info: Dict[str, Any], + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], ) -> None: - """Test uploading file to stage.""" + """Test uploading file to volume.""" with tempfile.TemporaryDirectory() as temp_dir: # Create test files test_file = Path(temp_dir) / "test.txt" test_file.write_text("test content") mock_process.return_value = ([str(test_file)], 12) - stage_file_manager.stage_info = mock_stage_info + volume_file_manager.volume_info = mock_volume_info - result = stage_file_manager.upload_file_to_stage(str(test_file), "data/") + result = volume_file_manager.upload_file_to_volume(str(test_file), "data/") - assert result["stageName"] == "test_stage" + assert result["volumeName"] == "test_volume" assert result["path"] == "data/" mock_refresh.assert_called_once_with("data/") mock_validate.assert_called_once() - @patch.object(StageFileManager, "_upload_with_retry") - @patch.object(StageFileManager, "_refresh_stage_and_client") + @patch.object(VolumeFileManager, "_upload_with_retry") + @patch.object(VolumeFileManager, "_refresh_volume_and_client") def test_put_object_refresh_on_expiry( self, mock_refresh: Mock, mock_upload: Mock, - stage_file_manager: StageFileManager, - mock_stage_info: Dict[str, Any], + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], ) -> None: """Test that credentials are refreshed when expired.""" # Set expired credentials - expired_info = mock_stage_info.copy() + expired_info = mock_volume_info.copy() expired_info["credentials"]["expireTime"] = "2020-01-01T00:00:00Z" - stage_file_manager.stage_info = expired_info + volume_file_manager.volume_info = expired_info - stage_file_manager._put_object("test.txt", "remote/test.txt", "data/") + volume_file_manager._put_object("test.txt", "remote/test.txt", "data/") mock_refresh.assert_called_once_with("data/") mock_upload.assert_called_once() - @patch("pymilvus.bulk_writer.stage_file_manager.Minio") + @patch("pymilvus.bulk_writer.volume_file_manager.Minio") def test_upload_with_retry_success( - self, mock_minio: Mock, stage_file_manager: StageFileManager, mock_stage_info: Dict[str, Any] + self, mock_minio: Mock, volume_file_manager: VolumeFileManager, mock_volume_info: Dict[str, Any] ) -> None: """Test successful upload with retry.""" - stage_file_manager.stage_info = mock_stage_info - stage_file_manager._client = mock_minio.return_value + volume_file_manager.volume_info = mock_volume_info + volume_file_manager._client = mock_minio.return_value - stage_file_manager._upload_with_retry("test.txt", "remote/test.txt", "data/") + volume_file_manager._upload_with_retry("test.txt", "remote/test.txt", "data/") - stage_file_manager._client.fput_object.assert_called_once_with( + volume_file_manager._client.fput_object.assert_called_once_with( bucket_name="test-bucket", object_name="remote/test.txt", file_path="test.txt", ) - @patch("pymilvus.bulk_writer.stage_file_manager.Minio") - @patch.object(StageFileManager, "_refresh_stage_and_client") + @patch("pymilvus.bulk_writer.volume_file_manager.Minio") + @patch.object(VolumeFileManager, "_refresh_volume_and_client") def test_upload_with_retry_failure( self, mock_refresh: Mock, mock_minio: Mock, - stage_file_manager: StageFileManager, - mock_stage_info: Dict[str, Any], + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], ) -> None: """Test upload failure after max retries.""" - stage_file_manager.stage_info = mock_stage_info + volume_file_manager.volume_info = mock_volume_info mock_client = mock_minio.return_value mock_client.fput_object.side_effect = Exception("Upload failed") - stage_file_manager._client = mock_client + volume_file_manager._client = mock_client with pytest.raises(RuntimeError, match="Upload failed after 2 attempts"): - stage_file_manager._upload_with_retry("test.txt", "remote/test.txt", "data/", max_retries=2) + volume_file_manager._upload_with_retry("test.txt", "remote/test.txt", "data/", max_retries=2) assert mock_client.fput_object.call_count == 2 assert mock_refresh.call_count == 2 # Refreshed on each retry -class TestStageBulkWriter: - """Test StageBulkWriter class.""" +class TestVolumeBulkWriter: + """Test VolumeBulkWriter class.""" @pytest.fixture def simple_schema(self) -> CollectionSchema: @@ -409,41 +409,41 @@ def simple_schema(self) -> CollectionSchema: return CollectionSchema(fields=fields) @pytest.fixture - def stage_bulk_writer(self, simple_schema: CollectionSchema) -> StageBulkWriter: - """Create a StageBulkWriter instance.""" - with patch("pymilvus.bulk_writer.stage_bulk_writer.StageFileManager"): - return StageBulkWriter( + def volume_bulk_writer(self, simple_schema: CollectionSchema) -> VolumeBulkWriter: + """Create a VolumeBulkWriter instance.""" + with patch("pymilvus.bulk_writer.volume_bulk_writer.VolumeFileManager"): + return VolumeBulkWriter( schema=simple_schema, remote_path="test/data", cloud_endpoint="https://api.cloud.zilliz.com", api_key="test_api_key", - stage_name="test_stage", + volume_name="test_volume", chunk_size=1024, file_type=BulkFileType.PARQUET, ) - def test_init(self, stage_bulk_writer: StageBulkWriter) -> None: - """Test StageBulkWriter initialization.""" - assert stage_bulk_writer._remote_path.endswith("/") - assert stage_bulk_writer._stage_name == "test_stage" - assert isinstance(stage_bulk_writer._stage_file_manager, MagicMock) + def test_init(self, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test VolumeBulkWriter initialization.""" + assert volume_bulk_writer._remote_path.endswith("/") + assert volume_bulk_writer._volume_name == "test_volume" + assert isinstance(volume_bulk_writer._volume_file_manager, MagicMock) - def test_append_row(self, stage_bulk_writer: StageBulkWriter) -> None: + def test_append_row(self, volume_bulk_writer: VolumeBulkWriter) -> None: """Test appending a row.""" row = { "id": 1, "vector": [1.0] * 128, "text": "test text", } - stage_bulk_writer.append_row(row) - assert stage_bulk_writer.total_row_count == 1 + volume_bulk_writer.append_row(row) + assert volume_bulk_writer.total_row_count == 1 - @patch.object(StageBulkWriter, "_upload") - def test_commit(self, mock_upload: Mock, stage_bulk_writer: StageBulkWriter) -> None: + @patch.object(VolumeBulkWriter, "_upload") + def test_commit(self, mock_upload: Mock, volume_bulk_writer: VolumeBulkWriter) -> None: """Test committing data.""" # Add some data for i in range(10): - stage_bulk_writer.append_row({ + volume_bulk_writer.append_row({ "id": i, "vector": [float(i)] * 128, "text": f"text_{i}", @@ -453,44 +453,44 @@ def test_commit(self, mock_upload: Mock, stage_bulk_writer: StageBulkWriter) -> mock_upload.return_value = ["file1.parquet", "file2.parquet"] # Commit the data - stage_bulk_writer.commit() + volume_bulk_writer.commit() # Upload should have been called during commit assert mock_upload.called - def test_data_path_property(self, stage_bulk_writer: StageBulkWriter) -> None: + def test_data_path_property(self, volume_bulk_writer: VolumeBulkWriter) -> None: """Test data_path property.""" - assert isinstance(stage_bulk_writer.data_path, str) - assert "/" in stage_bulk_writer.data_path + assert isinstance(volume_bulk_writer.data_path, str) + assert "/" in volume_bulk_writer.data_path - def test_batch_files_property(self, stage_bulk_writer: StageBulkWriter) -> None: + def test_batch_files_property(self, volume_bulk_writer: VolumeBulkWriter) -> None: """Test batch_files property.""" - assert stage_bulk_writer.batch_files == [] - stage_bulk_writer._remote_files = [["file1.parquet"], ["file2.parquet"]] - assert stage_bulk_writer.batch_files == [["file1.parquet"], ["file2.parquet"]] - - def test_get_stage_upload_result(self, stage_bulk_writer: StageBulkWriter) -> None: - """Test getting stage upload result.""" - result = stage_bulk_writer.get_stage_upload_result() - assert result["stage_name"] == "test_stage" + assert volume_bulk_writer.batch_files == [] + volume_bulk_writer._remote_files = [["file1.parquet"], ["file2.parquet"]] + assert volume_bulk_writer.batch_files == [["file1.parquet"], ["file2.parquet"]] + + def test_get_volume_upload_result(self, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test getting volume upload result.""" + result = volume_bulk_writer.get_volume_upload_result() + assert result["volume_name"] == "test_volume" assert "path" in result - @patch("pymilvus.bulk_writer.stage_bulk_writer.Path") - def test_local_rm(self, mock_path: Mock, stage_bulk_writer: StageBulkWriter) -> None: + @patch("pymilvus.bulk_writer.volume_bulk_writer.Path") + def test_local_rm(self, mock_path: Mock, volume_bulk_writer: VolumeBulkWriter) -> None: """Test local file removal.""" # Test successful removal mock_file = mock_path.return_value mock_file.parent.iterdir.return_value = [] - stage_bulk_writer._local_rm("test_file.parquet") + volume_bulk_writer._local_rm("test_file.parquet") mock_file.unlink.assert_called_once() - @patch.object(StageBulkWriter, "_upload_object") - @patch.object(StageBulkWriter, "_local_rm") - @patch("pymilvus.bulk_writer.stage_bulk_writer.Path") + @patch.object(VolumeBulkWriter, "_upload_object") + @patch.object(VolumeBulkWriter, "_local_rm") + @patch("pymilvus.bulk_writer.volume_bulk_writer.Path") def test_upload( - self, mock_path_class: Mock, mock_rm: Mock, mock_upload_object: Mock, stage_bulk_writer: StageBulkWriter + self, mock_path_class: Mock, mock_rm: Mock, mock_upload_object: Mock, volume_bulk_writer: VolumeBulkWriter ) -> None: """Test uploading files.""" # Mock Path behavior @@ -499,32 +499,32 @@ def test_upload( mock_path.relative_to.return_value = Path("test.parquet") file_list = ["test_file.parquet"] - result = stage_bulk_writer._upload(file_list) + result = volume_bulk_writer._upload(file_list) assert len(result) == 1 mock_upload_object.assert_called_once() # The actual call will be with the mock path object assert mock_rm.called - @patch.object(StageFileManager, "upload_file_to_stage") + @patch.object(VolumeFileManager, "upload_file_to_volume") def test_upload_object( - self, mock_upload_to_stage: Mock, stage_bulk_writer: StageBulkWriter + self, mock_upload_to_volume: Mock, volume_bulk_writer: VolumeBulkWriter ) -> None: """Test uploading a single object.""" - stage_bulk_writer._upload_object("local_file.parquet", "remote_file.parquet") + volume_bulk_writer._upload_object("local_file.parquet", "remote_file.parquet") - stage_bulk_writer._stage_file_manager.upload_file_to_stage.assert_called_once_with( - "local_file.parquet", stage_bulk_writer._remote_path + volume_bulk_writer._volume_file_manager.upload_file_to_volume.assert_called_once_with( + "local_file.parquet", volume_bulk_writer._remote_path ) def test_context_manager(self, simple_schema: CollectionSchema) -> None: - """Test StageBulkWriter as context manager.""" - with patch("pymilvus.bulk_writer.stage_bulk_writer.StageFileManager"), StageBulkWriter( + """Test VolumeBulkWriter as context manager.""" + with patch("pymilvus.bulk_writer.volume_bulk_writer.VolumeFileManager"), VolumeBulkWriter( schema=simple_schema, remote_path="test/data", cloud_endpoint="https://api.cloud.zilliz.com", api_key="test_api_key", - stage_name="test_stage", + volume_name="test_volume", ) as writer: assert writer is not None writer.append_row({ @@ -533,10 +533,10 @@ def test_context_manager(self, simple_schema: CollectionSchema) -> None: "text": "test", }) - @patch.object(StageBulkWriter, "_upload_object") - @patch("pymilvus.bulk_writer.stage_bulk_writer.Path") + @patch.object(VolumeBulkWriter, "_upload_object") + @patch("pymilvus.bulk_writer.volume_bulk_writer.Path") def test_upload_error_handling( - self, mock_path_class: Mock, mock_upload_object: Mock, stage_bulk_writer: StageBulkWriter + self, mock_path_class: Mock, mock_upload_object: Mock, volume_bulk_writer: VolumeBulkWriter ) -> None: """Test error handling during upload.""" mock_upload_object.side_effect = Exception("Upload error") @@ -547,22 +547,22 @@ def test_upload_error_handling( mock_path.relative_to.return_value = Path("test.parquet") with pytest.raises(MilvusException, match="Failed to upload file"): - stage_bulk_writer._upload(["test_file.parquet"]) + volume_bulk_writer._upload(["test_file.parquet"]) class TestIntegration: - """Integration tests for stage operations.""" + """Integration tests for volume operations.""" @pytest.fixture def mock_server_responses(self) -> Dict[str, Any]: """Mock server responses for integration testing.""" return { - "apply_stage": { + "apply_volume": { "code": 0, "message": "success", "data": { - "stageName": "test_stage", - "stagePrefix": "prefix/", + "volumeName": "test_volume", + "volumePrefix": "prefix/", "endpoint": "s3.amazonaws.com", "bucketName": "test-bucket", "region": "us-west-2", @@ -576,51 +576,51 @@ def mock_server_responses(self) -> Dict[str, Any]: }, }, }, - "list_stages": { + "list_volumes": { "code": 0, "message": "success", - "data": {"stages": ["stage1", "stage2", "test_stage"]}, + "data": {"volumes": ["volume1", "volume2", "test_volume"]}, }, } - @patch("pymilvus.bulk_writer.stage_restful.requests.post") - @patch("pymilvus.bulk_writer.stage_restful.requests.get") - def test_full_stage_workflow( + @patch("pymilvus.bulk_writer.volume_restful.requests.post") + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_full_volume_workflow( self, mock_get: Mock, mock_post: Mock, mock_server_responses: Dict[str, Any], ) -> None: - """Test complete stage workflow from creation to upload.""" + """Test complete volume workflow from creation to upload.""" # Setup mock responses mock_post_response = Mock() mock_post_response.status_code = 200 - mock_post_response.json.return_value = mock_server_responses["apply_stage"] + mock_post_response.json.return_value = mock_server_responses["apply_volume"] mock_post.return_value = mock_post_response mock_get_response = Mock() mock_get_response.status_code = 200 - mock_get_response.json.return_value = mock_server_responses["list_stages"] + mock_get_response.json.return_value = mock_server_responses["list_volumes"] mock_get.return_value = mock_get_response - # Create stage manager - stage_manager = StageManager( + # Create volume manager + volume_manager = VolumeManager( cloud_endpoint="https://api.cloud.zilliz.com", api_key="test_api_key", ) - # List stages - result = stage_manager.list_stages(project_id="test_project") - assert "test_stage" in result.json()["data"]["stages"] + # List volumes + result = volume_manager.list_volumes(project_id="test_project") + assert "test_volume" in result.json()["data"]["volumes"] - # Create stage file manager - file_manager = StageFileManager( + # Create volume file manager + file_manager = VolumeFileManager( cloud_endpoint="https://api.cloud.zilliz.com", api_key="test_api_key", - stage_name="test_stage", + volume_name="test_volume", connect_type=ConnectType.AUTO, ) - # Verify stage info can be refreshed - file_manager._refresh_stage_and_client("data/") - assert file_manager.stage_info["stageName"] == "test_stage" + # Verify volume info can be refreshed + file_manager._refresh_volume_and_client("data/") + assert file_manager.volume_info["volumeName"] == "test_volume"