The contents of this notebook are © Zhengfang Duanmu and Wentao Liu. It is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License. If you build on it, please cite our paper at:
Z. Duanmu*, W. Liu*, Z. Li, K. Ma and Z. Wang, "Characterizing Generalized Rate-Distortion Performance of Video Coding: An Eigen Analysis Approach," in IEEE Transactions on Image Processing, vol. 29, pp. 6180-6193, 2020. (*Equal contribution)
or
@article{duanmu2020egrd, title={Characterizing Generalized Rate-Distortion Performance of Video Coding: An Eigen Analysis Approach}, author={Duanmu, Zhengfang and Liu, Wentao and Li, Zhuoran and Ma, Kede and Wang, Zhou}, journal={IEEE Transactions on Image Processing}, volume={29}, number={1}, pages={6180--6193}, year={2020} }
To run this notebook, you will need:
We tested the source codes on Anaconda 4.8.3, which includes Python 3.6.10, IPython 7.14.0, scipy 1.4.1, matplotlib 3.1.3, and osqp 0.5.0. A downloadable version of these codes is available here.
Additionally, you need a data source, such as a set of real generalized rate-distortion functions, to run the algorithm on. In the paper and below, we have used the Waterloo GRD Database (download it from here). You may need to adjust the path to the files in the code below.
Please refer to our TIP paper for more details regarding the eGRD method.
This section loads the required Python modules and defines two helper functions, as well as the main class used for all the experiments.
import matplotlib.pyplot as plt
import numpy as np
import simplejson as json
from IPython.display import HTML, display
# helper functions
def plot_grd_surface(grd, reps, fontsize=20, domain_x=None, domain_y=None, zlim=None):
"""Plot a single GRD function
Arguments:
grd {2-D numpy array} -- shape(1, num_reps), the grd function to plot
reps {2-D numpy array} -- shape (num_reps, 2) where 1st column
and 2nd column represent bitrate and spatial resolution, respectively
Points at which to interpolate data.
Keyword Arguments:
fontsize {int} -- set the fontsize of the figure title, axis and tick labels. (default: {20})
domain_x {2-entry tuple} -- set the interpolation range of x-axis (default: {None})
domain_y {2-entry tuple} -- set the interpolation range of y-axis (default: {None})
zlim {2-entry tuple} -- set the limit of z-axis (default: {None})
"""
from scipy.interpolate.interpnd import CloughTocher2DInterpolator
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
if domain_x is not None:
start_x, stop_x = domain_x
else:
start_x, stop_x = reps[:, 0].min(), reps[:, 0].max()
if domain_y is not None:
start_y, stop_y = domain_y
else:
start_y, stop_y = reps[:, 1].min(), reps[:, 1].max()
x = np.linspace(start=start_x,
stop=stop_x,
num=100)
y = np.linspace(start=start_y,
stop=stop_y,
num=100)
xp, yp = np.meshgrid(x, y)
coord = np.c_[xp.flatten(), yp.flatten()]
# num_cols = 1
# num_rows = 1
fig = plt.figure(figsize=2 * plt.figaspect(1/1.5))
# for row, grd in enumerate(grd_sel):
interp = CloughTocher2DInterpolator(reps, grd)
z = interp(coord)
zp = z.reshape(xp.shape)
ax = fig.add_subplot(111, projection='3d')
ax.plot_surface(xp, yp, zp, linewidth=0, cmap=cm.jet)
ax.plot_wireframe(xp, yp, zp, colors='black', linewidths=0.5)
ax.view_init(35, -125)
if zlim is not None:
ax.set_zlim(zlim[0], zlim[1])
ax.set_yticks(np.arange(500,2100,500))
# set axis tick font size
for tick in ax.xaxis.get_major_ticks():
tick.label.set_fontsize(fontsize)
for tick in ax.yaxis.get_major_ticks():
tick.label.set_fontsize(fontsize)
for tick in ax.zaxis.get_major_ticks():
tick.label.set_fontsize(fontsize-8)
# set label
ax.set_xlabel('bitrate', labelpad=40, fontsize=fontsize)
ax.set_ylabel('resolution', labelpad=40, fontsize=fontsize)
ax.set_zlabel('SSIMplus', labelpad=18, fontsize=fontsize-4)
plt.show()
def plot_grd_surfaces(grds, reps, num_plt):
"""
Plot a set of GRD functions
:param grds: 2-D ndarray, shape(num_observations, num_variables)
:param reps: 2-D array, shape (num_variables, 2) where 1st column
and 2nd column represent bitrate and spatial resolution, respectively
Points at which to interpolate data.
:param num_plt: int, number of functions to plot
"""
from scipy.interpolate.interpnd import CloughTocher2DInterpolator
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
grd_sel = grds[0:num_plt, :]
x = np.linspace(start=reps[:, 0].min(),
stop=reps[:, 0].max(),
num=100)
y = np.linspace(start=reps[:, 1].min(),
stop=reps[:, 1].max(),
num=100)
xp, yp = np.meshgrid(x, y)
coord = np.c_[xp.flatten(), yp.flatten()]
num_cols = 4
num_rows = int(np.ceil(num_plt / num_cols))
fig = plt.figure(figsize=2*plt.figaspect(num_rows / float(num_cols+3)))
for row, grd in enumerate(grd_sel):
interp = CloughTocher2DInterpolator(reps, grd)
z = interp(coord)
zp = z.reshape(xp.shape)
ax = fig.add_subplot(num_rows, num_cols, row+1, projection='3d')
ax.plot_surface(xp, yp, zp, linewidth=0, cmap=cm.jet)
ax.plot_wireframe(xp, yp, zp, colors='black', linewidths=0.5)
ax.view_init(35, -125)
ax.set_xlabel('bitrate')
ax.set_ylabel('resolution')
ax.set_zlabel('SSIMplus')
plt.show()
def compute_loss(f, f_hat):
"""
Compute the following loss functions:
1. RMSE averaged over all test GRD functions
2. worst case MSE over all test GRD functions
3. L_infinity averaged over all test GRD functions
4. worst case L_infinity over all test GRD functions
:param f: 2-D ndarray, shape(num_functions, num_representations)
:param f_hat: 2-D ndarray, shape(num_functions, num_representations)
"""
rmse = np.sqrt(np.nanmean((f - f_hat) ** 2, axis=1))
l_inf = np.nanmax(np.abs(f - f_hat), axis=1)
rmse_avg = np.nanmean(rmse)
rmse_wst = np.nanmax(rmse)
linf_avg = np.nanmean(l_inf)
linf_wst = np.nanmax(l_inf)
return rmse_avg, rmse_wst, linf_avg, linf_wst
def isvalidgrd(grd_vec):
grd_surface = np.reshape(grd_vec, (6, 91))
diff_x = np.diff(grd_surface)
diff_y = np.diff(grd_surface[:,-1])
return np.all(diff_x >= 0) and np.all(diff_y >= 0)
def data_clean(grd_data):
valid_mask = np.ones((np.shape(grd_data)[0],), dtype=bool)
for row, grd in enumerate(grd_data):
if not isvalidgrd(grd):
# print('The {:d}-th grd function is not valid!'.format(row))
valid_mask[row] = False
clean_data = grd_data[valid_mask,:]
return clean_data
# class to load the Waterloo GRD database
class GrdDatabase(object):
def __init__(self, path, measures):
"""Initialize a GRD function database
Arguments:
object {GrdDatabase} -- The GrdDatabase object to be constructed
path {str} -- A path to a GRD function database
measures {tuple of strs} -- A tuple consisting of names of quality measures
"""
self.database_path = path
# get info about the database
info_file = os.path.join(path, 'info.json')
with open(info_file, 'r') as fp:
info = json.load(fp)
self.name = info['name']
# get representations of the grd data base
self.attributes, self.reps = self._parse_reps(path)
self.num_reps = len(self.reps[self.attributes[0]])
self.gridded, self.grid_shape = self._get_grid_shape()
# get all grd functions for each measure
self.measures, self.grd_data, self.grd_files = self._get_grds(measures)
def _parse_reps(self, path):
"""
Media attributes are sorted by alphabetical order, while the bitrate remains the last.
"""
rep_file = os.path.join(path, 'reps.json')
with open(rep_file, 'r') as fp:
reps_dict = json.load(fp)
attrs = sorted(reps_dict.keys())
attrs.remove('bitrate')
attrs.append('bitrate')
num_reps = len(reps_dict[attrs[0]])
assert all([len(reps_dict[attr]) == num_reps for attr in attrs])
return attrs, reps_dict
def _get_grid_shape(self):
grid_shape = []
rep_num = 1
for attr in self.attributes:
attr_size = np.unique(np.asarray(self.reps[attr])).size
rep_num *= attr_size
grid_shape.append(attr_size)
if rep_num == self.num_reps:
gridded = True
grid_shape = tuple(grid_shape)
else:
gridded = False
grid_shape = tuple([-1] * len(grid_shape))
return gridded, grid_shape
def _get_grds(self, measures):
valid_measures = []
grd_data = {}
grd_files = {}
for measure in measures:
subset_path = os.path.join(self.database_path, measure)
if os.path.isdir(subset_path):
grd_files_subset = [os.path.join(subset_path, f) for f in os.listdir(subset_path) if f.endswith('.json')]
grd_data_subset = []
grd_files_sub = []
for file in grd_files_subset:
with open(file, 'r') as fp:
grd_f = np.asarray(json.load(fp), dtype=np.float)
if len(grd_f) == self.num_reps:
grd_data_subset.append(grd_f)
grd_files_sub.append(file)
if len(grd_data_subset) == 0:
continue
grd_data_subset = np.stack(grd_data_subset)
grd_data[measure] = grd_data_subset
grd_files[measure] = grd_files_sub
valid_measures.append(measure)
return valid_measures, grd_data, grd_files
def grd_functions(self, measures=None, zero_prepend=False):
if measures is None:
measures = self.measures
grd_data = [self.grd_data[m] for m in measures if m in self.measures]
grd_data = np.concatenate(grd_data)
if zero_prepend:
grd_data = [self._pad_left(grd) for grd in grd_data]
grd_data = np.stack(grd_data)
return grd_data
def _pad_left(self, grd, mode='zero'):
"""Pad before each rd curve
Arguments:
grd {1d np array} -- [One grd function]
Keyword Arguments:
mode {str} -- ['zero' or 'copy'. 'zero' to pad with zeros. 'copy' to pad with the first column.] (default: {'zero'})
Raises:
NotImplementedError: [For non-gridded grd function, this functionality has not been implemented]
Returns:
1d np array -- padded grd function
"""
if self.gridded:
gridded_grd = np.reshape(grd, self.grid_shape)
if mode == 'zero':
gridded_grd = np.insert(gridded_grd, 0, 0, axis=-1)
elif mode == 'copy':
gridded_grd = np.insert(gridded_grd, 0, gridded_grd[..., 0], axis=-1)
else:
print('{} is not a valid mode for _pad_left! No padding is performed.'.format(mode))
padded_grd = gridded_grd.flatten()
else:
raise NotImplementedError('The _pad_left functionality for non-gridded grd function has not been implemented.')
return padded_grd
def compact_reps(self, bitrate_first=False, zero_prepend=False):
comp_reps = [np.asarray(self.reps[attr]) for attr in self.attributes]
comp_reps = np.stack(comp_reps)
if zero_prepend:
padded_comp_reps = [self._pad_left(attrs, mode='copy') for attrs in comp_reps[:-1]]
padded_comp_reps.append(self._pad_left(comp_reps[-1]))
comp_reps = np.stack(padded_comp_reps)
if bitrate_first:
comp_reps = np.roll(comp_reps, 1, axis=0)
return comp_reps
def get_single_grd(self, measure, idx, gridded=False):
grd = self.grd_data[measure][idx]
if gridded:
grd = np.reshape(grd, self.grid_shape)
return grd
class GRD_Experiment(object):
'''Implementation of eigensurface model'''
def __init__(self, **parameters):
'''Initialize a new experiment.
Parameters:
data: array-like
contains the generalized rate-distortion functions as
a 2-dimensional data structure
first dimension: number of functions
second dimension: number of representations
rep: array-like
shape (num_variables, 2) where 1st column and 2nd column represent
bitrate and spatial resolution, respectively.
p_train: float
percentage of training samples
seed: integer
random seed for getting training samples
model: possible choice {'egrd', 'trigonometric', 'polynomial'}
type of grd function model
'''
from numpy.random import seed
self.__dict__.update(parameters)
assert 0 < self.p_train <= 1
assert self.rep.shape[0] == self.data.shape[1]
seed(self.seed)
self.num_rep = self.rep.shape[0]
self.num_resolution = np.unique(self.rep[:, 1]).size
self.num_bitrate = int(self.num_rep / self.num_resolution)
# segregate the test_database into training and testing sets
idx = np.arange(0, self.data.shape[0])
np.random.shuffle(idx)
train_idx = idx[0:round(self.data.shape[0] * self.p_train)]
test_idx = idx[round(self.data.shape[0] * self.p_train):-1]
self.train_data = self.data[train_idx, :]
self.test_data = self.data[test_idx, :]
self.indices = idx
self.train_idx = train_idx
self.test_idx = test_idx
if self.model.lower() == 'egrd':
self.f0, self.eig_values, self.H = self._pca(self.train_data)
elif self.model.lower() == 'trigonometric':
self.f0, self.eig_values, self.H = self._tri_basis(self.train_data)
elif self.model.lower() == 'polynomial':
self.f0, self.eig_values, self.H = self._poly_basis(self.train_data)
else:
raise ValueError('Invalid model type %s'.format(self.model))
def construct_diff_mat():
'''Construct discrete derivative matrices D in Eq.X'''
# Construct D1
D1_t1 = np.diag(np.ones(self.num_rep))
D1_t2 = np.ones(self.num_rep - 1) * - 1 # Left elements of diagonal are -1.
D1_t3 = np.diag(D1_t2, k=-1) # Diagonal matrix shiftedto left.
D1_t4 = D1_t1 + D1_t3
idx_rmv = np.array(np.arange(0, self.num_rep))[np.s_[::self.num_bitrate]]
D1 = np.delete(D1_t4, idx_rmv, 0)
# Construct D2
D2 = np.zeros((self.num_resolution - 1, self.num_rep))
for idx, row in enumerate(D2):
row[(idx + 1) * self.num_bitrate - 1] = -1
row[(idx + 2) * self.num_bitrate - 1] = 1
D = np.concatenate((D1, D2), axis=0)
return D
self.D = construct_diff_mat()
def __call__(self, M, S=None, dataset='test'):
"""Run the grd reconstruction algorithm on the test set, and
evaluate.
Arguments:
M {integer} -- the number of basis functions
Keyword Arguments:
S {1D numpy array} -- list of indices (of sampled representation) for surface reconstruction with sparse samples. (default: {None})
dataset {str} -- the test dataset, can be a 2D numpy array of shape (test_sample_num, test_sample_dim) (default: {'test'})
Returns:
2-entry tuple of 2D numpy arrays -- (ground-truth of test samples, estimated test samples)
"""
if dataset == 'test':
self.f = self.test_data
indices = self.test_idx
elif dataset == 'train':
self.f = self.train_data
indices = self.train_idx
elif dataset == 'all':
self.f = self.data
indices = self.indices
else:
self.f = dataset
if S is None:
S = np.arange(self.num_rep)
if M != 0:
H = self.H[:, :M]
H_tilde = self.H[S, :M]
self.f_hat = np.zeros((self.f.shape[0], self.num_rep))
self.c_opt = np.zeros((self.f.shape[0], M))
for idx, f in enumerate(self.f):
f0f_tilde = self.f0[S] - f[S]
c_opt = self._solve_c(f0f_tilde, H_tilde, H)
self.c_opt[idx, :] = c_opt
if np.any(np.isnan(self.c_opt[idx, :])):
print('Video {:d} with {:d} basis not well solved!'.format(indices[idx], M))
self.f_hat[idx, :] = np.nan
else:
self.f_hat[idx, :] = np.dot(H[:, :M], c_opt) + self.f0
else:
self.f_hat = np.tile(self.f0, (self.f.shape[0], 1))
return self.f, self.f_hat
def uncertainty_sampling(self):
'''Initialize uncertainty sampling indices'''
eps = 1
sigma = np.cov(self.train_data.T)
idx = [a for a in range(self.num_rep)]
evaluated_idx = []
for i in range(self.num_rep):
var = np.zeros(sigma.shape[0])
for j in range(sigma.shape[0]):
var[j] = np.sum(sigma[j, :] ** 2 / (sigma[j, j] + eps))
cur_idx = int(np.argmax(var))
rest_idx = [a for a in range(len(var))]
rest_idx.remove(cur_idx)
real_idx = idx[cur_idx]
evaluated_idx.append(real_idx)
idx.remove(real_idx)
sigma11_x, sigma11_y = np.ix_(rest_idx, rest_idx)
sigma11 = sigma[sigma11_x, sigma11_y]
sigma22_x, sigma22_y = np.ix_([cur_idx], [cur_idx])
sigma22 = sigma[sigma22_x, sigma22_y]
sigma12_x, sigma12_y = np.ix_(rest_idx, [cur_idx])
sigma12 = sigma[sigma12_x, sigma12_y]
sigma21 = sigma12.T
sigma_new = sigma11 - np.matmul(np.matmul(sigma12, np.linalg.inv(sigma22+eps)), sigma21)
sigma = sigma_new
return evaluated_idx
def _pca(self, data):
'''Perform pca'''
f0 = data.mean(axis=0)
data = data - f0
C = np.cov(data.T)
eig_values, H = np.linalg.eigh(C)
idx = eig_values.argsort()[::-1]
eig_values = eig_values[idx]
H = H[:, idx]
return f0, eig_values, H
def _poly_basis(self, data):
'''Construct polynomial basis'''
f0 = data.mean(axis=0)
data = data - f0
eig_values = np.zeros(self.num_rep)
x = np.linspace(0, 1, num=self.num_bitrate, endpoint=False)
y = np.unique(self.rep[:, 1])
y = y / y.max()
# polynomial of x
poly_x = np.zeros((self.num_bitrate, self.num_bitrate))
for n in range(self.num_bitrate):
poly_x[:, n] = x ** (n+2) - x
# orthogonalization
phi_X, _ = np.linalg.qr(poly_x)
# polynomial of y
poly_y = np.zeros((self.num_resolution, self.num_resolution))
for m in range(self.num_resolution):
poly_y[:, m] = y ** (m+1)
# orthogonalization
phi_Y, _ = np.linalg.qr(poly_y)
H = np.zeros((self.num_rep, self.num_rep))
count = 0
for m in range(self.num_resolution):
phi_y = phi_Y[:, m]
for n in range(self.num_bitrate):
phi_x = phi_X[:, n]
H[:, count] = np.outer(phi_x, phi_y).flatten(order='F')
H[:, count] = H[:, count] / np.linalg.norm(H[:, count])
coef = np.dot(data, H[:, count])
eig_values[count] = np.var(coef)
count += 1
idx = eig_values.argsort()[::-1]
eig_values = eig_values[idx]
H = H[:, idx]
return f0, eig_values, H
def _tri_basis(self, data):
'''Construct trigonometric basis'''
f0 = data.mean(axis=0)
data = data - f0
eig_values = np.zeros(self.num_rep)
x = np.linspace(0, 1, num=self.num_bitrate, endpoint=False)
y = np.unique(self.rep[:, 1])
y = y / y.max()
H = np.zeros((self.num_rep, self.num_rep))
count = 0
for m in range(self.num_resolution):
phi_y = np.sin((m * np.pi + np.pi / 2) * y)
for n in range(1, self.num_bitrate+1):
phi_x = np.sin(n * np.pi * x)
H[:, count] = np.outer(phi_x, phi_y).flatten(order='F')
H[:, count] = H[:, count] / np.linalg.norm(H[:, count])
coef = np.dot(data, H[:, count])
eig_values[count] = np.var(coef)
count += 1
idx = eig_values.argsort()[::-1]
eig_values = eig_values[idx]
H = H[:, idx]
return f0, eig_values, H
def _osqp_solve_qp(self, P, q, G=None, h=None, A=None, b=None, initvals=None):
"""
Solve a Quadratic Program defined as:
minimize
(1/2) * x.T * P * x + q.T * x
subject to
G * x <= h
A * x == b
using OSQP <https://github.com/oxfordcontrol/osqp>.
Parameters
----------
P : scipy.sparse.csc_matrix
Symmetric quadratic-cost matrix.
q : numpy.array
Quadratic cost vector.
G : scipy.sparse.csc_matrix
Linear inequality constraint matrix.
h : numpy.array
Linear inequality constraint vector.
A : scipy.sparse.csc_matrix, optional
Linear equality constraint matrix.
b : numpy.array, optional
Linear equality constraint vector.
initvals : numpy.array, optional
Warm-start guess vector.
Returns
-------
x : array, shape=(n,)
Solution to the QP, if found, otherwise ``None``.
Note
----
OSQP requires `P` to be symmetric, and won't check for errors otherwise.
Check out for this point if you e.g. `get nan values
<https://github.com/oxfordcontrol/osqp/issues/10>`_ in your solutions.
"""
from osqp import OSQP
l = -np.inf * np.ones(len(h))
if A is not None:
qp_A = np.vstack([G, A]).tocsc()
qp_l = np.hstack([l, b])
qp_u = np.hstack([h, b])
else: # no equality constraint
qp_A = G
qp_l = l
qp_u = h
osqp = OSQP()
osqp.setup(P=P, q=q, A=qp_A, l=qp_l, u=qp_u, eps_abs=1e-2, max_iter=1000000, verbose=False)
if initvals is not None:
osqp.warm_start(x=initvals)
res = osqp.solve()
if res.info.status_val != osqp.constant('OSQP_SOLVED'):
print("OSQP exited with status '%s'" % res.info.status)
return res.x
def _solve_c(self, f0f_tilde, H_tilde, H):
''' Construct input for quadratic programming and solve for c'''
from scipy.sparse import csc_matrix
P = 2 * np.dot(H_tilde.T, H_tilde)
q = np.dot(2 * f0f_tilde, H_tilde)
G = np.dot(-self.D, H)
h = np.dot(self.D, self.f0)
P = csc_matrix(P)
G = csc_matrix(G)
c_opt = self._osqp_solve_qp(P, q, G=G, h=h)
return c_opt
def cum_energy(self, threshold):
'''Return explained variance by the first M principle components'''
exp_var = np.cumsum(self.eig_values) / np.sum(self.eig_values)
M = np.argmax(exp_var > threshold)
return exp_var[:M]
def f_avg(self):
'''Return the mean function'''
return self.f0
def H_tilde(self, M):
'''Return the first M principle components'''
return self.H[:, :M]
def f_reconstruct(self, idx):
'''Return the idx-th reconstructed surface'''
return self.f_hat[idx, :]
def f_groundtruth(self, idx):
'''Return the idx-th testing sample'''
return self.f[idx, :]
def c(self):
'''Return the coefficients for each eigen-surface'''
return self.c_opt
This section instantiates a number of experiments. First, we load the actual GRD functions.
Here, we use the actual GRD functions from the Waterloo GRD database (download it from [here][WGRD]), and assume that the database is stored in the path './WaterlooGRD' in the current working directory, respectively. You might need to adjust this to your needs.
database_path = 'WaterlooGRD'
import os
h264_path = os.path.join(database_path, 'H264')
vp9_path = os.path.join(database_path, 'VP9')
measures = ('SSIMplus_core',)
wgrd2_h264 = GrdDatabase(h264_path, measures)
wgrd2_vp9 = GrdDatabase(vp9_path, measures)
# collect data
grd_data = np.concatenate((wgrd2_h264.grd_functions(measures=measures, zero_prepend=True),
wgrd2_vp9.grd_functions(measures=measures, zero_prepend=True)))
rep_data = wgrd2_h264.compact_reps(bitrate_first=True, zero_prepend=True).T
grd_data = data_clean(grd_data)
database_name = 'H264_VP9'
We start with training the eGRD model.
egrd_waterloo = GRD_Experiment(
data = grd_data, # a set of GRD functions
rep = rep_data, # representations
p_train = 0.80, # percent of training samples
seed = 1234, # seed for the random number generator
model = 'egrd' # type of grd function model
)
In fact, the first eight principal components explain more than 99.5% of the variance in real-world GRD functions as shown in the figure below. This suggests that even a 8-parameter model should work reasonably well for most GRD functions found in practice.
cum_energy = egrd_waterloo.cum_energy(0.995)
M = cum_energy.size
# plot percent of energy explained by the first M principle components
plt.plot(np.arange(1, M + 1), cum_energy * 100)
ax = plt.gca()
for tick in ax.xaxis.get_major_ticks():
tick.label.set_fontsize(14)
for tick in ax.yaxis.get_major_ticks():
tick.label.set_fontsize(14)
plt.xlabel('Principal Components', fontsize=18)
plt.ylabel('Percent of Energy', fontsize=18)
plt.ylim(80, 100)
# plt.savefig(r'E:\EGRD\TIP\fig_materials\fig4.pdf', bbox_inches='tight')
plt.show()
In order to gain an impression about the shapes of the eigen GRD functions, we visualize the mean GRD surface and the first seven eigen surfaces. This corresponds to Figure 5 in the paper.
f0 = egrd_waterloo.f_avg()
H_tilde = egrd_waterloo.H_tilde(7)
CP = np.concatenate((f0.reshape(1, -1), H_tilde.T), axis=0)
# plot mean function and eigen-surfaces
plot_grd_surfaces(CP, rep_data, M+1)
Next, we evaluate the performance of the eGRD model with respect to the number of princple components. We also compare the performance of the eGRD model with two baseline models: trigonometric (tGRD) and polynomial (pGRD). Both of the models are able to fit increasingly complicated functions at the cost of using more basis functions. As we can see from the tables, the proposed eGRD model significantly outperforms the baseline models, even with less basis functions. Note that these tables correspond to Table I, II and III in the paper except that these results are obtained from only one training-test split.
result = np.zeros((M+1, 4))
for i in range(M+1):
f, f_hat = egrd_waterloo(i, dataset="train")
result[i, :] = compute_loss(f, f_hat)
result_show = np.concatenate((np.arange(0, M+1).reshape(M+1, 1), result), axis=1)
print("Performance of eGRD on the training set with increasing number of basis vectors")
display(HTML(
'<table><tr><th># of basis</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
result = np.zeros((10, 4))
num_basis = np.asarray([0, 1, 2, 3, 4, 6, 8, 10, 15, 20], dtype=np.int)
for row_id, i in enumerate(num_basis):
f, f_hat = egrd_waterloo(i, dataset="train")
result[row_id, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_basis.reshape(10, 1), result), axis=1)
print("Performance of eGRD on the testing set with increasing number of basis vectors")
display(HTML(
'<table><tr><th># of basis</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
# remove the egrd model
del egrd_waterloo
# create a trigonometric model and test its performance
tri_waterloo = GRD_Experiment(
data = grd_data, # a set of GRD functions
rep = rep_data, # representations
p_train = 0.80, # percent of training samples
seed = 1234, # seed for the random number generator
model = 'trigonometric' # type of grd function model
)
result = np.zeros((10, 4))
num_basis = np.asarray([0, 1, 2, 3, 4, 6, 8, 10, 15, 20], dtype=np.int)
for row_id, i in enumerate(num_basis):
f, f_hat = tri_waterloo(i, dataset="train")
result[row_id, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_basis.reshape(10, 1), result), axis=1)
print("Performance of tGRD on the testing set with increasing number of basis vectors")
display(HTML(
'<table><tr><th># of basis</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
# remove the trigonometric model
del tri_waterloo
# create a polynomial model and test its performance
poly_waterloo = GRD_Experiment(
data = grd_data, # a set of GRD functions
rep = rep_data, # representations
p_train = 0.90, # percent of training samples
seed = 1234, # seed for the random number generator
model = 'polynomial' # type of grd function model
)
result = np.zeros((10, 4))
num_basis = np.asarray([0, 1, 2, 3, 4, 6, 8, 10, 15, 20], dtype=np.int)
for row_id, i in enumerate(num_basis):
f, f_hat = poly_waterloo(i, dataset="train")
result[row_id, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_basis.reshape(10, 1), result), axis=1)
print("Performance of pGRD on the testing set with increasing number of basis vectors")
display(HTML(
'<table><tr><th># of basis</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
del poly_waterloo
Probing the quality of a single sample in the GRD space involves sophisticated video encoding and quality assessment, both of which are expensive processes. It is desirable to accurately reconstruct the rate-distortion surface with minimal number of samples. Here, we compare the proposed eGRD model with its two baseline variants, tGRD and pGRD, 1D reciprocal model, and 1D logarithmic model, whose implementations are listed below. For eGRD, tGRD, and pGRD models, we use 8 basis functions.
from scipy.optimize import curve_fit
from scipy.interpolate.interpnd import LinearNDInterpolator
import warnings
from scipy.optimize import OptimizeWarning
warnings.simplefilter("ignore", OptimizeWarning)
class Baseline_Experiment(object):
'''Implementation of eigensurface model'''
def __init__(self, **parameters):
'''Initialize a new experiment.
Parameters:
data: array-like
contains the generalized rate-distortion functions as
a 2-dimensional data structure
first dimension: number of functions
second dimension: number of representations
rep: array-like
shape (num_variables, 2) where 1st column and 2nd column represent
bitrate and spatial resolution, respectively.
p_train: float
percentage of training samples
seed: integer
random seed for getting training samples
'''
from numpy.random import seed
self.__dict__.update(parameters)
assert 0 < self.p_train < 1
assert self.rep.shape[0] == self.data.shape[1]
seed(self.seed)
self.num_rep = self.rep.shape[0]
self.num_resolution = np.unique(self.rep[:, 1]).size
self.num_bitrate = int(self.num_rep / self.num_resolution)
# segregate the test_database into training and testing sets
idx = np.arange(0, self.data.shape[0])
np.random.shuffle(idx)
train_idx = idx[0:round(self.data.shape[0] * self.p_train)]
test_idx = idx[round(self.data.shape[0] * self.p_train):-1]
self.train_data = self.data[train_idx, :]
self.test_data = self.data[test_idx, :]
def __call__(self, model, S=None, dataset='test'):
'''Run the grd reconstruction algorithm on the test set, and
evaluate.
Parameters:
model: possible choice {'2d_linear', '1d_logarithmic', '1d_reciprocal'}
S: list of indices (of sampled representation) for surface
reconstruction with sparse samples.
'''
if dataset == 'test':
self.f = self.test_data
elif dataset == 'train':
self.f = self.train_data
elif dataset == 'all':
self.f = self.data
else:
self.f = dataset
if model == '2d_linear':
self.model = self.LinearInterpModel
elif model == '1d_logarithmic':
self.model = self.LogarithmicModel
elif model == '1d_reciprocal':
self.model = self.ReciprocalModel
else:
raise ValueError('Invalid model type %s'.format(self.model))
if S is None:
S = np.arange(self.num_rep)
self.f_hat = np.zeros((self.f.shape[0], self.num_rep))
for idx, f in enumerate(self.f):
func_model = self.model(self.rep[S, :], f[S], self.rep)
self.f_hat[idx, :] = func_model(self.rep)
return self.f, self.f_hat
def uncertainty_sampling(self):
'''Initialize uncertainty sampling indices for 1D models
The sample index is selected in a rotational manner across each resolution.
Two ending points are first selected. Revised by Wentao Apr. 24, 2019
'''
eps = 1
sigma_all = np.zeros((self.num_resolution, self.num_bitrate - 2, self.num_bitrate - 2))
for j in range(self.num_resolution):
sigma_all[j, :, :] = np.cov(self.train_data[:, (j * self.num_bitrate + 1):(j + 1) * self.num_bitrate - 1].T)
evaluated_idx = np.zeros((self.num_resolution, self.num_bitrate), dtype=np.int)
for i in range(self.num_resolution):
evaluated_idx[i, 0] = self.num_bitrate * i
evaluated_idx[i, 1] = self.num_bitrate * (i + 1) - 1
idx = [a for a in range(self.num_bitrate * i + 1, self.num_bitrate * (i + 1) - 1)]
sigma = sigma_all[i, :, :]
for j in range(2, self.num_bitrate):
var = np.zeros(sigma.shape[0])
for k in range(sigma.shape[0]):
var[k] = np.sum(sigma[k, :] ** 2 / (sigma[k, k] + eps))
cur_idx = int(np.argmax(var))
rest_idx = [a for a in range(len(var))]
rest_idx.remove(cur_idx)
real_idx = idx[cur_idx]
evaluated_idx[i, j] = real_idx
idx.remove(real_idx)
sigma11_x, sigma11_y = np.ix_(rest_idx, rest_idx)
sigma11 = sigma[sigma11_x, sigma11_y]
sigma22_x, sigma22_y = np.ix_([cur_idx], [cur_idx])
sigma22 = sigma[sigma22_x, sigma22_y]
sigma12_x, sigma12_y = np.ix_(rest_idx, [cur_idx])
sigma12 = sigma[sigma12_x, sigma12_y]
sigma21 = sigma12.T
sigma_new = sigma11 - np.matmul(np.matmul(sigma12, np.linalg.inv(sigma22 + eps)), sigma21)
sigma = sigma_new
evaluated_idx = evaluated_idx.flatten(order='F')
return evaluated_idx
class LogarithmicModel(object):
'''1D-Logarithmic GRD model'''
def __init__(self, x, y, rep):
assert np.unique(x[:, 1]).size == np.unique(rep[:, 1]).size
resolutions = np.unique(x[:, 1])
self.num_rep = rep.shape[0]
self.num_resolution = resolutions.size
self.num_bitrate = int(self.num_rep / self.num_resolution)
self.a = np.zeros(self.num_resolution)
self.b = np.zeros(self.num_resolution)
self.c = np.zeros(self.num_resolution)
for s in range(self.num_resolution):
idx = (x[:, 1] == resolutions[s])
popt, _ = curve_fit(self.func, x[idx, 0], y[idx])
self.a[s] = popt[0]
self.b[s] = popt[1]
self.c[s] = popt[2]
def __call__(self, x):
y_hat = np.zeros((self.num_resolution, self.num_bitrate))
for s in range(self.num_resolution):
y_hat[s, :] = self.func(x[(s * self.num_bitrate):(s + 1) * self.num_bitrate, 0], self.a[s], self.b[s],
self.c[s])
y_hat[:, 0] = 0
y_hat = y_hat.flatten()
return y_hat
def func(self, x, a, b, c):
return a * np.log(x+1) + b * x + c
class ReciprocalModel(object):
'''1D-Reciprocal GRD model'''
def __init__(self, x, y, rep):
assert np.unique(x[:, 1]).size == np.unique(rep[:, 1]).size
resolutions = np.unique(x[:, 1])
self.num_rep = rep.shape[0]
self.num_resolution = resolutions.size
self.num_bitrate = int(self.num_rep / self.num_resolution)
self.a = np.zeros(self.num_resolution)
self.b = np.zeros(self.num_resolution)
self.c = np.zeros(self.num_resolution)
self.d = np.zeros(self.num_resolution)
for s in range(self.num_resolution):
idx = (x[:, 1] == resolutions[s])
popt, _ = curve_fit(self.func, x[idx, 0], y[idx], maxfev=500000,
bounds=([-np.inf, 1., -np.inf, -np.inf],
[np.inf, 5., np.inf, np.inf]),
method='trf')
self.a[s] = popt[0]
self.b[s] = popt[1]
self.c[s] = popt[2]
self.d[s] = popt[3]
def __call__(self, x):
y_hat = np.zeros((self.num_resolution, self.num_bitrate))
for s in range(self.num_resolution):
y_hat[s, :] = self.func(x[(s * self.num_bitrate):(s + 1) * self.num_bitrate, 0], self.a[s], self.b[s],
self.c[s], self.d[s])
y_hat[:, 0] = 0
y_hat = y_hat.flatten()
return y_hat
def func(self, x, a, b, c, d):
return c - a / (np.abs(x + d)**b + 0.01)
class LinearInterpModel(object):
'''2D Linear Interpolant GRD model'''
def __init__(self, x, y, rep):
self.interp_estimator = LinearNDInterpolator(x, y)
def __call__(self, x):
y_hat = self.interp_estimator(x)
return y_hat
M = 8
num_sample = np.asarray([8,10,20,30,40,50], dtype=np.int)
# eGRD exp
egrd_waterloo = GRD_Experiment(
data = grd_data, # a set of GRD functions
rep = rep_data, # representations
p_train = 0.80, # percent of training samples
seed = 1234, # seed for the random number generator
model = 'egrd' # type of grd function model
)
twoD_order = egrd_waterloo.uncertainty_sampling()
result = np.zeros((num_sample.size, 4))
for i, n in enumerate(num_sample):
f, f_hat = egrd_waterloo(M, S=twoD_order[:n], dataset="test")
result[i, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_sample.reshape(num_sample.size, 1), result), axis=1)
print("Performance of eGRD with sparse samples")
display(HTML(
'<table><tr><th># of sample</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
# tGRD exp
tri_waterloo = GRD_Experiment(
data = grd_data, # a set of GRD functions
rep = rep_data, # representations
p_train = 0.80, # percent of training samples
seed = 1234, # seed for the random number generator
model = 'trigonometric' # type of grd function model
)
result = np.zeros((num_sample.size, 4))
for i, n in enumerate(num_sample):
f, f_hat = tri_waterloo(M, S=twoD_order[:n], dataset="test")
result[i, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_sample.reshape(num_sample.size, 1), result), axis=1)
print("Performance of tGRD with sparse samples")
display(HTML(
'<table><tr><th># of sample</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
# pGRD exp
poly_waterloo = GRD_Experiment(
data = grd_data, # a set of GRD functions
rep = rep_data, # representations
p_train = 0.90, # percent of training samples
seed = 1234, # seed for the random number generator
model = 'polynomial' # type of grd function model
)
result = np.zeros((num_sample.size, 4))
for i, n in enumerate(num_sample):
f, f_hat = poly_waterloo(M, S=twoD_order[:n], dataset="test")
result[i, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_sample.reshape(num_sample.size, 1), result), axis=1)
print("Performance of pGRD with sparse samples")
display(HTML(
'<table><tr><th># of sample</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
# 1D reciprocal and logarithmic models
base_waterloo = Baseline_Experiment(
data=grd_data, # a set of GRD functions
rep=rep_data, # representations, maxfev=50000000
p_train=0.80, # percent of training samples
seed=1234 # seed for the random number generator
)
oneD_order = base_waterloo.uncertainty_sampling()
result = np.empty((num_sample.size, 4))
result.fill(np.nan)
for i in range(3, num_sample.size):
n = num_sample[i]
f, f_hat = base_waterloo(model='1d_reciprocal', S=oneD_order[0:n], dataset='test')
result[i, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_sample.reshape(num_sample.size, 1), result), axis=1)
print("Performance of reciprocal model with sparse samples")
display(HTML(
'<table><tr><th># of sample</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
result = np.empty((num_sample.size, 4))
result.fill(np.nan)
for i in range(2, num_sample.size):
n = num_sample[i]
f, f_hat = base_waterloo(model='1d_logarithmic', S=oneD_order[0:n], dataset='test')
result[i, :] = compute_loss(f, f_hat)
result_show = np.concatenate((num_sample.reshape(num_sample.size, 1), result), axis=1)
print("Performance of logarithmic model with sparse samples")
display(HTML(
'<table><tr><th># of sample</th><th>RMSE Average</th><th>RMSE Worst</th><th>l<sub>∞</sub> Average</th><th>l<sub>∞</sub> Worst</th></tr><tr>{}</tr></table>'.format(
'</tr><tr>'.join(
'<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in result_show)
))
)
These tables correspond to Table IV and V in the paper. Clearly, the proposed EGRD model converges with only a very small number of samples.