Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import numpy as np

from examples.bulk_import.data_gengerator import *
from pymilvus.bulk_writer.stage_bulk_writer import StageBulkWriter
from pymilvus.bulk_writer.volume_bulk_writer import VolumeBulkWriter
from pymilvus.orm import utility

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -46,7 +46,7 @@
API_KEY = "_api_key_for_cluster_org_"

# This is currently a private preview feature. If you need to use it, please submit a request and contact us.
STAGE_NAME = "_stage_name_for_project_"
VOLUME_NAME = "_volume_name_for_project_"

CLUSTER_ID = "_your_cloud_cluster_id_"
DB_NAME = "" # If db_name is not specified, use ""
Expand Down Expand Up @@ -93,12 +93,12 @@ def build_all_type_schema():
return schema


def example_collection_remote_stage(file_type: BulkFileType):
def example_collection_remote_volume(file_type: BulkFileType):
schema = build_all_type_schema()
print(f"\n===================== all field types ({file_type.name}) ====================")
create_collection(schema, False)
stage_upload_result = stage_remote_writer(file_type, schema)
call_stage_import(stage_upload_result['stage_name'], stage_upload_result['path'])
volume_upload_result = volume_remote_writer(file_type, schema)
call_volume_import(volume_upload_result['volume_name'], volume_upload_result['path'])
retrieve_imported_data()


Expand All @@ -111,16 +111,16 @@ def create_collection(schema: CollectionSchema, drop_if_exist: bool):
print(f"Collection '{collection.name}' created")


def stage_remote_writer(file_type, schema):
with StageBulkWriter(
def volume_remote_writer(file_type, schema):
with VolumeBulkWriter(
schema=schema,
remote_path="bulk_data",
file_type=file_type,
chunk_size=512 * 1024 * 1024,
cloud_endpoint=CLOUD_ENDPOINT,
api_key=API_KEY,
stage_name=STAGE_NAME,
) as stage_bulk_writer:
volume_name=VOLUME_NAME,
) as volume_bulk_writer:
print("Append rows")
batch_count = 10000
for i in range(batch_count):
Expand All @@ -146,12 +146,12 @@ def stage_remote_writer(file_type, schema):
"array_int": [k for k in range(10)],
"sparse_vector": gen_sparse_vector(False),
}
stage_bulk_writer.append_row(row)
volume_bulk_writer.append_row(row)

# append rows by numpy type
for i in range(batch_count):
id = i + batch_count
stage_bulk_writer.append_row({
volume_bulk_writer.append_row({
"id": np.int64(id),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(id % 128),
Expand All @@ -174,12 +174,12 @@ def stage_remote_writer(file_type, schema):
"sparse_vector": gen_sparse_vector(True),
})

print(f"{stage_bulk_writer.total_row_count} rows appends")
print(f"{stage_bulk_writer.buffer_row_count} rows in buffer not flushed")
print(f"{volume_bulk_writer.total_row_count} rows appends")
print(f"{volume_bulk_writer.buffer_row_count} rows in buffer not flushed")
print("Generate data files...")
stage_bulk_writer.commit()
print(f"Data files have been uploaded: {stage_bulk_writer.batch_files}")
return stage_bulk_writer.get_stage_upload_result()
volume_bulk_writer.commit()
print(f"Data files have been uploaded: {volume_bulk_writer.batch_files}")
return volume_bulk_writer.get_volume_upload_result()


def retrieve_imported_data():
Expand Down Expand Up @@ -217,15 +217,15 @@ def retrieve_imported_data():
print(item)


def call_stage_import(stage_name: str, path: str):
def call_volume_import(volume_name: str, path: str):
print(f"\n===================== import files to cluster ====================")
resp = bulk_import(
url=CLOUD_ENDPOINT,
api_key=API_KEY,
cluster_id=CLUSTER_ID,
db_name=DB_NAME,
collection_name=COLLECTION_NAME,
stage_name=stage_name,
volume_name=volume_name,
data_paths=[[path]]
)
print(resp.json())
Expand Down Expand Up @@ -270,4 +270,4 @@ def call_stage_import(stage_name: str, path: str):

if __name__ == '__main__':
create_connection()
example_collection_remote_stage(file_type=BulkFileType.PARQUET)
example_collection_remote_volume(file_type=BulkFileType.PARQUET)
12 changes: 0 additions & 12 deletions examples/bulk_import/example_stage_file_manager.py

This file was deleted.

20 changes: 0 additions & 20 deletions examples/bulk_import/example_stage_manager.py

This file was deleted.

12 changes: 12 additions & 0 deletions examples/bulk_import/example_volume_file_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pymilvus.bulk_writer.constants import ConnectType
from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager

if __name__ == "__main__":
volume_file_manager = VolumeFileManager(
cloud_endpoint='https://api.cloud.zilliz.com',
api_key='_api_key_for_cluster_org_',
volume_name='_volume_name_for_project_',
connect_type=ConnectType.AUTO,
)
result = volume_file_manager.upload_file_to_volume("/Users/zilliz/data/", "data/")
print(f"\nuploadFileToVolume results: {result}")
20 changes: 20 additions & 0 deletions examples/bulk_import/example_volume_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pymilvus.bulk_writer.volume_manager import VolumeManager

PROJECT_ID = "_id_for_project_"
REGION_ID = "_id_for_region_"
VOLUME_NAME = "_volume_name_for_project_"

if __name__ == "__main__":
volume_manager = VolumeManager(
cloud_endpoint="https://api.cloud.zilliz.com",
api_key="_api_key_for_cluster_org_",
)

volume_manager.create_volume(PROJECT_ID, REGION_ID, VOLUME_NAME)
print(f"\nVolume {VOLUME_NAME} created")

volume_list = volume_manager.list_volumes(PROJECT_ID, 1, 10)
print(f"\nlistVolumes results: ", volume_list.json()['data'])

volume_manager.delete_volume(VOLUME_NAME)
print(f"\nVolume {VOLUME_NAME} deleted")
10 changes: 5 additions & 5 deletions pymilvus/bulk_writer/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def bulk_import(
access_key: str = "",
secret_key: str = "",
token: str = "",
stage_name: str = "",
volume_name: str = "",
data_paths: [List[List[str]]] = None,
verify: Optional[Union[bool, str]] = True,
cert: Optional[Union[str, tuple]] = None,
Expand All @@ -139,7 +139,7 @@ def bulk_import(
secret_key (str): secret key to access the object storage(cloud)
token (str): access token to access the object storage(cloud)

stage_name (str): name of the stage to import(cloud)
volume_name (str): name of the volume to import(cloud)
data_paths (list of list of str): The paths of files that contain the data to import(cloud)
verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate
or a string, which must be server's certificate path. Defaults to `True`.
Expand Down Expand Up @@ -181,15 +181,15 @@ def bulk_import(
... token="your-token" # for short-term credentials, also include `token`
... )

>>> # 3. Import multiple files or folders from a Zilliz Stage into a Zilliz Cloud instance
>>> # 3. Import multiple files or folders from a Zilliz Volume into a Zilliz Cloud instance
>>> bulk_import(
... url="https://api.cloud.zilliz.com", # If regions in China, it is: https://api.cloud.zilliz.com.cn
... api_key="YOUR_API_KEY",
... cluster_id="in0x-xxx",
... db_name="", # Only For Dedicated deployments: this parameter can be specified.
... collection_name="my_collection",
... partition_name="", # If Collection not enable partitionKey, can be specified.
... stage_name="my_stage",
... volume_name="my_volume",
... data_paths=[
... ["parquet-folder/1.parquet"],
... ["parquet-folder-2/"]
Expand All @@ -210,7 +210,7 @@ def bulk_import(
"accessKey": access_key,
"secretKey": secret_key,
"token": token,
"stageName": stage_name,
"volumeName": volume_name,
"dataPaths": data_paths,
}

Expand Down
39 changes: 0 additions & 39 deletions pymilvus/bulk_writer/stage_manager.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional

from pymilvus.bulk_writer.stage_file_manager import StageFileManager
from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager
from pymilvus.orm.schema import CollectionSchema

from .constants import MB, BulkFileType, ConnectType
Expand All @@ -12,16 +12,16 @@
logger = logging.getLogger(__name__)


class StageBulkWriter(LocalBulkWriter):
"""StageBulkWriter handles writing local bulk files to a remote stage."""
class VolumeBulkWriter(LocalBulkWriter):
"""VolumeBulkWriter handles writing local bulk files to a remote volume."""

def __init__(
self,
schema: CollectionSchema,
remote_path: str,
cloud_endpoint: str,
api_key: str,
stage_name: str,
volume_name: str,
chunk_size: int = 1024 * MB,
file_type: BulkFileType = BulkFileType.PARQUET,
config: Optional[dict] = None,
Expand All @@ -33,11 +33,11 @@ def __init__(
remote_dir_path = Path(remote_path) / super().uuid
self._remote_path = str(remote_dir_path) + "/"
self._remote_files: List[List[str]] = []
self._stage_name = stage_name
self._stage_file_manager = StageFileManager(
self._volume_name = volume_name
self._volume_file_manager = VolumeFileManager(
cloud_endpoint=cloud_endpoint,
api_key=api_key,
stage_name=stage_name,
volume_name=volume_name,
connect_type=ConnectType.AUTO,
)

Expand All @@ -50,7 +50,7 @@ def append_row(self, row: Dict[str, Any], **kwargs):
super().append_row(row, **kwargs)

def commit(self, **kwargs):
"""Commit local bulk files and upload to remote stage."""
"""Commit local bulk files and upload to remote volume."""
super().commit(call_back=self._upload)

@property
Expand All @@ -61,8 +61,8 @@ def data_path(self) -> str:
def batch_files(self) -> List[List[str]]:
return self._remote_files

def get_stage_upload_result(self) -> Dict[str, str]:
return {"stage_name": self._stage_name, "path": str(self._remote_path)}
def get_volume_upload_result(self) -> Dict[str, str]:
return {"volume_name": self._volume_name, "path": str(self._remote_path)}

def __exit__(self, exc_type: object, exc_val: object, exc_tb: object):
super().__exit__(exc_type, exc_val, exc_tb)
Expand All @@ -84,7 +84,7 @@ def _local_rm(self, file_path: str):
logger.warning(f"Failed to delete local file: {file_path}")

def _upload(self, file_list: List[str]) -> List[str]:
"""Upload files to remote stage and remove local copies."""
"""Upload files to remote volume and remove local copies."""
uploaded_files: List[str] = []

for file_path in file_list:
Expand All @@ -105,5 +105,5 @@ def _upload(self, file_list: List[str]) -> List[str]:

def _upload_object(self, file_path: str, object_name: str):
logger.info(f"Prepare to upload '{file_path}' to '{object_name}'")
self._stage_file_manager.upload_file_to_stage(file_path, self._remote_path)
self._volume_file_manager.upload_file_to_volume(file_path, self._remote_path)
logger.info(f"Uploaded file '{file_path}' to '{object_name}'")
Loading