Loading CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -63,6 +63,7 @@ Please mark all change in change log and use the issue from GitHub - \#2252 Upgrade mishards APIs and requirements - \#2256 k-means clustering algorithm use only Euclidean distance metric - \#2300 Upgrade mishrads configuration to version 0.4 - \#2311 Update mishards methods ## Task Loading shards/mishards/service_handler.py +78 −78 Original line number Diff line number Diff line Loading @@ -142,7 +142,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): with self.tracer.start_span('search_{}'.format(addr), child_of=span): logger.warning("Search file ids is {}".format(file_ids)) future = conn.search_vectors_in_files(collection_name=collection_id, future = conn.search_in_segment(collection_name=collection_id, file_ids=file_ids, query_records=vectors, top_k=topk, Loading Loading @@ -223,7 +223,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def HasPartition(self, request, context): _collection_name, _tag = Parser.parse_proto_PartitionParam(request) _status, _ok = self.router.connection().has_partition(_collection_name, _tag) return milvus_pb2.BoolReply(status_pb2.Status(error_code=_status.code, return milvus_pb2.BoolReply(status=status_pb2.Status(error_code=_status.code, reason=_status.message), bool_reply=_ok) @mark_grpc_method Loading @@ -236,7 +236,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): logger.info('ShowPartitions {}'.format(_collection_name)) _status, partition_array = self.router.connection().show_partitions(_collection_name) _status, partition_array = self.router.connection().list_partitions(_collection_name) return milvus_pb2.PartitionList(status=status_pb2.Status( error_code=_status.code, reason=_status.message), Loading Loading @@ -323,7 +323,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): if not collection_meta: status, info = self.router.connection( metadata=metadata).describe_collection(collection_name) metadata=metadata).get_collection_info(collection_name) if not status.OK(): raise exceptions.CollectionNotFoundError(collection_name, metadata=metadata) Loading Loading @@ -365,74 +365,74 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def SearchInFiles(self, request, context): raise NotImplemented() @mark_grpc_method def SearchByID(self, request, context): metadata = {'resp_class': milvus_pb2.TopKQueryResult} collection_name = request.collection_name topk = request.topk if len(request.extra_params) == 0: raise exceptions.SearchParamError(message="Search param loss", metadata=metadata) params = ujson.loads(str(request.extra_params[0].value)) logger.info('Search {}: topk={} params={}'.format( collection_name, topk, params)) if topk > self.MAX_TOPK or topk <= 0: raise exceptions.InvalidTopKError( message='Invalid topk: {}'.format(topk), metadata=metadata) collection_meta = self.collection_meta.get(collection_name, None) if not collection_meta: status, info = self.router.connection( metadata=metadata).describe_collection(collection_name) if not status.OK(): raise exceptions.CollectionNotFoundError(collection_name, metadata=metadata) self.collection_meta[collection_name] = info collection_meta = info start = time.time() query_record_array = [] if int(collection_meta.metric_type) >= MetricType.HAMMING.value: for query_record in request.query_record_array: query_record_array.append(bytes(query_record.binary_data)) else: for query_record in request.query_record_array: query_record_array.append(list(query_record.float_data)) partition_tags = getattr(request, "partition_tag_array", []) ids = getattr(request, "id_array", []) search_result = self.router.connection(metadata=metadata).search_by_ids(collection_name, ids, topk, partition_tags, params) # status, id_results, dis_results = self._do_query(context, # collection_name, # collection_meta, # query_record_array, # topk, # params, # partition_tags=getattr(request, "partition_tag_array", []), # @mark_grpc_method # def SearchByID(self, request, context): # metadata = {'resp_class': milvus_pb2.TopKQueryResult} # # collection_name = request.collection_name # # topk = request.topk # # if len(request.extra_params) == 0: # raise exceptions.SearchParamError(message="Search param loss", metadata=metadata) # params = ujson.loads(str(request.extra_params[0].value)) # # logger.info('Search {}: topk={} params={}'.format( # collection_name, topk, params)) # # if topk > self.MAX_TOPK or topk <= 0: # raise exceptions.InvalidTopKError( # message='Invalid topk: {}'.format(topk), metadata=metadata) # # collection_meta = self.collection_meta.get(collection_name, None) # # if not collection_meta: # status, info = self.router.connection( # metadata=metadata).describe_collection(collection_name) # if not status.OK(): # raise exceptions.CollectionNotFoundError(collection_name, # metadata=metadata) now = time.time() logger.info('SearchVector takes: {}'.format(now - start)) return search_result # # topk_result_list = milvus_pb2.TopKQueryResult( # status=status_pb2.Status(error_code=status.error_code, # reason=status.reason), # row_num=len(request.query_record_array) if len(id_results) else 0, # ids=id_results, # distances=dis_results) # return topk_result_list # raise NotImplemented() # self.collection_meta[collection_name] = info # collection_meta = info # # start = time.time() # # query_record_array = [] # if int(collection_meta.metric_type) >= MetricType.HAMMING.value: # for query_record in request.query_record_array: # query_record_array.append(bytes(query_record.binary_data)) # else: # for query_record in request.query_record_array: # query_record_array.append(list(query_record.float_data)) # # partition_tags = getattr(request, "partition_tag_array", []) # ids = getattr(request, "id_array", []) # search_result = self.router.connection(metadata=metadata).search_by_ids(collection_name, ids, topk, partition_tags, params) # # status, id_results, dis_results = self._do_query(context, # # collection_name, # # collection_meta, # # query_record_array, # # topk, # # params, # # partition_tags=getattr(request, "partition_tag_array", []), # # metadata=metadata) # # now = time.time() # logger.info('SearchVector takes: {}'.format(now - start)) # return search_result # # # # topk_result_list = milvus_pb2.TopKQueryResult( # # status=status_pb2.Status(error_code=status.error_code, # # reason=status.reason), # # row_num=len(request.query_record_array) if len(id_results) else 0, # # ids=id_results, # # distances=dis_results) # # return topk_result_list # # raise NotImplemented() def _describe_collection(self, collection_name, metadata=None): return self.router.connection(metadata=metadata).describe_collection(collection_name) return self.router.connection(metadata=metadata).get_collection_info(collection_name) @mark_grpc_method def DescribeCollection(self, request, context): Loading Loading @@ -465,7 +465,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): ) def _collection_info(self, collection_name, metadata=None): return self.router.connection(metadata=metadata).collection_info(collection_name) return self.router.connection(metadata=metadata).get_collection_stats(collection_name) @mark_grpc_method def ShowCollectionInfo(self, request, context): Loading Loading @@ -494,7 +494,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def _count_collection(self, collection_name, metadata=None): return self.router.connection( metadata=metadata).count_collection(collection_name) metadata=metadata).count_entities(collection_name) @mark_grpc_method def CountCollection(self, request, context): Loading Loading @@ -551,7 +551,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): string_reply=_reply) def _show_collections(self, metadata=None): return self.router.connection(metadata=metadata).show_collections() return self.router.connection(metadata=metadata).list_collections() @mark_grpc_method def ShowCollections(self, request, context): Loading @@ -564,7 +564,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): collection_names=_results) def _preload_collection(self, collection_name): return self.router.connection().preload_collection(collection_name) return self.router.connection().load_collection(collection_name) @mark_grpc_method def PreloadCollection(self, request, context): Loading @@ -580,7 +580,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): reason=_status.message) def _describe_index(self, collection_name, metadata=None): return self.router.connection(metadata=metadata).describe_index(collection_name) return self.router.connection(metadata=metadata).get_index_info(collection_name) @mark_grpc_method def DescribeIndex(self, request, context): Loading Loading @@ -610,7 +610,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): return grpc_index def _get_vectors_by_id(self, collection_name, ids, metadata): return self.router.connection(metadata=metadata).get_vectors_by_ids(collection_name, ids) return self.router.connection(metadata=metadata).get_entity_by_id(collection_name, ids) @mark_grpc_method def GetVectorsByID(self, request, context): Loading Loading @@ -640,7 +640,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): return response def _get_vector_ids(self, collection_name, segment_name, metadata): return self.router.connection(metadata=metadata).get_vector_ids(collection_name, segment_name) return self.router.connection(metadata=metadata).list_id_in_segment(collection_name, segment_name) @mark_grpc_method def GetVectorIDs(self, request, context): Loading @@ -666,7 +666,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): ) def _delete_by_id(self, collection_name, id_array): return self.router.connection().delete_by_id(collection_name, id_array) return self.router.connection().delete_entity_by_id(collection_name, id_array) @mark_grpc_method def DeleteByID(self, request, context): Loading shards/mishards/settings.py +1 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ else: env.read_env() SERVER_VERSIONS = ['0.8.0'] SERVER_VERSIONS = ['0.9.0'] DEBUG = env.bool('DEBUG', False) MAX_RETRY = env.int('MAX_RETRY', 3) Loading shards/requirements.txt +1 −1 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ pyasn1==0.4.7 pyasn1-modules==0.2.6 pylint==2.5.0 #pymilvus==0.2.10 pymilvus-test==0.3.11 pymilvus-test==0.3.15 pyparsing==2.4.0 pytest==4.6.3 pytest-level==0.1.1 Loading Loading
CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -63,6 +63,7 @@ Please mark all change in change log and use the issue from GitHub - \#2252 Upgrade mishards APIs and requirements - \#2256 k-means clustering algorithm use only Euclidean distance metric - \#2300 Upgrade mishrads configuration to version 0.4 - \#2311 Update mishards methods ## Task Loading
shards/mishards/service_handler.py +78 −78 Original line number Diff line number Diff line Loading @@ -142,7 +142,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): with self.tracer.start_span('search_{}'.format(addr), child_of=span): logger.warning("Search file ids is {}".format(file_ids)) future = conn.search_vectors_in_files(collection_name=collection_id, future = conn.search_in_segment(collection_name=collection_id, file_ids=file_ids, query_records=vectors, top_k=topk, Loading Loading @@ -223,7 +223,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def HasPartition(self, request, context): _collection_name, _tag = Parser.parse_proto_PartitionParam(request) _status, _ok = self.router.connection().has_partition(_collection_name, _tag) return milvus_pb2.BoolReply(status_pb2.Status(error_code=_status.code, return milvus_pb2.BoolReply(status=status_pb2.Status(error_code=_status.code, reason=_status.message), bool_reply=_ok) @mark_grpc_method Loading @@ -236,7 +236,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): logger.info('ShowPartitions {}'.format(_collection_name)) _status, partition_array = self.router.connection().show_partitions(_collection_name) _status, partition_array = self.router.connection().list_partitions(_collection_name) return milvus_pb2.PartitionList(status=status_pb2.Status( error_code=_status.code, reason=_status.message), Loading Loading @@ -323,7 +323,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): if not collection_meta: status, info = self.router.connection( metadata=metadata).describe_collection(collection_name) metadata=metadata).get_collection_info(collection_name) if not status.OK(): raise exceptions.CollectionNotFoundError(collection_name, metadata=metadata) Loading Loading @@ -365,74 +365,74 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def SearchInFiles(self, request, context): raise NotImplemented() @mark_grpc_method def SearchByID(self, request, context): metadata = {'resp_class': milvus_pb2.TopKQueryResult} collection_name = request.collection_name topk = request.topk if len(request.extra_params) == 0: raise exceptions.SearchParamError(message="Search param loss", metadata=metadata) params = ujson.loads(str(request.extra_params[0].value)) logger.info('Search {}: topk={} params={}'.format( collection_name, topk, params)) if topk > self.MAX_TOPK or topk <= 0: raise exceptions.InvalidTopKError( message='Invalid topk: {}'.format(topk), metadata=metadata) collection_meta = self.collection_meta.get(collection_name, None) if not collection_meta: status, info = self.router.connection( metadata=metadata).describe_collection(collection_name) if not status.OK(): raise exceptions.CollectionNotFoundError(collection_name, metadata=metadata) self.collection_meta[collection_name] = info collection_meta = info start = time.time() query_record_array = [] if int(collection_meta.metric_type) >= MetricType.HAMMING.value: for query_record in request.query_record_array: query_record_array.append(bytes(query_record.binary_data)) else: for query_record in request.query_record_array: query_record_array.append(list(query_record.float_data)) partition_tags = getattr(request, "partition_tag_array", []) ids = getattr(request, "id_array", []) search_result = self.router.connection(metadata=metadata).search_by_ids(collection_name, ids, topk, partition_tags, params) # status, id_results, dis_results = self._do_query(context, # collection_name, # collection_meta, # query_record_array, # topk, # params, # partition_tags=getattr(request, "partition_tag_array", []), # @mark_grpc_method # def SearchByID(self, request, context): # metadata = {'resp_class': milvus_pb2.TopKQueryResult} # # collection_name = request.collection_name # # topk = request.topk # # if len(request.extra_params) == 0: # raise exceptions.SearchParamError(message="Search param loss", metadata=metadata) # params = ujson.loads(str(request.extra_params[0].value)) # # logger.info('Search {}: topk={} params={}'.format( # collection_name, topk, params)) # # if topk > self.MAX_TOPK or topk <= 0: # raise exceptions.InvalidTopKError( # message='Invalid topk: {}'.format(topk), metadata=metadata) # # collection_meta = self.collection_meta.get(collection_name, None) # # if not collection_meta: # status, info = self.router.connection( # metadata=metadata).describe_collection(collection_name) # if not status.OK(): # raise exceptions.CollectionNotFoundError(collection_name, # metadata=metadata) now = time.time() logger.info('SearchVector takes: {}'.format(now - start)) return search_result # # topk_result_list = milvus_pb2.TopKQueryResult( # status=status_pb2.Status(error_code=status.error_code, # reason=status.reason), # row_num=len(request.query_record_array) if len(id_results) else 0, # ids=id_results, # distances=dis_results) # return topk_result_list # raise NotImplemented() # self.collection_meta[collection_name] = info # collection_meta = info # # start = time.time() # # query_record_array = [] # if int(collection_meta.metric_type) >= MetricType.HAMMING.value: # for query_record in request.query_record_array: # query_record_array.append(bytes(query_record.binary_data)) # else: # for query_record in request.query_record_array: # query_record_array.append(list(query_record.float_data)) # # partition_tags = getattr(request, "partition_tag_array", []) # ids = getattr(request, "id_array", []) # search_result = self.router.connection(metadata=metadata).search_by_ids(collection_name, ids, topk, partition_tags, params) # # status, id_results, dis_results = self._do_query(context, # # collection_name, # # collection_meta, # # query_record_array, # # topk, # # params, # # partition_tags=getattr(request, "partition_tag_array", []), # # metadata=metadata) # # now = time.time() # logger.info('SearchVector takes: {}'.format(now - start)) # return search_result # # # # topk_result_list = milvus_pb2.TopKQueryResult( # # status=status_pb2.Status(error_code=status.error_code, # # reason=status.reason), # # row_num=len(request.query_record_array) if len(id_results) else 0, # # ids=id_results, # # distances=dis_results) # # return topk_result_list # # raise NotImplemented() def _describe_collection(self, collection_name, metadata=None): return self.router.connection(metadata=metadata).describe_collection(collection_name) return self.router.connection(metadata=metadata).get_collection_info(collection_name) @mark_grpc_method def DescribeCollection(self, request, context): Loading Loading @@ -465,7 +465,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): ) def _collection_info(self, collection_name, metadata=None): return self.router.connection(metadata=metadata).collection_info(collection_name) return self.router.connection(metadata=metadata).get_collection_stats(collection_name) @mark_grpc_method def ShowCollectionInfo(self, request, context): Loading Loading @@ -494,7 +494,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def _count_collection(self, collection_name, metadata=None): return self.router.connection( metadata=metadata).count_collection(collection_name) metadata=metadata).count_entities(collection_name) @mark_grpc_method def CountCollection(self, request, context): Loading Loading @@ -551,7 +551,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): string_reply=_reply) def _show_collections(self, metadata=None): return self.router.connection(metadata=metadata).show_collections() return self.router.connection(metadata=metadata).list_collections() @mark_grpc_method def ShowCollections(self, request, context): Loading @@ -564,7 +564,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): collection_names=_results) def _preload_collection(self, collection_name): return self.router.connection().preload_collection(collection_name) return self.router.connection().load_collection(collection_name) @mark_grpc_method def PreloadCollection(self, request, context): Loading @@ -580,7 +580,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): reason=_status.message) def _describe_index(self, collection_name, metadata=None): return self.router.connection(metadata=metadata).describe_index(collection_name) return self.router.connection(metadata=metadata).get_index_info(collection_name) @mark_grpc_method def DescribeIndex(self, request, context): Loading Loading @@ -610,7 +610,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): return grpc_index def _get_vectors_by_id(self, collection_name, ids, metadata): return self.router.connection(metadata=metadata).get_vectors_by_ids(collection_name, ids) return self.router.connection(metadata=metadata).get_entity_by_id(collection_name, ids) @mark_grpc_method def GetVectorsByID(self, request, context): Loading Loading @@ -640,7 +640,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): return response def _get_vector_ids(self, collection_name, segment_name, metadata): return self.router.connection(metadata=metadata).get_vector_ids(collection_name, segment_name) return self.router.connection(metadata=metadata).list_id_in_segment(collection_name, segment_name) @mark_grpc_method def GetVectorIDs(self, request, context): Loading @@ -666,7 +666,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): ) def _delete_by_id(self, collection_name, id_array): return self.router.connection().delete_by_id(collection_name, id_array) return self.router.connection().delete_entity_by_id(collection_name, id_array) @mark_grpc_method def DeleteByID(self, request, context): Loading
shards/mishards/settings.py +1 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ else: env.read_env() SERVER_VERSIONS = ['0.8.0'] SERVER_VERSIONS = ['0.9.0'] DEBUG = env.bool('DEBUG', False) MAX_RETRY = env.int('MAX_RETRY', 3) Loading
shards/requirements.txt +1 −1 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ pyasn1==0.4.7 pyasn1-modules==0.2.6 pylint==2.5.0 #pymilvus==0.2.10 pymilvus-test==0.3.11 pymilvus-test==0.3.15 pyparsing==2.4.0 pytest==4.6.3 pytest-level==0.1.1 Loading