retinaface test code

This commit is contained in:
nttstar
2019-04-18 15:28:38 +08:00
parent 73561c1d0e
commit 51e267a82d
14 changed files with 2037 additions and 0 deletions

6
retinaface/Makefile Normal file
View File

@@ -0,0 +1,6 @@
all:
cd rcnn/cython/; python setup.py build_ext --inplace; rm -rf build; cd ../../
cd rcnn/pycocotools/; python setup.py build_ext --inplace; rm -rf build; cd ../../
clean:
cd rcnn/cython/; rm *.so *.c *.cpp; cd ../../
cd rcnn/pycocotools/; rm *.so; cd ../../

View File

@@ -0,0 +1,2 @@
from .imdb import IMDB
from .retinaface import retinaface

View File

@@ -0,0 +1,16 @@
import numpy as np
def unique_boxes(boxes, scale=1.0):
""" return indices of unique boxes """
v = np.array([1, 1e3, 1e6, 1e9])
hashes = np.round(boxes * scale).dot(v).astype(np.int)
_, index = np.unique(hashes, return_index=True)
return np.sort(index)
def filter_small_boxes(boxes, min_size):
w = boxes[:, 2] - boxes[:, 0]
h = boxes[:, 3] - boxes[:, 1]
keep = np.where((w >= min_size) & (h > min_size))[0]
return keep

View File

@@ -0,0 +1,318 @@
"""
General image database
An image database creates a list of relative image path called image_set_index and
transform index to absolute image path. As to training, it is necessary that ground
truth and proposals are mixed together for training.
roidb
basic format [image_index]
['image', 'height', 'width', 'flipped',
'boxes', 'gt_classes', 'gt_overlaps', 'max_classes', 'max_overlaps', 'bbox_targets']
"""
from ..logger import logger
import os
try:
import cPickle as pickle
except ImportError:
import pickle
import numpy as np
from ..processing.bbox_transform import bbox_overlaps
class IMDB(object):
def __init__(self, name, image_set, root_path, dataset_path):
"""
basic information about an image database
:param name: name of image database will be used for any output
:param root_path: root path store cache and proposal data
:param dataset_path: dataset path store images and image lists
"""
self.name = name + '_' + image_set
self.image_set = image_set
self.root_path = root_path
self.data_path = dataset_path
# abstract attributes
self.classes = []
self.num_classes = 0
self.image_set_index = []
self.num_images = 0
self.config = {}
def image_path_from_index(self, index):
raise NotImplementedError
def gt_roidb(self):
raise NotImplementedError
def evaluate_detections(self, detections):
raise NotImplementedError
@property
def cache_path(self):
"""
make a directory to store all caches
:return: cache path
"""
cache_path = os.path.join(self.root_path, 'cache')
if not os.path.exists(cache_path):
os.mkdir(cache_path)
return cache_path
def image_path_at(self, index):
"""
access image at index in image database
:param index: image index in image database
:return: image path
"""
return self.image_path_from_index(self.image_set_index[index])
def load_rpn_data(self, full=False):
if full:
rpn_file = os.path.join(self.root_path, 'rpn_data', self.name + '_full_rpn.pkl')
else:
rpn_file = os.path.join(self.root_path, 'rpn_data', self.name + '_rpn.pkl')
assert os.path.exists(rpn_file), '%s rpn data not found at %s' % (self.name, rpn_file)
logger.info('%s loading rpn data from %s' % (self.name, rpn_file))
with open(rpn_file, 'rb') as f:
box_list = pickle.load(f)
return box_list
def load_rpn_roidb(self, gt_roidb):
"""
turn rpn detection boxes into roidb
:param gt_roidb: [image_index]['boxes', 'gt_classes', 'gt_overlaps', 'flipped']
:return: roidb: [image_index]['boxes', 'gt_classes', 'gt_overlaps', 'flipped']
"""
box_list = self.load_rpn_data()
return self.create_roidb_from_box_list(box_list, gt_roidb)
def rpn_roidb(self, gt_roidb, append_gt=False):
"""
get rpn roidb and ground truth roidb
:param gt_roidb: ground truth roidb
:param append_gt: append ground truth
:return: roidb of rpn
"""
if append_gt:
logger.info('%s appending ground truth annotations' % self.name)
rpn_roidb = self.load_rpn_roidb(gt_roidb)
roidb = IMDB.merge_roidbs(gt_roidb, rpn_roidb)
else:
roidb = self.load_rpn_roidb(gt_roidb)
return roidb
def create_roidb_from_box_list(self, box_list, gt_roidb):
"""
given ground truth, prepare roidb
:param box_list: [image_index] ndarray of [box_index][x1, x2, y1, y2]
:param gt_roidb: [image_index]['boxes', 'gt_classes', 'gt_overlaps', 'flipped']
:return: roidb: [image_index]['boxes', 'gt_classes', 'gt_overlaps', 'flipped']
"""
assert len(box_list) == self.num_images, 'number of boxes matrix must match number of images'
roidb = []
for i in range(self.num_images):
roi_rec = dict()
roi_rec['image'] = gt_roidb[i]['image']
roi_rec['height'] = gt_roidb[i]['height']
roi_rec['width'] = gt_roidb[i]['width']
boxes = box_list[i]
if boxes.shape[1] == 5:
boxes = boxes[:, :4]
num_boxes = boxes.shape[0]
overlaps = np.zeros((num_boxes, self.num_classes), dtype=np.float32)
if gt_roidb is not None and gt_roidb[i]['boxes'].size > 0:
gt_boxes = gt_roidb[i]['boxes']
gt_classes = gt_roidb[i]['gt_classes']
# n boxes and k gt_boxes => n * k overlap
gt_overlaps = bbox_overlaps(boxes.astype(np.float), gt_boxes.astype(np.float))
# for each box in n boxes, select only maximum overlap (must be greater than zero)
argmaxes = gt_overlaps.argmax(axis=1)
maxes = gt_overlaps.max(axis=1)
I = np.where(maxes > 0)[0]
overlaps[I, gt_classes[argmaxes[I]]] = maxes[I]
roi_rec.update({'boxes': boxes,
'gt_classes': np.zeros((num_boxes,), dtype=np.int32),
'gt_overlaps': overlaps,
'max_classes': overlaps.argmax(axis=1),
'max_overlaps': overlaps.max(axis=1),
'flipped': False})
# background roi => background class
zero_indexes = np.where(roi_rec['max_overlaps'] == 0)[0]
assert all(roi_rec['max_classes'][zero_indexes] == 0)
# foreground roi => foreground class
nonzero_indexes = np.where(roi_rec['max_overlaps'] > 0)[0]
assert all(roi_rec['max_classes'][nonzero_indexes] != 0)
roidb.append(roi_rec)
return roidb
def append_flipped_images(self, roidb):
"""
append flipped images to an roidb
flip boxes coordinates, images will be actually flipped when loading into network
:param roidb: [image_index]['boxes', 'gt_classes', 'gt_overlaps', 'flipped']
:return: roidb: [image_index]['boxes', 'gt_classes', 'gt_overlaps', 'flipped']
"""
logger.info('%s append flipped images to roidb' % self.name)
assert self.num_images == len(roidb)
for i in range(self.num_images):
roi_rec = roidb[i]
entry = {'image': roi_rec['image'],
'stream': roi_rec['stream'],
'height': roi_rec['height'],
'width': roi_rec['width'],
#'boxes': boxes,
'gt_classes': roidb[i]['gt_classes'],
'gt_overlaps': roidb[i]['gt_overlaps'],
'max_classes': roidb[i]['max_classes'],
'max_overlaps': roidb[i]['max_overlaps'],
'flipped': True}
for k in roi_rec:
if not k.startswith('boxes'):
continue
boxes = roi_rec[k].copy()
oldx1 = boxes[:, 0].copy()
oldx2 = boxes[:, 2].copy()
boxes[:, 0] = roi_rec['width'] - oldx2 - 1
boxes[:, 2] = roi_rec['width'] - oldx1 - 1
assert (boxes[:, 2] >= boxes[:, 0]).all()
entry[k] = boxes
if 'landmarks' in roi_rec:
k = 'landmarks'
landmarks = roi_rec[k].copy()
landmarks[:,:,0] *= -1
landmarks[:,:,0] += (roi_rec['width']-1)
#for a in range(0,10,2):
# oldx1 = landmarks[:, a].copy()
# landmarks[:,a] = roi_rec['width'] - oldx1 - 1
order = [1,0,2,4,3]
flandmarks = landmarks.copy()
for idx, a in enumerate(order):
flandmarks[:, idx,:] = landmarks[:,a,:]
entry[k] = flandmarks
if 'blur' in roi_rec:
entry['blur'] = roi_rec['blur']
roidb.append(entry)
self.image_set_index *= 2
return roidb
def evaluate_recall(self, roidb, candidate_boxes=None, thresholds=None):
"""
evaluate detection proposal recall metrics
record max overlap value for each gt box; return vector of overlap values
:param roidb: used to evaluate
:param candidate_boxes: if not given, use roidb's non-gt boxes
:param thresholds: array-like recall threshold
:return: None
ar: average recall, recalls: vector recalls at each IoU overlap threshold
thresholds: vector of IoU overlap threshold, gt_overlaps: vector of all ground-truth overlaps
"""
area_names = ['all', '0-25', '25-50', '50-100',
'100-200', '200-300', '300-inf']
area_ranges = [[0**2, 1e5**2], [0**2, 25**2], [25**2, 50**2], [50**2, 100**2],
[100**2, 200**2], [200**2, 300**2], [300**2, 1e5**2]]
area_counts = []
for area_name, area_range in zip(area_names[1:], area_ranges[1:]):
area_count = 0
for i in range(self.num_images):
if candidate_boxes is None:
# default is use the non-gt boxes from roidb
non_gt_inds = np.where(roidb[i]['gt_classes'] == 0)[0]
boxes = roidb[i]['boxes'][non_gt_inds, :]
else:
boxes = candidate_boxes[i]
boxes_areas = (boxes[:, 2] - boxes[:, 0] + 1) * (boxes[:, 3] - boxes[:, 1] + 1)
valid_range_inds = np.where((boxes_areas >= area_range[0]) & (boxes_areas < area_range[1]))[0]
area_count += len(valid_range_inds)
area_counts.append(area_count)
total_counts = float(sum(area_counts))
for area_name, area_count in zip(area_names[1:], area_counts):
logger.info('percentage of %s is %f' % (area_name, area_count / total_counts))
logger.info('average number of proposal is %f' % (total_counts / self.num_images))
for area_name, area_range in zip(area_names, area_ranges):
gt_overlaps = np.zeros(0)
num_pos = 0
for i in range(self.num_images):
# check for max_overlaps == 1 avoids including crowd annotations
max_gt_overlaps = roidb[i]['gt_overlaps'].max(axis=1)
gt_inds = np.where((roidb[i]['gt_classes'] > 0) & (max_gt_overlaps == 1))[0]
gt_boxes = roidb[i]['boxes'][gt_inds, :]
gt_areas = (gt_boxes[:, 2] - gt_boxes[:, 0] + 1) * (gt_boxes[:, 3] - gt_boxes[:, 1] + 1)
valid_gt_inds = np.where((gt_areas >= area_range[0]) & (gt_areas < area_range[1]))[0]
gt_boxes = gt_boxes[valid_gt_inds, :]
num_pos += len(valid_gt_inds)
if candidate_boxes is None:
# default is use the non-gt boxes from roidb
non_gt_inds = np.where(roidb[i]['gt_classes'] == 0)[0]
boxes = roidb[i]['boxes'][non_gt_inds, :]
else:
boxes = candidate_boxes[i]
if boxes.shape[0] == 0:
continue
overlaps = bbox_overlaps(boxes.astype(np.float), gt_boxes.astype(np.float))
_gt_overlaps = np.zeros((gt_boxes.shape[0]))
# choose whatever is smaller to iterate
rounds = min(boxes.shape[0], gt_boxes.shape[0])
for j in range(rounds):
# find which proposal maximally covers each gt box
argmax_overlaps = overlaps.argmax(axis=0)
# get the IoU amount of coverage for each gt box
max_overlaps = overlaps.max(axis=0)
# find which gt box is covered by most IoU
gt_ind = max_overlaps.argmax()
gt_ovr = max_overlaps.max()
assert (gt_ovr >= 0), '%s\n%s\n%s' % (boxes, gt_boxes, overlaps)
# find the proposal box that covers the best covered gt box
box_ind = argmax_overlaps[gt_ind]
# record the IoU coverage of this gt box
_gt_overlaps[j] = overlaps[box_ind, gt_ind]
assert (_gt_overlaps[j] == gt_ovr)
# mark the proposal box and the gt box as used
overlaps[box_ind, :] = -1
overlaps[:, gt_ind] = -1
# append recorded IoU coverage level
gt_overlaps = np.hstack((gt_overlaps, _gt_overlaps))
gt_overlaps = np.sort(gt_overlaps)
if thresholds is None:
step = 0.05
thresholds = np.arange(0.5, 0.95 + 1e-5, step)
recalls = np.zeros_like(thresholds)
# compute recall for each IoU threshold
for i, t in enumerate(thresholds):
recalls[i] = (gt_overlaps >= t).sum() / float(num_pos)
ar = recalls.mean()
# print results
print('average recall for {}: {:.3f}, number:{}'.format(area_name, ar, num_pos))
for threshold, recall in zip(thresholds, recalls):
print('recall @{:.2f}: {:.3f}'.format(threshold, recall))
@staticmethod
def merge_roidbs(a, b):
"""
merge roidbs into one
:param a: roidb to be merged into
:param b: roidb to be merged
:return: merged imdb
"""
assert len(a) == len(b)
for i in range(len(a)):
a[i]['boxes'] = np.vstack((a[i]['boxes'], b[i]['boxes']))
a[i]['gt_classes'] = np.hstack((a[i]['gt_classes'], b[i]['gt_classes']))
a[i]['gt_overlaps'] = np.vstack((a[i]['gt_overlaps'], b[i]['gt_overlaps']))
a[i]['max_classes'] = np.hstack((a[i]['max_classes'], b[i]['max_classes']))
a[i]['max_overlaps'] = np.hstack((a[i]['max_overlaps'], b[i]['max_overlaps']))
return a

View File

@@ -0,0 +1,181 @@
from __future__ import print_function
try:
import cPickle as pickle
except ImportError:
import pickle
import cv2
import os
import numpy as np
import json
from PIL import Image
from ..logger import logger
from .imdb import IMDB
from .ds_utils import unique_boxes, filter_small_boxes
from ..config import config
class retinaface(IMDB):
def __init__(self, image_set, root_path, data_path):
super(retinaface, self).__init__('retinaface', image_set, root_path, data_path)
#assert image_set=='train'
split = image_set
self._split = image_set
self._image_set = image_set
self.root_path = root_path
self.data_path = data_path
self._dataset_path = self.data_path
self._imgs_path = os.path.join(self._dataset_path, image_set, 'images')
self._fp_bbox_map = {}
label_file = os.path.join(self._dataset_path, image_set, 'label.txt')
name = None
for line in open(label_file, 'r'):
line = line.strip()
if line.startswith('#'):
name = line[1:].strip()
self._fp_bbox_map[name] = []
continue
assert name is not None
assert name in self._fp_bbox_map
self._fp_bbox_map[name].append(line)
print('origin image size', len(self._fp_bbox_map))
#self.num_images = len(self._image_paths)
#self._image_index = range(len(self._image_paths))
self.classes = ['bg', 'face']
self.num_classes = len(self.classes)
def gt_roidb(self):
cache_file = os.path.join(self.cache_path, '{}_{}_gt_roidb.pkl'.format(self.name, self._split))
if os.path.exists(cache_file):
with open(cache_file, 'rb') as fid:
roidb = pickle.load(fid)
print('{} gt roidb loaded from {}'.format(self.name, cache_file))
self.num_images = len(roidb)
return roidb
roidb = []
max_num_boxes = 0
nonattr_box_num = 0
landmark_num = 0
for fp in self._fp_bbox_map:
if self._split=='test':
image_path = os.path.join(self._imgs_path, fp)
roi = {'image': image_path}
roidb.append(roi)
continue
boxes = np.zeros([len(self._fp_bbox_map[fp]), 4], np.float)
landmarks = np.zeros([len(self._fp_bbox_map[fp]), 5, 3], np.float)
blur = np.zeros((len(self._fp_bbox_map[fp]),), np.float)
boxes_mask = []
gt_classes = np.ones([len(self._fp_bbox_map[fp])], np.int32)
overlaps = np.zeros([len(self._fp_bbox_map[fp]), 2], np.float)
ix = 0
for aline in self._fp_bbox_map[fp]:
imsize = Image.open(os.path.join(self._imgs_path, fp)).size
values = [float(x) for x in aline.strip().split()]
bbox = [values[0], values[1], values[0]+values[2], values[1]+values[3]]
x1 = bbox[0]
y1 = bbox[1]
x2 = min(imsize[0], bbox[2])
y2 = min(imsize[1], bbox[3])
if x1>=x2 or y1>=y2:
continue
if config.BBOX_MASK_THRESH>0:
if (x2 - x1) < config.BBOX_MASK_THRESH or y2 - y1 < config.BBOX_MASK_THRESH:
boxes_mask.append(np.array([x1, y1, x2, y2], np.float))
continue
if (x2 - x1) < config.TRAIN.MIN_BOX_SIZE or y2 - y1 < config.TRAIN.MIN_BOX_SIZE:
continue
boxes[ix, :] = np.array([x1, y1, x2, y2], np.float)
if self._split=='train':
landmark = np.array( values[4:19], dtype=np.float32 ).reshape((5,3))
for li in range(5):
#print(landmark)
if landmark[li][0]==-1. and landmark[li][1]==-1.: #missing landmark
assert landmark[li][2]==-1
else:
assert landmark[li][2]>=0
if li==0:
landmark_num+=1
if landmark[li][2]==0.0:#visible
landmark[li][2] = 1.0
else:
landmark[li][2] = 0.0
landmarks[ix] = landmark
blur[ix] = values[19]
#print(aline, blur[ix])
if blur[ix]<0.0:
blur[ix] = 0.3
nonattr_box_num+=1
cls = int(1)
gt_classes[ix] = cls
overlaps[ix, cls] = 1.0
ix += 1
max_num_boxes = max(max_num_boxes, ix)
#overlaps = scipy.sparse.csr_matrix(overlaps)
if self._split=='train' and ix==0:
continue
boxes = boxes[:ix,:]
landmarks = landmarks[:ix,:,:]
blur = blur[:ix]
gt_classes = gt_classes[:ix]
overlaps = overlaps[:ix,:]
image_path = os.path.join(self._imgs_path, fp)
with open(image_path, 'rb') as fin:
stream = fin.read()
stream = np.fromstring(stream, dtype=np.uint8)
roi = {
'image': image_path,
'stream': stream,
'height': imsize[1],
'width': imsize[0],
'boxes': boxes,
'landmarks': landmarks,
'blur': blur,
'gt_classes': gt_classes,
'gt_overlaps': overlaps,
'max_classes': overlaps.argmax(axis=1),
'max_overlaps': overlaps.max(axis=1),
'flipped': False,
}
if len(boxes_mask)>0:
boxes_mask = np.array(boxes_mask)
roi['boxes_mask'] = boxes_mask
roidb.append(roi)
for roi in roidb:
roi['max_num_boxes'] = max_num_boxes
self.num_images = len(roidb)
print('roidb size', len(roidb))
print('non attr box num', nonattr_box_num)
print('landmark num', landmark_num)
with open(cache_file, 'wb') as fid:
pickle.dump(roidb, fid, pickle.HIGHEST_PROTOCOL)
print('wrote gt roidb to {}'.format(cache_file))
return roidb
def write_detections(self, all_boxes, output_dir='./output/'):
pass
def evaluate_detections(self, all_boxes, output_dir='./output/',method_name='insightdetection'):
pass

View File

@@ -0,0 +1,6 @@
import logging
# set up logger
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)

View File

View File

@@ -0,0 +1,37 @@
from rcnn.config import config
import numpy as np
def compute_assign_targets(rois, threshold):
rois_area = np.sqrt((rois[:, 2] - rois[:, 0] + 1) * (rois[:, 3] - rois[:, 1] + 1))
num_rois = np.shape(rois)[0]
assign_levels = np.zeros(num_rois, dtype=np.uint8)
for i, stride in enumerate(config.RCNN_FEAT_STRIDE):
thd = threshold[i]
idx = np.logical_and(thd[1] <= rois_area, rois_area < thd[0])
assign_levels[idx] = stride
assert 0 not in assign_levels, "All rois should assign to specify levels."
return assign_levels
def add_assign_targets(roidb):
"""
given roidb, add ['assign_level']
:param roidb: roidb to be processed. must have gone through imdb.prepare_roidb
"""
print 'add assign targets'
assert len(roidb) > 0
assert 'boxes' in roidb[0]
area_threshold = [[np.inf, 448],
[448, 224],
[224, 112],
[112, 0]]
assert len(config.RCNN_FEAT_STRIDE) == len(area_threshold)
num_images = len(roidb)
for im_i in range(num_images):
rois = roidb[im_i]['boxes']
roidb[im_i]['assign_levels'] = compute_assign_targets(rois, area_threshold)

View File

@@ -0,0 +1,255 @@
"""
This file has functions about generating bounding box regression targets
"""
from ..pycocotools.mask import encode
import numpy as np
from ..logger import logger
from .bbox_transform import bbox_overlaps, bbox_transform
from rcnn.config import config
import math
import cv2
import PIL.Image as Image
import threading
import Queue
def compute_bbox_regression_targets(rois, overlaps, labels):
"""
given rois, overlaps, gt labels, compute bounding box regression targets
:param rois: roidb[i]['boxes'] k * 4
:param overlaps: roidb[i]['max_overlaps'] k * 1
:param labels: roidb[i]['max_classes'] k * 1
:return: targets[i][class, dx, dy, dw, dh] k * 5
"""
# Ensure ROIs are floats
rois = rois.astype(np.float, copy=False)
# Sanity check
if len(rois) != len(overlaps):
logger.warning('bbox regression: len(rois) != len(overlaps)')
# Indices of ground-truth ROIs
gt_inds = np.where(overlaps == 1)[0]
if len(gt_inds) == 0:
logger.warning('bbox regression: len(gt_inds) == 0')
# Indices of examples for which we try to make predictions
ex_inds = np.where(overlaps >= config.TRAIN.BBOX_REGRESSION_THRESH)[0]
# Get IoU overlap between each ex ROI and gt ROI
ex_gt_overlaps = bbox_overlaps(rois[ex_inds, :], rois[gt_inds, :])
# Find which gt ROI each ex ROI has max overlap with:
# this will be the ex ROI's gt target
gt_assignment = ex_gt_overlaps.argmax(axis=1)
gt_rois = rois[gt_inds[gt_assignment], :]
ex_rois = rois[ex_inds, :]
targets = np.zeros((rois.shape[0], 5), dtype=np.float32)
targets[ex_inds, 0] = labels[ex_inds]
targets[ex_inds, 1:] = bbox_transform(ex_rois, gt_rois)
return targets
def add_bbox_regression_targets(roidb):
"""
given roidb, add ['bbox_targets'] and normalize bounding box regression targets
:param roidb: roidb to be processed. must have gone through imdb.prepare_roidb
:return: means, std variances of targets
"""
logger.info('bbox regression: add bounding box regression targets')
assert len(roidb) > 0
assert 'max_classes' in roidb[0]
num_images = len(roidb)
num_classes = roidb[0]['gt_overlaps'].shape[1]
for im_i in range(num_images):
rois = roidb[im_i]['boxes']
max_overlaps = roidb[im_i]['max_overlaps']
max_classes = roidb[im_i]['max_classes']
roidb[im_i]['bbox_targets'] = compute_bbox_regression_targets(rois, max_overlaps, max_classes)
if config.TRAIN.BBOX_NORMALIZATION_PRECOMPUTED:
# use fixed / precomputed means and stds instead of empirical values
means = np.tile(np.array(config.TRAIN.BBOX_MEANS), (num_classes, 1))
stds = np.tile(np.array(config.TRAIN.BBOX_STDS), (num_classes, 1))
else:
# compute mean, std values
class_counts = np.zeros((num_classes, 1)) + 1e-14
sums = np.zeros((num_classes, 4))
squared_sums = np.zeros((num_classes, 4))
for im_i in range(num_images):
targets = roidb[im_i]['bbox_targets']
for cls in range(1, num_classes):
cls_indexes = np.where(targets[:, 0] == cls)[0]
if cls_indexes.size > 0:
class_counts[cls] += cls_indexes.size
sums[cls, :] += targets[cls_indexes, 1:].sum(axis=0)
squared_sums[cls, :] += (targets[cls_indexes, 1:] ** 2).sum(axis=0)
means = sums / class_counts
# var(x) = E(x^2) - E(x)^2
stds = np.sqrt(squared_sums / class_counts - means ** 2)
# normalized targets
for im_i in range(num_images):
targets = roidb[im_i]['bbox_targets']
for cls in range(1, num_classes):
cls_indexes = np.where(targets[:, 0] == cls)[0]
roidb[im_i]['bbox_targets'][cls_indexes, 1:] -= means[cls, :]
roidb[im_i]['bbox_targets'][cls_indexes, 1:] /= stds[cls, :]
return means.ravel(), stds.ravel()
def expand_bbox_regression_targets(bbox_targets_data, num_classes):
"""
expand from 5 to 4 * num_classes; only the right class has non-zero bbox regression targets
:param bbox_targets_data: [k * 5]
:param num_classes: number of classes
:return: bbox target processed [k * 4 num_classes]
bbox_weights ! only foreground boxes have bbox regression computation!
"""
classes = bbox_targets_data[:, 0]
bbox_targets = np.zeros((classes.size, 4 * num_classes), dtype=np.float32)
bbox_weights = np.zeros(bbox_targets.shape, dtype=np.float32)
indexes = np.where(classes > 0)[0]
for index in indexes:
cls = classes[index]
start = int(4 * cls)
end = start + 4
bbox_targets[index, start:end] = bbox_targets_data[index, 1:]
bbox_weights[index, start:end] = config.TRAIN.BBOX_WEIGHTS
return bbox_targets, bbox_weights
def compute_mask_and_label(ex_rois, ex_labels, seg, flipped):
# assert os.path.exists(seg_gt), 'Path does not exist: {}'.format(seg_gt)
# im = Image.open(seg_gt)
# pixel = list(im.getdata())
# pixel = np.array(pixel).reshape([im.size[1], im.size[0]])
im = Image.open(seg)
pixel = list(im.getdata())
ins_seg = np.array(pixel).reshape([im.size[1], im.size[0]])
if flipped:
ins_seg = ins_seg[:, ::-1]
rois = ex_rois
n_rois = ex_rois.shape[0]
label = ex_labels
class_id = config.CLASS_ID
mask_target = np.zeros((n_rois, 28, 28), dtype=np.int8)
mask_label = np.zeros((n_rois), dtype=np.int8)
for n in range(n_rois):
target = ins_seg[int(rois[n, 1]): int(rois[n, 3]), int(rois[n, 0]): int(rois[n, 2])]
ids = np.unique(target)
ins_id = 0
max_count = 0
for id in ids:
if math.floor(id / 1000) == class_id[int(label[int(n)])]:
px = np.where(ins_seg == int(id))
x_min = np.min(px[1])
y_min = np.min(px[0])
x_max = np.max(px[1])
y_max = np.max(px[0])
x1 = max(rois[n, 0], x_min)
y1 = max(rois[n, 1], y_min)
x2 = min(rois[n, 2], x_max)
y2 = min(rois[n, 3], y_max)
iou = (x2 - x1) * (y2 - y1)
iou = iou / ((rois[n, 2] - rois[n, 0]) * (rois[n, 3] - rois[n, 1])
+ (x_max - x_min) * (y_max - y_min) - iou)
if iou > max_count:
ins_id = id
max_count = iou
if max_count == 0:
continue
# print max_count
mask = np.zeros(target.shape)
idx = np.where(target == ins_id)
mask[idx] = 1
mask = cv2.resize(mask, (28, 28), interpolation=cv2.INTER_NEAREST)
mask_target[n] = mask
mask_label[n] = label[int(n)]
return mask_target, mask_label
def compute_bbox_mask_targets_and_label(rois, overlaps, labels, seg, flipped):
"""
given rois, overlaps, gt labels, seg, compute bounding box mask targets
:param rois: roidb[i]['boxes'] k * 4
:param overlaps: roidb[i]['max_overlaps'] k * 1
:param labels: roidb[i]['max_classes'] k * 1
:return: targets[i][class, dx, dy, dw, dh] k * 5
"""
# Ensure ROIs are floats
rois = rois.astype(np.float, copy=False)
# Sanity check
if len(rois) != len(overlaps):
print 'bbox regression: this should not happen'
# Indices of ground-truth ROIs
gt_inds = np.where(overlaps == 1)[0]
if len(gt_inds) == 0:
print 'something wrong : zero ground truth rois'
# Indices of examples for which we try to make predictions
ex_inds = np.where(overlaps >= config.TRAIN.BBOX_REGRESSION_THRESH)[0]
# Get IoU overlap between each ex ROI and gt ROI
ex_gt_overlaps = bbox_overlaps(rois[ex_inds, :], rois[gt_inds, :])
# Find which gt ROI each ex ROI has max overlap with:
# this will be the ex ROI's gt target
gt_assignment = ex_gt_overlaps.argmax(axis=1)
gt_rois = rois[gt_inds[gt_assignment], :]
ex_rois = rois[ex_inds, :]
mask_targets, mask_label = compute_mask_and_label(ex_rois, labels[ex_inds], seg, flipped)
return mask_targets, mask_label, ex_inds
def add_mask_targets(roidb):
"""
given roidb, add ['bbox_targets'] and normalize bounding box regression targets
:param roidb: roidb to be processed. must have gone through imdb.prepare_roidb
:return: means, std variances of targets
"""
print 'add bounding box mask targets'
assert len(roidb) > 0
assert 'max_classes' in roidb[0]
num_images = len(roidb)
# Multi threads processing
im_quene = Queue.Queue(maxsize=0)
for im_i in range(num_images):
im_quene.put(im_i)
def process():
while not im_quene.empty():
im_i = im_quene.get()
print "-----process img {}".format(im_i)
rois = roidb[im_i]['boxes']
max_overlaps = roidb[im_i]['max_overlaps']
max_classes = roidb[im_i]['max_classes']
ins_seg = roidb[im_i]['ins_seg']
flipped = roidb[im_i]['flipped']
roidb[im_i]['mask_targets'], roidb[im_i]['mask_labels'], roidb[im_i]['mask_inds'] = \
compute_bbox_mask_targets_and_label(rois, max_overlaps, max_classes, ins_seg, flipped)
threads = [threading.Thread(target=process, args=()) for i in xrange(10)]
for t in threads: t.start()
for t in threads: t.join()
# Single thread
# for im_i in range(num_images):
# print "-----processing img {}".format(im_i)
# rois = roidb[im_i]['boxes']
# max_overlaps = roidb[im_i]['max_overlaps']
# max_classes = roidb[im_i]['max_classes']
# ins_seg = roidb[im_i]['ins_seg']
# # roidb[im_i]['mask_targets'] = compute_bbox_mask_targets(rois, max_overlaps, max_classes, ins_seg)
# roidb[im_i]['mask_targets'], roidb[im_i]['mask_labels'], roidb[im_i]['mask_inds'] = \
# compute_bbox_mask_targets_and_label(rois, max_overlaps, max_classes, ins_seg)

View File

@@ -0,0 +1,216 @@
import numpy as np
from ..cython.bbox import bbox_overlaps_cython
from rcnn.config import config
def bbox_overlaps(boxes, query_boxes):
return bbox_overlaps_cython(boxes, query_boxes)
def bbox_overlaps_py(boxes, query_boxes):
"""
determine overlaps between boxes and query_boxes
:param boxes: n * 4 bounding boxes
:param query_boxes: k * 4 bounding boxes
:return: overlaps: n * k overlaps
"""
n_ = boxes.shape[0]
k_ = query_boxes.shape[0]
overlaps = np.zeros((n_, k_), dtype=np.float)
for k in range(k_):
query_box_area = (query_boxes[k, 2] - query_boxes[k, 0] + 1) * (query_boxes[k, 3] - query_boxes[k, 1] + 1)
for n in range(n_):
iw = min(boxes[n, 2], query_boxes[k, 2]) - max(boxes[n, 0], query_boxes[k, 0]) + 1
if iw > 0:
ih = min(boxes[n, 3], query_boxes[k, 3]) - max(boxes[n, 1], query_boxes[k, 1]) + 1
if ih > 0:
box_area = (boxes[n, 2] - boxes[n, 0] + 1) * (boxes[n, 3] - boxes[n, 1] + 1)
all_area = float(box_area + query_box_area - iw * ih)
overlaps[n, k] = iw * ih / all_area
return overlaps
def clip_boxes(boxes, im_shape):
"""
Clip boxes to image boundaries.
:param boxes: [N, 4* num_classes]
:param im_shape: tuple of 2
:return: [N, 4* num_classes]
"""
# x1 >= 0
boxes[:, 0::4] = np.maximum(np.minimum(boxes[:, 0::4], im_shape[1] - 1), 0)
# y1 >= 0
boxes[:, 1::4] = np.maximum(np.minimum(boxes[:, 1::4], im_shape[0] - 1), 0)
# x2 < im_shape[1]
boxes[:, 2::4] = np.maximum(np.minimum(boxes[:, 2::4], im_shape[1] - 1), 0)
# y2 < im_shape[0]
boxes[:, 3::4] = np.maximum(np.minimum(boxes[:, 3::4], im_shape[0] - 1), 0)
return boxes
def nonlinear_transform(ex_rois, gt_rois):
"""
compute bounding box regression targets from ex_rois to gt_rois
:param ex_rois: [N, 4]
:param gt_rois: [N, 4]
:return: [N, 4]
"""
assert ex_rois.shape[0] == gt_rois.shape[0], 'inconsistent rois number'
ex_widths = ex_rois[:, 2] - ex_rois[:, 0] + 1.0
ex_heights = ex_rois[:, 3] - ex_rois[:, 1] + 1.0
ex_ctr_x = ex_rois[:, 0] + 0.5 * (ex_widths - 1.0)
ex_ctr_y = ex_rois[:, 1] + 0.5 * (ex_heights - 1.0)
gt_widths = gt_rois[:, 2] - gt_rois[:, 0] + 1.0
gt_heights = gt_rois[:, 3] - gt_rois[:, 1] + 1.0
gt_ctr_x = gt_rois[:, 0] + 0.5 * (gt_widths - 1.0)
gt_ctr_y = gt_rois[:, 1] + 0.5 * (gt_heights - 1.0)
targets_dx = (gt_ctr_x - ex_ctr_x) / (ex_widths + 1e-14)
targets_dy = (gt_ctr_y - ex_ctr_y) / (ex_heights + 1e-14)
targets_dw = np.log(gt_widths / ex_widths)
targets_dh = np.log(gt_heights / ex_heights)
if gt_rois.shape[1]<=4:
targets = np.vstack(
(targets_dx, targets_dy, targets_dw, targets_dh)).transpose()
return targets
else:
targets = [targets_dx, targets_dy, targets_dw, targets_dh]
if config.USE_BLUR:
for i in range(4, gt_rois.shape[1]):
t = gt_rois[:,i]
targets.append(t)
targets = np.vstack(targets).transpose()
return targets
def landmark_transform(ex_rois, gt_rois):
assert ex_rois.shape[0] == gt_rois.shape[0], 'inconsistent rois number'
ex_widths = ex_rois[:, 2] - ex_rois[:, 0] + 1.0
ex_heights = ex_rois[:, 3] - ex_rois[:, 1] + 1.0
ex_ctr_x = ex_rois[:, 0] + 0.5 * (ex_widths - 1.0)
ex_ctr_y = ex_rois[:, 1] + 0.5 * (ex_heights - 1.0)
targets = []
for i in range(gt_rois.shape[1]):
for j in range(gt_rois.shape[2]):
if not config.USE_OCCLUSION and j==2:
continue
if j==0: #w
target = (gt_rois[:,i,j] - ex_ctr_x) / (ex_widths + 1e-14)
elif j==1: #h
target = (gt_rois[:,i,j] - ex_ctr_y) / (ex_heights + 1e-14)
else: #visibile
target = gt_rois[:,i,j]
targets.append(target)
targets = np.vstack(targets).transpose()
return targets
def nonlinear_pred(boxes, box_deltas):
"""
Transform the set of class-agnostic boxes into class-specific boxes
by applying the predicted offsets (box_deltas)
:param boxes: !important [N 4]
:param box_deltas: [N, 4 * num_classes]
:return: [N 4 * num_classes]
"""
if boxes.shape[0] == 0:
return np.zeros((0, box_deltas.shape[1]))
boxes = boxes.astype(np.float, copy=False)
widths = boxes[:, 2] - boxes[:, 0] + 1.0
heights = boxes[:, 3] - boxes[:, 1] + 1.0
ctr_x = boxes[:, 0] + 0.5 * (widths - 1.0)
ctr_y = boxes[:, 1] + 0.5 * (heights - 1.0)
dx = box_deltas[:, 0::4]
dy = box_deltas[:, 1::4]
dw = box_deltas[:, 2::4]
dh = box_deltas[:, 3::4]
pred_ctr_x = dx * widths[:, np.newaxis] + ctr_x[:, np.newaxis]
pred_ctr_y = dy * heights[:, np.newaxis] + ctr_y[:, np.newaxis]
pred_w = np.exp(dw) * widths[:, np.newaxis]
pred_h = np.exp(dh) * heights[:, np.newaxis]
pred_boxes = np.zeros(box_deltas.shape)
# x1
pred_boxes[:, 0::4] = pred_ctr_x - 0.5 * (pred_w - 1.0)
# y1
pred_boxes[:, 1::4] = pred_ctr_y - 0.5 * (pred_h - 1.0)
# x2
pred_boxes[:, 2::4] = pred_ctr_x + 0.5 * (pred_w - 1.0)
# y2
pred_boxes[:, 3::4] = pred_ctr_y + 0.5 * (pred_h - 1.0)
return pred_boxes
def landmark_pred(boxes, landmark_deltas):
if boxes.shape[0] == 0:
return np.zeros((0, landmark_deltas.shape[1]))
boxes = boxes.astype(np.float, copy=False)
widths = boxes[:, 2] - boxes[:, 0] + 1.0
heights = boxes[:, 3] - boxes[:, 1] + 1.0
ctr_x = boxes[:, 0] + 0.5 * (widths - 1.0)
ctr_y = boxes[:, 1] + 0.5 * (heights - 1.0)
preds = []
for i in range(landmark_deltas.shape[1]):
if i%2==0:
pred = (landmark_deltas[:,i]*widths + ctr_x)
else:
pred = (landmark_deltas[:,i]*heights + ctr_y)
preds.append(pred)
preds = np.vstack(preds).transpose()
return preds
def iou_transform(ex_rois, gt_rois):
""" return bbox targets, IoU loss uses gt_rois as gt """
assert ex_rois.shape[0] == gt_rois.shape[0], 'inconsistent rois number'
return gt_rois
def iou_pred(boxes, box_deltas):
"""
Transform the set of class-agnostic boxes into class-specific boxes
by applying the predicted offsets (box_deltas)
:param boxes: !important [N 4]
:param box_deltas: [N, 4 * num_classes]
:return: [N 4 * num_classes]
"""
if boxes.shape[0] == 0:
return np.zeros((0, box_deltas.shape[1]))
boxes = boxes.astype(np.float, copy=False)
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
dx1 = box_deltas[:, 0::4]
dy1 = box_deltas[:, 1::4]
dx2 = box_deltas[:, 2::4]
dy2 = box_deltas[:, 3::4]
pred_boxes = np.zeros(box_deltas.shape)
# x1
pred_boxes[:, 0::4] = dx1 + x1[:, np.newaxis]
# y1
pred_boxes[:, 1::4] = dy1 + y1[:, np.newaxis]
# x2
pred_boxes[:, 2::4] = dx2 + x2[:, np.newaxis]
# y2
pred_boxes[:, 3::4] = dy2 + y2[:, np.newaxis]
return pred_boxes
# define bbox_transform and bbox_pred
bbox_transform = nonlinear_transform
bbox_pred = nonlinear_pred

View File

@@ -0,0 +1,127 @@
"""
Generate base anchors on index 0
"""
from __future__ import print_function
import sys
from builtins import range
import numpy as np
from ..cython.anchors import anchors_cython
from ..config import config
def anchors_plane(feat_h, feat_w, stride, base_anchor):
return anchors_cython(feat_h, feat_w, stride, base_anchor)
def generate_anchors(base_size=16, ratios=[0.5, 1, 2],
scales=2 ** np.arange(3, 6), stride=16, dense_anchor=False):
"""
Generate anchor (reference) windows by enumerating aspect ratios X
scales wrt a reference (0, 0, 15, 15) window.
"""
base_anchor = np.array([1, 1, base_size, base_size]) - 1
ratio_anchors = _ratio_enum(base_anchor, ratios)
anchors = np.vstack([_scale_enum(ratio_anchors[i, :], scales)
for i in range(ratio_anchors.shape[0])])
if dense_anchor:
assert stride%2==0
anchors2 = anchors.copy()
anchors2[:,:] += int(stride/2)
anchors = np.vstack( (anchors, anchors2) )
#print('GA',base_anchor.shape, ratio_anchors.shape, anchors.shape)
return anchors
#def generate_anchors_fpn(base_size=[64,32,16,8,4], ratios=[0.5, 1, 2], scales=8):
# """
# Generate anchor (reference) windows by enumerating aspect ratios X
# scales wrt a reference (0, 0, 15, 15) window.
# """
# anchors = []
# _ratios = ratios.reshape( (len(base_size), -1) )
# _scales = scales.reshape( (len(base_size), -1) )
# for i,bs in enumerate(base_size):
# __ratios = _ratios[i]
# __scales = _scales[i]
# #print('anchors_fpn', bs, __ratios, __scales, file=sys.stderr)
# r = generate_anchors(bs, __ratios, __scales)
# #print('anchors_fpn', r.shape, file=sys.stderr)
# anchors.append(r)
# return anchors
def generate_anchors_fpn(dense_anchor=False, cfg = None):
#assert(False)
"""
Generate anchor (reference) windows by enumerating aspect ratios X
scales wrt a reference (0, 0, 15, 15) window.
"""
if cfg is None:
cfg = config.RPN_ANCHOR_CFG
RPN_FEAT_STRIDE = []
for k in cfg:
RPN_FEAT_STRIDE.append( int(k) )
RPN_FEAT_STRIDE = sorted(RPN_FEAT_STRIDE, reverse=True)
anchors = []
for k in RPN_FEAT_STRIDE:
v = cfg[str(k)]
bs = v['BASE_SIZE']
__ratios = np.array(v['RATIOS'])
__scales = np.array(v['SCALES'])
stride = int(k)
#print('anchors_fpn', bs, __ratios, __scales, file=sys.stderr)
r = generate_anchors(bs, __ratios, __scales, stride, dense_anchor)
#print('anchors_fpn', r.shape, file=sys.stderr)
anchors.append(r)
return anchors
def _whctrs(anchor):
"""
Return width, height, x center, and y center for an anchor (window).
"""
w = anchor[2] - anchor[0] + 1
h = anchor[3] - anchor[1] + 1
x_ctr = anchor[0] + 0.5 * (w - 1)
y_ctr = anchor[1] + 0.5 * (h - 1)
return w, h, x_ctr, y_ctr
def _mkanchors(ws, hs, x_ctr, y_ctr):
"""
Given a vector of widths (ws) and heights (hs) around a center
(x_ctr, y_ctr), output a set of anchors (windows).
"""
ws = ws[:, np.newaxis]
hs = hs[:, np.newaxis]
anchors = np.hstack((x_ctr - 0.5 * (ws - 1),
y_ctr - 0.5 * (hs - 1),
x_ctr + 0.5 * (ws - 1),
y_ctr + 0.5 * (hs - 1)))
return anchors
def _ratio_enum(anchor, ratios):
"""
Enumerate a set of anchors for each aspect ratio wrt an anchor.
"""
w, h, x_ctr, y_ctr = _whctrs(anchor)
size = w * h
size_ratios = size / ratios
ws = np.round(np.sqrt(size_ratios))
hs = np.round(ws * ratios)
anchors = _mkanchors(ws, hs, x_ctr, y_ctr)
return anchors
def _scale_enum(anchor, scales):
"""
Enumerate a set of anchors for each scale wrt an anchor.
"""
w, h, x_ctr, y_ctr = _whctrs(anchor)
ws = w * scales
hs = h * scales
anchors = _mkanchors(ws, hs, x_ctr, y_ctr)
return anchors

View File

@@ -0,0 +1,64 @@
import numpy as np
from ..cython.cpu_nms import cpu_nms
try:
from ..cython.gpu_nms import gpu_nms
except ImportError:
gpu_nms = None
def py_nms_wrapper(thresh):
def _nms(dets):
return nms(dets, thresh)
return _nms
def cpu_nms_wrapper(thresh):
def _nms(dets):
return cpu_nms(dets, thresh)
return _nms
def gpu_nms_wrapper(thresh, device_id):
def _nms(dets):
return gpu_nms(dets, thresh, device_id)
if gpu_nms is not None:
return _nms
else:
return cpu_nms_wrapper(thresh)
def nms(dets, thresh):
"""
greedily select boxes with high confidence and overlap with current maximum <= thresh
rule out overlap >= thresh
:param dets: [[x1, y1, x2, y2 score]]
:param thresh: retain overlap < thresh
:return: indexes to keep
"""
x1 = dets[:, 0]
y1 = dets[:, 1]
x2 = dets[:, 2]
y2 = dets[:, 3]
scores = dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep

610
retinaface/retinaface.py Normal file
View File

@@ -0,0 +1,610 @@
from __future__ import print_function
import sys
import os
import datetime
import time
import numpy as np
import mxnet as mx
from mxnet import ndarray as nd
import cv2
#from rcnn import config
from rcnn.logger import logger
#from rcnn.processing.bbox_transform import nonlinear_pred, clip_boxes, landmark_pred
from rcnn.processing.bbox_transform import clip_boxes
from rcnn.processing.generate_anchor import generate_anchors_fpn, anchors_plane
from rcnn.processing.nms import gpu_nms_wrapper, cpu_nms_wrapper
from rcnn.processing.bbox_transform import bbox_overlaps
class RetinaFace:
def __init__(self, prefix, epoch, ctx_id=0, network='net3', nms=0.4, nocrop=False, decay4 = 0.5, vote=False):
self.ctx_id = ctx_id
self.network = network
self.decay4 = decay4
self.nms_threshold = nms
self.vote = vote
self.nocrop = nocrop
self.debug = False
self.fpn_keys = []
self.anchor_cfg = None
pixel_means=[0.0, 0.0, 0.0]
pixel_stds=[1.0, 1.0, 1.0]
pixel_scale = 1.0
self.preprocess = False
_ratio = (1.,)
fmc = 3
if network=='ssh' or network=='vgg':
pixel_means=[103.939, 116.779, 123.68]
self.preprocess = True
elif network=='net3':
_ratio = (1.,)
elif network=='net3a':
_ratio = (1.,1.5)
elif network=='net6': #like pyramidbox or s3fd
fmc = 6
elif network=='net5': #retinaface
fmc = 5
elif network=='net5a':
fmc = 5
_ratio = (1.,1.5)
elif network=='net4':
fmc = 4
elif network=='net4a':
fmc = 4
_ratio = (1.,1.5)
else:
assert False, 'network setting error %s'%network
if fmc==3:
self._feat_stride_fpn = [32, 16, 8]
self.anchor_cfg = {
'32': {'SCALES': (32,16), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'16': {'SCALES': (8,4), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'8': {'SCALES': (2,1), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
}
elif fmc==4:
self._feat_stride_fpn = [32, 16, 8, 4]
self.anchor_cfg = {
'32': {'SCALES': (32,16), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'16': {'SCALES': (8,4), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'8': {'SCALES': (2,1), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'4': {'SCALES': (2,1), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
}
elif fmc==6:
self._feat_stride_fpn = [128, 64, 32, 16, 8, 4]
self.anchor_cfg = {
'128': {'SCALES': (32,), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'64': {'SCALES': (16,), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'32': {'SCALES': (8,), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'16': {'SCALES': (4,), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'8': {'SCALES': (2,), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
'4': {'SCALES': (1,), 'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999},
}
elif fmc==5:
self._feat_stride_fpn = [64, 32, 16, 8, 4]
self.anchor_cfg = {}
_ass = 2.0**(1.0/3)
_basescale = 1.0
for _stride in [4, 8, 16, 32, 64]:
key = str(_stride)
value = {'BASE_SIZE': 16, 'RATIOS': _ratio, 'ALLOWED_BORDER': 9999}
scales = []
for _ in range(3):
scales.append(_basescale)
_basescale *= _ass
value['SCALES'] = tuple(scales)
self.anchor_cfg[key] = value
print(self._feat_stride_fpn, self.anchor_cfg)
for s in self._feat_stride_fpn:
self.fpn_keys.append('stride%s'%s)
dense_anchor = False
#self._anchors_fpn = dict(zip(self.fpn_keys, generate_anchors_fpn(base_size=fpn_base_size, scales=self._scales, ratios=self._ratios)))
self._anchors_fpn = dict(zip(self.fpn_keys, generate_anchors_fpn(dense_anchor=dense_anchor, cfg=self.anchor_cfg)))
for k in self._anchors_fpn:
v = self._anchors_fpn[k].astype(np.float32)
self._anchors_fpn[k] = v
self._num_anchors = dict(zip(self.fpn_keys, [anchors.shape[0] for anchors in self._anchors_fpn.values()]))
#self._bbox_pred = nonlinear_pred
#self._landmark_pred = landmark_pred
sym, arg_params, aux_params = mx.model.load_checkpoint(prefix, epoch)
if self.ctx_id>=0:
self.ctx = mx.gpu(self.ctx_id)
self.nms = gpu_nms_wrapper(self.nms_threshold, self.ctx_id)
else:
self.ctx = mx.cpu()
self.nms = cpu_nms_wrapper(self.nms_threshold)
self.pixel_means = np.array(pixel_means, dtype=np.float32)
self.pixel_stds = np.array(pixel_stds, dtype=np.float32)
self.pixel_scale = float(pixel_scale)
print('means', self.pixel_means)
self.use_landmarks = False
if len(sym)//len(self._feat_stride_fpn)==3:
self.use_landmarks = True
print('use_landmarks', self.use_landmarks)
if self.debug:
c = len(sym)//len(self._feat_stride_fpn)
sym = sym[(c*0):]
self._feat_stride_fpn = [32,16,8]
print('sym size:', len(sym))
image_size = (640, 640)
self.model = mx.mod.Module(symbol=sym, context=self.ctx, label_names = None)
self.model.bind(data_shapes=[('data', (1, 3, image_size[0], image_size[1]))], for_training=False)
self.model.set_params(arg_params, aux_params)
def get_input(self, img):
im = img.astype(np.float32)
im_tensor = np.zeros((1, 3, im.shape[0], im.shape[1]))
for i in range(3):
im_tensor[0, i, :, :] = (im[:, :, 2 - i]/self.pixel_scale - self.pixel_means[2 - i])/self.pixel_stds[2-i]
#if self.debug:
# timeb = datetime.datetime.now()
# diff = timeb - timea
# print('X2 uses', diff.total_seconds(), 'seconds')
data = nd.array(im_tensor)
return data
def detect(self, img, threshold=0.5, scales=[1.0], do_flip=False):
#print('in_detect', threshold, scales, do_flip, do_nms)
proposals_list = []
scores_list = []
landmarks_list = []
timea = datetime.datetime.now()
flips = [0]
if do_flip:
flips = [0, 1]
for im_scale in scales:
for flip in flips:
if im_scale!=1.0:
im = cv2.resize(img, None, None, fx=im_scale, fy=im_scale, interpolation=cv2.INTER_LINEAR)
else:
im = img.copy()
if flip:
im = im[:,::-1,:]
if self.nocrop:
if im.shape[0]%32==0:
h = im.shape[0]
else:
h = (im.shape[0]//32+1)*32
if im.shape[1]%32==0:
w = im.shape[1]
else:
w = (im.shape[1]//32+1)*32
_im = np.zeros( (h, w, 3), dtype=np.float32 )
_im[0:im.shape[0], 0:im.shape[1], :] = im
im = _im
else:
im = im.astype(np.float32)
if self.debug:
timeb = datetime.datetime.now()
diff = timeb - timea
print('X1 uses', diff.total_seconds(), 'seconds')
#self.model.bind(data_shapes=[('data', (1, 3, image_size[0], image_size[1]))], for_training=False)
#im_info = [im.shape[0], im.shape[1], im_scale]
im_info = [im.shape[0], im.shape[1]]
im_tensor = np.zeros((1, 3, im.shape[0], im.shape[1]))
for i in range(3):
im_tensor[0, i, :, :] = (im[:, :, 2 - i]/self.pixel_scale - self.pixel_means[2 - i])/self.pixel_stds[2-i]
if self.debug:
timeb = datetime.datetime.now()
diff = timeb - timea
print('X2 uses', diff.total_seconds(), 'seconds')
data = nd.array(im_tensor)
db = mx.io.DataBatch(data=(data,), provide_data=[('data', data.shape)])
if self.debug:
timeb = datetime.datetime.now()
diff = timeb - timea
print('X3 uses', diff.total_seconds(), 'seconds')
self.model.forward(db, is_train=False)
net_out = self.model.get_outputs()
#post_nms_topN = self._rpn_post_nms_top_n
#min_size_dict = self._rpn_min_size_fpn
for _idx,s in enumerate(self._feat_stride_fpn):
#if len(scales)>1 and s==32 and im_scale==scales[-1]:
# continue
_key = 'stride%s'%s
stride = int(s)
#if self.vote and stride==4 and len(scales)>2 and (im_scale==scales[0]):
# continue
if self.use_landmarks:
idx = _idx*3
else:
idx = _idx*2
#print('getting', im_scale, stride, idx, len(net_out), data.shape, file=sys.stderr)
scores = net_out[idx].asnumpy()
if self.debug:
timeb = datetime.datetime.now()
diff = timeb - timea
print('A uses', diff.total_seconds(), 'seconds')
#print(scores.shape)
#print('scores',stride, scores.shape, file=sys.stderr)
scores = scores[:, self._num_anchors['stride%s'%s]:, :, :]
idx+=1
bbox_deltas = net_out[idx].asnumpy()
#if DEBUG:
# print 'im_size: ({}, {})'.format(im_info[0], im_info[1])
# print 'scale: {}'.format(im_info[2])
#_height, _width = int(im_info[0] / stride), int(im_info[1] / stride)
height, width = bbox_deltas.shape[2], bbox_deltas.shape[3]
A = self._num_anchors['stride%s'%s]
K = height * width
anchors_fpn = self._anchors_fpn['stride%s'%s]
anchors = anchors_plane(height, width, stride, anchors_fpn)
#print((height, width), (_height, _width), anchors.shape, bbox_deltas.shape, scores.shape, file=sys.stderr)
anchors = anchors.reshape((K * A, 4))
#print('num_anchors', self._num_anchors['stride%s'%s], file=sys.stderr)
#print('HW', (height, width), file=sys.stderr)
#print('anchors_fpn', anchors_fpn.shape, file=sys.stderr)
#print('anchors', anchors.shape, file=sys.stderr)
#print('bbox_deltas', bbox_deltas.shape, file=sys.stderr)
#print('scores', scores.shape, file=sys.stderr)
scores = self._clip_pad(scores, (height, width))
scores = scores.transpose((0, 2, 3, 1)).reshape((-1, 1))
#print('pre', bbox_deltas.shape, height, width)
bbox_deltas = self._clip_pad(bbox_deltas, (height, width))
#print('after', bbox_deltas.shape, height, width)
bbox_deltas = bbox_deltas.transpose((0, 2, 3, 1))
bbox_pred_len = bbox_deltas.shape[3]//A
#print(bbox_deltas.shape)
bbox_deltas = bbox_deltas.reshape((-1, bbox_pred_len))
#print(anchors.shape, bbox_deltas.shape, A, K, file=sys.stderr)
proposals = self.bbox_pred(anchors, bbox_deltas)
proposals = clip_boxes(proposals, im_info[:2])
#if self.vote:
# if im_scale>1.0:
# keep = self._filter_boxes2(proposals, 160*im_scale, -1)
# else:
# keep = self._filter_boxes2(proposals, -1, 100*im_scale)
# if stride==4:
# keep = self._filter_boxes2(proposals, 12*im_scale, -1)
# proposals = proposals[keep, :]
# scores = scores[keep]
#keep = self._filter_boxes(proposals, min_size_dict['stride%s'%s] * im_info[2])
#proposals = proposals[keep, :]
#scores = scores[keep]
#print('333', proposals.shape)
scores_ravel = scores.ravel()
#print('__shapes', proposals.shape, scores_ravel.shape)
#print('max score', np.max(scores_ravel))
order = np.where(scores_ravel>=threshold)[0]
#_scores = scores_ravel[order]
#_order = _scores.argsort()[::-1]
#order = order[_order]
proposals = proposals[order, :]
scores = scores[order]
if stride==4 and self.decay4<1.0:
scores *= self.decay4
if flip:
oldx1 = proposals[:, 0].copy()
oldx2 = proposals[:, 2].copy()
proposals[:, 0] = im.shape[1] - oldx2 - 1
proposals[:, 2] = im.shape[1] - oldx1 - 1
proposals[:,0:4] /= im_scale
proposals_list.append(proposals)
scores_list.append(scores)
if not self.vote and self.use_landmarks:
idx+=1
landmark_deltas = net_out[idx].asnumpy()
landmark_deltas = self._clip_pad(landmark_deltas, (height, width))
landmark_pred_len = landmark_deltas.shape[1]//A
landmark_deltas = landmark_deltas.transpose((0, 2, 3, 1)).reshape((-1, 5, landmark_pred_len//5))
#print(landmark_deltas.shape, landmark_deltas)
landmarks = self.landmark_pred(anchors, landmark_deltas)
landmarks = landmarks[order, :]
if flip:
landmarks[:,:,0] = im.shape[1] - landmarks[:,:,0] - 1
#for a in range(5):
# oldx1 = landmarks[:, a].copy()
# landmarks[:,a] = im.shape[1] - oldx1 - 1
order = [1,0,2,4,3]
flandmarks = landmarks.copy()
for idx, a in enumerate(order):
flandmarks[:,idx,:] = landmarks[:,a,:]
#flandmarks[:, idx*2] = landmarks[:,a*2]
#flandmarks[:, idx*2+1] = landmarks[:,a*2+1]
landmarks = flandmarks
landmarks[:,:,0:2] /= im_scale
#landmarks /= im_scale
#landmarks = landmarks.reshape( (-1, landmark_pred_len) )
landmarks_list.append(landmarks)
#proposals = np.hstack((proposals, landmarks))
if self.debug:
timeb = datetime.datetime.now()
diff = timeb - timea
print('B uses', diff.total_seconds(), 'seconds')
proposals = np.vstack(proposals_list)
landmarks = None
if proposals.shape[0]==0:
if self.use_landmarks:
landmarks = np.zeros( (0,5,2) )
return np.zeros( (0,5) ), landmarks
scores = np.vstack(scores_list)
#print('shapes', proposals.shape, scores.shape)
scores_ravel = scores.ravel()
order = scores_ravel.argsort()[::-1]
#if config.TEST.SCORE_THRESH>0.0:
# _count = np.sum(scores_ravel>config.TEST.SCORE_THRESH)
# order = order[:_count]
proposals = proposals[order, :]
scores = scores[order]
if not self.vote and self.use_landmarks:
landmarks = np.vstack(landmarks_list)
landmarks = landmarks[order].astype(np.float32, copy=False)
pre_det = np.hstack((proposals[:,0:4], scores)).astype(np.float32, copy=False)
if not self.vote:
keep = self.nms(pre_det)
det = np.hstack( (pre_det, proposals[:,4:]) )
det = det[keep, :]
if self.use_landmarks:
landmarks = landmarks[keep]
else:
det = np.hstack( (pre_det, proposals[:,4:]) )
det = self.bbox_vote(det)
#if self.use_landmarks:
# det = np.hstack((det, landmarks))
if self.debug:
timeb = datetime.datetime.now()
diff = timeb - timea
print('C uses', diff.total_seconds(), 'seconds')
return det, landmarks
def detect_center(self, img, threshold=0.5, scales=[1.0], do_flip=False):
det, landmarks = self.detect(img, threshold, scales, do_flip)
if det.shape[0]==0:
return None, None
bindex = 0
if det.shape[0]>1:
img_size = np.asarray(img.shape)[0:2]
bounding_box_size = (det[:,2]-det[:,0])*(det[:,3]-det[:,1])
img_center = img_size / 2
offsets = np.vstack([ (det[:,0]+det[:,2])/2-img_center[1], (det[:,1]+det[:,3])/2-img_center[0] ])
offset_dist_squared = np.sum(np.power(offsets,2.0),0)
bindex = np.argmax(bounding_box_size-offset_dist_squared*2.0) # some extra weight on the centering
bbox = det[bindex,:]
landmark = landmarks[bindex, :, :]
return bbox, landmark
@staticmethod
def check_large_pose(landmark, bbox):
assert landmark.shape==(5,2)
assert len(bbox)==4
def get_theta(base, x, y):
vx = x-base
vy = y-base
vx[1] *= -1
vy[1] *= -1
tx = np.arctan2(vx[1], vx[0])
ty = np.arctan2(vy[1], vy[0])
d = ty-tx
d = np.degrees(d)
#print(vx, tx, vy, ty, d)
#if d<-1.*math.pi:
# d+=2*math.pi
#elif d>math.pi:
# d-=2*math.pi
if d<-180.0:
d+=360.
elif d>180.0:
d-=360.0
return d
landmark = landmark.astype(np.float32)
theta1 = get_theta(landmark[0], landmark[3], landmark[2])
theta2 = get_theta(landmark[1], landmark[2], landmark[4])
#print(va, vb, theta2)
theta3 = get_theta(landmark[0], landmark[2], landmark[1])
theta4 = get_theta(landmark[1], landmark[0], landmark[2])
theta5 = get_theta(landmark[3], landmark[4], landmark[2])
theta6 = get_theta(landmark[4], landmark[2], landmark[3])
theta7 = get_theta(landmark[3], landmark[2], landmark[0])
theta8 = get_theta(landmark[4], landmark[1], landmark[2])
#print(theta1, theta2, theta3, theta4, theta5, theta6, theta7, theta8)
left_score = 0.0
right_score = 0.0
up_score = 0.0
down_score = 0.0
if theta1<=0.0:
left_score = 10.0
elif theta2<=0.0:
right_score = 10.0
else:
left_score = theta2/theta1
right_score = theta1/theta2
if theta3<=10.0 or theta4<=10.0:
up_score = 10.0
else:
up_score = max(theta1/theta3, theta2/theta4)
if theta5<=10.0 or theta6<=10.0:
down_score = 10.0
else:
down_score = max(theta7/theta5, theta8/theta6)
mleft = (landmark[0][0]+landmark[3][0])/2
mright = (landmark[1][0]+landmark[4][0])/2
box_center = ( (bbox[0]+bbox[2])/2, (bbox[1]+bbox[3])/2 )
ret = 0
if left_score>=3.0:
ret = 1
if ret==0 and left_score>=2.0:
if mright<=box_center[0]:
ret = 1
if ret==0 and right_score>=3.0:
ret = 2
if ret==0 and right_score>=2.0:
if mleft>=box_center[0]:
ret = 2
if ret==0 and up_score>=2.0:
ret = 3
if ret==0 and down_score>=5.0:
ret = 4
return ret, left_score, right_score, up_score, down_score
@staticmethod
def _filter_boxes(boxes, min_size):
""" Remove all boxes with any side smaller than min_size """
ws = boxes[:, 2] - boxes[:, 0] + 1
hs = boxes[:, 3] - boxes[:, 1] + 1
keep = np.where((ws >= min_size) & (hs >= min_size))[0]
return keep
@staticmethod
def _filter_boxes2(boxes, max_size, min_size):
""" Remove all boxes with any side smaller than min_size """
ws = boxes[:, 2] - boxes[:, 0] + 1
hs = boxes[:, 3] - boxes[:, 1] + 1
if max_size>0:
keep = np.where( np.minimum(ws, hs)<max_size )[0]
elif min_size>0:
keep = np.where( np.maximum(ws, hs)>min_size )[0]
return keep
@staticmethod
def _clip_pad(tensor, pad_shape):
"""
Clip boxes of the pad area.
:param tensor: [n, c, H, W]
:param pad_shape: [h, w]
:return: [n, c, h, w]
"""
H, W = tensor.shape[2:]
h, w = pad_shape
if h < H or w < W:
tensor = tensor[:, :, :h, :w].copy()
return tensor
@staticmethod
def bbox_pred(boxes, box_deltas):
"""
Transform the set of class-agnostic boxes into class-specific boxes
by applying the predicted offsets (box_deltas)
:param boxes: !important [N 4]
:param box_deltas: [N, 4 * num_classes]
:return: [N 4 * num_classes]
"""
if boxes.shape[0] == 0:
return np.zeros((0, box_deltas.shape[1]))
boxes = boxes.astype(np.float, copy=False)
widths = boxes[:, 2] - boxes[:, 0] + 1.0
heights = boxes[:, 3] - boxes[:, 1] + 1.0
ctr_x = boxes[:, 0] + 0.5 * (widths - 1.0)
ctr_y = boxes[:, 1] + 0.5 * (heights - 1.0)
dx = box_deltas[:, 0:1]
dy = box_deltas[:, 1:2]
dw = box_deltas[:, 2:3]
dh = box_deltas[:, 3:4]
pred_ctr_x = dx * widths[:, np.newaxis] + ctr_x[:, np.newaxis]
pred_ctr_y = dy * heights[:, np.newaxis] + ctr_y[:, np.newaxis]
pred_w = np.exp(dw) * widths[:, np.newaxis]
pred_h = np.exp(dh) * heights[:, np.newaxis]
pred_boxes = np.zeros(box_deltas.shape)
# x1
pred_boxes[:, 0:1] = pred_ctr_x - 0.5 * (pred_w - 1.0)
# y1
pred_boxes[:, 1:2] = pred_ctr_y - 0.5 * (pred_h - 1.0)
# x2
pred_boxes[:, 2:3] = pred_ctr_x + 0.5 * (pred_w - 1.0)
# y2
pred_boxes[:, 3:4] = pred_ctr_y + 0.5 * (pred_h - 1.0)
if box_deltas.shape[1]>4:
pred_boxes[:,4:] = box_deltas[:,4:]
return pred_boxes
@staticmethod
def landmark_pred(boxes, landmark_deltas):
if boxes.shape[0] == 0:
return np.zeros((0, landmark_deltas.shape[1]))
boxes = boxes.astype(np.float, copy=False)
widths = boxes[:, 2] - boxes[:, 0] + 1.0
heights = boxes[:, 3] - boxes[:, 1] + 1.0
ctr_x = boxes[:, 0] + 0.5 * (widths - 1.0)
ctr_y = boxes[:, 1] + 0.5 * (heights - 1.0)
pred = landmark_deltas.copy()
for i in range(5):
pred[:,i,0] = landmark_deltas[:,i,0]*widths + ctr_x
pred[:,i,1] = landmark_deltas[:,i,1]*heights + ctr_y
return pred
#preds = []
#for i in range(landmark_deltas.shape[1]):
# if i%2==0:
# pred = (landmark_deltas[:,i]*widths + ctr_x)
# else:
# pred = (landmark_deltas[:,i]*heights + ctr_y)
# preds.append(pred)
#preds = np.vstack(preds).transpose()
#return preds
def bbox_vote(self, det):
#order = det[:, 4].ravel().argsort()[::-1]
#det = det[order, :]
if det.shape[0] == 0:
dets = np.array([[10, 10, 20, 20, 0.002]])
det = np.empty(shape=[0, 5])
while det.shape[0] > 0:
# IOU
area = (det[:, 2] - det[:, 0] + 1) * (det[:, 3] - det[:, 1] + 1)
xx1 = np.maximum(det[0, 0], det[:, 0])
yy1 = np.maximum(det[0, 1], det[:, 1])
xx2 = np.minimum(det[0, 2], det[:, 2])
yy2 = np.minimum(det[0, 3], det[:, 3])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
o = inter / (area[0] + area[:] - inter)
# nms
merge_index = np.where(o >= self.nms_threshold)[0]
det_accu = det[merge_index, :]
det = np.delete(det, merge_index, 0)
if merge_index.shape[0] <= 1:
if det.shape[0] == 0:
try:
dets = np.row_stack((dets, det_accu))
except:
dets = det_accu
continue
det_accu[:, 0:4] = det_accu[:, 0:4] * np.tile(det_accu[:, -1:], (1, 4))
max_score = np.max(det_accu[:, 4])
det_accu_sum = np.zeros((1, 5))
det_accu_sum[:, 0:4] = np.sum(det_accu[:, 0:4],
axis=0) / np.sum(det_accu[:, -1:])
det_accu_sum[:, 4] = max_score
try:
dets = np.row_stack((dets, det_accu_sum))
except:
dets = det_accu_sum
dets = dets[0:750, :]
return dets

View File

@@ -0,0 +1,199 @@
from __future__ import print_function
import argparse
import sys
import os
import time
import numpy as np
import mxnet as mx
from mxnet import ndarray as nd
import cv2
from rcnn.logger import logger
#from rcnn.config import config, default, generate_config
#from rcnn.tools.test_rcnn import test_rcnn
#from rcnn.tools.test_rpn import test_rpn
from rcnn.processing.bbox_transform import nonlinear_pred, clip_boxes, landmark_pred
from rcnn.processing.generate_anchor import generate_anchors_fpn, anchors_plane
from rcnn.processing.nms import gpu_nms_wrapper
from rcnn.processing.bbox_transform import bbox_overlaps
from rcnn.dataset import retinaface
from retinaface import RetinaFace
def parse_args():
parser = argparse.ArgumentParser(description='Test widerface by retinaface detector')
# general
parser.add_argument('--network', help='network name', default='net3', type=str)
parser.add_argument('--dataset', help='dataset name', default='retinaface', type=str)
parser.add_argument('--image-set', help='image_set name', default='val', type=str)
parser.add_argument('--root-path', help='output data folder', default='./data', type=str)
parser.add_argument('--dataset-path', help='dataset path', default='./data/retinaface', type=str)
parser.add_argument('--gpu', help='GPU device to test with', default=0, type=int)
# testing
parser.add_argument('--prefix', help='model to test with', default='', type=str)
parser.add_argument('--epoch', help='model to test with', default=0, type=int)
parser.add_argument('--output', help='output folder', default='./wout', type=str)
parser.add_argument('--nocrop', help='', action='store_true')
parser.add_argument('--thresh', help='valid detection threshold', default=0.02, type=float)
parser.add_argument('--mode', help='test mode, 0 for fast, 1 for accurate', default=1, type=int)
#parser.add_argument('--pyramid', help='enable pyramid test', action='store_true')
#parser.add_argument('--bbox-vote', help='', action='store_true')
parser.add_argument('--part', help='', default=0, type=int)
parser.add_argument('--parts', help='', default=1, type=int)
args = parser.parse_args()
return args
detector = None
args = None
imgid = -1
def get_boxes(roi, pyramid):
global imgid
im = cv2.imread(roi['image'])
do_flip = False
if not pyramid:
target_size = 1200
max_size = 1600
#do_flip = True
target_size = 1504
max_size = 2000
target_size = 1600
max_size = 2150
im_shape = im.shape
im_size_min = np.min(im_shape[0:2])
im_size_max = np.max(im_shape[0:2])
im_scale = float(target_size) / float(im_size_min)
# prevent bigger axis from being more than max_size:
if np.round(im_scale * im_size_max) > max_size:
im_scale = float(max_size) / float(im_size_max)
scales = [im_scale]
else:
do_flip = True
#TEST_SCALES = [500, 800, 1200, 1600]
TEST_SCALES = [500, 800, 1100, 1400, 1700]
target_size = 800
max_size = 1200
im_shape = im.shape
im_size_min = np.min(im_shape[0:2])
im_size_max = np.max(im_shape[0:2])
im_scale = float(target_size) / float(im_size_min)
# prevent bigger axis from being more than max_size:
if np.round(im_scale * im_size_max) > max_size:
im_scale = float(max_size) / float(im_size_max)
scales = [float(scale)/target_size*im_scale for scale in TEST_SCALES]
boxes, landmarks = detector.detect(im, threshold=args.thresh, scales = scales, do_flip=do_flip)
#print(boxes.shape, landmarks.shape)
if imgid>=0 and imgid<100:
font = cv2.FONT_HERSHEY_SIMPLEX
for i in xrange(boxes.shape[0]):
box = boxes[i]
ibox = box[0:4].copy().astype(np.int)
cv2.rectangle(im, (ibox[0], ibox[1]), (ibox[2], ibox[3]), (255, 0, 0), 2)
#print('box', ibox)
#if len(ibox)>5:
# for l in xrange(5):
# pp = (ibox[5+l*2], ibox[6+l*2])
# cv2.circle(im, (pp[0], pp[1]), 1, (0, 0, 255), 1)
blur = box[5]
k = "%.3f"%blur
cv2.putText(im,k,(ibox[0]+2,ibox[1]+14), font, 0.6, (0,255,0), 2)
#landmarks = box[6:21].reshape( (5,3) )
if landmarks is not None:
for l in xrange(5):
color = (0,255,0)
landmark = landmarks[i][l]
pp = (int(landmark[0]), int(landmark[1]))
if landmark[2]-0.5<0.0:
color = (0,0,255)
cv2.circle(im, (pp[0], pp[1]), 1, color, 2)
filename = './testimages/%d.jpg'%imgid
cv2.imwrite(filename, im)
print(filename, 'wrote')
imgid+=1
return boxes
def test(args):
print('test with', args)
global detector
output_folder = args.output
if not os.path.exists(output_folder):
os.mkdir(output_folder)
detector = RetinaFace(args.prefix, args.epoch, args.gpu, network=args.network, nocrop=args.nocrop, vote=args.bbox_vote)
imdb = eval(args.dataset)(args.image_set, args.root_path, args.dataset_path)
roidb = imdb.gt_roidb()
gt_overlaps = np.zeros(0)
overall = [0.0, 0.0]
gt_max = np.array( (0.0, 0.0) )
num_pos = 0
print('roidb size', len(roidb))
for i in xrange(len(roidb)):
if i%args.parts!=args.part:
continue
#if i%10==0:
# print('processing', i, file=sys.stderr)
roi = roidb[i]
boxes = get_boxes(roi, args.pyramid)
if 'boxes' in roi:
gt_boxes = roi['boxes'].copy()
gt_areas = (gt_boxes[:, 2] - gt_boxes[:, 0] + 1) * (gt_boxes[:, 3] - gt_boxes[:, 1] + 1)
num_pos += gt_boxes.shape[0]
overlaps = bbox_overlaps(boxes.astype(np.float), gt_boxes.astype(np.float))
#print(im_info, gt_boxes.shape, boxes.shape, overlaps.shape, file=sys.stderr)
_gt_overlaps = np.zeros((gt_boxes.shape[0]))
if boxes.shape[0]>0:
_gt_overlaps = overlaps.max(axis=0)
#print('max_overlaps', _gt_overlaps, file=sys.stderr)
for j in range(len(_gt_overlaps)):
if _gt_overlaps[j]>0.5:
continue
#print(j, 'failed', gt_boxes[j], 'max_overlap:', _gt_overlaps[j], file=sys.stderr)
# append recorded IoU coverage level
found = (_gt_overlaps > 0.5).sum()
recall = found / float(gt_boxes.shape[0])
#print('recall', _recall, gt_boxes.shape[0], boxes.shape[0], gt_areas, 'num:', i, file=sys.stderr)
overall[0]+=found
overall[1]+=gt_boxes.shape[0]
#gt_overlaps = np.hstack((gt_overlaps, _gt_overlaps))
#_recall = (gt_overlaps >= threshold).sum() / float(num_pos)
recall_all = float(overall[0])/overall[1]
#print('recall_all', _recall, file=sys.stderr)
print('[%d]'%i, 'recall', recall, (gt_boxes.shape[0], boxes.shape[0]), 'all:', recall_all, file=sys.stderr)
else:
print('[%d]'%i, 'detect %d faces'%boxes.shape[0])
_vec = roidb[i]['image'].split('/')
out_dir = os.path.join(output_folder, _vec[-2])
if not os.path.exists(out_dir):
os.mkdir(out_dir)
out_file = os.path.join(out_dir, _vec[-1].replace('jpg', 'txt'))
with open(out_file, 'w') as f:
name = '/'.join(roidb[i]['image'].split('/')[-2:])
f.write("%s\n"%(name))
f.write("%d\n"%(boxes.shape[0]))
for b in range(boxes.shape[0]):
box = boxes[b]
f.write("%d %d %d %d %g \n"%(box[0], box[1], box[2]-box[0], box[3]-box[1], box[4]))
def main():
global args
args = parse_args()
if args.mode==0:
args.pyramid = False
args.bbox_vote = False
else:
args.pyramid = True
args.bbox_vote = True
logger.info('Called with argument: %s' % args)
test(args)
if __name__ == '__main__':
main()