diff --git a/uniface/attribute/age_gender.py b/uniface/attribute/age_gender.py index 9067d63..d6af83a 100644 --- a/uniface/attribute/age_gender.py +++ b/uniface/attribute/age_gender.py @@ -1,6 +1,6 @@ import cv2 import numpy as np -import onnxruntime +import onnxruntime as ort from typing import Tuple from uniface.log import Logger @@ -50,7 +50,7 @@ class AgeGender: model_path (str): Path to .onnx model. """ try: - self.session = onnxruntime.InferenceSession( + self.session = ort.InferenceSession( model_path, providers=["CUDAExecutionProvider", "CPUExecutionProvider"] diff --git a/uniface/constants.py b/uniface/constants.py index 987eb76..bef205c 100644 --- a/uniface/constants.py +++ b/uniface/constants.py @@ -70,6 +70,14 @@ class AgeGenderWeights(str, Enum): """ DEFAULT = "age_gender" + +class LandmarkWeights(str, Enum): + """ + MobileNet 0.5 from Insightface + https://github.com/deepinsight/insightface/tree/master/alignment/coordinate_reg + """ + DEFAULT = "2d_106" + # fmt: on @@ -106,9 +114,12 @@ MODEL_URLS: Dict[Enum, str] = { # DDAFM DDAMFNWeights.AFFECNET7: 'https://github.com/yakhyo/uniface/releases/download/v0.1.2/affecnet7.script', DDAMFNWeights.AFFECNET8: 'https://github.com/yakhyo/uniface/releases/download/v0.1.2/affecnet8.script', - + # AgeGender AgeGenderWeights.DEFAULT: 'https://github.com/yakhyo/uniface/releases/download/v0.1.2/genderage.onnx', + + # Landmarks + LandmarkWeights.DEFAULT: 'https://github.com/yakhyo/uniface/releases/download/v0.1.2/2d106det.onnx', } MODEL_SHA256: Dict[Enum, str] = { @@ -142,9 +153,12 @@ MODEL_SHA256: Dict[Enum, str] = { # DDAFM DDAMFNWeights.AFFECNET7: '10535bf8b6afe8e9d6ae26cea6c3add9a93036e9addb6adebfd4a972171d015d', DDAMFNWeights.AFFECNET8: '8c66963bc71db42796a14dfcbfcd181b268b65a3fc16e87147d6a3a3d7e0f487', - + # AgeGender AgeGenderWeights.DEFAULT: '4fde69b1c810857b88c64a335084f1c3fe8f01246c9a191b48c7bb756d6652fb', + + # Landmark + LandmarkWeights.DEFAULT: 'f001b856447c413801ef5c42091ed0cd516fcd21f2d6b79635b1e733a7109dbf', } CHUNK_SIZE = 8192 diff --git a/uniface/landmark/model.py b/uniface/landmark/model.py index 40b3ed6..ca5b01d 100644 --- a/uniface/landmark/model.py +++ b/uniface/landmark/model.py @@ -1,99 +1,190 @@ import cv2 import onnx -import onnxruntime +import onnxruntime as ort import numpy as np +from typing import Tuple -# from ..data import get_object - +from uniface.log import Logger from uniface.face_utils import bbox_center_alignment, trans_points +from uniface.model_store import verify_model_weights -__all__ = [ - 'Landmark', -] +from uniface.detection import RetinaFace +from uniface.constants import RetinaFaceWeights, LandmarkWeights + +__all__ = ['Landmark'] class Landmark: - def __init__(self, model_file=None, session=None): - assert model_file is not None - self.model_file = model_file - self.session = session + def __init__(self, model_name: LandmarkWeights = LandmarkWeights.DEFAULT, input_size: Tuple[int, int] = (192, 192)) -> None: + """ + Initializes the Attribute model for inference. - model = onnx.load(self.model_file) + Args: + model_path (str): Path to the ONNX file. + """ - input_mean = 0.0 - input_std = 1.0 + Logger.info( + f"Initializing Landmark with model={model_name}, " + f"input_size={input_size}" + ) - self.input_mean = input_mean - self.input_std = input_std - # print('input mean and std:', model_file, self.input_mean, self.input_std) + self.input_size = input_size + self.input_std = 1.0 + self.input_mean = 0.0 - if self.session is None: - self.session = onnxruntime.InferenceSession(self.model_file, None) - input_cfg = self.session.get_inputs()[0] - input_shape = input_cfg.shape - input_name = input_cfg.name + # Get path to model weights + self._model_path = verify_model_weights(model_name) + Logger.info(f"Verfied model weights located at: {self._model_path}") - self.input_size = tuple(input_shape[2:4][::-1]) - self.input_shape = input_shape + # Initialize model + self._initialize_model(model_path=self._model_path) - outputs = self.session.get_outputs() - output_names = [] - for out in outputs: - output_names.append(out.name) - - self.input_name = input_name - self.output_names = output_names - - assert len(self.output_names) == 1 - - output_shape = outputs[0].shape - self.require_pose = False - - self.lmk_dim = 2 - self.lmk_num = output_shape[1]//self.lmk_dim - self.taskname = 'landmark_%dd_%d' % (self.lmk_dim, self.lmk_num) - - def prepare(self, ctx_id, **kwargs): - if ctx_id < 0: - self.session.set_providers(['CPUExecutionProvider']) - - def get(self, img, bbox): - w, h = (bbox[2] - bbox[0]), (bbox[3] - bbox[1]) - center = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2 - rotate = 0 - _scale = self.input_size[0] / (max(w, h)*1.5) - # print('param:', img.shape, bbox, center, self.input_size, _scale, rotate) - - aimg, M = bbox_center_alignment(img, center, self.input_size[0], _scale, rotate) - input_size = tuple(aimg.shape[0:2][::-1]) - - # assert input_size==self.input_size + def _initialize_model(self, model_path:str): + """ Initialize the model from the given path. + Args: + model_path (str): Path to .onnx model. + """ + try: + self.session = ort.InferenceSession( + model_path, + providers=["CUDAExecutionProvider", "CPUExecutionProvider"] + ) + + metadata = self.session.get_inputs()[0] + input_shape = metadata.shape + self.input_size = tuple(input_shape[2:4][::-1]) + + self.input_names = [x.name for x in self.session.get_inputs()] + self.output_names = [x.name for x in self.session.get_outputs()] + + outputs = self.session.get_outputs() + output_shape = outputs[0].shape + self.lmk_dim = 2 + self.lmk_num = output_shape[1] // self.lmk_dim + + except Exception as e: + print(f"Failed to load the model: {e}") + raise + + def preprocess(self, image: np.ndarray, bbox: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Preprocess the input image and bbox for inference. + + Args: + image (np.ndarray): Input image. + bbox (np.ndarray): Bounding box [x1, y1, x2, y2]. + + Returns: + Tuple[np.ndarray, np.ndarray]: Preprocessed blob and transformation matrix. + """ + width, height = bbox[2] - bbox[0], bbox[3] - bbox[1] + center = (bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2 + scale = self.input_size[0] / (max(width, height) * 1.5) + rotation = 0.0 + + transformed_image, M = bbox_center_alignment(image, center, self.input_size[0], scale, rotation) + input_size = tuple(transformed_image.shape[0:2][::-1]) + blob = cv2.dnn.blobFromImage( - aimg, + transformed_image, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True ) - pred = self.session.run(self.output_names, {self.input_name: blob})[0][0] - if pred.shape[0] >= 3000: - pred = pred.reshape((-1, 3)) - else: - pred = pred.reshape((-1, 2)) - if self.lmk_num < pred.shape[0]: - pred = pred[self.lmk_num*-1:, :] - pred[:, 0:2] += 1 - pred[:, 0:2] *= (self.input_size[0] // 2) - if pred.shape[1] == 3: - pred[:, 2] *= (self.input_size[0] // 2) + return blob, M + + def postprocess(self, preds: np.ndarray, M: np.ndarray) -> np.ndarray: + """ + Postprocess model outputs to get landmarks. + + Args: + preds (np.ndarray): Raw model predictions. + M (np.ndarray): Affine transformation matrix. + + Returns: + np.ndarray: Transformed landmarks. + """ + + preds = preds.reshape((-1, 2)) + + preds[:, 0:2] += 1 + preds[:, 0:2] *= (self.input_size[0] // 2) IM = cv2.invertAffineTransform(M) - pred = trans_points(pred, IM) + preds = trans_points(preds, IM) - return pred + return preds + + def predict(self, image: np.ndarray, bbox: np.ndarray) -> np.ndarray: + """ + Predict facial landmarks for the given image and bounding box. + Args: + image (np.ndarray): Input image. + bbox (np.ndarray): Bounding box [x1, y1, x2, y2]. + + Returns: + np.ndarray: Predicted landmarks. + """ + blob, M = self.preprocess(image, bbox) + preds = self.session.run(self.output_names, {self.input_names[0]: blob})[0][0] + landmarks = self.postprocess(preds, M) + + return landmarks + +# TODO: For testing purposes only, remote later if __name__ == "__main__": - model = Landmark("2d106det.onnx") + + face_detector = RetinaFace( + model_name=RetinaFaceWeights.MNET_V2, + conf_thresh=0.5, + pre_nms_topk=5000, + nms_thresh=0.4, + post_nms_topk=750, + dynamic_size=False, + input_size=(640, 640) + ) + + model = Landmark() + + cap = cv2.VideoCapture(0) + if not cap.isOpened(): + print("Webcam not available.") + exit() + + print("Press 'q' to quit.") + + while True: + ret, frame = cap.read() + if not ret: + print("Frame capture failed.") + break + + boxes, landmarks = face_detector.detect(frame) + + if boxes is None or len(boxes) == 0: + cv2.imshow("Facial Landmark Detection", frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + continue + + for box in boxes: + x1, y1, x2, y2, score = box.astype(int) + + lmk = model.predict(frame, box[:4]) + + for (x, y) in lmk.astype(int): + cv2.circle(frame, (x, y), 2, (0, 255, 0), -1) + + cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2) + + cv2.imshow("Facial Landmark Detection", frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + cap.release() + cv2.destroyAllWindows()