| 知乎专栏 |
neo@MacBook-Pro-M2 ~/w/c/milvus (master) [1]> /opt/homebrew/bin/pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pymilvus
from pymilvus import connections, db
conn = connections.connect(host="127.0.0.1", port=19530)
database = db.create_database("book")
链接是打开数据库
conn = connections.connect(
host="127.0.0.1",
port="19530",
db_name="book"
)
链接后打开数据库
db.using_database("book")
链接后同样可以使用 using_database 切换数据库
db.using_database("book")
from pymilvus import Collection
collection = Collection("book") # Get an existing collection.
collection.create_partition("novel")
#! /usr/bin/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home : http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
##############################################
try:
import os, sys, time
import logging, logging.handlers
import pandas
from transformers import AutoTokenizer, AutoModel
from pymilvus import (
connections,
utility,
FieldSchema,
CollectionSchema,
DataType,
Collection,
)
from config import MILVUS_HOST, MILVUS_PORT, METRIC_TYPE, MILVUS_DISTANCE
except ImportError as err:
print("Error: %s" % (err))
class MilvusHelper:
cache_dir = "/opt/milvus/transformers"
collection_name = ""
# model = "hfl/chinese-macbert-base"
model = "bert-base-chinese"
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False, descrition="int64"),
FieldSchema(
name="question",
dtype=DataType.VARCHAR,
max_length=1024,
),
FieldSchema(
name="answer",
dtype=DataType.VARCHAR,
max_length=4096,
),
FieldSchema(name="question_vector", dtype=DataType.FLOAT_VECTOR, dim=768),
]
def __init__(self) -> None:
self.log = logging.getLogger(__class__.__name__)
self.log.info(f"Start connecting to Milvus {MILVUS_HOST}:{MILVUS_PORT}")
try:
connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT) # , db_name=self.collection_name
self.tokenizer = AutoTokenizer.from_pretrained(self.model, cache_dir=self.cache_dir) # , force_download=True
self.model = AutoModel.from_pretrained(self.model, cache_dir=self.cache_dir)
except PermissionError as e:
self.log.error(f"Milvus: {repr(e)}, {self.cache_dir}")
sys.exit(1)
except ConnectionError as e:
self.log.error(f"Milvus download model: {repr(e)}")
sys.exit(1)
except Exception as e:
self.log.error(f"Failed connect to Milvus: {repr(e)}")
sys.exit(1)
def set_collection(self, collection_name):
try:
self.collection_name = collection_name
if not utility.has_collection(self.collection_name):
self.log.info(f"Milvus doesn't have a collection named: {self.collection_name}")
return False
else:
self.collection = Collection(name=collection_name)
self.log.info(f"Milvus does have a collection named: {self.collection_name}")
return True
except Exception as e:
self.log.error(f"Failed to set collection in Milvus: {repr(e)}")
return False
def create(self, collection=None, description=None):
if collection:
self.collection_name = collection
if not description:
description = f"{self.collection_name} is the simplest demo to introduce the APIs"
try:
isExist = utility.has_collection(self.collection_name)
if isExist:
self.log.info(f"Does collection '{self.collection_name}' exist in Milvus: {isExist}")
return False
else:
# properties = {"collection.ttl.seconds": 1800}
schema = CollectionSchema(self.fields, description=description)
self.collection = Collection(name=self.collection_name, schema=schema, consistency_level="Strong") # properties=properties
self.log.info(f"Create Milvus collection: {self.collection_name}")
return True
except Exception as e:
self.log.error(f"Failed create collection in Milvus: {repr(e)}")
return False
def index(self, collection=None):
if collection:
self.set_collection(collection)
index_params = {
"index_type": "IVF_FLAT",
"metric_type": METRIC_TYPE, # "IP/L2",
"params": {"nlist": 2048},
}
try:
if self.collection.has_index(index_name="idx"):
self.log.info(f"Does index exist in Milvus")
else:
status = self.collection.create_index(field_name="question_vector", index_params=index_params, index_name="idx")
if not status.code:
self.log.info(f"Successfully create index in collection: {self.collection_name} with param:{index_params}")
return status
else:
raise Exception(status.message)
except Exception as e:
self.log.error(f"Failed to create index: {repr(e)}")
return False
def drop(self, collection=None):
if collection:
self.set_collection(collection)
# utility.drop_collection(self.collection_name)
try:
self.collection.drop()
self.log.info(f"Successfully drop collection: {collection}")
return True
except Exception as e:
self.log.error(f"Failed to drop collection: {repr(e)}")
return False
def insert(self, id, question, answer):
try:
data = pandas.DataFrame({"id": id, "question": question, "answer": answer, "question_vector": self.bert(question)})
res = self.collection.insert(data)
ids = res.primary_keys
self.log.info(f"Insert vectors to Milvus in collection: {self.collection_name} with {res.insert_count} rows")
# self.log.info(f"Number of entities in Milvus: {self.collection.num_entities}") # check the num_entites
return ids
except Exception as e:
self.log.error(f"Failed to insert data to Milvus: {repr(e)}")
return 0
def bert(self, sentences):
inputs = self.tokenizer(sentences, return_tensors="pt")
outputs = self.model(**inputs)
array = outputs.pooler_output.tolist()
return array
def search(self, text, limit=10):
# Search vector in milvus collection
try:
param = {
"metric_type": "L2",
"params": {"nprobe": 16},
}
start_time = time.time()
data = self.bert(text)
# self.log.info(f"Start searching based on vector similarity")
self.collection.load()
result = self.collection.search(data=data, anns_field="question_vector", param=param, limit=limit, output_fields=["id", "question", "answer"])
end_time = time.time()
res: list = list()
for hits in iter(result):
# print(hits.ids)
# print(hits.distances)
for hit in hits:
# print(hit)
# print(hit.entity)
json = {"ids": hit.id, "distance": hit.distance, "score": hit.score, "entity": {"id": hit.entity.id, "question": hit.entity.question, "answer": hit.entity.answer}}
res.append(json)
latency = "latency = {:.4f}s".format(end_time - start_time)
self.log.info(f"Successfully search in collection: {res}, {latency}")
return res
except Exception as e:
self.log.error(f"Failed to search vectors in Milvus: {repr(e)}")
return None
def delete(self, id):
try:
if type(id) == int:
expr = f"id in [{id}]"
else:
expr = f"id in [{','.join(id)}]"
res = self.collection.delete(expr)
logging.debug(f"Successfully delete vectors with expr `{expr}`")
return res
except Exception as e:
self.log.error(f"Failed to delete vectors: {repr(e)}")
return 0
def count(self):
try:
num_entities = self.collection.num_entities
self.log.info(f"Successfully get the num:{num_entities} of the collection:{self.collection_name}")
return num_entities
except Exception as e:
self.log.error(f"Failed to count vectors in Milvus: {repr(e)}")
return 0
class Milvus(MilvusHelper):
def __init__(self) -> None:
super().__init__()
self.log = logging.getLogger(__class__.__name__)
def setLogger(self, log):
self.log = log
def getBert(self, text: str):
try:
return {"status": True, "msg": "请求成功", "data": {text: self.bert(text)}}
except Exception as e:
self.log.error(e)
return {"status": False, "msg": repr(e)}
def createDatabase(self, collection: str, description: str):
try:
status = self.create(collection, description)
self.index(collection)
return {"status": status, "msg": "创建向量数据库"}
except Exception as e:
self.log.error(e)
return {"status": False, "msg": repr(e)}
def dropDatabase(self, collection: str):
try:
status = self.drop(collection)
return {"status": status, "msg": "删除向量数据"}
except Exception as e:
self.log.error(e)
return {"status": False, "drop": repr(e)}
def insertVector(self, collection: str, id: int, question: str, answer: str):
try:
if self.set_collection(collection):
ids = self.insert(id, question, answer)
num = self.count()
status = {"status": True, "msg": f"Successfully insert collection: {collection}, data: {ids}, total count: {num}"}
else:
status = {"status": False, "msg": f"Failed insert collection: {collection}, data: {ids}, total count: {num}"}
self.log.info(status)
return status
except Exception as e:
self.log.error(e)
return {"status": False, "msg": repr(e)}
def searchVector(self, collection: str, question: str):
try:
if self.set_collection(collection):
res = self.search(question)
staus = {"status": True, "msg": "Successfully searched similar question!", "question": question, "data": res}
else:
staus = {"status": False, "msg": "Failed searched similar question!", "question": question, "data": res}
self.log.info(staus)
return staus
except Exception as e:
self.log.error(e)
return {"status": False, "msg": repr(e)}
def deleteVector(self, collection: str, id: int):
try:
if self.set_collection(collection):
num = self.delete(id)
status = {"status": True, "msg": f"删除向量数据 {num}"}
else:
status = {"status": False, "msg": f"删除向量数据失败: {collection} 不存在"}
return status
except Exception as e:
self.log.error(e)
return {"status": False, "delete": e}
def countVector(self, collection: str):
try:
if self.set_collection(collection):
num = self.count()
status = {"status": True, "msg": f"向量数据库: {collection},条数:{num}"}
else:
status = {"status": False, "msg": f"删除向量数据失败: {collection} 不存在"}
return status
except Exception as e:
self.log.error(e)
return {"status": False, "delete": e}
数据类型 DataType.INT8 不支持 Python 的 int 类型,改为 DataType.INT64 可以解决
question_fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False, descrition="int64"),
FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=1024, description="问题"),
FieldSchema(name="answer_id", dtype=DataType.INT64, description="答案ID"),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768),
]