一、简介

文档地址

1. Milvus介绍

Milvus 于 2019 年开源,主要用于存储、索引和管理通过深度神经网络和机器学习模型产生的海量向量数据。

Milvus 向量数据库专为向量查询与检索设计,能够为万亿级向量数据建立索引。与传统关系型数据库不同,Milvus 主要用于自下而上地处理非结构化数据向量。非结构化数据没有统一的预定义模型,因此可以转化为向量。

随着互联网不断发展,电子邮件、论文、物联网传感数据、社交媒体照片、蛋白质分子结构等非结构化数据已经变得越来越普遍。如果想要使用计算机来处理这些数据,需要使用 embedding 技术将这些数据转化为向量。随后,Milvus 会存储这些向量,并为其建立索引。Milvus 能够根据两个向量之间的距离来分析他们的相关性。如果两个向量十分相似,这说明向量所代表的源数据也十分相似。

2. 特征向量是什么?

向量又称为 embedding vector,是指由embedding技术从离散变量(如xxx等各种非结构化数据)转变而来的连续向量。在数学表示上,向量是一个由浮点数或者二值型数据组成的 n 维数组。通过现代的向量转化技术,比如各种人工智能(AI)或者机器学习(ML)模型,可以将非结构化数据抽象为 n 维特征向量空间的向量。这样就可以采用最近邻算法(ANN)计算非结构化数据之间的相似度。

3. 术语表

  • Collection

    包含一组 entity,可以等价于关系型数据库系统(RDBMS)中的表。

  • Entity

    包含一组 field。field 与实际对象相对应。field 可以是代表对象属性的结构化数据,也可以是代表对象特征的向量。primary key 是用于指代一个 entity 的唯一值。

    你可以自定义 primary key,否则 Milvus 将会自动生成 primary key。请注意,目前 Milvus 不支持 primary key 去重,因此有可能在一个 collection 内出现 primary key 相同的 entity。

  • Field

    Entity 的组成部分。Field 可以是结构化数据,例如数字和字符串,也可以是向量。

    Milvus 2.0 现已支持标量字段过滤。

  • Segment

Milvus 在数据插入时通过合并数据自动创建的数据文件。一个 collection 可以包含多个 segment。一个 segment 可以包含多个 entity。在搜索中,Milvus 会搜索每个 segment,并返回合并后的结果。

  • Sharding

    Shard 是指将数据写入操作分散到不同节点上,使 Milvus 能充分利用集群的并行计算能力进行写入。默认情况下单个 collection 包含 2 个分片(shard)。目前 Milvus 采用基于主键哈希的分片方式,未来将支持随机分片、自定义分片等更加灵活的分片方式。

    Partition 的意义在于通过划定分区减少数据读取,而shard 的意义在于多台机器上并行写入操作。

  • Partition

    把 collection 中的数据根据一定规则在物理存储上分成多个部分。这种对 collection 数据的划分就叫分区(partitioning)。每个 partition 可包含多个segment。

  • 归一化

    归一化指的是通过数学变换将向量的模长变为 1 的过程。如需使用点积计算向量相似度,则必须对向量作归一化处理。处理后点积与余弦相似度等价。

  • 索引

    索引基于原始数据构建,可以提高对 collection 数据搜索的速度。Milvus 支持多种索引类型

  • 向量

    一种类型的 field,代表对象的特征。非结构化数据可以通过各种 AI 模型和 embedding 技术转化为向量。

    目前,一个实体最多只能包含一个向量。

4. 为什么选择使用 Milvus?

  • 高性能:性能高超,可对海量数据集进行向量相似度检索。
  • 高可用、高可靠:Milvus 支持在云上扩展,其容灾能力能够保证服务高可用。
  • 混合查询:Milvus 支持在向量相似度检索过程中进行标量字段过滤,实现混合查询。
  • 开发者友好:支持多语言、多工具的 Milvus 生态系统。

二、下载&安装

1. 安装前提

  • Milvus 在构建索引和查询向量时依赖 CPU 对 SIMD (Single Instruction Multiple Data) 扩展指令集合的支持。请确保运行 Milvus 的 CPU 至少支持以下一种 SIMD 指令集合:

    • SSE4.2
    • AVX
    • AVX2
    • AVX512

    使用 lscpu 命令以检查 CPU 是否支持特定 SIMD 指令集合:

    lscpu | grep -e sse4_2 -e avx -e avx2 -e avx512
    
  • 检查 Docker 及 Docker Compose 版本

    因为官网推荐使用docker-compose安装运行,所以需要检查版本是否合适

    • 运行 docker info 确认 Docker 版本。建议使用 19.03 或以上版本。
    • 运行 docker-compose version 确认 Docker Compose 版本。建议使用 1.25.1 或以上版本。

2. 安装

1. 单机版

官网教程

2. 分布式版

官网教程

3. 离线安装

因为我们生产环境是在专网,无法连接到互联网,所以需要离线安装

离线安装说白了就是在有网的机器上把需要的docker镜像下载下来,然后再把镜像导出,再上传到专网服务器

  • 根据docker-compose.yml里面内容,把用到的镜像导出

    docker save 镜像名:版本号 > xxx.tar #这里最好别用IMAGE ID 因为导入的时候还需要另外指定镜像名
    
  • 下载到本地

    下载下来后根据自己的方式把镜像包上传到专网服务器
    我们是开发机器可以同时连到两个网,所以通过FTP再上传到专网服务器就好了

  • 导入镜像

    docker load -i  xxx.tar #会自动加载镜像名称和版本号等内容
    
  • 导入docker-compose.yml

    把官网教程里下载的docker-compose.yml复制过来就行了

  • 启动

    docker-compose up -d 
    #查看启动状态
    docker ps 或 docker-compose ps
    #查看docker日志
    docker logs 容器id
    
    

三、版本管理

  • milvus-sdk-java版本2.0.0
  • SpringBoot版本2.3.0.RELEASE

四、前期准备

因为这里用的milvus版本是2.0.0的,但是Maven中央仓库中还没有这个版本的依赖,所以需要到GitHub上面把2.0版本的java-sdk下载到本地,然后编译到本地仓库

地址: https://github.com/milvus-io/milvus-sdk-java

下载下来后会有很多类找不到,这时候只需要clean install一下,有些类是编译之后才会有

如果嫌麻烦可以下我编译好了的:https://www.aliyundrive.com/s/3HZQ1VYaqKB·

五、POM引入

<dependency>
    <groupId>io.milvus</groupId>
    <artifactId>milvus-sdk-java</artifactId>
    <version>2.0.0</version>
</dependency>

六、MilvusServiceClient

需要跟milvus交互都需要调用MilvusServiceClient,我这里的做法是把它定义成一个Bean,需要用到的地方依赖注入

@Configuration
public class MilvusConfig {

    @Value("${milvus.host}")
    private String host; //milvus所在服务器地址
    @Value("${milvus.port}")
    private Integer port; //milvus端口

    @Bean
    public MilvusServiceClient milvusServiceClient() {
        ConnectParam connectParam = ConnectParam.newBuilder()
                .withHost(host)
                .withPort(port)
                .build();
        return new MilvusServiceClient(connectParam);
    }
}

七、常用方法

有了MilvusServiceClient后就可以为所欲为了!
下面介绍几个常用方法

  • 常量类

用来存放这个集合需用到的参数

public class FaceArchive {
    /**
     * 集合名称(库名)
     */
    public static final String COLLECTION_NAME = "face_archive";
    /**
     * 分片数量
     */
    public static final Integer SHARDS_NUM = 8;
    /**
     * 分区数量
     */
    public static final Integer PARTITION_NUM = 16;

    /**
     * 分区前缀
     */
    public static final String PARTITION_PREFIX = "shards_";
    /**
     * 特征值长度
     */
    public static final Integer FEATURE_DIM = 256;

    /**
     * 字段
     */
    public static class Field {
        /**
         * 档案id
         */
        public static final String ARCHIVE_ID = "archive_id";
        /**
         * 小区id
         */
        public static final String ORG_ID = "org_id";
        /**
         * 档案特征值
         */
        public static final String ARCHIVE_FEATURE = "archive_feature";
    }

    /**
     * 通过组织id计算分区名称
     * @param orgId
     * @return
     */
    public static String getPartitionName(Integer orgId) {
        return PARTITION_PREFIX + (orgId % PARTITION_NUM);
    }
}
  • 1. hasCollection

判断集合是否已经存在

R<Boolean> response = milvusServiceClient.hasCollection(
        HasCollectionParam.newBuilder()
            .withCollectionName(collectionName)
            .build());

返回值boolean类型,有(true)/无(false)

  • 2. createCollection

创建集合

FieldType archiveId = FieldType.newBuilder()
        .withName(FaceArchive.Field.ARCHIVE_ID)
        .withDescription("主键id")
        .withDataType(DataType.Int64)
        .withPrimaryKey(true)
        .withAutoID(false)
        .build();
FieldType orgId = FieldType.newBuilder()
        .withName(FaceArchive.Field.ORG_ID)
        .withDescription("组织id")
        .withDataType(DataType.Int32)
        .build();
FieldType archiveFeature = FieldType.newBuilder()
        .withName(FaceArchive.Field.ARCHIVE_FEATURE)
        .withDescription("档案特征值")
        .withDataType(DataType.FloatVector)
        .withDimension(FaceArchive.FEATURE_DIM)
        .build();
CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder()
        .withCollectionName(FaceArchive.COLLECTION_NAME)
        .withDescription("档案集合")
        .withShardsNum(FaceArchive.SHARDS_NUM)
        .addFieldType(archiveId)
        .addFieldType(orgId)
        .addFieldType(archiveFeature)
        .build();
R<RpcStatus> response = milvusServiceClient.createCollection(createCollectionReq);
  • 3. createPartition

创建分区 ->这里我理解的意思就跟关系型数据库分表一样的,在插入数据时指定插入到哪个分区,查询的时候也一样,这样可以在查询的时候减少数据量

R<RpcStatus> response = milvusServiceClient.createPartition(CreatePartitionParam.newBuilder()
        .withCollectionName(collectionName) //集合名称
        .withPartitionName(partitionName) //分区名称
        .build());

我在这里的做法是先定义了分区总数PARTITION_NUM, 然后循环建立分区,在查询或者插入的时候根据里面的某个值进行取模,分到对应的分区里面去

/**
 * 创建分区
 */
private void createPartition() {
    for (int i = 0; i < FaceArchive.PARTITION_NUM; i++) {
        milvusService.createPartition(FaceArchive.COLLECTION_NAME, FaceArchive.PARTITION_PREFIX + i);
    }
}
  • 4. createIndex

创建索引

/**
 * 创建索引
 */
public R<RpcStatus> createIndex() {
    R<RpcStatus> response = milvusServiceClient.createIndex(CreateIndexParam.newBuilder()
            .withCollectionName(FaceArchive.COLLECTION_NAME)
            .withFieldName(FaceArchive.Field.ARCHIVE_FEATURE)
            .withIndexType(IndexType.IVF_FLAT)
            .withMetricType(MetricType.IP)
            //nlist 建议值为 4 × sqrt(n),其中 n 指 segment 最多包含的 entity 条数。
            .withExtraParam("{\"nlist\":16384}")
            .withSyncMode(Boolean.FALSE)
            .build());
    log.info("createIndex-------------------->{}", response.toString());
    R<GetIndexBuildProgressResponse> idnexResp = milvusServiceClient.getIndexBuildProgress(
            GetIndexBuildProgressParam.newBuilder()
                    .withCollectionName(FaceArchive.COLLECTION_NAME)
                    .build());
    log.info("getIndexBuildProgress---------------------------->{}", idnexResp.toString());
    return response;
}

调用 create_index() 方法后,Milvus 会为后续新增向量自动构建索引的任务。每当新增数据量达到一个完整的 segment 时即触发这一任务,Milvus 为新插入的向量构建索引。

新增向量的索引文件与前期构建的索引文件相互独立。

至于选择什么样的索引,见官方文档

  • 5. insert

数据插入

public boolean insert(List<MilvusArchiveDto> data) {
    Map<Integer, List<MilvusArchiveDto>> map =
            data.stream().filter(item -> ArrayUtil.isNotEmpty(item.getArcsoftFeature())).collect(Collectors.groupingBy(MilvusArchiveDto::getOrgId));
    map.forEach((orgId, list) -> {
        //插入数据
        List<InsertParam.Field> fields = new ArrayList<>();
        List<Long> archiveIds = Lists.newArrayList();
        List<Integer> orgIds = Lists.newArrayList();
        List<List<Float>> floatVectors = Lists.newArrayList();
        for (MilvusArchiveDto dto : list) {
            archiveIds.add(dto.getArchiveId());
            orgIds.add(dto.getOrgId());
            //虹软特征值转Float向量
            floatVectors.add(MilvusUtil.arcsoftToFloat(dto.getArcsoftFeature()));
        }
        //档案ID
        fields.add(new InsertParam.Field(FaceArchive.Field.ARCHIVE_ID, DataType.Int64, archiveIds));
        //小区id
        fields.add(new InsertParam.Field(FaceArchive.Field.ORG_ID, DataType.Int32, orgIds));
        //特征值
        fields.add(new InsertParam.Field(FaceArchive.Field.ARCHIVE_FEATURE, DataType.FloatVector, floatVectors));
        //插入
        InsertParam insertParam = InsertParam.newBuilder()
                .withCollectionName(FaceArchive.COLLECTION_NAME)
                .withPartitionName(FaceArchive.getPartitionName(orgId))
                .withFields(fields)
                .build();
        R<MutationResult> insert = milvusClient.insert(insertParam);
        log.info("插入:{}", insert);
    });
    return true;
}

这里就我自己用的插入代码,因为是按组织ID分区的,所以每个组织分一个组,然后再批量插入,其中的向量值是通过虹软人脸识别SDK计算出来的特征值转换成List,
因为虹软特征值本身就是归一化处理了的,只需要把字节转Float就行了

  • 6. loadCollection

把集合加载到内存中(milvus查询前必须把数据加载到内存中)

public void loadCollection(String collectionName) {
    R<RpcStatus> response = milvusServiceClient.loadCollection(LoadCollectionParam.newBuilder()
            //集合名称
            .withCollectionName(collectionName) 
            .build());
    log.info("loadCollection------------->{}", response);
}
  • 7. loadPartitions

加载分区数据

public void loadPartitions(String collectionName, String partitionsName) {
    R<RpcStatus> response = milvusServiceClient.loadPartitions(
            LoadPartitionsParam
                    .newBuilder()
                    //集合名称
                    .withCollectionName(collectionName)
                    //需要加载的分区名称
                    .withPartitionNames(Lists.newArrayList(partitionsName))
                    .build()
    );
    log.info("loadCollection------------->{}", response);
}
  • 8. releaseCollection

释放集合(从内存中释放)

public void releaseCollection(String collectionName) {
    R<RpcStatus> response = milvusServiceClient.releaseCollection(ReleaseCollectionParam.newBuilder()
            .withCollectionName(collectionName)
            .build());
    log.info("releaseCollection------------->{}", response);
}
  • 9. releasePartition

释放分区

public void releasePartition(String collectionName, String partitionsName) {
    R<RpcStatus> response = milvusServiceClient.releasePartitions(ReleasePartitionsParam.newBuilder()
            .withCollectionName(collectionName)
            .addPartitionName(partitionsName)
            .build());
    log.info("releasePartition------------->{}", response);
}
  • 10. deleteData

删除数据

public void deleteEntity(String collectionName, String partitionName, String expr) {
    R<MutationResult> response = milvusServiceClient.delete(
            DeleteParam.newBuilder()
                    //集合名称
                    .withCollectionName(collectionName)
                    //分区名称
                    .withPartitionName(partitionName)
                    //条件 如: id == 1
                    .withExpr(expr)
                    .build()
    );
    log.info("deleteEntity------------->{}", response);
}
  • 11、search

搜索

@Override
public SearchTallestSimilarityDto searchTallestSimilarity(byte[] arcsoftFeature, Integer orgId) {
    List<Float> arcsoftToFloat = MilvusUtil.arcsoftToFloat(arcsoftFeature);
    List<List<Float>> list = new ArrayList<>();
    list.add(arcsoftToFloat);
    SearchParam.Builder builder = SearchParam.newBuilder()
            //集合名称
            .withCollectionName(FaceArchive.COLLECTION_NAME)
            //计算方式
            // 欧氏距离 (L2)
            // 内积 (IP)
            .withMetricType(MetricType.IP)
            //返回多少条结果
            .withTopK(1)
            //搜索的向量值
            .withVectors(list)
            //搜索的Field
            .withVectorFieldName(FaceArchive.Field.ARCHIVE_FEATURE)
            //https://milvus.io/cn/docs/v2.0.0/performance_faq.md
            .withParams("{\"nprobe\":512}");
    if (orgId != null) {
        //如果只需要搜索某个分区的数据,则需要指定分区
        builder
                .withExpr(FaceArchive.Field.ORG_ID + " == " + orgId)
                .withPartitionNames(Lists.newArrayList(FaceArchive.getPartitionName(orgId)));
    }
    R<SearchResults> search = milvusClient.search(builder.build());
    if (search.getData() == null) return null;
    SearchResultsWrapper wrapper = new SearchResultsWrapper(search.getData().getResults());
    for (int i = 0; i < list.size(); ++i) {
        List<SearchResultsWrapper.IDScore> scores = wrapper.GetIDScore(i);
        if (scores.size() > 0) {
            System.err.println(scores);
            SearchResultsWrapper.IDScore idScore = scores.get(0);
            return new SearchTallestSimilarityDto(idScore.getLongID(), idScore.getScore());
        }
    }
    return null;
}

搜索是支持多个向量值一起搜的,但是我这里做的是搜索相似度最高的那一个,所以我只需要一个返回数据(返回数据是已经按相似度排序了的)


待补充…

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐