从Milvus迁移DashVector

DashVector 2024-09-05 16:43:00 阅读 66

从Milvus迁移DashVector

本文档演示如何从Milvus将Collection数据全量导出,并适配迁移至DashVector。

本文档演示如何从Milvus将Collection数据全量导出,并适配迁移至DashVector。方案的主要流程包括:

    <li>首先,升级Milvus版本,目前Milvus只有在最新版本(v.2.3.x)中支持全量导出
  1. 其次,将Milvus Collection的Schema信息和数据信息导出到具体的文件中
  2. 最后,以导出的文件作为输入来构建DashVector Collection并数据导入

下面,将详细阐述迁移方案的具体操作细节。

Milvus升级2.3.x版本

本文中,我们将借助Milvus的[query_iterator](https://milvus.io/docs/with_iterators.md)来全量导出数据([query](https://milvus.io/docs/query.md)接口无法导出完整数据),由于该接口目前只在v2.3.x版本中支持,所以在导出数据前,需要先将Milvus版本升级到该版本。Milvus版本升级的详细操作参考[Milvus用户文档](https://milvus.io/docs/upgrade_milvus_standalone-operator.md)。

:::info

注意:在进行Milvus Upgrade时需要注意数据的备份安全问题。

:::

Milvus全量数据导出

数据的导出包含Schema以及数据记录,Schema主要用于完备地定义Collection,数据记录对应于每个Partition下的全量数据,这两部分涵盖了需要导出的全部数据。下文展示如何将单个Milvus Collection全量导出。

Schema导出

DashVector和Milvus在Schema的设计上有一些区别,DashVector向用户透出的接口非常简单,Milvus则更加详尽。从Milvus迁移DashVector时会涉及到部分Schema参数的删除(例如Collection的index_param参数),只会保留DashVector构建Collection的必要参数,以下为一个Schema转换的简单示例(其中,Collection已有的数据参考[Milvus示例代码](https://raw.githubusercontent.com/zilliztech/milvus-backup/main/example/prepare_data.py)写入)。

<code>from pymilvus import (

connections,

utility,

Collection,

DataType

)

import os

import json

from pathlib import Path

fmt = "\n=== {:30} ===\n"

print(fmt.format("start connecting to Milvus"))

host = os.environ.get('MILVUS_HOST', "localhost")

print(fmt.format(f"Milvus host: {host}"))

connections.connect("default", host=host, port="19530")code>

metrics_map = {

'COSINE': 'cosine',

'L2': 'euclidean',

'IP': 'dotproduct',

}

dtype_map = {

DataType.BOOL: 'bool',

DataType.INT8: 'int',

DataType.INT16: 'int',

DataType.INT32: 'int',

DataType.INT64: 'int',

DataType.FLOAT: 'float',

DataType.DOUBLE: 'float',

DataType.STRING: 'str',

DataType.VARCHAR: 'str',

}

def load_collection(collection_name: str) -> Collection:

has = utility.has_collection(collection_name)

print(f"Does collection hello_milvus exist in Milvus: {has}")

if not has:

return None

collection = Collection(collection_name)

collection.load()

return collection

def export_collection_schema(collection, file: str):

schema = collection.schema.to_dict()

index = collection.indexes[0].to_dict()

export_schema = dict()

milvus_metric_type = index['index_param']['metric_type']

try:

export_schema['metrics'] = metrics_map[milvus_metric_type]

except:

raise Exception(f"milvus metrics_type{milvus_metric_type} not supported")

export_schema['fields_schema'] = {}

for field in schema['fields']:

if 'is_primary' in field and field['is_primary']:

continue

if field['name'] == index['field']:

# vector

if field['type'] == DataType.FLOAT_VECTOR:

export_schema['dtype'] = 'float'

export_schema['dimension'] = field['params']['dim']

else:

raise Exception(f"milvus dtype{field['type']} not supported yet")

else:

try:

# non-vector

export_schema['fields_schema'][field['name']] = dtype_map[field['type']]

except:

raise Exception(f"milvus dtype{field['type']} not supported yet")

with open(file, 'w') as file:

json.dump(export_schema, file, indent=4)

if __name__ == "__main__":

collection_name = "YOUR_MILVUS_COLLECTION_NAME"

collection = load_collection(collection_name)

dump_path_str = collection_name+'.dump'

dump_path = Path(dump_path_str)

dump_path.mkdir(parents=True, exist_ok=True)

schema_file = dump_path_str + "/schema.json"

export_collection_schema(collection, schema_file)

以下是一个可用于创建DashVector Collection的schema文件示例。

{

"metrics": "euclidean",

"fields_schema": {

"random": "float",

"var": "str"

},

"dtype": "float",

"dimension": 8

}

Data导出

DashVector和Milvus在设计上都有Partition的概念,所以向量以及其他数据进行导出时,需要注意按照Partition粒度进行导出。此外,DashVector的主键类型为str,而Milvus设计其为自定义类型,所以在导出时需要考虑主键类型的转换。以下为一个基于[query_iterator](https://milvus.io/docs/with_iterators.md)接口导出的简单代码示例:

from pymilvus import (

connections,

utility,

Collection,

DataType

)

import os

import json

import numpy as np

from pathlib import Path

fmt = "\n=== {:30} ===\n"

print(fmt.format("start connecting to Milvus"))

host = os.environ.get('MILVUS_HOST', "localhost")

print(fmt.format(f"Milvus host: {host}"))

connections.connect("default", host=host, port="19530")code>

pk = "pk"

vector_field_name = "vector"

def load_collection(collection_name: str) -> Collection:

has = utility.has_collection(collection_name)

print(f"Does collection hello_milvus exist in Milvus: {has}")

if not has:

return None

collection = Collection(collection_name)

collection.load()

return collection

def export_partition_data(collection, partition_name, file: str):

batch_size = 10

output_fields=["pk", "random", "var", "embeddings"]

query_iter = collection.query_iterator(

batch_size=batch_size,

output_fields = output_fields,

partition_names=[partition_name]

)

export_file = open(file, 'w')

while True:

docs = query_iter.next()

if len(docs) == 0:

# close the iterator

query_iter.close()

break

for doc in docs:

new_doc = {}

new_doc_fields = {}

for k, v in doc.items():

if k == pk:

# primary key

new_doc['pk'] = str(v)

elif k == vector_field_name:

new_doc['vector'] = [float(k) for k in v]

else:

new_doc_fields[k] = v

new_doc['fields'] = new_doc_fields

json.dump(new_doc, export_file)

export_file.write('\n')

export_file.close()

if __name__ == "__main__":

collection_name = "YOUR_MILVUS_COLLECTION_NAME"

collection = load_collection(collection_name)

pk = collection.schema.primary_field.name

vector_field_name = collection.indexes[0].field_name

dump_path_str = collection_name+'.dump'

dump_path = Path(dump_path_str)

dump_path.mkdir(parents=True, exist_ok=True)

for partition in collection.partitions:

partition_name = partition.name

if partition_name == '_default':

export_path = dump_path_str + '/default.txt'

else:

export_path = dump_path_str + '/' + partition_name + ".txt"

export_partition_data(collection, partition_name, export_path)

上述示例代码会将Milvus Collection的各个Partition分别进行数据导出,导出后的文件结构如下图所示:

# collection_name = hello_milvus

hello_milvus.dump/

├── default.txt

└── schema.json

将数据导入DashVector

创建Cluster

参考DashVector官方[用户手册](https://help.aliyun.com/document_detail/2631966.html?spm=a2c4g.2631965.0.0.33485425aqhYvz)构建Cluster。

创建Collection

根据2.1章节中导出的Schema信息以及参考Dashvector官方[用户手册](https://help.aliyun.com/document_detail/2568085.html?spm=a2c4g.2631966.0.0.153c1afcNYc6rW)来创建Collection。下面的示例代码会根据2.1章节中导出的schema.json来创建一个DashVector的Collection。

from dashvector import Client, DashVectorException

from pydantic import BaseModel

from typing import Dict, Type

import json

dtype_convert = {

'int': int,

'float': float,

'bool': bool,

'str': str

}

class Schema(BaseModel):

metrics: str

dtype: Type

dimension: int

fields_schema: Dict[str, Type]

@classmethod

def from_dict(cls, json_data):

metrics = json_data['metrics']

dtype = dtype_convert[json_data['dtype']]

dimension = json_data['dimension']

fields_schema = {k: dtype_convert[v] for k, v in json_data['fields_schema'].items()}

return cls(metrics=metrics, dtype=dtype, dimension=dimension, fields_schema=fields_schema)

def read_schema(schema_path) -> Schema:

with open(schema_path) as file:

json_data = json.loads(file.read())

return Schema.from_dict(json_data)

if __name__ == "__main__":

milvus_dump_path = f"{YOUR_MILVUS_COLLECTION_NAME}.dump"

milvus_dump_scheme_path = milvus_dump_path + "/schema.json"

schema = read_schema(milvus_dump_scheme_path)

client = dashvector.Client(

api_key='YOUR_API_KEY',code>

endpoint='YOUR_CLUSTER_ENDPOINT'code>

)

# create collection

rsp = client.create(name="YOUR_DASHVECTOR_COLLECTION_NAME", code>

dimension=schema.dimension,

metric=schema.metrics,

dtype=schema.dtype,

fields_schema=schema.fields_schema)

if not rsp:

raise DashVectorException(rsp.code, reason=rsp.message)

导入Data

根据2.2章节中导出的数据以及参考DashVector官方[用户手册](https://help.aliyun.com/document_detail/2510249.html?spm=a2c4g.2510248.0.0.49ef7738NuI0kM#aa59e950508ld)来批量插入Doc。下面的示例代码会依次解析各个Partition导出的数据,然后依次创建DashVector下的Partition并导入数据。

from dashvector import Client, DashVectorException, Doc

from pydantic import BaseModel

from typing import Dict, Type

import json

import glob

from pathlib import Path

def insert_data(collection, partition_name, partition_file):

if partition_name != 'default':

rsp = collection.create_partition(partition_name)

if not rsp:

raise DashVectorException(rsp.code, reason=rsp.message)

with open(partition_file) as f:

for line in f:

if line.strip():

json_data = json.loads(line)

rsp = collection.insert(

[

Doc(id=json_data['pk'], vector=json_data['vector'], fields=json_data['fields'])

]

)

if not rsp:

raise DashVectorException(rsp.code, reason=rsp.message)

if __name__ == "__main__":

milvus_dump_path = f"{YOUR_MILVUS_COLLECTION_NAME}.dump"

client = dashvector.Client(

api_key='YOUR_API_KEY',code>

endpoint='YOUR_CLUSTER_ENDPOINT'code>

)

# create collection

collection = client.get("YOUR_DASHVECTOR_COLLECTION_NAME")

partition_files = glob.glob(milvus_dump_path+'/*.txt', recursive=False)

for partition_file in partition_files:

# create partition

partition_name = Path(partition_file).stem

insert_data(collection, partition_name, partition_file)



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。