Add the inspireface project to cpp-package.

This commit is contained in:
JingyuYan
2024-05-02 01:27:29 +08:00
parent e90dacb3cf
commit 08d7e96f79
431 changed files with 370534 additions and 0 deletions

View File

@@ -0,0 +1,73 @@
# PyInspireFace
## Setup Library
You need to compile the dynamic linking library in the main project and then place it in **inspireface/modules/core**.
```Bash
# copy or link
cp YOUR_BUILD_DIR/libInspireFace.so inspireface/modules/core
```
## Require
You need to install some dependencies beforehand.
```Bash
pip install loguru
pip install tqdm
pip install opencv-python
```
## Quick Start
You can easily call the api to implement a number of functions:
```Python
import cv2
import inspireface as ifac
from inspireface.param import *
# Step 1: Initialize the SDK and load the algorithm resource files.
resource_path = "pack/Pikachu"
ret = ifac.launch(resource_path)
assert ret, "Launch failure. Please ensure the resource path is correct."
# Optional features, loaded during session creation based on the modules specified.
opt = HF_ENABLE_NONE
session = ifac.InspireFaceSession(opt, HF_DETECT_MODE_IMAGE)
# Load the image using OpenCV.
image = cv2.imread(image_path)
assert image is not None, "Please check that the image path is correct."
# Perform face detection on the image.
faces = session.face_detection(image)
print(f"face detection: {len(faces)} found")
# Copy the image for drawing the bounding boxes.
draw = image.copy()
for idx, face in enumerate(faces):
print(f"{'==' * 20}")
print(f"idx: {idx}")
# Print Euler angles of the face.
print(f"roll: {face.roll}, yaw: {face.yaw}, pitch: {face.pitch}")
# Draw bounding box around the detected face.
x1, y1, x2, y2 = face.location
cv2.rectangle(draw, (x1, y1), (x2, y2), (0, 0, 255), 2)
```
You can also check out other sample files, which contain more diverse examples of functionality.
## Test
In the Python API, we have integrated a relatively simple unit test. You can adjust the content of the unit test by modifying the parameters in the configuration file **test/test_settings.py**.
```Bash
# Run total test
python -m unittest discover -s test
```

View File

@@ -0,0 +1,4 @@
from .modules import *
__version__ = version()

View File

@@ -0,0 +1,7 @@
from .inspire_face import ImageStream, FaceExtended, FaceInformation, SessionCustomParameter, InspireFaceSession, \
launch, FeatureHubConfiguration, feature_hub_enable, feature_hub_disable, feature_comparison, \
FaceIdentity, feature_hub_set_search_threshold, feature_hub_face_insert, SearchResult, \
feature_hub_face_search, feature_hub_face_search_top_k, feature_hub_face_update, feature_hub_face_remove, \
feature_hub_get_face_identity, feature_hub_get_face_count, view_table_in_terminal, version, \
set_logging_level, disable_logging

View File

@@ -0,0 +1 @@
from .native import *

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,894 @@
import cv2
import numpy as np
from .core import *
from typing import Tuple, List
from dataclasses import dataclass
from loguru import logger
class ImageStream(object):
"""
ImageStream class handles the conversion of image data from various sources into a format compatible with the InspireFace library.
It allows loading image data from numpy arrays, buffer objects, and directly from OpenCV images.
"""
@staticmethod
def load_from_cv_image(image: np.ndarray, stream_format=HF_STREAM_BGR, rotation=HF_CAMERA_ROTATION_0):
"""
Load image data from an OpenCV image (numpy ndarray).
Args:
image (np.ndarray): The image data as a numpy array.
stream_format (int): The format of the image data (e.g., BGR, RGB).
rotation (int): The rotation angle to be applied to the image data.
Returns:
ImageStream: An instance of the ImageStream class initialized with the provided image data.
Raises:
Exception: If the image does not have 3 or 4 channels.
"""
h, w, c = image.shape
if c != 3 and c != 4:
raise Exception("The channel must be 3 or 4.")
return ImageStream(image, w, h, stream_format, rotation)
@staticmethod
def load_from_ndarray(data: np.ndarray, width: int, height: int, stream_format: int, rotation: int):
"""
Load image data from a numpy array specifying width and height explicitly.
Args:
data (np.ndarray): The raw image data.
width (int): The width of the image.
height (int): The height of the image.
stream_format (int): The format of the image data.
rotation (int): The rotation angle to be applied to the image data.
Returns:
ImageStream: An instance of the ImageStream class.
"""
return ImageStream(data, width, height, stream_format, rotation)
@staticmethod
def load_from_buffer(data, width: int, height: int, stream_format: int, rotation: int):
"""
Load image data from a buffer (like bytes or bytearray).
Args:
data: The buffer containing the image data.
width (int): The width of the image.
height (int): The height of the image.
stream_format (int): The format of the image data.
rotation (int): The rotation angle to be applied to the image data.
Returns:
ImageStream: An instance of the ImageStream class.
"""
return ImageStream(data, width, height, stream_format, rotation)
def __init__(self, data, width: int, height: int, stream_format: int, rotation: int):
"""
Initialize the ImageStream object with provided data and configuration.
Args:
data: The image data (numpy array or buffer).
width (int): The width of the image.
height (int): The height of the image.
stream_format (int): The format of the image data.
rotation (int): The rotation applied to the image.
Raises:
Exception: If there is an error in creating the image stream.
"""
self.rotate = rotation
self.data_format = stream_format
if isinstance(data, np.ndarray):
data_ptr = ctypes.cast(data.ctypes.data, ctypes.POINTER(ctypes.c_uint8))
else:
data_ptr = ctypes.cast(data, ctypes.POINTER(ctypes.c_uint8))
image_struct = HFImageData()
image_struct.data = data_ptr
image_struct.width = width
image_struct.height = height
image_struct.format = self.data_format
image_struct.rotation = self.rotate
self._handle = HFImageStream()
ret = HFCreateImageStream(PHFImageData(image_struct), self._handle)
if ret != 0:
raise Exception("Error in creating ImageStream")
def release(self):
"""
Release the resources associated with the ImageStream.
Logs an error if the release fails.
"""
if self._handle is not None:
ret = HFReleaseImageStream(self._handle)
if ret != 0:
logger.error(f"Release ImageStream error: {ret}")
def __del__(self):
"""
Ensure that resources are released when the ImageStream object is garbage collected.
"""
self.release()
def debug_show(self):
"""
Display the image using a debug function provided by the library.
"""
HFDeBugImageStreamImShow(self._handle)
@property
def handle(self):
"""
Return the internal handle of the image stream.
Returns:
The handle to the internal image stream, used for interfacing with the underlying C/C++ library.
"""
return self._handle
# == Session API ==
@dataclass
class FaceExtended:
"""
A data class to hold extended face information with confidence levels for various attributes.
Attributes:
rgb_liveness_confidence (float): Confidence level of RGB-based liveness detection.
mask_confidence (float): Confidence level of mask detection on the face.
quality_confidence (float): Confidence level of the overall quality of the face capture.
"""
rgb_liveness_confidence: float
mask_confidence: float
quality_confidence: float
class FaceInformation:
"""
Holds detailed information about a detected face including location and orientation.
Attributes:
track_id (int): Unique identifier for tracking the face across frames.
location (Tuple): Coordinates of the face in the form (x, y, width, height).
roll (float): Roll angle of the face.
yaw (float): Yaw angle of the face.
pitch (float): Pitch angle of the face.
_token (HFFaceBasicToken): A token containing low-level details about the face.
_feature (np.array, optional): An optional numpy array holding the facial feature data.
Methods:
__init__: Initializes a new instance of FaceInformation.
"""
def __init__(self,
track_id: int,
location: Tuple,
roll: float,
yaw: float,
pitch: float,
_token: HFFaceBasicToken,
_feature: np.array = None):
self.track_id = track_id
self.location = location
self.roll = roll
self.yaw = yaw
self.pitch = pitch
# Calculate the required buffer size for the face token and copy it.
token_size = HInt32()
HFGetFaceBasicTokenSize(HPInt32(token_size))
buffer_size = token_size.value
self.buffer = create_string_buffer(buffer_size)
ret = HFCopyFaceBasicToken(_token, self.buffer, token_size)
if ret != 0:
logger.error("Failed to copy face basic token")
# Store the copied token.
self._token = HFFaceBasicToken()
self._token.size = buffer_size
self._token.data = cast(addressof(self.buffer), c_void_p)
@dataclass
class SessionCustomParameter:
"""
A data class for configuring the optional parameters in a face recognition session.
Attributes are set to False by default and can be enabled as needed.
Methods:
_c_struct: Converts the Python attributes to a C-compatible structure for session configuration.
"""
enable_recognition: bool = False
enable_liveness: bool = False
enable_ir_liveness: bool = False
enable_mask_detect: bool = False
enable_age: bool = False
enable_gender: bool = False
enable_face_quality: bool = False
enable_interaction_liveness: bool = False
def _c_struct(self):
"""
Creates a C structure from the current state of the instance.
Returns:
HFSessionCustomParameter: The corresponding C structure with proper type conversions.
"""
custom_param = HFSessionCustomParameter(
enable_recognition=int(self.enable_recognition),
enable_liveness=int(self.enable_liveness),
enable_ir_liveness=int(self.enable_ir_liveness),
enable_mask_detect=int(self.enable_mask_detect),
enable_age=int(self.enable_age),
enable_gender=int(self.enable_gender),
enable_face_quality=int(self.enable_face_quality),
enable_interaction_liveness=int(self.enable_interaction_liveness)
)
return custom_param
class InspireFaceSession(object):
"""
Manages a session for face detection and recognition processes using the InspireFace library.
Attributes:
multiple_faces (HFMultipleFaceData): Stores data about multiple detected faces during the session.
_sess (HFSession): The handle to the underlying library session.
param (int or SessionCustomParameter): Configuration parameters or flags for the session.
"""
def __init__(self, param, detect_mode: int = HF_DETECT_MODE_IMAGE,
max_detect_num: int = 10):
"""
Initializes a new session with the provided configuration parameters.
Args:
param (int or SessionCustomParameter): Configuration parameters or flags.
detect_mode (int): Detection mode to be used (e.g., image-based detection).
max_detect_num (int): Maximum number of faces to detect.
Raises:
Exception: If session creation fails.
"""
self.multiple_faces = None
self._sess = HFSession()
self.param = param
if isinstance(self.param, SessionCustomParameter):
ret = HFCreateInspireFaceSession(self.param._c_struct(), detect_mode, max_detect_num, self._sess)
elif isinstance(self.param, int):
ret = HFCreateInspireFaceSessionOptional(self.param, detect_mode, max_detect_num, self._sess)
else:
raise NotImplemented("")
if ret != 0:
st = f"Create session error: {ret}"
raise Exception(st)
def face_detection(self, image) -> List[FaceInformation]:
"""
Detects faces in the given image and returns a list of FaceInformation objects containing detailed face data.
Args:
image (np.ndarray or ImageStream): The image in which to detect faces.
Returns:
List[FaceInformation]: A list of detected face information.
"""
stream = self._get_image_stream(image)
self.multiple_faces = HFMultipleFaceData()
ret = HFExecuteFaceTrack(self._sess, stream.handle,
PHFMultipleFaceData(self.multiple_faces))
if ret != 0:
logger.error(f"Face detection error: ", {ret})
return []
if self.multiple_faces.detectedNum > 0:
boxes = self._get_faces_boundary_boxes()
track_ids = self._get_faces_track_ids()
euler_angle = self._get_faces_euler_angle()
tokens = self._get_faces_tokens()
infos = list()
for idx in range(self.multiple_faces.detectedNum):
top_left = (boxes[idx][0], boxes[idx][1])
bottom_right = (boxes[idx][0] + boxes[idx][2], boxes[idx][1] + boxes[idx][3])
roll = euler_angle[idx][0]
yaw = euler_angle[idx][1]
pitch = euler_angle[idx][2]
track_id = track_ids[idx]
_token = tokens[idx]
info = FaceInformation(
location=(top_left[0], top_left[1], bottom_right[0], bottom_right[1]),
roll=roll,
yaw=yaw,
pitch=pitch,
track_id=track_id,
_token=_token,
)
infos.append(info)
return infos
else:
return []
def set_track_mode(self, mode: int):
"""
Sets the tracking mode for the face detection session.
Args:
mode (int): An integer representing the tracking mode to be used.
Notes:
If setting the mode fails, an error is logged with the returned status code.
"""
ret = HFSessionSetFaceTrackMode(self._sess, mode)
if ret != 0:
logger.error(f"Set track mode error: {ret}")
def set_track_preview_size(self, size=192):
"""
Sets the preview size for the face tracking session.
Args:
size (int, optional): The size of the preview area for face tracking. Default is 192.
Notes:
If setting the preview size fails, an error is logged with the returned status code.
"""
ret = HFSessionSetTrackPreviewSize(self._sess, size)
if ret != 0:
logger.error(f"Set track preview size error: {ret}")
def face_pipeline(self, image, faces: List[FaceInformation], exec_param) -> List[FaceExtended]:
"""
Processes detected faces to extract additional attributes based on the provided execution parameters.
Args:
image (np.ndarray or ImageStream): The image from which faces are detected.
faces (List[FaceInformation]): A list of FaceInformation objects containing detected face data.
exec_param (SessionCustomParameter or int): Custom parameters for processing faces.
Returns:
List[FaceExtended]: A list of FaceExtended objects with updated attributes like mask confidence, liveness, etc.
Notes:
If the face pipeline processing fails, an error is logged and an empty list is returned.
"""
stream = self._get_image_stream(image)
fn, pm, flag = self._get_processing_function_and_param(exec_param)
tokens = [face._token for face in faces]
tokens_array = (HFFaceBasicToken * len(tokens))(*tokens)
tokens_ptr = cast(tokens_array, PHFFaceBasicToken)
multi_faces = HFMultipleFaceData()
multi_faces.detectedNum = len(tokens)
multi_faces.tokens = tokens_ptr
ret = fn(self._sess, stream.handle, PHFMultipleFaceData(multi_faces), pm)
if ret != 0:
logger.error(f"Face pipeline error: {ret}")
return []
extends = [FaceExtended(-1.0, -1.0, -1.0) for _ in range(len(faces))]
self._update_mask_confidence(exec_param, flag, extends)
self._update_rgb_liveness_confidence(exec_param, flag, extends)
self._update_face_quality_confidence(exec_param, flag, extends)
return extends
def face_feature_extract(self, image, face_information: FaceInformation):
"""
Extracts facial features from a specified face within an image for recognition or comparison purposes.
Args:
image (np.ndarray or ImageStream): The image from which the face features are to be extracted.
face_information (FaceInformation): The FaceInformation object containing the details of the face.
Returns:
np.ndarray: A numpy array containing the extracted facial features, or None if the extraction fails.
Notes:
If the feature extraction process fails, an error is logged and None is returned.
"""
stream = self._get_image_stream(image)
feature_length = HInt32()
HFGetFeatureLength(byref(feature_length))
feature = np.zeros((feature_length.value,), dtype=np.float32)
ret = HFFaceFeatureExtractCpy(self._sess, stream.handle, face_information._token,
feature.ctypes.data_as(ctypes.POINTER(HFloat)))
if ret != 0:
logger.error(f"Face feature extract error: {ret}")
return None
return feature
@staticmethod
def _get_image_stream(image):
if isinstance(image, np.ndarray):
return ImageStream.load_from_cv_image(image)
elif isinstance(image, ImageStream):
return image
else:
raise NotImplemented("Place check input type.")
@staticmethod
def _get_processing_function_and_param(exec_param):
if isinstance(exec_param, SessionCustomParameter):
return HFMultipleFacePipelineProcess, exec_param._c_struct(), "object"
elif isinstance(exec_param, int):
return HFMultipleFacePipelineProcessOptional, exec_param, "bitmask"
else:
raise NotImplemented("Unsupported parameter type")
def _update_mask_confidence(self, exec_param, flag, extends):
if (flag == "object" and exec_param.enable_mask_detect) or (
flag == "bitmask" and exec_param & HF_ENABLE_MASK_DETECT):
mask_results = HFFaceMaskConfidence()
ret = HFGetFaceMaskConfidence(self._sess, PHFFaceMaskConfidence(mask_results))
if ret == 0:
for i in range(mask_results.num):
extends[i].mask_confidence = mask_results.confidence[i]
else:
logger.error(f"Get mask result error: {ret}")
def _update_rgb_liveness_confidence(self, exec_param, flag, extends: List[FaceExtended]):
if (flag == "object" and exec_param.enable_liveness) or (
flag == "bitmask" and exec_param & HF_ENABLE_LIVENESS):
liveness_results = HFRGBLivenessConfidence()
ret = HFGetRGBLivenessConfidence(self._sess, PHFRGBLivenessConfidence(liveness_results))
if ret == 0:
for i in range(liveness_results.num):
extends[i].rgb_liveness_confidence = liveness_results.confidence[i]
else:
logger.error(f"Get rgb liveness result error: {ret}")
def _update_face_quality_confidence(self, exec_param, flag, extends: List[FaceExtended]):
if (flag == "object" and exec_param.enable_face_quality) or (
flag == "bitmask" and exec_param & HF_ENABLE_QUALITY):
quality_results = HFFaceQualityConfidence()
ret = HFGetFaceQualityConfidence(self._sess, PHFFaceQualityConfidence(quality_results))
if ret == 0:
for i in range(quality_results.num):
extends[i].quality_confidence = quality_results.confidence[i]
else:
logger.error(f"Get quality result error: {ret}")
def _get_faces_boundary_boxes(self) -> List:
num_of_faces = self.multiple_faces.detectedNum
rects_ptr = self.multiple_faces.rects
rects = [(rects_ptr[i].x, rects_ptr[i].y, rects_ptr[i].width, rects_ptr[i].height) for i in range(num_of_faces)]
return rects
def _get_faces_track_ids(self) -> List:
num_of_faces = self.multiple_faces.detectedNum
track_ids_ptr = self.multiple_faces.trackIds
track_ids = [track_ids_ptr[i] for i in range(num_of_faces)]
return track_ids
def _get_faces_euler_angle(self) -> List:
num_of_faces = self.multiple_faces.detectedNum
euler_angle = self.multiple_faces.angles
angles = [(euler_angle.roll[i], euler_angle.yaw[i], euler_angle.pitch[i]) for i in range(num_of_faces)]
return angles
def _get_faces_tokens(self) -> List[HFFaceBasicToken]:
num_of_faces = self.multiple_faces.detectedNum
tokens_ptr = self.multiple_faces.tokens
tokens = [tokens_ptr[i] for i in range(num_of_faces)]
return tokens
def release(self):
if self._sess is not None:
HFReleaseInspireFaceSession(self._sess)
self._sess = None
def __del__(self):
self.release()
# == Global API ==
def launch(resource_path: str) -> bool:
"""
Launches the InspireFace system with the specified resource directory.
Args:
resource_path (str): The file path to the resource directory necessary for operation.
Returns:
bool: True if the system was successfully launched, False otherwise.
Notes:
A specific error is logged if duplicate loading is detected or if there is any other launch failure.
"""
path_c = String(bytes(resource_path, encoding="utf8"))
ret = HFLaunchInspireFace(path_c)
if ret != 0:
if ret == 1363:
logger.warning("Duplicate loading was found")
return True
else:
logger.error(f"Launch InspireFace failure: {ret}")
return False
return True
@dataclass
class FeatureHubConfiguration:
"""
Configuration settings for managing the feature hub, including database and search settings.
Attributes:
feature_block_num (int): Number of features per block in the database.
enable_use_db (bool): Flag to indicate if the database should be used.
db_path (str): Path to the database file.
search_threshold (float): The threshold value for considering a match.
search_mode (int): The mode of searching in the database.
"""
feature_block_num: int
enable_use_db: bool
db_path: str
search_threshold: float
search_mode: int
def _c_struct(self):
"""
Converts the data class attributes to a C-compatible structure for use in the InspireFace SDK.
Returns:
HFFeatureHubConfiguration: A C-structure for feature hub configuration.
"""
return HFFeatureHubConfiguration(
enableUseDb=int(self.enable_use_db),
dbPath=String(bytes(self.db_path, encoding="utf8")),
featureBlockNum=self.feature_block_num,
searchThreshold=self.search_threshold,
searchMode=self.search_mode
)
def feature_hub_enable(config: FeatureHubConfiguration) -> bool:
"""
Enables the feature hub with the specified configuration.
Args:
config (FeatureHubConfiguration): Configuration settings for the feature hub.
Returns:
bool: True if successfully enabled, False otherwise.
Notes:
Logs an error if enabling the feature hub fails.
"""
ret = HFFeatureHubDataEnable(config._c_struct())
if ret != 0:
logger.error(f"FeatureHub enable failure: {ret}")
return False
return True
def feature_hub_disable() -> bool:
"""
Disables the feature hub.
Returns:
bool: True if successfully disabled, False otherwise.
Notes:
Logs an error if disabling the feature hub fails.
"""
ret = HFFeatureHubDataDisable()
if ret != 0:
logger.error(f"FeatureHub disable failure: {ret}")
return False
return True
def feature_comparison(feature1: np.ndarray, feature2: np.ndarray) -> float:
"""
Compares two facial feature arrays to determine their similarity.
Args:
feature1 (np.ndarray): The first feature array.
feature2 (np.ndarray): The second feature array.
Returns:
float: A similarity score, where -1.0 indicates an error during comparison.
Notes:
Logs an error if the comparison process fails.
"""
faces = [feature1, feature2]
feats = []
for face in faces:
feature = HFFaceFeature()
data_ptr = face.ctypes.data_as(HPFloat)
feature.size = HInt32(face.size)
feature.data = data_ptr
feats.append(feature)
comparison_result = HFloat()
ret = HFFaceComparison(feats[0], feats[1], HPFloat(comparison_result))
if ret != 0:
logger.error(f"Comparison error: {ret}")
return -1.0
return float(comparison_result.value)
class FaceIdentity(object):
"""
Represents an identity based on facial features, associating the features with a custom ID and a tag.
Attributes:
feature (np.ndarray): The facial features as a numpy array.
custom_id (int): A custom identifier for the face identity.
tag (str): A tag or label associated with the face identity.
Methods:
__init__: Initializes a new instance of FaceIdentity.
from_ctypes: Converts a C structure to a FaceIdentity instance.
_c_struct: Converts the instance back to a compatible C structure.
"""
def __init__(self, data: np.ndarray, custom_id: int, tag: str):
"""
Initializes a new FaceIdentity instance with facial feature data, a custom identifier, and a tag.
Args:
data (np.ndarray): The facial feature data.
custom_id (int): A custom identifier for tracking or referencing the face identity.
tag (str): A descriptive tag or label for the face identity.
"""
self.feature = data
self.custom_id = custom_id
self.tag = tag
@staticmethod
def from_ctypes(raw_identity: HFFaceFeatureIdentity):
"""
Converts a ctypes structure representing a face identity into a FaceIdentity object.
Args:
raw_identity (HFFaceFeatureIdentity): The ctypes structure containing the face identity data.
Returns:
FaceIdentity: An instance of FaceIdentity with data extracted from the ctypes structure.
"""
feature_size = raw_identity.feature.contents.size
feature_data_ptr = raw_identity.feature.contents.data
feature_data = np.ctypeslib.as_array(cast(feature_data_ptr, HPFloat), (feature_size,))
custom_id = raw_identity.customId
tag = raw_identity.tag.data.decode('utf-8')
return FaceIdentity(data=feature_data, custom_id=custom_id, tag=tag)
def _c_struct(self):
"""
Converts this FaceIdentity instance into a C-compatible structure for use with InspireFace APIs.
Returns:
HFFaceFeatureIdentity: A C structure representing this face identity.
"""
feature = HFFaceFeature()
data_ptr = self.feature.ctypes.data_as(HPFloat)
feature.size = HInt32(self.feature.size)
feature.data = data_ptr
return HFFaceFeatureIdentity(
customId=self.custom_id,
tag=String(bytes(self.tag, encoding="utf8")),
feature=PHFFaceFeature(feature)
)
def feature_hub_set_search_threshold(threshold: float):
"""
Sets the search threshold for face matching in the FeatureHub.
Args:
threshold (float): The similarity threshold for determining a match.
"""
HFFeatureHubFaceSearchThresholdSetting(threshold)
def feature_hub_face_insert(face_identity: FaceIdentity) -> bool:
"""
Inserts a face identity into the FeatureHub database.
Args:
face_identity (FaceIdentity): The face identity to insert.
Returns:
bool: True if the face identity was successfully inserted, False otherwise.
Notes:
Logs an error if the insertion process fails.
"""
ret = HFFeatureHubInsertFeature(face_identity._c_struct())
if ret != 0:
logger.error(f"Failed to insert face feature data into FeatureHub: {ret}")
return False
return True
@dataclass
class SearchResult:
"""
Represents the result of a face search operation with confidence level and the most similar face identity found.
Attributes:
confidence (float): The confidence score of the search result, indicating the similarity.
similar_identity (FaceIdentity): The face identity that most closely matches the search query.
"""
confidence: float
similar_identity: FaceIdentity
def feature_hub_face_search(data: np.ndarray) -> SearchResult:
"""
Searches for the most similar face identity in the feature hub based on provided facial features.
Args:
data (np.ndarray): The facial feature data to search for.
Returns:
SearchResult: The search result containing the confidence and the most similar identity found.
Notes:
If the search operation fails, logs an error and returns a SearchResult with a confidence of -1.
"""
feature = HFFaceFeature(size=HInt32(data.size), data=data.ctypes.data_as(HPFloat))
confidence = HFloat()
most_similar = HFFaceFeatureIdentity()
ret = HFFeatureHubFaceSearch(feature, HPFloat(confidence), PHFFaceFeatureIdentity(most_similar))
if ret != 0:
logger.error(f"Failed to search face: {ret}")
return SearchResult(confidence=-1, similar_identity=FaceIdentity(np.zeros(0), most_similar.customId, "None"))
if most_similar.customId != -1:
search_identity = FaceIdentity.from_ctypes(most_similar)
return SearchResult(confidence=confidence.value, similar_identity=search_identity)
else:
none = FaceIdentity(np.zeros(0), most_similar.customId, "None")
return SearchResult(confidence=confidence.value, similar_identity=none)
def feature_hub_face_search_top_k(data: np.ndarray, top_k: int) -> List[Tuple]:
"""
Searches for the top 'k' most similar face identities in the feature hub based on provided facial features.
Args:
data (np.ndarray): The facial feature data to search for.
top_k (int): The number of top results to retrieve.
Returns:
List[Tuple]: A list of tuples, each containing the confidence and custom ID of the top results.
Notes:
If the search operation fails, an empty list is returned.
"""
feature = HFFaceFeature(size=HInt32(data.size), data=data.ctypes.data_as(HPFloat))
results = HFSearchTopKResults()
ret = HFFeatureHubFaceSearchTopK(feature, top_k, PHFSearchTopKResults(results))
outputs = []
if ret == 0:
for idx in range(results.size):
confidence = results.confidence[idx]
customId = results.customIds[idx]
outputs.append((confidence, customId))
return outputs
def feature_hub_face_update(face_identity: FaceIdentity) -> bool:
"""
Updates an existing face identity in the feature hub.
Args:
face_identity (FaceIdentity): The face identity to update.
Returns:
bool: True if the update was successful, False otherwise.
Notes:
Logs an error if the update operation fails.
"""
ret = HFFeatureHubFaceUpdate(face_identity._c_struct())
if ret != 0:
logger.error(f"Failed to update face feature data in FeatureHub: {ret}")
return False
return True
def feature_hub_face_remove(custom_id: int) -> bool:
"""
Removes a face identity from the feature hub using its custom ID.
Args:
custom_id (int): The custom ID of the face identity to remove.
Returns:
bool: True if the face was successfully removed, False otherwise.
Notes:
Logs an error if the removal operation fails.
"""
ret = HFFeatureHubFaceRemove(custom_id)
if ret != 0:
logger.error(f"Failed to remove face feature data from FeatureHub: {ret}")
return False
return True
def feature_hub_get_face_identity(custom_id: int):
"""
Retrieves a face identity from the feature hub using its custom ID.
Args:
custom_id (int): The custom ID of the face identity to retrieve.
Returns:
FaceIdentity: The face identity retrieved, or None if the operation fails.
Notes:
Logs an error if retrieving the face identity fails.
"""
identify = HFFaceFeatureIdentity()
ret = HFFeatureHubGetFaceIdentity(custom_id, PHFFaceFeatureIdentity(identify))
if ret != 0:
logger.error("Get face identity errors from FeatureHub")
return None
return FaceIdentity.from_ctypes(identify)
def feature_hub_get_face_count() -> int:
"""
Retrieves the total count of face identities stored in the feature hub.
Returns:
int: The count of face identities.
Notes:
Logs an error if the operation to retrieve the count fails.
"""
count = HInt32()
ret = HFFeatureHubGetFaceCount(HPInt32(count))
if ret != 0:
logger.error(f"Failed to get count: {ret}")
return int(count.value)
def view_table_in_terminal():
"""
Displays the database table of face identities in the terminal.
Notes:
Logs an error if the operation to view the table fails.
"""
ret = HFFeatureHubViewDBTable()
if ret != 0:
logger.error(f"Failed to view DB: {ret}")
def version() -> str:
"""
Retrieves the version of the InspireFace library.
Returns:
str: The version string of the library.
"""
ver = HFInspireFaceVersion()
HFQueryInspireFaceVersion(PHFInspireFaceVersion(ver))
return f"{ver.major}.{ver.minor}.{ver.patch}"
def set_logging_level(level: int) -> None:
"""
Sets the logging level of the InspireFace library.
Args:
level (int): The level to set the logging to.
"""
HFSetLogLevel(level)
def disable_logging() -> None:
"""
Disables all logging from the InspireFace library.
"""
HFLogDisable()

View File

@@ -0,0 +1,20 @@
# Session option
from inspireface.modules.core.native import HF_ENABLE_NONE, HF_ENABLE_FACE_RECOGNITION, HF_ENABLE_LIVENESS, HF_ENABLE_IR_LIVENESS, \
HF_ENABLE_MASK_DETECT, HF_ENABLE_AGE_PREDICT, HF_ENABLE_GENDER_PREDICT, HF_ENABLE_QUALITY, HF_ENABLE_INTERACTION
# Face track mode
from inspireface.modules.core.native import HF_DETECT_MODE_IMAGE, HF_DETECT_MODE_VIDEO
# Image format
from inspireface.modules.core.native import HF_STREAM_RGB, HF_STREAM_BGR, HF_STREAM_RGBA, HF_STREAM_BGRA, HF_STREAM_YUV_NV12, HF_STREAM_YUV_NV21
# Image rotation
from inspireface.modules.core.native import HF_CAMERA_ROTATION_0, HF_CAMERA_ROTATION_90, HF_CAMERA_ROTATION_180, HF_CAMERA_ROTATION_270
# Search mode
from inspireface.modules.core.native import HF_SEARCH_MODE_EAGER, HF_SEARCH_MODE_EXHAUSTIVE
# Logger level
from inspireface.modules.core.native import HF_LOG_NONE, HF_LOG_DEBUG, HF_LOG_INFO, HF_LOG_WARN, HF_LOG_ERROR, HF_LOG_FATAL

View File

@@ -0,0 +1,62 @@
import os
import cv2
import inspireface as ifac
from inspireface.param import *
import click
@click.command()
@click.argument("resource_path")
@click.argument('image_path')
def case_face_detection_image(resource_path, image_path):
"""
This is a sample application for face detection and tracking using an image.
It also includes pipeline extensions such as RGB liveness, mask detection, and face quality evaluation.
"""
# Step 1: Initialize the SDK and load the algorithm resource files.
ret = ifac.launch(resource_path)
assert ret, "Launch failure. Please ensure the resource path is correct."
# Optional features, loaded during session creation based on the modules specified.
opt = HF_ENABLE_FACE_RECOGNITION | HF_ENABLE_QUALITY | HF_ENABLE_MASK_DETECT | HF_ENABLE_LIVENESS
session = ifac.InspireFaceSession(opt, HF_DETECT_MODE_IMAGE)
# Load the image using OpenCV.
image = cv2.imread(image_path)
assert image is not None, "Please check that the image path is correct."
# Perform face detection on the image.
faces = session.face_detection(image)
print(f"face detection: {len(faces)} found")
# Copy the image for drawing the bounding boxes.
draw = image.copy()
for idx, face in enumerate(faces):
print(f"{'==' * 20}")
print(f"idx: {idx}")
# Print Euler angles of the face.
print(f"roll: {face.roll}, yaw: {face.yaw}, pitch: {face.pitch}")
# Draw bounding box around the detected face.
x1, y1, x2, y2 = face.location
cv2.rectangle(draw, (x1, y1), (x2, y2), (0, 0, 255), 2)
# Features must be enabled during session creation to use them here.
select_exec_func = HF_ENABLE_QUALITY | HF_ENABLE_MASK_DETECT | HF_ENABLE_LIVENESS
# Execute the pipeline to obtain richer face information.
extends = session.face_pipeline(image, faces, select_exec_func)
for idx, ext in enumerate(extends):
print(f"{'==' * 20}")
print(f"idx: {idx}")
# For these pipeline results, you can set thresholds based on the specific scenario to make judgments.
print(f"quality: {ext.quality_confidence}")
print(f"rgb liveness: {ext.rgb_liveness_confidence}")
print(f"face mask: {ext.mask_confidence}")
# Save the annotated image to the 'tmp/' directory.
save_path = os.path.join("tmp/", "det.jpg")
cv2.imwrite(save_path, draw)
print(f"\nSave annotated image to {save_path}")
if __name__ == '__main__':
os.makedirs("tmp", exist_ok=True)
case_face_detection_image()

View File

@@ -0,0 +1,99 @@
import os
import cv2
import inspireface as ifac
from inspireface.param import *
import click
@click.command()
@click.argument("resource_path")
@click.argument('test_data_folder')
def case_face_recognition(resource_path, test_data_folder):
"""
Launches the face recognition system, inserts face features into a database, and performs searches.
Args:
resource_path (str): Path to the resource directory for face recognition algorithms.
test_data_folder (str): Path to the test data containing images for insertion and recognition tests.
"""
# Initialize the face recognition system with provided resources.
ret = ifac.launch(resource_path)
assert ret, "Launch failure. Please ensure the resource path is correct."
# Enable face recognition features.
opt = HF_ENABLE_FACE_RECOGNITION
session = ifac.InspireFaceSession(opt, HF_DETECT_MODE_IMAGE)
# Configure the feature management system.
feature_hub_config = ifac.FeatureHubConfiguration(
feature_block_num=10,
enable_use_db=False,
db_path="",
search_threshold=0.48,
search_mode=HF_SEARCH_MODE_EAGER,
)
ret = ifac.feature_hub_enable(feature_hub_config)
assert ret, "Failed to enable FeatureHub."
# Insert face features from 'bulk' directory.
bulk_path = os.path.join(test_data_folder, "bulk")
assert os.path.exists(bulk_path), "Bulk directory does not exist."
insert_images = [os.path.join(bulk_path, path) for path in os.listdir(bulk_path) if path.endswith(".jpg")]
for idx, image_path in enumerate(insert_images):
name = os.path.basename(image_path).replace(".jpg", "")
image = cv2.imread(image_path)
assert image is not None, f"Failed to load image {image_path}"
faces = session.face_detection(image)
if faces:
face = faces[0] # Assume the most prominent face is what we want.
feature = session.face_feature_extract(image, face)
identity = ifac.FaceIdentity(feature, custom_id=idx, tag=name)
ret = ifac.feature_hub_face_insert(identity)
assert ret, "Failed to insert face."
count = ifac.feature_hub_get_face_count()
print(f"Number of faces inserted: {count}")
# Process faces from 'RD' directory and insert them.
RD = os.path.join(test_data_folder, "RD")
assert os.path.exists(RD), "RD directory does not exist."
RD_images = [os.path.join(RD, path) for path in os.listdir(RD) if path.endswith(".jpeg")]
for idx, image_path in enumerate(RD_images[:-1]):
name = os.path.basename(image_path).replace(".jpeg", "")
image = cv2.imread(image_path)
assert image is not None, f"Failed to load image {image_path}"
faces = session.face_detection(image)
if faces:
face = faces[0]
feature = session.face_feature_extract(image, face)
identity = ifac.FaceIdentity(feature, custom_id=idx+count+1, tag=name)
ret = ifac.feature_hub_face_insert(identity)
assert ret, "Failed to insert face."
count = ifac.feature_hub_get_face_count()
print(f"Total number of faces after insertion: {count}")
# Search for a similar face using the last image in RD directory.
remain = cv2.imread(RD_images[-1])
assert remain is not None, f"Failed to load image {RD_images[-1]}"
faces = session.face_detection(remain)
assert faces, "No faces detected."
face = faces[0]
feature = session.face_feature_extract(remain, face)
search = ifac.feature_hub_face_search(feature)
if search.similar_identity.custom_id != -1:
print(f"Found similar identity with ID: {search.similar_identity.custom_id}, Tag: {search.similar_identity.tag}, Confidence: {search.confidence:.2f}")
else:
print("No similar identity found.")
# Display top-k similar face identities.
print("Top-k similar identities:")
search_top_k = ifac.feature_hub_face_search_top_k(feature, 10)
for idx, (conf, custom_id) in enumerate(search_top_k):
identity = ifac.feature_hub_get_face_identity(custom_id)
print(f"Top-{idx + 1}: {identity.tag}, ID: {custom_id}, Confidence: {conf:.2f}")
if __name__ == '__main__':
case_face_recognition()

View File

@@ -0,0 +1,69 @@
import click
import cv2
import inspireface as ifac
from inspireface.param import *
@click.command()
@click.argument("resource_path")
@click.argument('source')
@click.option('--show', is_flag=True, help='Display the video stream or video file in a window.')
def case_face_tracker_from_video(resource_path, source, show):
"""
Launch a face tracking process from a video source. The 'source' can either be a webcam index (0, 1, ...)
or a path to a video file. Use the --show option to display the video.
Args:
resource_path (str): Path to the resource directory for face tracking algorithms.
source (str): Webcam index or path to the video file.
show (bool): If set, the video will be displayed in a window.
"""
# Initialize the face tracker or other resources.
print(f"Initializing with resources from: {resource_path}")
# Step 1: Initialize the SDK and load the algorithm resource files.
ret = ifac.launch(resource_path)
assert ret, "Launch failure. Please ensure the resource path is correct."
# Optional features, loaded during session creation based on the modules specified.
opt = HF_ENABLE_NONE
session = ifac.InspireFaceSession(opt, HF_DETECT_MODE_VIDEO) # Use video mode
# Determine if the source is a digital webcam index or a video file path.
try:
source_index = int(source) # Try to convert source to an integer.
cap = cv2.VideoCapture(source_index)
print(f"Using webcam at index {source_index}.")
except ValueError:
# If conversion fails, treat source as a file path.
cap = cv2.VideoCapture(source)
print(f"Opening video file at {source}.")
if not cap.isOpened():
print("Error: Could not open video source.")
return
# Main loop to process video frames.
while True:
ret, frame = cap.read()
if not ret:
break # Exit loop if no more frames or error occurs.
# Process frame here (e.g., face detection/tracking).
faces = session.face_detection(frame)
for idx, face in enumerate(faces):
x1, y1, x2, y2 = face.location
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
if show:
cv2.imshow("Face Tracker", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break # Exit loop if 'q' is pressed.
# Cleanup: release video capture and close any open windows.
cap.release()
cv2.destroyAllWindows()
print("Released all resources and closed windows.")
if __name__ == '__main__':
case_face_tracker_from_video()

View File

@@ -0,0 +1 @@
python -m unittest discover -s test

View File

@@ -0,0 +1,7 @@
from .test_settings import *
from .test_utilis import *
# Unit module
from .unit import *
from .performance import *

Binary file not shown.

After

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 212 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 424 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 224 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 296 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 391 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 166 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 165 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 165 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

View File

@@ -0,0 +1,59 @@
from test import *
import unittest
import cv2
@optional(ENABLE_LFW_PRECISION_TEST, "LFW dataset precision tests have been closed.")
class LFWPrecisionTestCase(unittest.TestCase):
def setUp(self) -> None:
self.quick = QuickComparison()
def test_lfw_precision(self):
pairs_path = os.path.join(LFW_FUNNELED_DIR_PATH, 'pairs.txt')
pairs = read_pairs(pairs_path)
self.assertEqual(True, len(pairs) > 0)
if os.path.exists(LFW_PREDICT_DATA_CACHE_PATH):
print("Loading results from cache")
cache = np.load(LFW_PREDICT_DATA_CACHE_PATH, allow_pickle=True)
similarities = cache[0]
labels = cache[1]
else:
similarities = []
labels = []
for pair in tqdm(pairs):
if len(pair) == 3:
person, img_num1, img_num2 = pair
img_path1 = os.path.join(LFW_FUNNELED_DIR_PATH, person, f"{person}_{img_num1.zfill(4)}.jpg")
img_path2 = os.path.join(LFW_FUNNELED_DIR_PATH, person, f"{person}_{img_num2.zfill(4)}.jpg")
match = True
else:
person1, img_num1, person2, img_num2 = pair
img_path1 = os.path.join(LFW_FUNNELED_DIR_PATH, person1, f"{person1}_{img_num1.zfill(4)}.jpg")
img_path2 = os.path.join(LFW_FUNNELED_DIR_PATH, person2, f"{person2}_{img_num2.zfill(4)}.jpg")
match = False
img1 = cv2.imread(img_path1)
img2 = cv2.imread(img_path2)
if not self.quick.setup(img1, img2):
print("not detect face")
continue
cosine_similarity = self.quick.comp()
similarities.append(cosine_similarity)
labels.append(match)
similarities = np.array(similarities)
labels = np.array(labels)
# save cache file
np.save(LFW_PREDICT_DATA_CACHE_PATH, [similarities, labels])
# find best threshold
best_threshold, best_accuracy = find_best_threshold(similarities, labels)
print(f"Best Threshold: {best_threshold:.2f}, Best Accuracy: {best_accuracy:.3f}")
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,68 @@
import os
import sys
import inspireface as ifac
# ++ OPTIONAL ++
# Enabling will run all the benchmark tests, which takes time
ENABLE_BENCHMARK_TEST = True
# Enabling will run all the CRUD tests, which will take time
ENABLE_CRUD_TEST = True
# Enabling will run the face search benchmark, which takes time and must be configured with the correct
# 'LFW_FUNNELED_DIR_PATH' parameter
ENABLE_SEARCH_BENCHMARK_TEST = True
# Enabling will run the LFW dataset precision test, which will take time
ENABLE_LFW_PRECISION_TEST = True
# Testing model name
TEST_MODEL_NAME = "Pikachu"
# TEST_MODEL_NAME = "Megatron"
# Testing length of face feature
TEST_MODEL_FACE_FEATURE_LENGTH = 512
# Testing face comparison image threshold
TEST_FACE_COMPARISON_IMAGE_THRESHOLD = 0.45
# ++ END OPTIONAL ++
# Current project path
TEST_PROJECT_PATH = os.path.dirname(os.path.abspath(__file__))
# Current project path
CURRENT_PROJECT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Main project path
MAIN_PROJECT_PATH = os.path.dirname(CURRENT_PROJECT_PATH)
# Model zip path
MODEL_ZIP_PATH = os.path.join(MAIN_PROJECT_PATH, "test_res/pack/")
# Testing model full path
TEST_MODEL_PATH = os.path.join(MODEL_ZIP_PATH, TEST_MODEL_NAME)
# Python test data folder
PYTHON_TEST_DATA_FOLDER = os.path.join(TEST_PROJECT_PATH, "data/")
# Stores some temporary file data generated during testing
TMP_FOLDER = os.path.join(CURRENT_PROJECT_PATH, "tmp")
# Default db file path
DEFAULT_DB_PATH = os.path.join(TMP_FOLDER, ".E63520A95DD5B3892C56DA38C3B28E551D8173FD")
# Create tmp if not exist
os.makedirs(TMP_FOLDER, exist_ok=True)
# lfw_funneled Dataset dir path
LFW_FUNNELED_DIR_PATH = "/Users/tunm/datasets/lfw_funneled/"
# The LFW data predicted by the algorithm is used and cached to save time in the next prediction, and it can be
# re-predicted by manually deleting it
LFW_PREDICT_DATA_CACHE_PATH = os.path.join(TMP_FOLDER, "LFW_PRED.npy")
assert os.path.exists(LFW_FUNNELED_DIR_PATH), "'LFW_FUNNELED_DIR_PATH' is not found."
ifac.launch(TEST_MODEL_PATH)

View File

@@ -0,0 +1,280 @@
from test.test_settings import *
import inspireface as ifac
from inspireface.param import *
import numpy as np
import time
from functools import wraps
import cv2
from itertools import cycle
from tqdm import tqdm
from unittest import skipUnless as optional
def title(name: str = None):
print("--" * 35)
print(f" InspireFace Version: {ifac.__version__}")
if name is not None:
print(f" {name}")
print("--" * 35)
def get_test_data(path: str) -> str:
return os.path.join(PYTHON_TEST_DATA_FOLDER, path)
def calculate_overlap(box1, box2):
"""
Calculate the overlap ratio between two rectangular boxes.
Parameters:
- box1: The first rectangle, format ((x1, y1), (x2, y2)), where (x1, y1) is the top left coordinate, and (x2, y2) is the bottom right coordinate.
- box2: The second rectangle, format the same as box1.
Returns:
- The overlap ratio, 0 if the rectangles do not overlap.
"""
# Unpack rectangle coordinates
x1_box1, y1_box1, x2_box1, y2_box1 = box1
x1_box2, y1_box2, x2_box2, y2_box2 = box2
# Calculate the coordinates of the intersection rectangle
x_overlap = max(0, min(x2_box1, x2_box2) - max(x1_box1, x1_box2))
y_overlap = max(0, min(y2_box1, y2_box2) - max(y1_box1, y1_box2))
# Calculate the area of the intersection
overlap_area = x_overlap * y_overlap
# Calculate the area of each rectangle
box1_area = (x2_box1 - x1_box1) * (y2_box1 - y1_box1)
box2_area = (x2_box2 - x1_box2) * (y2_box2 - y1_box2)
# Calculate the total area
total_area = box1_area + box2_area - overlap_area
# Calculate the overlap ratio
overlap_ratio = overlap_area / total_area if total_area > 0 else 0
return overlap_ratio
def restore_rotated_box(original_width, original_height, box, rotation):
"""
Restore the coordinates of a rotated face box based on the original image width, height, and rotation angle.
Parameters:
- original_width: The width of the original image.
- original_height: The height of the original image.
- box: The coordinates of the rotated box, format ((x1, y1), (x2, y2)).
- rotation: The rotation angle, represented by 0, 1, 2, 3 for 0, 90, 180, 270 degrees respectively.
Returns:
- The restored box coordinates, format same as box.
"""
# For 90 or 270 degrees rotation, the image width and height are swapped
if rotation == 1 or rotation == 3:
width, height = original_height, original_width
else:
width, height = original_width, original_height
(x1, y1, x2, y2) = box
if rotation == 0: # No transformation needed for 0 degrees
restored_box = box
elif rotation == 1: # 90 degrees rotation
restored_box = (y1, width - x2, y2, width - x1)
elif rotation == 2: # 180 degrees rotation
restored_box = (width - x2, height - y2, width - x1, height - y1)
elif rotation == 3: # 270 degrees rotation
restored_box = (height - y2, x1, height - y1, x2)
else:
raise ValueError("Rotation must be 0, 1, 2, or 3 representing 0, 90, 180, 270 degrees.")
return restored_box
def read_binary_file_to_ndarray(file_path, width, height):
nv21_size = width * height * 3 // 2 # NV21 size calculation
try:
with open(file_path, 'rb') as file:
file_data = file.read() # Read the entire file
if len(file_data) != nv21_size:
print(f"Expected file size is {nv21_size}, but got {len(file_data)}")
return None
# Assuming the file data is a complete NV21 frame
data = np.frombuffer(file_data, dtype=np.uint8)
return data
except FileNotFoundError:
print(f"File '{file_path}' not found.")
return None
except Exception as e:
print(f"An error occurred while reading the file: {str(e)}")
return None
def print_benchmark_table(benchmark_results):
print("\n")
header_format = "{:<20} | {:<10} | {:<15} | {:<15}"
row_format = "{:<20} | {:<10} | {:>10.2f} ms | {:>10.4f} ms"
print(header_format.format('Benchmark', 'Loops', 'Total Time', 'Avg Time'))
print("-" * 70) # 调整分割线长度以匹配标题长度
for name, loops, total_time in benchmark_results:
avg_time = total_time / loops
print(row_format.format(name, loops, total_time * 1000, avg_time * 1000))
def benchmark(test_name, loop):
def benchmark_decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Set the loop property on the test object
setattr(self, 'loop', loop)
start_time = time.time()
try:
result = func(self, *args, **kwargs)
finally:
end_time = time.time()
cost_total = end_time - start_time
self.__class__.benchmark_results.append((test_name, loop, cost_total))
# After the test is complete, delete the loop property to prevent other tests from being affected
delattr(self, 'loop')
return result
return wrapper
return benchmark_decorator
def read_video_generator(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise IOError(f"Cannot open video {video_path}")
while True:
ret, frame = cap.read()
if not ret:
break
yield frame
cap.release()
def lfw_generator(directory_path):
while True:
for root, dirs, files in os.walk(directory_path):
for file_name in files:
# Be sure to only process JPG images that end in '0001.jpg'
if file_name.endswith('0001.jpg'):
# Extract the name of the person as the last part of the directory name
name = os.path.basename(root)
image_path = os.path.join(root, file_name)
image = cv2.imread(image_path)
assert image is not None, "Error of image data."
yield image, name
def batch_import_lfw_faces(lfw_path, engine: ifac.InspireFaceSession, num_of_faces: int):
engine.set_track_mode(HF_DETECT_MODE_IMAGE)
generator = lfw_generator(lfw_path)
registered_faces = 0
# With the tqdm wrapper generator, unknown totals are used with total=None, and tqdm will run in unknown total mode
for image, name in tqdm(generator, total=num_of_faces, desc="Registering faces"):
faces_info = engine.face_detection(image)
if len(faces_info) == 0:
continue
# Extract features from the first face detected
first_face_info = faces_info[0]
feature = engine.face_feature_extract(image, first_face_info)
# The extracted features are used for face registration
if feature is not None:
face_identity = ifac.FaceIdentity(data=feature, tag=name, custom_id=registered_faces)
ifac.feature_hub_face_insert(face_identity)
registered_faces += 1
if registered_faces >= num_of_faces:
break
print(f"Completed. Total faces registered: {registered_faces}")
class QuickComparison(object):
def __init__(self):
param = ifac.SessionCustomParameter()
param.enable_recognition = True
self.engine = ifac.InspireFaceSession(param)
self.faces_set_1 = None
self.faces_set_2 = None
def setup(self, image1: np.ndarray, image2: np.ndarray) -> bool:
images = [image1, image2]
self.faces_set_1 = list()
self.faces_set_2 = list()
for idx, img in enumerate(images):
results = self.engine.face_detection(img)
vector_list = list()
if len(results) > 0:
for info in results:
feature = self.engine.face_feature_extract(img, info)
vector_list.append(feature)
else:
return False
if idx == 0:
self.faces_set_1 = vector_list
else:
self.faces_set_2 = vector_list
return True
def comp(self) -> float:
"""
Cross-compare one by one, keep the value with the highest score and return it, calling self.recognition.face_comparison1v1(info1, info2)
:return: Maximum matching score
"""
max_score = 0.0
# Each face in faces_set_1 is traversed and compared with each face in faces_set_2
for face1 in self.faces_set_1:
for face2 in self.faces_set_2:
score = ifac.feature_comparison(face1, face2)
if score > max_score:
max_score = score
return max_score
def match(self, threshold) -> bool:
return self.comp() > threshold
def find_best_threshold(similarities, labels):
thresholds = np.arange(0, 1, 0.01)
best_threshold = best_accuracy = 0
for threshold in thresholds:
predictions = (similarities > threshold)
accuracy = np.mean((predictions == labels).astype(int))
if accuracy > best_accuracy:
best_accuracy = accuracy
best_threshold = threshold
return best_threshold, best_accuracy
def read_pairs(pairs_filename):
"""Read the pairs.txt file and return a list of image pairs"""
pairs = []
with open(pairs_filename, 'r') as f:
for line in f.readlines()[1:]:
pair = line.strip().split()
pairs.append(pair)
return pairs

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,51 @@
from test import *
import unittest
import inspireface as ifac
from inspireface.param import *
import cv2
class CameraStreamCase(unittest.TestCase):
def setUp(self) -> None:
"""Shared area for priority execution"""
pass
def test_image_codec(self) -> None:
image = cv2.imread(get_test_data("bulk/kun.jpg"))
self.assertIsNotNone(image)
def test_stream_rotation(self) -> None:
# Prepare material
engine = ifac.InspireFaceSession(HF_ENABLE_NONE, HF_DETECT_MODE_IMAGE)
# Prepare rotation images
rotation_images_filenames = ["rotate/rot_0.jpg", "rotate/rot_90.jpg", "rotate/rot_180.jpg","rotate/rot_270.jpg"]
rotation_images = [cv2.imread(get_test_data(path)) for path in rotation_images_filenames]
self.assertEqual(True, all(isinstance(item, np.ndarray) for item in rotation_images))
# Detecting face images without rotation
rot_0 = rotation_images[0]
h, w, _ = rot_0.shape
self.assertIsNotNone(rot_0, "Image is empty")
rot_0_faces = engine.face_detection(image=rot_0)
self.assertEqual(True, len(rot_0_faces) > 0)
rot_0_face_box = rot_0_faces[0].location
num_of_faces = len(rot_0_faces)
# Detect images with other rotation angles
rotation_tags = [HF_CAMERA_ROTATION_90, HF_CAMERA_ROTATION_180, HF_CAMERA_ROTATION_270]
streams = [ifac.ImageStream.load_from_cv_image(img, rotation=rotation_tags[idx]) for idx, img in enumerate(rotation_images[1:])]
results = [engine.face_detection(stream) for stream in streams]
# No matter how many degrees the image is rotated, the same number of faces should be detected
self.assertEqual(True, all(len(item) == num_of_faces for item in results))
# Select all the first face box
rot_other_faces_boxes = [face[0].location for face in results]
# We need to restore the rotated face box
restored_boxes = [restore_rotated_box(w, h, rot_other_faces_boxes[idx], rotation_tags[idx]) for idx, box in enumerate(rot_other_faces_boxes)]
# IoU is performed with the face box of the original image to calculate the overlap
iou_results = [calculate_overlap(box, rot_0_face_box) for box in restored_boxes]
# The face box position of all rotated images is detected to be consistent with that of the original image
self.assertEqual(all(0.95 < iou < 1.0 for iou in iou_results), True)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,226 @@
import unittest
from test import *
import inspireface as ifac
from inspireface.param import *
import cv2
class FaceRecognitionBaseCase(unittest.TestCase):
"""
This case is mainly used to test the basic functions of face recognition.
"""
def setUp(self) -> None:
# Prepare material
track_mode = HF_DETECT_MODE_IMAGE
param = ifac.SessionCustomParameter()
param.enable_recognition = True
self.engine = ifac.InspireFaceSession(param, track_mode, 10)
def test_face_feature_extraction(self):
self.engine.set_track_mode(mode=HF_DETECT_MODE_IMAGE)
# Prepare a image
image = cv2.imread(get_test_data("bulk/kun.jpg"))
self.assertIsNotNone(image)
# Face detection
faces = self.engine.face_detection(image)
# "kun.jpg" has only one face
self.assertEqual(len(faces), 1)
face = faces[0]
box = face.location
expect_box = (98, 146, 233, 272)
# Calculate the location of the detected box and the expected box
iou = calculate_overlap(box, expect_box)
self.assertAlmostEqual(iou, 1.0, places=3)
# Extract feature
feature = self.engine.face_feature_extract(image, face)
self.assertIsNotNone(feature)
#
def test_face_comparison(self):
self.engine.set_track_mode(mode=HF_DETECT_MODE_IMAGE)
# Prepare two pictures of someone
images_path_list = [get_test_data("bulk/kun.jpg"), get_test_data("bulk/jntm.jpg")]
self.assertEqual(len(images_path_list), 2, "Only 2 photos can be used for the 1v1 scene.")
images = [cv2.imread(pth) for pth in images_path_list]
faces_list = [self.engine.face_detection(img) for img in images]
# Check num of faces detection
self.assertEqual(len(faces_list[0]), 1)
self.assertEqual(len(faces_list[1]), 1)
# Extract features
features = [self.engine.face_feature_extract(images[idx], faces[0]) for idx, faces in enumerate(faces_list)]
self.assertEqual(features[0].size, TEST_MODEL_FACE_FEATURE_LENGTH)
self.assertEqual(features[1].size, TEST_MODEL_FACE_FEATURE_LENGTH)
# Comparison
similarity = ifac.feature_comparison(features[0], features[1])
self.assertEqual(True, similarity > TEST_FACE_COMPARISON_IMAGE_THRESHOLD)
# Prepare a picture of a different person
woman = cv2.imread(get_test_data("bulk/woman.png"))
self.assertIsNotNone(woman)
woman_faces = self.engine.face_detection(woman)
self.assertEqual(len(woman_faces), 1)
face_3 = woman_faces[0]
feature = self.engine.face_feature_extract(woman, face_3)
self.assertEqual(feature.size, TEST_MODEL_FACE_FEATURE_LENGTH)
# Comparison
similarity = ifac.feature_comparison(features[0], feature)
self.assertEqual(True, similarity < TEST_FACE_COMPARISON_IMAGE_THRESHOLD)
similarity = ifac.feature_comparison(features[1], feature)
self.assertEqual(True, similarity < TEST_FACE_COMPARISON_IMAGE_THRESHOLD)
@optional(ENABLE_CRUD_TEST, "All CRUD related tests have been closed.")
class FaceRecognitionCRUDMemoryCase(unittest.TestCase):
"""
This case is mainly used to test the CRUD functions of face recognition.
"""
engine = None
default_faces_num = 10000
@classmethod
def setUpClass(cls):
config = ifac.FeatureHubConfiguration(
feature_block_num=20,
enable_use_db=False,
db_path="",
search_mode=HF_SEARCH_MODE_EAGER,
search_threshold=TEST_FACE_COMPARISON_IMAGE_THRESHOLD,
)
ifac.feature_hub_enable(config)
track_mode = HF_DETECT_MODE_IMAGE
param = ifac.SessionCustomParameter()
param.enable_recognition = True
cls.engine = ifac.InspireFaceSession(param, track_mode)
batch_import_lfw_faces(LFW_FUNNELED_DIR_PATH, cls.engine, cls.default_faces_num)
def test_face_search(self):
num_current = ifac.feature_hub_get_face_count()
registered = cv2.imread(get_test_data("bulk/kun.jpg"))
self.assertIsNotNone(registered)
faces = self.engine.face_detection(registered)
self.assertEqual(len(faces), 1)
face = faces[0]
feature = self.engine.face_feature_extract(registered, face)
self.assertEqual(feature.size, TEST_MODEL_FACE_FEATURE_LENGTH)
# Insert a new face
registered_identity = ifac.FaceIdentity(feature, custom_id=num_current + 1, tag="Kun")
ret = ifac.feature_hub_face_insert(registered_identity)
self.assertEqual(ret, True)
# Prepare a picture of searched face
searched = cv2.imread(get_test_data("bulk/jntm.jpg"))
self.assertIsNotNone(searched)
faces = self.engine.face_detection(searched)
self.assertEqual(len(faces), 1)
searched_face = faces[0]
feature = self.engine.face_feature_extract(searched, searched_face)
self.assertEqual(feature.size, TEST_MODEL_FACE_FEATURE_LENGTH)
searched_result = ifac.feature_hub_face_search(feature)
self.assertEqual(True, searched_result.confidence > TEST_FACE_COMPARISON_IMAGE_THRESHOLD)
self.assertEqual(searched_result.similar_identity.tag, registered_identity.tag)
self.assertEqual(searched_result.similar_identity.custom_id, registered_identity.custom_id)
# Prepare a picture of a stranger's face
stranger = cv2.imread(get_test_data("bulk/woman.png"))
self.assertIsNotNone(stranger)
faces = self.engine.face_detection(stranger)
self.assertEqual(len(faces), 1)
stranger_face = faces[0]
feature = self.engine.face_feature_extract(stranger, stranger_face)
self.assertEqual(feature.size, TEST_MODEL_FACE_FEATURE_LENGTH)
stranger_result = ifac.feature_hub_face_search(feature)
self.assertEqual(True, stranger_result.confidence < TEST_FACE_COMPARISON_IMAGE_THRESHOLD)
self.assertEqual(stranger_result.similar_identity.custom_id, -1)
#
def test_face_remove(self):
query_image = cv2.imread(get_test_data("bulk/Nathalie_Baye_0002.jpg"))
self.assertIsNotNone(query_image)
faces = self.engine.face_detection(query_image)
self.assertEqual(len(faces), 1)
query_face = faces[0]
feature = self.engine.face_feature_extract(query_image, query_face)
self.assertEqual(feature.size, TEST_MODEL_FACE_FEATURE_LENGTH)
# First search
result = ifac.feature_hub_face_search(feature)
self.assertEqual(True, result.confidence > TEST_FACE_COMPARISON_IMAGE_THRESHOLD)
self.assertEqual("Nathalie_Baye", result.similar_identity.tag)
# Remove that
remove_id = result.similar_identity.custom_id
ret = ifac.feature_hub_face_remove(remove_id)
self.assertEqual(ret, True)
# Second search
result = ifac.feature_hub_face_search(feature)
self.assertEqual(True, result.confidence < TEST_FACE_COMPARISON_IMAGE_THRESHOLD)
self.assertEqual(result.similar_identity.custom_id, -1)
# Reusability testing
new_face_image = cv2.imread(get_test_data("bulk/yifei.jpg"))
self.assertIsNotNone(new_face_image)
faces = self.engine.face_detection(new_face_image)
self.assertEqual(len(faces), 1)
new_face = faces[0]
feature = self.engine.face_feature_extract(new_face_image, new_face)
# Insert that
registered_identity = ifac.FaceIdentity(feature, custom_id=remove_id, tag="YF")
ifac.feature_hub_face_insert(registered_identity)
def test_face_update(self):
pass
@optional(ENABLE_BENCHMARK_TEST, "All benchmark related tests have been closed.")
class FaceRecognitionFeatureExtractCase(unittest.TestCase):
benchmark_results = list()
loop = 1
@classmethod
def setUpClass(cls):
cls.benchmark_results = []
def setUp(self) -> None:
# Prepare image
image = cv2.imread(get_test_data("bulk/kun.jpg"))
self.stream = ifac.ImageStream.load_from_cv_image(image)
self.assertIsNotNone(self.stream)
# Prepare material
track_mode = HF_DETECT_MODE_IMAGE
param = ifac.SessionCustomParameter()
param.enable_recognition = True
self.engine = ifac.InspireFaceSession(param, track_mode)
# Prepare a face
faces = self.engine.face_detection(self.stream)
# "kun.jpg" has only one face
self.assertEqual(len(faces), 1)
self.face = faces[0]
box = self.face.location
expect_box = (98, 146, 233, 272)
# Calculate the location of the detected box and the expected box
iou = calculate_overlap(box, expect_box)
self.assertAlmostEqual(iou, 1.0, places=3)
self.feature = self.engine.face_feature_extract(self.stream, self.face)
@benchmark(test_name="Feature Extract", loop=1000)
def test_benchmark_feature_extract(self):
self.engine.set_track_mode(HF_DETECT_MODE_IMAGE)
for _ in range(self.loop):
feature = self.engine.face_feature_extract(self.stream, self.face)
self.assertEqual(TEST_MODEL_FACE_FEATURE_LENGTH, feature.size)
@benchmark(test_name="Face comparison 1v1", loop=1000)
def test_benchmark_face_comparison1v1(self):
for _ in range(self.loop):
ifac.feature_comparison(self.feature, self.feature)
@classmethod
def tearDownClass(cls):
print_benchmark_table(cls.benchmark_results)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,152 @@
import unittest
from test import *
import inspireface as ifac
from inspireface.param import *
import cv2
class FaceTrackerCase(unittest.TestCase):
def setUp(self) -> None:
# Prepare material
track_mode = HF_DETECT_MODE_IMAGE # Use video mode
self.engine = ifac.InspireFaceSession(param=ifac.SessionCustomParameter(),
detect_mode=track_mode)
def test_face_detection_from_image(self):
image = cv2.imread(get_test_data("bulk/kun.jpg"))
self.assertIsNotNone(image)
# Detection
faces = self.engine.face_detection(image)
# "kun.jpg" has only one face
self.assertEqual(len(faces), 1)
face = faces[0]
expect_box = (98, 146, 233, 272)
# Calculate the location of the detected box and the expected box
iou = calculate_overlap(face.location, expect_box)
self.assertAlmostEqual(iou, 1.0, places=3)
# Prepare non-face images
any_image = cv2.imread(get_test_data("bulk/view.jpg"))
self.assertIsNotNone(any_image)
self.assertEqual(len(self.engine.face_detection(any_image)), 0)
def test_face_pose(self):
self.engine.set_track_mode(HF_DETECT_MODE_IMAGE)
# Test yaw (shake one's head)
left_face = cv2.imread(get_test_data("pose/left_face.jpeg"))
self.assertIsNotNone(left_face)
faces = self.engine.face_detection(left_face)
self.assertEqual(len(faces), 1)
left_face_yaw = faces[0].yaw
# The expected value is not completely accurate, it is only a rough estimate
expect_left_shake_range = (-90, -10)
self.assertEqual(True, expect_left_shake_range[0] < left_face_yaw < expect_left_shake_range[1])
right_face = cv2.imread(get_test_data("pose/right_face.png"))
self.assertIsNotNone(right_face)
faces = self.engine.face_detection(right_face)
self.assertEqual(len(faces), 1)
right_face_yaw = faces[0].yaw
expect_right_shake_range = (10, 90)
self.assertEqual(True, expect_right_shake_range[0] < right_face_yaw < expect_right_shake_range[1])
# Test pitch (nod head)
rise_face = cv2.imread(get_test_data("pose/rise_face.jpeg"))
self.assertIsNotNone(rise_face)
faces = self.engine.face_detection(rise_face)
self.assertEqual(len(faces), 1)
left_face_pitch = faces[0].pitch
self.assertEqual(True, left_face_pitch > 5)
lower_face = cv2.imread(get_test_data("pose/lower_face.jpeg"))
self.assertIsNotNone(lower_face)
faces = self.engine.face_detection(lower_face)
self.assertEqual(len(faces), 1)
lower_face_pitch = faces[0].pitch
self.assertEqual(True, lower_face_pitch < -10)
# Test roll (wryneck head)
left_wryneck_face = cv2.imread(get_test_data("pose/left_wryneck.png"))
self.assertIsNotNone(left_wryneck_face)
faces = self.engine.face_detection(left_wryneck_face)
self.assertEqual(len(faces), 1)
left_face_roll = faces[0].roll
self.assertEqual(True, left_face_roll < -30)
right_wryneck_face = cv2.imread(get_test_data("pose/right_wryneck.png"))
self.assertIsNotNone(right_wryneck_face)
faces = self.engine.face_detection(right_wryneck_face)
self.assertEqual(len(faces), 1)
right_face_roll = faces[0].roll
self.assertEqual(True, right_face_roll > 30)
def test_face_track_from_video(self):
self.engine.set_track_mode(HF_DETECT_MODE_VIDEO)
# Read a video file
video_gen = read_video_generator(get_test_data("video/810_1684206192.mp4"))
results = [self.engine.face_detection(frame) for frame in video_gen]
num_of_frame = len(results)
num_of_track_loss = len([faces for faces in results if not faces])
total_track_ids = [faces[0].track_id for faces in results if faces]
num_of_id_switch = len([id_ for id_ in total_track_ids if id_ != 1])
# Calculate the loss rate of trace loss and switching id
track_loss = num_of_track_loss / num_of_frame
id_switch_loss = num_of_id_switch / len(total_track_ids)
# Not rigorous, only for the current test of this video file
self.assertEqual(True, track_loss < 0.05)
self.assertEqual(True, id_switch_loss < 0.1)
@optional(ENABLE_BENCHMARK_TEST, "All benchmark related tests have been closed.")
class FaceTrackerBenchmarkCase(unittest.TestCase):
benchmark_results = list()
loop = 1
@classmethod
def setUpClass(cls):
cls.benchmark_results = []
def setUp(self) -> None:
# Prepare image
self.image = cv2.imread(get_test_data("bulk/kun.jpg"))
self.assertIsNotNone(self.image)
# Prepare material
track_mode = HF_DETECT_MODE_VIDEO # Use video mode
self.engine = ifac.InspireFaceSession(HF_ENABLE_NONE, track_mode, )
# Prepare video data
self.video_gen = read_video_generator(get_test_data("video/810_1684206192.mp4"))
@benchmark(test_name="Face Detect", loop=1000)
def test_benchmark_face_detect(self):
self.engine.set_track_mode(HF_DETECT_MODE_IMAGE)
for _ in range(self.loop):
faces = self.engine.face_detection(self.image)
self.assertEqual(len(faces), 1, "No face detected may have an error, please check.")
@benchmark(test_name="Face Track", loop=1000)
def test_benchmark_face_track(self):
self.engine.set_track_mode(HF_DETECT_MODE_VIDEO)
for _ in range(self.loop):
faces = self.engine.face_detection(self.image)
self.assertEqual(len(faces), 1, "No face detected may have an error, please check.")
@benchmark(test_name="Face Track(Video)", loop=345)
def test_benchmark_face_track_video(self):
self.engine.set_track_mode(HF_DETECT_MODE_VIDEO)
for frame in self.video_gen:
faces = self.engine.face_detection(frame)
self.assertEqual(len(faces), 1, "No face detected may have an error, please check.")
@classmethod
def tearDownClass(cls):
print_benchmark_table(cls.benchmark_results)
if __name__ == '__main__':
unittest.main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 202 KiB