"""Helper for evaluation on the Labeled Faces in the Wild dataset """ # MIT License # # Copyright (c) 2016 David Sandberg # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import datetime import os import pickle import numpy as np import sklearn import oneflow as flow from scipy import interpolate from sklearn.decomposition import PCA from sklearn.model_selection import KFold import cv2 as cv import logging class LFold: def __init__(self, n_splits=2, shuffle=False): self.n_splits = n_splits if self.n_splits > 1: self.k_fold = KFold(n_splits=n_splits, shuffle=shuffle) def split(self, indices): if self.n_splits > 1: return self.k_fold.split(indices) else: return [(indices, indices)] def calculate_roc( thresholds, embeddings1, embeddings2, actual_issame, nrof_folds=10, pca=0 ): assert embeddings1.shape[0] == embeddings2.shape[0] assert embeddings1.shape[1] == embeddings2.shape[1] nrof_pairs = min(len(actual_issame), embeddings1.shape[0]) nrof_thresholds = len(thresholds) k_fold = LFold(n_splits=nrof_folds, shuffle=False) tprs = np.zeros((nrof_folds, nrof_thresholds)) fprs = np.zeros((nrof_folds, nrof_thresholds)) accuracy = np.zeros((nrof_folds)) indices = np.arange(nrof_pairs) if pca == 0: diff = np.subtract(embeddings1, embeddings2) dist = np.sum(np.square(diff), 1) for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)): if pca > 0: print("doing pca on", fold_idx) embed1_train = embeddings1[train_set] embed2_train = embeddings2[train_set] _embed_train = np.concatenate((embed1_train, embed2_train), axis=0) pca_model = PCA(n_components=pca) pca_model.fit(_embed_train) embed1 = pca_model.transform(embeddings1) embed2 = pca_model.transform(embeddings2) embed1 = sklearn.preprocessing.normalize(embed1) embed2 = sklearn.preprocessing.normalize(embed2) diff = np.subtract(embed1, embed2) dist = np.sum(np.square(diff), 1) # Find the best threshold for the fold acc_train = np.zeros((nrof_thresholds)) for threshold_idx, threshold in enumerate(thresholds): _, _, acc_train[threshold_idx] = calculate_accuracy( threshold, dist[train_set], actual_issame[train_set] ) best_threshold_index = np.argmax(acc_train) for threshold_idx, threshold in enumerate(thresholds): ( tprs[fold_idx, threshold_idx], fprs[fold_idx, threshold_idx], _, ) = calculate_accuracy(threshold, dist[test_set], actual_issame[test_set]) _, _, accuracy[fold_idx] = calculate_accuracy( thresholds[best_threshold_index], dist[test_set], actual_issame[test_set] ) tpr = np.mean(tprs, 0) fpr = np.mean(fprs, 0) return tpr, fpr, accuracy def calculate_accuracy(threshold, dist, actual_issame): predict_issame = np.less(dist, threshold) tp = np.sum(np.logical_and(predict_issame, actual_issame)) fp = np.sum(np.logical_and(predict_issame, np.logical_not(actual_issame))) tn = np.sum( np.logical_and(np.logical_not(predict_issame), np.logical_not(actual_issame)) ) fn = np.sum(np.logical_and(np.logical_not(predict_issame), actual_issame)) tpr = 0 if (tp + fn == 0) else float(tp) / float(tp + fn) fpr = 0 if (fp + tn == 0) else float(fp) / float(fp + tn) acc = float(tp + tn) / dist.size return tpr, fpr, acc def calculate_val( thresholds, embeddings1, embeddings2, actual_issame, far_target, nrof_folds=10 ): assert embeddings1.shape[0] == embeddings2.shape[0] assert embeddings1.shape[1] == embeddings2.shape[1] nrof_pairs = min(len(actual_issame), embeddings1.shape[0]) nrof_thresholds = len(thresholds) k_fold = LFold(n_splits=nrof_folds, shuffle=False) val = np.zeros(nrof_folds) far = np.zeros(nrof_folds) diff = np.subtract(embeddings1, embeddings2) dist = np.sum(np.square(diff), 1) indices = np.arange(nrof_pairs) for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)): # Find the threshold that gives FAR = far_target far_train = np.zeros(nrof_thresholds) for threshold_idx, threshold in enumerate(thresholds): _, far_train[threshold_idx] = calculate_val_far( threshold, dist[train_set], actual_issame[train_set] ) if np.max(far_train) >= far_target: f = interpolate.interp1d(far_train, thresholds, kind="slinear") threshold = f(far_target) else: threshold = 0.0 val[fold_idx], far[fold_idx] = calculate_val_far( threshold, dist[test_set], actual_issame[test_set] ) val_mean = np.mean(val) far_mean = np.mean(far) val_std = np.std(val) return val_mean, val_std, far_mean def calculate_val_far(threshold, dist, actual_issame): predict_issame = np.less(dist, threshold) true_accept = np.sum(np.logical_and(predict_issame, actual_issame)) false_accept = np.sum(np.logical_and(predict_issame, np.logical_not(actual_issame))) n_same = np.sum(actual_issame) n_diff = np.sum(np.logical_not(actual_issame)) # print(true_accept, false_accept) # print(n_same, n_diff) val = float(true_accept) / float(n_same) far = float(false_accept) / float(n_diff) return val, far def evaluate(embeddings, actual_issame, nrof_folds=10, pca=0): # Calculate evaluation metrics thresholds = np.arange(0, 4, 0.01) embeddings1 = embeddings[0::2] embeddings2 = embeddings[1::2] tpr, fpr, accuracy = calculate_roc( thresholds, embeddings1, embeddings2, np.asarray(actual_issame), nrof_folds=nrof_folds, pca=pca, ) thresholds = np.arange(0, 4, 0.001) val, val_std, far = calculate_val( thresholds, embeddings1, embeddings2, np.asarray(actual_issame), 1e-3, nrof_folds=nrof_folds, ) return tpr, fpr, accuracy, val, val_std, far def load_bin_cv(path, image_size): bins, issame_list = pickle.load(open(path, "rb"), encoding="bytes") data_list = [] for flip in [0, 1]: data = flow.empty(len(issame_list) * 2, 3, image_size[0], image_size[1]) data_list.append(data) for i in range(len(issame_list) * 2): _bin = bins[i] img_ori = cv.imdecode(_bin, cv.IMREAD_COLOR)[:, :, ::-1] for flip in [0, 1]: img = img_ori.copy() if flip == 1: img = cv.flip(img, 1) img = np.array(img).transpose((2, 0, 1)) img = (img - 127.5) * 0.00784313725 data_list[flip][i] = flow.tensor(img, dtype=flow.float) if i % 1000 == 0: logging.info("loading bin:%d", i) logging.info(data_list[0].shape) return data_list, issame_list @flow.no_grad() def test(data_set, backbone, batch_size, nfolds=10, is_consistent=False): logging.info("testing verification..") data_list = data_set[0] issame_list = data_set[1] embeddings_list = [] time_consumed = 0.0 if is_consistent: placement = flow.env.all_device_placement("cpu") sbp = flow.sbp.split(0) for i in range(len(data_list)): data = data_list[i] embeddings = None ba = 0 while ba < data.shape[0]: bb = min(ba + batch_size, data.shape[0]) count = bb - ba img = data[bb - batch_size : bb] time0 = datetime.datetime.now() with flow.no_grad(): if is_consistent: img = img.to_consistent(placement=placement, sbp=sbp) net_out = backbone(img.to("cuda")) if is_consistent: _embeddings = net_out.to_local().numpy() else: _embeddings = net_out.detach().numpy() time_now = datetime.datetime.now() diff = time_now - time0 time_consumed += diff.total_seconds() if embeddings is None: embeddings = np.zeros((data.shape[0], _embeddings.shape[1])) embeddings[ba:bb, :] = _embeddings[(batch_size - count) :, :] ba = bb embeddings_list.append(embeddings) _xnorm = 0.0 _xnorm_cnt = 0 for embed in embeddings_list: for i in range(embed.shape[0]): _em = embed[i] _norm = np.linalg.norm(_em) _xnorm += _norm _xnorm_cnt += 1 _xnorm /= _xnorm_cnt embeddings = embeddings_list[0].copy() embeddings = sklearn.preprocessing.normalize(embeddings) acc1 = 0.0 std1 = 0.0 embeddings = embeddings_list[0] + embeddings_list[1] embeddings = sklearn.preprocessing.normalize(embeddings) logging.info(embeddings.shape) logging.info("infer time:%f" % time_consumed) _, _, accuracy, val, val_std, far = evaluate( embeddings, issame_list, nrof_folds=nfolds ) acc2, std2 = np.mean(accuracy), np.std(accuracy) return acc1, std1, acc2, std2, _xnorm, embeddings_list def dumpR(data_set, backbone, batch_size, name="", data_extra=None, label_shape=None): print("dump verification embedding..") data_list = data_set[0] issame_list = data_set[1] embeddings_list = [] time_consumed = 0.0 for i in range(len(data_list)): data = data_list[i] embeddings = None ba = 0 while ba < data.shape[0]: bb = min(ba + batch_size, data.shape[0]) count = bb - ba _data = nd.slice_axis(data, axis=0, begin=bb - batch_size, end=bb) time0 = datetime.datetime.now() if data_extra is None: db = mx.io.DataBatch(data=(_data,), label=(_label,)) else: db = mx.io.DataBatch(data=(_data, _data_extra), label=(_label,)) model.forward(db, is_train=False) net_out = model.get_outputs() _embeddings = net_out[0].asnumpy() time_now = datetime.datetime.now() diff = time_now - time0 time_consumed += diff.total_seconds() if embeddings is None: embeddings = np.zeros((data.shape[0], _embeddings.shape[1])) embeddings[ba:bb, :] = _embeddings[(batch_size - count) :, :] ba = bb embeddings_list.append(embeddings) embeddings = embeddings_list[0] + embeddings_list[1] embeddings = sklearn.preprocessing.normalize(embeddings) actual_issame = np.asarray(issame_list) outname = os.path.join("temp.bin") with open(outname, "wb") as f: pickle.dump((embeddings, issame_list), f, protocol=pickle.HIGHEST_PROTOCOL)