from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import random import logging import sys import sklearn import datetime import numpy as np import cv2 import mxnet as mx from mxnet import ndarray as nd #from . import _ndarray_internal as _internal #from mxnet._ndarray_internal import _cvimresize as imresize #from ._ndarray_internal import _cvcopyMakeBorder as copyMakeBorder from mxnet import io from mxnet import recordio sys.path.append(os.path.join(os.path.dirname(__file__), 'common')) import face_preprocess import multiprocessing logger = logging.getLogger() def pick_triplets_impl(q_in, q_out): more = True while more: deq = q_in.get() if deq is None: more = False else: embeddings, emb_start_idx, nrof_images, alpha = deq print('running', emb_start_idx, nrof_images, os.getpid()) for j in xrange(1,nrof_images): a_idx = emb_start_idx + j - 1 neg_dists_sqr = np.sum(np.square(embeddings[a_idx] - embeddings), 1) for pair in xrange(j, nrof_images): # For every possible positive pair. p_idx = emb_start_idx + pair pos_dist_sqr = np.sum(np.square(embeddings[a_idx]-embeddings[p_idx])) neg_dists_sqr[emb_start_idx:emb_start_idx+nrof_images] = np.NaN all_neg = np.where(np.logical_and(neg_dists_sqr-pos_dist_sqr0: rnd_idx = np.random.randint(nrof_random_negs) n_idx = all_neg[rnd_idx] #triplets.append( (a_idx, p_idx, n_idx) ) q_out.put( (a_idx, p_idx, n_idx) ) #emb_start_idx += nrof_images print('exit',os.getpid()) class FaceImageIter(io.DataIter): def __init__(self, batch_size, data_shape, path_imgrec = None, shuffle=False, aug_list=None, mean = None, rand_mirror = False, ctx_num = 0, images_per_identity = 0, data_extra = None, hard_mining = False, triplet_params = None, coco_mode = False, mx_model = None, data_name='data', label_name='softmax_label', **kwargs): super(FaceImageIter, self).__init__() assert path_imgrec if path_imgrec: logging.info('loading recordio %s...', path_imgrec) path_imgidx = path_imgrec[0:-4]+".idx" self.imgrec = recordio.MXIndexedRecordIO(path_imgidx, path_imgrec, 'r') # pylint: disable=redefined-variable-type s = self.imgrec.read_idx(0) header, _ = recordio.unpack(s) if header.flag>0: print('header0 label', header.label) self.header0 = (int(header.label[0]), int(header.label[1])) #assert(header.flag==1) self.imgidx = range(1, int(header.label[0])) self.id2range = {} self.seq_identity = range(int(header.label[0]), int(header.label[1])) for identity in self.seq_identity: s = self.imgrec.read_idx(identity) header, _ = recordio.unpack(s) #print('flag', header.flag) #print(header.label) #assert(header.flag==2) self.id2range[identity] = (int(header.label[0]), int(header.label[1])) print('id2range', len(self.id2range)) else: self.imgidx = list(self.imgrec.keys) if shuffle: self.seq = self.imgidx self.oseq = self.imgidx else: self.seq = None self.mean = mean self.nd_mean = None if self.mean: self.mean = np.array(self.mean, dtype=np.float32).reshape(1,1,3) self.nd_mean = mx.nd.array(self.mean).reshape((1,1,3)) self.check_data_shape(data_shape) self.provide_data = [(data_name, (batch_size,) + data_shape)] self.batch_size = batch_size self.data_shape = data_shape self.shuffle = shuffle self.image_size = '%d,%d'%(data_shape[1],data_shape[2]) self.rand_mirror = rand_mirror print('rand_mirror', rand_mirror) #self.cast_aug = mx.image.CastAug() #self.color_aug = mx.image.ColorJitterAug(0.4, 0.4, 0.4) self.ctx_num = ctx_num self.per_batch_size = int(self.batch_size/self.ctx_num) self.images_per_identity = images_per_identity if self.images_per_identity>0: self.identities = int(self.per_batch_size/self.images_per_identity) self.per_identities = self.identities self.repeat = 3000000.0/(self.images_per_identity*len(self.id2range)) self.repeat = int(self.repeat) print(self.images_per_identity, self.identities, self.repeat) self.data_extra = None if data_extra is not None: self.data_extra = nd.array(data_extra) self.provide_data = [(data_name, (batch_size,) + data_shape), ('extra', data_extra.shape)] self.hard_mining = hard_mining self.mx_model = mx_model if self.hard_mining: assert self.images_per_identity>0 assert self.mx_model is not None self.triplet_params = triplet_params self.triplet_mode = False self.coco_mode = coco_mode if len(label_name)>0: self.provide_label = [(label_name, (batch_size,))] else: self.provide_label = [] if self.coco_mode: assert self.triplet_params is None assert self.images_per_identity>0 if self.triplet_params is not None: assert self.images_per_identity>0 assert self.mx_model is not None self.triplet_bag_size = self.triplet_params[0] self.triplet_alpha = self.triplet_params[1] self.triplet_max_ap = self.triplet_params[2] assert self.triplet_bag_size>0 assert self.triplet_alpha>=0.0 assert self.triplet_alpha<=1.0 self.triplet_mode = True self.triplet_oseq_cur = 0 self.triplet_oseq_reset() self.seq_min_size = self.batch_size*2 self.cur = 0 self.is_init = False self.times = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] #self.reset() def ____pick_triplets(self, embeddings, nrof_images_per_class): emb_start_idx = 0 people_per_batch = len(nrof_images_per_class) nrof_threads = 8 q_in = multiprocessing.Queue() q_out = multiprocessing.Queue() processes = [multiprocessing.Process(target=pick_triplets_impl, args=(q_in, q_out)) \ for i in range(nrof_threads)] for p in processes: p.start() # VGG Face: Choosing good triplets is crucial and should strike a balance between # selecting informative (i.e. challenging) examples and swamping training with examples that # are too hard. This is achieve by extending each pair (a, p) to a triplet (a, p, n) by sampling # the image n at random, but only between the ones that violate the triplet loss margin. The # latter is a form of hard-negative mining, but it is not as aggressive (and much cheaper) than # choosing the maximally violating example, as often done in structured output learning. for i in xrange(people_per_batch): nrof_images = int(nrof_images_per_class[i]) job = (embeddings, emb_start_idx, nrof_images, self.triplet_alpha) emb_start_idx+=nrof_images q_in.put(job) for i in xrange(nrof_threads): q_in.put(None) print('joining') for p in processes: p.join() print('joined') q_out.put(None) triplets = [] more = True while more: triplet = q_out.get() if triplet is None: more = False else: triplets.append(triplets) np.random.shuffle(triplets) return triplets #cal pairwise dists on single gpu def _pairwise_dists(self, embeddings): nd_embedding = mx.nd.array(embeddings, mx.gpu(0)) pdists = [] for idx in xrange(embeddings.shape[0]): a_embedding = nd_embedding[idx] body = mx.nd.broadcast_sub(a_embedding, nd_embedding) body = body*body body = mx.nd.sum_axis(body, axis=1) ret = body.asnumpy() #print(ret.shape) pdists.append(ret) return pdists def pairwise_dists(self, embeddings): nd_embedding_list = [] for i in xrange(self.ctx_num): nd_embedding = mx.nd.array(embeddings, mx.gpu(i)) nd_embedding_list.append(nd_embedding) nd_pdists = [] pdists = [] for idx in xrange(embeddings.shape[0]): emb_idx = idx%self.ctx_num nd_embedding = nd_embedding_list[emb_idx] a_embedding = nd_embedding[idx] body = mx.nd.broadcast_sub(a_embedding, nd_embedding) body = body*body body = mx.nd.sum_axis(body, axis=1) nd_pdists.append(body) if len(nd_pdists)==self.ctx_num or idx==embeddings.shape[0]-1: for x in nd_pdists: pdists.append(x.asnumpy()) nd_pdists = [] return pdists def pick_triplets(self, embeddings, nrof_images_per_class): emb_start_idx = 0 triplets = [] people_per_batch = len(nrof_images_per_class) #self.time_reset() pdists = self.pairwise_dists(embeddings) #self.times[3] += self.time_elapsed() for i in xrange(people_per_batch): nrof_images = int(nrof_images_per_class[i]) for j in xrange(1,nrof_images): #self.time_reset() a_idx = emb_start_idx + j - 1 #neg_dists_sqr = np.sum(np.square(embeddings[a_idx] - embeddings), 1) neg_dists_sqr = pdists[a_idx] #self.times[3] += self.time_elapsed() for pair in xrange(j, nrof_images): # For every possible positive pair. p_idx = emb_start_idx + pair #self.time_reset() pos_dist_sqr = np.sum(np.square(embeddings[a_idx]-embeddings[p_idx])) #self.times[4] += self.time_elapsed() #self.time_reset() neg_dists_sqr[emb_start_idx:emb_start_idx+nrof_images] = np.NaN if self.triplet_max_ap>0.0: if pos_dist_sqr>self.triplet_max_ap: continue all_neg = np.where(np.logical_and(neg_dists_sqr-pos_dist_sqr0: rnd_idx = np.random.randint(nrof_random_negs) n_idx = all_neg[rnd_idx] triplets.append( (a_idx, p_idx, n_idx) ) emb_start_idx += nrof_images np.random.shuffle(triplets) return triplets def __pick_triplets(self, embeddings, nrof_images_per_class): emb_start_idx = 0 triplets = [] people_per_batch = len(nrof_images_per_class) for i in xrange(people_per_batch): nrof_images = int(nrof_images_per_class[i]) if nrof_images<2: continue for j in xrange(1,nrof_images): a_idx = emb_start_idx + j - 1 pcount = nrof_images-1 dists_a2all = np.sum(np.square(embeddings[a_idx] - embeddings), 1) #(N,) #print(a_idx, dists_a2all.shape) ba = emb_start_idx bb = emb_start_idx+nrof_images sorted_idx = np.argsort(dists_a2all) #print('assert', sorted_idx[0], a_idx) #assert sorted_idx[0]==a_idx #for idx in sorted_idx: # print(idx, dists_a2all[idx]) p2n_map = {} pfound = 0 for idx in sorted_idx: if idx==a_idx: #is anchor continue if idx=ba: #is pos p2n_map[idx] = [dists_a2all[idx], []] #ap, [neg_list] pfound+=1 else: # is neg an = dists_a2all[idx] if pfound==pcount and len(p2n_map)==0: break to_del = [] for p_idx in p2n_map: v = p2n_map[p_idx] an_ap = an - v[0] if an_ap0: n_idx = random.choice(v[1]) triplets.append( (a_idx, p_idx, n_idx) ) to_del.append(p_idx) for _del in to_del: del p2n_map[_del] for p_idx,v in p2n_map.iteritems(): if len(v[1])>0: n_idx = random.choice(v[1]) triplets.append( (a_idx, p_idx, n_idx) ) emb_start_idx += nrof_images np.random.shuffle(triplets) return triplets def triplet_oseq_reset(self): #reset self.oseq by identities seq self.triplet_oseq_cur = 0 ids = [] for k in self.id2range: ids.append(k) random.shuffle(ids) self.oseq = [] for _id in ids: v = self.id2range[_id] _list = range(*v) random.shuffle(_list) if len(_list)>self.images_per_identity: _list = _list[0:self.images_per_identity] self.oseq += _list print('oseq', len(self.oseq)) def time_reset(self): self.time_now = datetime.datetime.now() def time_elapsed(self): time_now = datetime.datetime.now() diff = time_now - self.time_now return diff.total_seconds() def select_triplets(self): self.seq = [] while len(self.seq)len(self.oseq): self.triplet_oseq_reset() print('eval %d images..'%bag_size, self.triplet_oseq_cur) self.times[0] += self.time_elapsed() self.time_reset() #print(data.shape) data = nd.zeros( self.provide_data[0][1] ) label = nd.zeros( self.provide_label[0][1] ) ba = 0 while True: bb = min(ba+batch_size, bag_size) if ba>=bb: break #_batch = self.data_iter.next() #_data = _batch.data[0].asnumpy() #print(_data.shape) #_label = _batch.label[0].asnumpy() #data[ba:bb,:,:,:] = _data #label[ba:bb] = _label for i in xrange(ba, bb): _idx = self.oseq[i+self.triplet_oseq_cur] s = self.imgrec.read_idx(_idx) header, img = recordio.unpack(s) img = self.imdecode(img) data[i-ba][:] = self.postprocess_data(img) label[i-ba][:] = header.label tag.append( ( int(header.label), _idx) ) #idx[i] = _idx db = mx.io.DataBatch(data=(data,), label=(label,)) self.mx_model.forward(db, is_train=False) net_out = self.mx_model.get_outputs() #print('eval for selecting triplets',ba,bb) #print(net_out) #print(len(net_out)) #print(net_out[0].asnumpy()) net_out = net_out[0].asnumpy() #print(net_out) #print('net_out', net_out.shape) if embeddings is None: embeddings = np.zeros( (bag_size, net_out.shape[1])) embeddings[ba:bb,:] = net_out ba = bb assert len(tag)==bag_size self.triplet_oseq_cur+=bag_size embeddings = sklearn.preprocessing.normalize(embeddings) self.times[1] += self.time_elapsed() self.time_reset() nrof_images_per_class = [1] for i in xrange(1, bag_size): if tag[i][0]==tag[i-1][0]: nrof_images_per_class[-1]+=1 else: nrof_images_per_class.append(1) triplets = self.pick_triplets(embeddings, nrof_images_per_class) # shape=(T,3) print('found triplets', len(triplets)) ba = 0 while True: bb = ba+self.per_batch_size//3 if bb>len(triplets): break _triplets = triplets[ba:bb] for i in xrange(3): for triplet in _triplets: _pos = triplet[i] _idx = tag[_pos][1] self.seq.append(_idx) ba = bb self.times[2] += self.time_elapsed() def triplet_reset(self): self.select_triplets() def hard_mining_reset(self): #import faiss from annoy import AnnoyIndex data = nd.zeros( self.provide_data[0][1] ) label = nd.zeros( self.provide_label[0][1] ) #label = np.zeros( self.provide_label[0][1] ) X = None ba = 0 batch_num = 0 while ba0: if self.triplet_mode: self.triplet_reset() elif not self.hard_mining: self.seq = [] idlist = [] for _id,v in self.id2range.iteritems(): idlist.append((_id,range(*v))) for r in xrange(self.repeat): if r%10==0: print('repeat', r) if self.shuffle: random.shuffle(idlist) for item in idlist: _id = item[0] _list = item[1] #random.shuffle(_list) if len(_list)= len(self.seq): raise StopIteration idx = self.seq[self.cur] self.cur += 1 if self.imgrec is not None: s = self.imgrec.read_idx(idx) header, img = recordio.unpack(s) return header.label, img, None, None else: label, fname, bbox, landmark = self.imglist[idx] return label, self.read_image(fname), bbox, landmark else: s = self.imgrec.read() if s is None: raise StopIteration header, img = recordio.unpack(s) return header.label, img, None, None def brightness_aug(self, src, x): alpha = 1.0 + random.uniform(-x, x) src *= alpha return src def contrast_aug(self, src, x): alpha = 1.0 + random.uniform(-x, x) coef = np.array([[[0.299, 0.587, 0.114]]]) gray = src * coef gray = (3.0 * (1.0 - alpha) / gray.size) * np.sum(gray) src *= alpha src += gray return src def saturation_aug(self, src, x): alpha = 1.0 + random.uniform(-x, x) coef = np.array([[[0.299, 0.587, 0.114]]]) gray = src * coef gray = np.sum(gray, axis=2, keepdims=True) gray *= (1.0 - alpha) src *= alpha src += gray return src def color_aug(self, img, x): augs = [self.brightness_aug, self.contrast_aug, self.saturation_aug] random.shuffle(augs) for aug in augs: #print(img.shape) img = aug(img, x) #print(img.shape) return img def mirror_aug(self, img): _rd = random.randint(0,1) if _rd==1: for c in xrange(img.shape[2]): img[:,:,c] = np.fliplr(img[:,:,c]) return img def next(self): if not self.is_init: self.reset() self.is_init = True """Returns the next batch of data.""" #print('in next', self.cur, self.labelcur) batch_size = self.batch_size c, h, w = self.data_shape batch_data = nd.empty((batch_size, c, h, w)) if self.provide_label is not None: batch_label = nd.empty(self.provide_label[0][1]) i = 0 try: while i < batch_size: label, s, bbox, landmark = self.next_sample() _data = self.imdecode(s) if self.rand_mirror: _rd = random.randint(0,1) if _rd==1: _data = mx.ndarray.flip(data=_data, axis=1) if self.nd_mean is not None: _data = _data.astype('float32') _data -= self.nd_mean _data *= 0.0078125 #_npdata = _data.asnumpy() #if landmark is not None: # _npdata = face_preprocess.preprocess(_npdata, bbox = bbox, landmark=landmark, image_size=self.image_size) #if self.rand_mirror: # _npdata = self.mirror_aug(_npdata) #if self.mean is not None: # _npdata = _npdata.astype(np.float32) # _npdata -= self.mean # _npdata *= 0.0078125 #nimg = np.zeros(_npdata.shape, dtype=np.float32) #nimg[self.patch[1]:self.patch[3],self.patch[0]:self.patch[2],:] = _npdata[self.patch[1]:self.patch[3], self.patch[0]:self.patch[2], :] #_data = mx.nd.array(nimg) data = [_data] try: self.check_valid_image(data) except RuntimeError as e: logging.debug('Invalid image, skipping: %s', str(e)) continue #print('aa',data[0].shape) #data = self.augmentation_transform(data) #print('bb',data[0].shape) for datum in data: assert i < batch_size, 'Batch size must be multiples of augmenter output length' #print(datum.shape) batch_data[i][:] = self.postprocess_data(datum) if self.provide_label is not None: if not self.coco_mode: batch_label[i][:] = label else: batch_label[i][:] = (i%self.per_batch_size)//self.images_per_identity i += 1 except StopIteration: if i>> dataIter.read_image('Face.jpg') # returns decoded raw bytes. """ with open(os.path.join(self.path_root, fname), 'rb') as fin: img = fin.read() return img def augmentation_transform(self, data): """Transforms input data with specified augmentation.""" for aug in self.auglist: data = [ret for src in data for ret in aug(src)] return data def postprocess_data(self, datum): """Final postprocessing step before image is loaded into the batch.""" return nd.transpose(datum, axes=(2, 0, 1)) class FaceImageIterList(io.DataIter): def __init__(self, iter_list): assert len(iter_list)>0 self.provide_data = iter_list[0].provide_data self.provide_label = iter_list[0].provide_label self.iter_list = iter_list self.cur_iter = None def reset(self): self.cur_iter.reset() def next(self): self.cur_iter = random.choice(self.iter_list) while True: try: ret = self.cur_iter.next() except StopIteration: self.cur_iter.reset() continue return ret