知乎专栏 |
neo@MacBook-Pro-M2 ~/w/c/milvus (master) [1]> /opt/homebrew/bin/pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pymilvus
from pymilvus import connections, db conn = connections.connect(host="127.0.0.1", port=19530) database = db.create_database("book")
链接是打开数据库
conn = connections.connect( host="127.0.0.1", port="19530", db_name="book" )
链接后打开数据库
db.using_database("book")
链接后同样可以使用 using_database 切换数据库
db.using_database("book")
from pymilvus import Collection collection = Collection("book") # Get an existing collection. collection.create_partition("novel")
#! /usr/bin/env python3 # -*- coding: UTF-8 -*- ############################################## # Home : http://netkiller.github.io # Author: Neo <netkiller@msn.com> ############################################## try: import os, sys, time import logging, logging.handlers import pandas from transformers import AutoTokenizer, AutoModel from pymilvus import ( connections, utility, FieldSchema, CollectionSchema, DataType, Collection, ) from config import MILVUS_HOST, MILVUS_PORT, METRIC_TYPE, MILVUS_DISTANCE except ImportError as err: print("Error: %s" % (err)) class MilvusHelper: cache_dir = "/opt/milvus/transformers" collection_name = "" # model = "hfl/chinese-macbert-base" model = "bert-base-chinese" fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False, descrition="int64"), FieldSchema( name="question", dtype=DataType.VARCHAR, max_length=1024, ), FieldSchema( name="answer", dtype=DataType.VARCHAR, max_length=4096, ), FieldSchema(name="question_vector", dtype=DataType.FLOAT_VECTOR, dim=768), ] def __init__(self) -> None: self.log = logging.getLogger(__class__.__name__) self.log.info(f"Start connecting to Milvus {MILVUS_HOST}:{MILVUS_PORT}") try: connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT) # , db_name=self.collection_name self.tokenizer = AutoTokenizer.from_pretrained(self.model, cache_dir=self.cache_dir) # , force_download=True self.model = AutoModel.from_pretrained(self.model, cache_dir=self.cache_dir) except PermissionError as e: self.log.error(f"Milvus: {repr(e)}, {self.cache_dir}") sys.exit(1) except ConnectionError as e: self.log.error(f"Milvus download model: {repr(e)}") sys.exit(1) except Exception as e: self.log.error(f"Failed connect to Milvus: {repr(e)}") sys.exit(1) def set_collection(self, collection_name): try: self.collection_name = collection_name if not utility.has_collection(self.collection_name): self.log.info(f"Milvus doesn't have a collection named: {self.collection_name}") return False else: self.collection = Collection(name=collection_name) self.log.info(f"Milvus does have a collection named: {self.collection_name}") return True except Exception as e: self.log.error(f"Failed to set collection in Milvus: {repr(e)}") return False def create(self, collection=None, description=None): if collection: self.collection_name = collection if not description: description = f"{self.collection_name} is the simplest demo to introduce the APIs" try: isExist = utility.has_collection(self.collection_name) if isExist: self.log.info(f"Does collection '{self.collection_name}' exist in Milvus: {isExist}") return False else: # properties = {"collection.ttl.seconds": 1800} schema = CollectionSchema(self.fields, description=description) self.collection = Collection(name=self.collection_name, schema=schema, consistency_level="Strong") # properties=properties self.log.info(f"Create Milvus collection: {self.collection_name}") return True except Exception as e: self.log.error(f"Failed create collection in Milvus: {repr(e)}") return False def index(self, collection=None): if collection: self.set_collection(collection) index_params = { "index_type": "IVF_FLAT", "metric_type": METRIC_TYPE, # "IP/L2", "params": {"nlist": 2048}, } try: if self.collection.has_index(index_name="idx"): self.log.info(f"Does index exist in Milvus") else: status = self.collection.create_index(field_name="question_vector", index_params=index_params, index_name="idx") if not status.code: self.log.info(f"Successfully create index in collection: {self.collection_name} with param:{index_params}") return status else: raise Exception(status.message) except Exception as e: self.log.error(f"Failed to create index: {repr(e)}") return False def drop(self, collection=None): if collection: self.set_collection(collection) # utility.drop_collection(self.collection_name) try: self.collection.drop() self.log.info(f"Successfully drop collection: {collection}") return True except Exception as e: self.log.error(f"Failed to drop collection: {repr(e)}") return False def insert(self, id, question, answer): try: data = pandas.DataFrame({"id": id, "question": question, "answer": answer, "question_vector": self.bert(question)}) res = self.collection.insert(data) ids = res.primary_keys self.log.info(f"Insert vectors to Milvus in collection: {self.collection_name} with {res.insert_count} rows") # self.log.info(f"Number of entities in Milvus: {self.collection.num_entities}") # check the num_entites return ids except Exception as e: self.log.error(f"Failed to insert data to Milvus: {repr(e)}") return 0 def bert(self, sentences): inputs = self.tokenizer(sentences, return_tensors="pt") outputs = self.model(**inputs) array = outputs.pooler_output.tolist() return array def search(self, text, limit=10): # Search vector in milvus collection try: param = { "metric_type": "L2", "params": {"nprobe": 16}, } start_time = time.time() data = self.bert(text) # self.log.info(f"Start searching based on vector similarity") self.collection.load() result = self.collection.search(data=data, anns_field="question_vector", param=param, limit=limit, output_fields=["id", "question", "answer"]) end_time = time.time() res: list = list() for hits in iter(result): # print(hits.ids) # print(hits.distances) for hit in hits: # print(hit) # print(hit.entity) json = {"ids": hit.id, "distance": hit.distance, "score": hit.score, "entity": {"id": hit.entity.id, "question": hit.entity.question, "answer": hit.entity.answer}} res.append(json) latency = "latency = {:.4f}s".format(end_time - start_time) self.log.info(f"Successfully search in collection: {res}, {latency}") return res except Exception as e: self.log.error(f"Failed to search vectors in Milvus: {repr(e)}") return None def delete(self, id): try: if type(id) == int: expr = f"id in [{id}]" else: expr = f"id in [{','.join(id)}]" res = self.collection.delete(expr) logging.debug(f"Successfully delete vectors with expr `{expr}`") return res except Exception as e: self.log.error(f"Failed to delete vectors: {repr(e)}") return 0 def count(self): try: num_entities = self.collection.num_entities self.log.info(f"Successfully get the num:{num_entities} of the collection:{self.collection_name}") return num_entities except Exception as e: self.log.error(f"Failed to count vectors in Milvus: {repr(e)}") return 0 class Milvus(MilvusHelper): def __init__(self) -> None: super().__init__() self.log = logging.getLogger(__class__.__name__) def setLogger(self, log): self.log = log def getBert(self, text: str): try: return {"status": True, "msg": "请求成功", "data": {text: self.bert(text)}} except Exception as e: self.log.error(e) return {"status": False, "msg": repr(e)} def createDatabase(self, collection: str, description: str): try: status = self.create(collection, description) self.index(collection) return {"status": status, "msg": "创建向量数据库"} except Exception as e: self.log.error(e) return {"status": False, "msg": repr(e)} def dropDatabase(self, collection: str): try: status = self.drop(collection) return {"status": status, "msg": "删除向量数据"} except Exception as e: self.log.error(e) return {"status": False, "drop": repr(e)} def insertVector(self, collection: str, id: int, question: str, answer: str): try: if self.set_collection(collection): ids = self.insert(id, question, answer) num = self.count() status = {"status": True, "msg": f"Successfully insert collection: {collection}, data: {ids}, total count: {num}"} else: status = {"status": False, "msg": f"Failed insert collection: {collection}, data: {ids}, total count: {num}"} self.log.info(status) return status except Exception as e: self.log.error(e) return {"status": False, "msg": repr(e)} def searchVector(self, collection: str, question: str): try: if self.set_collection(collection): res = self.search(question) staus = {"status": True, "msg": "Successfully searched similar question!", "question": question, "data": res} else: staus = {"status": False, "msg": "Failed searched similar question!", "question": question, "data": res} self.log.info(staus) return staus except Exception as e: self.log.error(e) return {"status": False, "msg": repr(e)} def deleteVector(self, collection: str, id: int): try: if self.set_collection(collection): num = self.delete(id) status = {"status": True, "msg": f"删除向量数据 {num}"} else: status = {"status": False, "msg": f"删除向量数据失败: {collection} 不存在"} return status except Exception as e: self.log.error(e) return {"status": False, "delete": e} def countVector(self, collection: str): try: if self.set_collection(collection): num = self.count() status = {"status": True, "msg": f"向量数据库: {collection},条数:{num}"} else: status = {"status": False, "msg": f"删除向量数据失败: {collection} 不存在"} return status except Exception as e: self.log.error(e) return {"status": False, "delete": e}
数据类型 DataType.INT8 不支持 Python 的 int 类型,改为 DataType.INT64 可以解决
question_fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False, descrition="int64"), FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=1024, description="问题"), FieldSchema(name="answer_id", dtype=DataType.INT64, description="答案ID"), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768), ]