【Java整合Milvus】SpringBoot整合Milvus向量数据库以及虹软SDK实现以图搜图
Java框架SpringBoot整合Milvus向量数据库以及虹软SDK实现以图搜图
一、简介
1. Milvus介绍
Milvus 于 2019 年开源,主要用于存储、索引和管理通过深度神经网络和机器学习模型产生的海量向量数据。
Milvus 向量数据库专为向量查询与检索设计,能够为万亿级向量数据建立索引。与传统关系型数据库不同,Milvus 主要用于自下而上地处理非结构化数据向量。非结构化数据没有统一的预定义模型,因此可以转化为向量。
随着互联网不断发展,电子邮件、论文、物联网传感数据、社交媒体照片、蛋白质分子结构等非结构化数据已经变得越来越普遍。如果想要使用计算机来处理这些数据,需要使用 embedding 技术将这些数据转化为向量。随后,Milvus 会存储这些向量,并为其建立索引。Milvus 能够根据两个向量之间的距离来分析他们的相关性。如果两个向量十分相似,这说明向量所代表的源数据也十分相似。
2. 特征向量是什么?
向量又称为
embedding vector
,是指由embedding技术从离散变量(如xxx等各种非结构化数据)转变而来的连续向量。在数学表示上,向量是一个由浮点数或者二值型数据组成的 n 维数组。通过现代的向量转化技术,比如各种人工智能(AI)或者机器学习(ML)模型,可以将非结构化数据抽象为 n 维特征向量空间的向量。这样就可以采用最近邻算法(ANN)计算非结构化数据之间的相似度。
3. 术语表
-
Collection
包含一组 entity,可以等价于关系型数据库系统(RDBMS)中的表。
-
Entity
包含一组 field。field 与实际对象相对应。field 可以是代表对象属性的结构化数据,也可以是代表对象特征的向量。primary key 是用于指代一个 entity 的唯一值。
你可以自定义 primary key,否则 Milvus 将会自动生成 primary key。请注意,目前 Milvus 不支持 primary key 去重,因此有可能在一个 collection 内出现 primary key 相同的 entity。
-
Field
Entity 的组成部分。Field 可以是结构化数据,例如数字和字符串,也可以是向量。
Milvus 2.0 现已支持标量字段过滤。
-
Segment
Milvus 在数据插入时通过合并数据自动创建的数据文件。一个 collection 可以包含多个 segment。一个 segment 可以包含多个 entity。在搜索中,Milvus 会搜索每个 segment,并返回合并后的结果。
-
Sharding
Shard 是指将数据写入操作分散到不同节点上,使 Milvus 能充分利用集群的并行计算能力进行写入。默认情况下单个 collection 包含 2 个分片(shard)。目前 Milvus 采用基于主键哈希的分片方式,未来将支持随机分片、自定义分片等更加灵活的分片方式。
Partition 的意义在于通过划定分区减少数据读取,而shard 的意义在于多台机器上并行写入操作。
-
Partition
把 collection 中的数据根据一定规则在物理存储上分成多个部分。这种对 collection 数据的划分就叫分区(partitioning)。每个 partition 可包含多个segment。
-
归一化
归一化指的是通过数学变换将向量的模长变为 1 的过程。如需使用点积计算向量相似度,则必须对向量作归一化处理。处理后点积与余弦相似度等价。
-
索引
索引基于原始数据构建,可以提高对 collection 数据搜索的速度。Milvus 支持多种索引类型。
-
向量
一种类型的 field,代表对象的特征。非结构化数据可以通过各种 AI 模型和 embedding 技术转化为向量。
目前,一个实体最多只能包含一个向量。
4. 为什么选择使用 Milvus?
- 高性能:性能高超,可对海量数据集进行向量相似度检索。
- 高可用、高可靠:Milvus 支持在云上扩展,其容灾能力能够保证服务高可用。
- 混合查询:Milvus 支持在向量相似度检索过程中进行标量字段过滤,实现混合查询。
- 开发者友好:支持多语言、多工具的 Milvus 生态系统。
二、下载&安装
1. 安装前提
-
Milvus 在构建索引和查询向量时依赖 CPU 对 SIMD (Single Instruction Multiple Data) 扩展指令集合的支持。请确保运行 Milvus 的 CPU 至少支持以下一种 SIMD 指令集合:
- SSE4.2
- AVX
- AVX2
- AVX512
使用 lscpu 命令以检查 CPU 是否支持特定 SIMD 指令集合:
lscpu | grep -e sse4_2 -e avx -e avx2 -e avx512
-
检查 Docker 及 Docker Compose 版本
因为官网推荐使用docker-compose安装运行,所以需要检查版本是否合适
- 运行
docker info
确认 Docker 版本。建议使用 19.03 或以上版本。 - 运行
docker-compose version
确认 Docker Compose 版本。建议使用 1.25.1 或以上版本。
- 运行
2. 安装
1. 单机版
2. 分布式版
3. 离线安装
因为我们生产环境是在专网,无法连接到互联网,所以需要离线安装
离线安装说白了就是在有网的机器上把需要的docker镜像下载下来,然后再把镜像导出,再上传到专网服务器
-
根据docker-compose.yml里面内容,把用到的镜像导出
docker save 镜像名:版本号 > xxx.tar #这里最好别用IMAGE ID 因为导入的时候还需要另外指定镜像名
-
下载到本地
下载下来后根据自己的方式把镜像包上传到专网服务器
我们是开发机器可以同时连到两个网,所以通过FTP再上传到专网服务器就好了 -
导入镜像
docker load -i xxx.tar #会自动加载镜像名称和版本号等内容
-
导入docker-compose.yml
把官网教程里下载的docker-compose.yml复制过来就行了
-
启动
docker-compose up -d #查看启动状态 docker ps 或 docker-compose ps #查看docker日志 docker logs 容器id
三、版本管理
- milvus-sdk-java版本
2.0.0
- SpringBoot版本
2.3.0.RELEASE
四、前期准备
因为这里用的milvus版本是2.0.0的,但是Maven中央仓库中还没有这个版本的依赖,所以需要到GitHub上面把2.0版本的java-sdk下载到本地,然后编译到本地仓库
地址: https://github.com/milvus-io/milvus-sdk-java
下载下来后会有很多类找不到,这时候只需要clean install一下,有些类是编译之后才会有
如果嫌麻烦可以下我编译好了的:https://www.aliyundrive.com/s/3HZQ1VYaqKB·
五、POM引入
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.0.0</version>
</dependency>
六、MilvusServiceClient
需要跟milvus交互都需要调用MilvusServiceClient,我这里的做法是把它定义成一个Bean,需要用到的地方依赖注入
@Configuration
public class MilvusConfig {
@Value("${milvus.host}")
private String host; //milvus所在服务器地址
@Value("${milvus.port}")
private Integer port; //milvus端口
@Bean
public MilvusServiceClient milvusServiceClient() {
ConnectParam connectParam = ConnectParam.newBuilder()
.withHost(host)
.withPort(port)
.build();
return new MilvusServiceClient(connectParam);
}
}
七、常用方法
有了MilvusServiceClient后就可以为所欲为了!
下面介绍几个常用方法
用来存放这个集合需用到的参数
public class FaceArchive {
/**
* 集合名称(库名)
*/
public static final String COLLECTION_NAME = "face_archive";
/**
* 分片数量
*/
public static final Integer SHARDS_NUM = 8;
/**
* 分区数量
*/
public static final Integer PARTITION_NUM = 16;
/**
* 分区前缀
*/
public static final String PARTITION_PREFIX = "shards_";
/**
* 特征值长度
*/
public static final Integer FEATURE_DIM = 256;
/**
* 字段
*/
public static class Field {
/**
* 档案id
*/
public static final String ARCHIVE_ID = "archive_id";
/**
* 小区id
*/
public static final String ORG_ID = "org_id";
/**
* 档案特征值
*/
public static final String ARCHIVE_FEATURE = "archive_feature";
}
/**
* 通过组织id计算分区名称
* @param orgId
* @return
*/
public static String getPartitionName(Integer orgId) {
return PARTITION_PREFIX + (orgId % PARTITION_NUM);
}
}
判断集合是否已经存在
R<Boolean> response = milvusServiceClient.hasCollection(
HasCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build());
返回值boolean类型,有(true)/无(false)
创建集合
FieldType archiveId = FieldType.newBuilder()
.withName(FaceArchive.Field.ARCHIVE_ID)
.withDescription("主键id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType orgId = FieldType.newBuilder()
.withName(FaceArchive.Field.ORG_ID)
.withDescription("组织id")
.withDataType(DataType.Int32)
.build();
FieldType archiveFeature = FieldType.newBuilder()
.withName(FaceArchive.Field.ARCHIVE_FEATURE)
.withDescription("档案特征值")
.withDataType(DataType.FloatVector)
.withDimension(FaceArchive.FEATURE_DIM)
.build();
CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.withDescription("档案集合")
.withShardsNum(FaceArchive.SHARDS_NUM)
.addFieldType(archiveId)
.addFieldType(orgId)
.addFieldType(archiveFeature)
.build();
R<RpcStatus> response = milvusServiceClient.createCollection(createCollectionReq);
创建分区 ->
这里我理解的意思就跟关系型数据库分表一样的,在插入数据时指定插入到哪个分区,查询的时候也一样,这样可以在查询的时候减少数据量
R<RpcStatus> response = milvusServiceClient.createPartition(CreatePartitionParam.newBuilder()
.withCollectionName(collectionName) //集合名称
.withPartitionName(partitionName) //分区名称
.build());
我在这里的做法是先定义了分区总数PARTITION_NUM
, 然后循环建立分区,在查询或者插入的时候根据里面的某个值进行取模,分到对应的分区里面去
/**
* 创建分区
*/
private void createPartition() {
for (int i = 0; i < FaceArchive.PARTITION_NUM; i++) {
milvusService.createPartition(FaceArchive.COLLECTION_NAME, FaceArchive.PARTITION_PREFIX + i);
}
}
创建索引
/**
* 创建索引
*/
public R<RpcStatus> createIndex() {
R<RpcStatus> response = milvusServiceClient.createIndex(CreateIndexParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.withFieldName(FaceArchive.Field.ARCHIVE_FEATURE)
.withIndexType(IndexType.IVF_FLAT)
.withMetricType(MetricType.IP)
//nlist 建议值为 4 × sqrt(n),其中 n 指 segment 最多包含的 entity 条数。
.withExtraParam("{\"nlist\":16384}")
.withSyncMode(Boolean.FALSE)
.build());
log.info("createIndex-------------------->{}", response.toString());
R<GetIndexBuildProgressResponse> idnexResp = milvusServiceClient.getIndexBuildProgress(
GetIndexBuildProgressParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.build());
log.info("getIndexBuildProgress---------------------------->{}", idnexResp.toString());
return response;
}
调用 create_index() 方法后,Milvus 会为后续新增向量自动构建索引的任务。每当新增数据量达到一个完整的 segment 时即触发这一任务,Milvus 为新插入的向量构建索引。
新增向量的索引文件与前期构建的索引文件相互独立。
至于选择什么样的索引,见官方文档
数据插入
public boolean insert(List<MilvusArchiveDto> data) {
Map<Integer, List<MilvusArchiveDto>> map =
data.stream().filter(item -> ArrayUtil.isNotEmpty(item.getArcsoftFeature())).collect(Collectors.groupingBy(MilvusArchiveDto::getOrgId));
map.forEach((orgId, list) -> {
//插入数据
List<InsertParam.Field> fields = new ArrayList<>();
List<Long> archiveIds = Lists.newArrayList();
List<Integer> orgIds = Lists.newArrayList();
List<List<Float>> floatVectors = Lists.newArrayList();
for (MilvusArchiveDto dto : list) {
archiveIds.add(dto.getArchiveId());
orgIds.add(dto.getOrgId());
//虹软特征值转Float向量
floatVectors.add(MilvusUtil.arcsoftToFloat(dto.getArcsoftFeature()));
}
//档案ID
fields.add(new InsertParam.Field(FaceArchive.Field.ARCHIVE_ID, DataType.Int64, archiveIds));
//小区id
fields.add(new InsertParam.Field(FaceArchive.Field.ORG_ID, DataType.Int32, orgIds));
//特征值
fields.add(new InsertParam.Field(FaceArchive.Field.ARCHIVE_FEATURE, DataType.FloatVector, floatVectors));
//插入
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.withPartitionName(FaceArchive.getPartitionName(orgId))
.withFields(fields)
.build();
R<MutationResult> insert = milvusClient.insert(insertParam);
log.info("插入:{}", insert);
});
return true;
}
这里就我自己用的插入代码,因为是按组织ID分区的,所以每个组织分一个组,然后再批量插入,其中的向量值是通过虹软人脸识别SDK计算出来的特征值转换成List,
因为虹软特征值本身就是归一化处理了的,只需要把字节转Float就行了
把集合加载到内存中(milvus查询前必须把数据加载到内存中)
public void loadCollection(String collectionName) {
R<RpcStatus> response = milvusServiceClient.loadCollection(LoadCollectionParam.newBuilder()
//集合名称
.withCollectionName(collectionName)
.build());
log.info("loadCollection------------->{}", response);
}
加载分区数据
public void loadPartitions(String collectionName, String partitionsName) {
R<RpcStatus> response = milvusServiceClient.loadPartitions(
LoadPartitionsParam
.newBuilder()
//集合名称
.withCollectionName(collectionName)
//需要加载的分区名称
.withPartitionNames(Lists.newArrayList(partitionsName))
.build()
);
log.info("loadCollection------------->{}", response);
}
释放集合(从内存中释放)
public void releaseCollection(String collectionName) {
R<RpcStatus> response = milvusServiceClient.releaseCollection(ReleaseCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build());
log.info("releaseCollection------------->{}", response);
}
释放分区
public void releasePartition(String collectionName, String partitionsName) {
R<RpcStatus> response = milvusServiceClient.releasePartitions(ReleasePartitionsParam.newBuilder()
.withCollectionName(collectionName)
.addPartitionName(partitionsName)
.build());
log.info("releasePartition------------->{}", response);
}
删除数据
public void deleteEntity(String collectionName, String partitionName, String expr) {
R<MutationResult> response = milvusServiceClient.delete(
DeleteParam.newBuilder()
//集合名称
.withCollectionName(collectionName)
//分区名称
.withPartitionName(partitionName)
//条件 如: id == 1
.withExpr(expr)
.build()
);
log.info("deleteEntity------------->{}", response);
}
搜索
@Override
public SearchTallestSimilarityDto searchTallestSimilarity(byte[] arcsoftFeature, Integer orgId) {
List<Float> arcsoftToFloat = MilvusUtil.arcsoftToFloat(arcsoftFeature);
List<List<Float>> list = new ArrayList<>();
list.add(arcsoftToFloat);
SearchParam.Builder builder = SearchParam.newBuilder()
//集合名称
.withCollectionName(FaceArchive.COLLECTION_NAME)
//计算方式
// 欧氏距离 (L2)
// 内积 (IP)
.withMetricType(MetricType.IP)
//返回多少条结果
.withTopK(1)
//搜索的向量值
.withVectors(list)
//搜索的Field
.withVectorFieldName(FaceArchive.Field.ARCHIVE_FEATURE)
//https://milvus.io/cn/docs/v2.0.0/performance_faq.md
.withParams("{\"nprobe\":512}");
if (orgId != null) {
//如果只需要搜索某个分区的数据,则需要指定分区
builder
.withExpr(FaceArchive.Field.ORG_ID + " == " + orgId)
.withPartitionNames(Lists.newArrayList(FaceArchive.getPartitionName(orgId)));
}
R<SearchResults> search = milvusClient.search(builder.build());
if (search.getData() == null) return null;
SearchResultsWrapper wrapper = new SearchResultsWrapper(search.getData().getResults());
for (int i = 0; i < list.size(); ++i) {
List<SearchResultsWrapper.IDScore> scores = wrapper.GetIDScore(i);
if (scores.size() > 0) {
System.err.println(scores);
SearchResultsWrapper.IDScore idScore = scores.get(0);
return new SearchTallestSimilarityDto(idScore.getLongID(), idScore.getScore());
}
}
return null;
}
搜索是支持多个向量值一起搜的,但是我这里做的是搜索相似度最高的那一个,所以我只需要一个返回数据(返回数据是已经按相似度排序了的)
待补充…
更多推荐
所有评论(0)