Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

84.5. Python SDK

		 
neo@MacBook-Pro-M2 ~/w/c/milvus (master) [1]> /opt/homebrew/bin/pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pymilvus		
		
	

84.5.1. 数据库管理

84.5.1.1. 创建数据库

			
from pymilvus import connections, db

conn = connections.connect(host="127.0.0.1", port=19530)

database = db.create_database("book")			
			
			

84.5.1.2. 打开数据库

链接是打开数据库

			
conn = connections.connect(
    host="127.0.0.1",
    port="19530",
    db_name="book"
)	
			
			

链接后打开数据库

			
db.using_database("book")			
			
			

链接后同样可以使用 using_database 切换数据库

			
db.using_database("book")
			
			

84.5.1.3. 列出数据库

			
db.list_database()

# Output
['default', 'book']		
			
			

84.5.1.4. 删除数据库

			
db.drop_database("book")

db.list_database()

# Output
['default']
			
			

84.5.2. 创建分区

			
from pymilvus import Collection
collection = Collection("book")      # Get an existing collection.
collection.create_partition("novel")			
			
		

84.5.3. Example

		
#! /usr/bin/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
##############################################
try:
    import os, sys, time
    import logging, logging.handlers
    import pandas
    from transformers import AutoTokenizer, AutoModel
    from pymilvus import (
        connections,
        utility,
        FieldSchema,
        CollectionSchema,
        DataType,
        Collection,
    )
    from config import MILVUS_HOST, MILVUS_PORT, METRIC_TYPE, MILVUS_DISTANCE
except ImportError as err:
    print("Error: %s" % (err))


class MilvusHelper:
    cache_dir = "/opt/milvus/transformers"
    collection_name = ""
    # model = "hfl/chinese-macbert-base"
    model = "bert-base-chinese"
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False, descrition="int64"),
        FieldSchema(
            name="question",
            dtype=DataType.VARCHAR,
            max_length=1024,
        ),
        FieldSchema(
            name="answer",
            dtype=DataType.VARCHAR,
            max_length=4096,
        ),
        FieldSchema(name="question_vector", dtype=DataType.FLOAT_VECTOR, dim=768),
    ]

    def __init__(self) -> None:
        self.log = logging.getLogger(__class__.__name__)

        self.log.info(f"Start connecting to Milvus {MILVUS_HOST}:{MILVUS_PORT}")
        try:
            connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)  # , db_name=self.collection_name

            self.tokenizer = AutoTokenizer.from_pretrained(self.model, cache_dir=self.cache_dir)  # , force_download=True
            self.model = AutoModel.from_pretrained(self.model, cache_dir=self.cache_dir)
        except PermissionError as e:
            self.log.error(f"Milvus: {repr(e)}, {self.cache_dir}")
            sys.exit(1)
        except ConnectionError as e:
            self.log.error(f"Milvus download model: {repr(e)}")
            sys.exit(1)
        except Exception as e:
            self.log.error(f"Failed connect to Milvus: {repr(e)}")
            sys.exit(1)

    def set_collection(self, collection_name):
        try:
            self.collection_name = collection_name
            if not utility.has_collection(self.collection_name):
                self.log.info(f"Milvus doesn't have a collection named: {self.collection_name}")
                return False
            else:
                self.collection = Collection(name=collection_name)
                self.log.info(f"Milvus does have a collection named: {self.collection_name}")
                return True
        except Exception as e:
            self.log.error(f"Failed to set collection in Milvus: {repr(e)}")
            return False

    def create(self, collection=None, description=None):
        if collection:
            self.collection_name = collection
        if not description:
            description = f"{self.collection_name} is the simplest demo to introduce the APIs"
        try:
            isExist = utility.has_collection(self.collection_name)
            if isExist:
                self.log.info(f"Does collection '{self.collection_name}' exist in Milvus: {isExist}")
                return False
            else:
                # properties = {"collection.ttl.seconds": 1800}
                schema = CollectionSchema(self.fields, description=description)
                self.collection = Collection(name=self.collection_name, schema=schema, consistency_level="Strong")  # properties=properties
                self.log.info(f"Create Milvus collection: {self.collection_name}")
            return True
        except Exception as e:
            self.log.error(f"Failed create collection in Milvus: {repr(e)}")
            return False

    def index(self, collection=None):
        if collection:
            self.set_collection(collection)
        index_params = {
            "index_type": "IVF_FLAT",
            "metric_type": METRIC_TYPE,  # "IP/L2",
            "params": {"nlist": 2048},
        }
        try:
            if self.collection.has_index(index_name="idx"):
                self.log.info(f"Does index exist in Milvus")
            else:
                status = self.collection.create_index(field_name="question_vector", index_params=index_params, index_name="idx")
                if not status.code:
                    self.log.info(f"Successfully create index in collection: {self.collection_name} with param:{index_params}")
                    return status
                else:
                    raise Exception(status.message)
        except Exception as e:
            self.log.error(f"Failed to create index: {repr(e)}")
            return False

    def drop(self, collection=None):
        if collection:
            self.set_collection(collection)
        # utility.drop_collection(self.collection_name)
        try:
            self.collection.drop()
            self.log.info(f"Successfully drop collection: {collection}")
            return True
        except Exception as e:
            self.log.error(f"Failed to drop collection: {repr(e)}")
            return False

    def insert(self, id, question, answer):
        try:
            data = pandas.DataFrame({"id": id, "question": question, "answer": answer, "question_vector": self.bert(question)})
            res = self.collection.insert(data)
            ids = res.primary_keys
            self.log.info(f"Insert vectors to Milvus in collection: {self.collection_name} with {res.insert_count} rows")
            # self.log.info(f"Number of entities in Milvus: {self.collection.num_entities}")  # check the num_entites
            return ids
        except Exception as e:
            self.log.error(f"Failed to insert data to Milvus: {repr(e)}")
            return 0

    def bert(self, sentences):
        inputs = self.tokenizer(sentences, return_tensors="pt")
        outputs = self.model(**inputs)
        array = outputs.pooler_output.tolist()
        return array

    def search(self, text, limit=10):
        # Search vector in milvus collection
        try:
            param = {
                "metric_type": "L2",
                "params": {"nprobe": 16},
            }

            start_time = time.time()
            data = self.bert(text)
            # self.log.info(f"Start searching based on vector similarity")
            self.collection.load()
            result = self.collection.search(data=data, anns_field="question_vector", param=param, limit=limit, output_fields=["id", "question", "answer"])
            end_time = time.time()
            res: list = list()
            for hits in iter(result):
                # print(hits.ids)
                # print(hits.distances)
                for hit in hits:
                    # print(hit)
                    # print(hit.entity)
                    json = {"ids": hit.id, "distance": hit.distance, "score": hit.score, "entity": {"id": hit.entity.id, "question": hit.entity.question, "answer": hit.entity.answer}}
                    res.append(json)
            latency = "latency = {:.4f}s".format(end_time - start_time)
            self.log.info(f"Successfully search in collection: {res}, {latency}")
            return res

        except Exception as e:
            self.log.error(f"Failed to search vectors in Milvus: {repr(e)}")
            return None

    def delete(self, id):
        try:
            if type(id) == int:
                expr = f"id in [{id}]"
            else:
                expr = f"id in [{','.join(id)}]"
            res = self.collection.delete(expr)
            logging.debug(f"Successfully delete vectors with expr `{expr}`")
            return res
        except Exception as e:
            self.log.error(f"Failed to delete vectors: {repr(e)}")
            return 0

    def count(self):
        try:
            num_entities = self.collection.num_entities
            self.log.info(f"Successfully get the num:{num_entities} of the collection:{self.collection_name}")
            return num_entities
        except Exception as e:
            self.log.error(f"Failed to count vectors in Milvus: {repr(e)}")
            return 0

class Milvus(MilvusHelper):
    def __init__(self) -> None:
        super().__init__()
        self.log = logging.getLogger(__class__.__name__)

    def setLogger(self, log):
        self.log = log

    def getBert(self, text: str):
        try:
            return {"status": True, "msg": "请求成功", "data": {text: self.bert(text)}}
        except Exception as e:
            self.log.error(e)
            return {"status": False, "msg": repr(e)}

    def createDatabase(self, collection: str, description: str):
        try:
            status = self.create(collection, description)
            self.index(collection)
            return {"status": status, "msg": "创建向量数据库"}
        except Exception as e:
            self.log.error(e)
            return {"status": False, "msg": repr(e)}

    def dropDatabase(self, collection: str):
        try:
            status = self.drop(collection)
            return {"status": status, "msg": "删除向量数据"}
        except Exception as e:
            self.log.error(e)
            return {"status": False, "drop": repr(e)}

    def insertVector(self, collection: str, id: int, question: str, answer: str):
        try:
            if self.set_collection(collection):
                ids = self.insert(id, question, answer)
                num = self.count()
                status = {"status": True, "msg": f"Successfully insert collection: {collection}, data: {ids}, total count: {num}"}
            else:
                status = {"status": False, "msg": f"Failed insert collection: {collection}, data: {ids}, total count: {num}"}
            self.log.info(status)
            return status
        except Exception as e:
            self.log.error(e)
            return {"status": False, "msg": repr(e)}

    def searchVector(self, collection: str, question: str):
        try:
            if self.set_collection(collection):
                res = self.search(question)
                staus = {"status": True, "msg": "Successfully searched similar question!", "question": question, "data": res}
            else:
                staus = {"status": False, "msg": "Failed searched similar question!", "question": question, "data": res}
            self.log.info(staus)
            return staus
        except Exception as e:
            self.log.error(e)
            return {"status": False, "msg": repr(e)}

    def deleteVector(self, collection: str, id: int):
        try:
            if self.set_collection(collection):
                num = self.delete(id)
                status = {"status": True, "msg": f"删除向量数据 {num}"}
            else:
                status = {"status": False, "msg": f"删除向量数据失败: {collection} 不存在"}
            return status
        except Exception as e:
            self.log.error(e)
            return {"status": False, "delete": e}

    def countVector(self, collection: str):
        try:
            if self.set_collection(collection):
                num = self.count()
                status = {"status": True, "msg": f"向量数据库: {collection},条数:{num}"}
            else:
                status = {"status": False, "msg": f"删除向量数据失败: {collection} 不存在"}
            return status
        except Exception as e:
            self.log.error(e)
            return {"status": False, "delete": e}
		
		
		

84.5.4. FAQ

84.5.4.1. DataNotMatchException()

数据类型 DataType.INT8 不支持 Python 的 int 类型,改为 DataType.INT64 可以解决

			
        question_fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False, descrition="int64"),
            FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=1024, description="问题"),
            FieldSchema(name="answer_id", dtype=DataType.INT64, description="答案ID"),
            FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768),
        ]