from pipelinex.utils import DictToDict, dict_io, list_of_dict_to_dict_of_list
import cv2
import numpy as np
[docs]class NpDictToDict(DictToDict):
module = np
[docs]class CvDictToDict(DictToDict):
module = cv2
[docs]class NpZerosLike(NpDictToDict):
fn = "zeros_like"
[docs]class NpOnesLike(NpDictToDict):
fn = "ones_like"
[docs]class NpFullLike(NpDictToDict):
fn = "full_like"
[docs]class NpConcat(NpDictToDict):
fn = "concatenate"
[docs]class NpStack(NpDictToDict):
fn = "stack"
[docs]class NpSum(NpDictToDict):
fn = "sum"
[docs]class NpMean(NpDictToDict):
fn = "mean"
[docs]class NpSquare(NpDictToDict):
fn = "square"
[docs]class NpSqrt(NpDictToDict):
fn = "sqrt"
[docs]class NpAbs(NpDictToDict):
fn = "abs"
def _fit_to_1(a):
assert isinstance(a, np.ndarray)
min = a.min()
max = a.max()
return (a - min) / (max - min)
[docs]@dict_io
def fit_to_1(a):
return _fit_to_1(a)
def _fit_to_uint8(a):
return (_fit_to_1(a) * 255).astype(np.uint8)
[docs]@dict_io
def fit_to_uint8(a):
return _fit_to_uint8(a)
def _expand_repeat(a, repeats=1, axis=None):
return np.repeat(np.expand_dims(a, axis=axis), repeats=repeats, axis=axis)
[docs]@dict_io
def expand_repeat(a, repeats=1, axis=None):
return _expand_repeat(a, repeats=repeats, axis=axis)
def _sum_up(*imgs):
imgs = [
(_expand_repeat(img, repeats=3, axis=2) if img.ndim == 2 else img)
for img in imgs
]
imgs = [_fit_to_1(img) for img in imgs]
out_img = np.sum(np.stack(imgs), axis=0)
if out_img.shape[2] == 1:
out_img = np.squeeze(out_img, axis=2)
return out_img
[docs]@dict_io
def sum_up(*imgs):
return _sum_up(*imgs)
def _mix_up(*imgs):
mean_img = _sum_up(*imgs) / len(imgs)
return (mean_img * 255).astype(np.uint8)
[docs]@dict_io
def mix_up(*imgs):
return _mix_up(*imgs)
def _overlay(*imgs):
clipped_img = np.clip(_sum_up(*imgs), 0, 1)
return (clipped_img * 255).astype(np.uint8)
[docs]@dict_io
def overlay(*imgs):
return _overlay(*imgs)
[docs]class CvModuleListMerge:
[docs] def __init__(self, *modules):
for m in modules:
assert callable(m)
self.modules = modules
def __call__(self, d):
assert isinstance(d, dict)
list_d = [m(d) for m in self.modules]
d_list = list_of_dict_to_dict_of_list(list_d)
return d_list
[docs]class CvModuleConcat(CvModuleListMerge):
def __call__(self, d):
d_list = super().__call__(d)
d_out = NpConcat(axis=0)(d_list)
return d_out
[docs]class CvModuleStack(CvModuleListMerge):
def __call__(self, d):
d_list = super().__call__(d)
np_stack = NpStack(axis=0)
d_stacked = np_stack(d_list)
return d_stacked
[docs]class CvModuleSum(CvModuleStack):
def __call__(self, d):
d_stacked = super().__call__(d)
d_out = NpSum(axis=0)(d_stacked)
return d_out
[docs]class CvModuleMean(CvModuleStack):
def __call__(self, d):
d_stacked = super().__call__(d)
d_out = NpMean(axis=0)(d_stacked)
return d_out
[docs]class CvModuleL1(CvModuleStack):
def __call__(self, d):
d_stacked = super().__call__(d)
d_out = NpSum(axis=0)(NpAbs()(d_stacked))
return d_out
[docs]class CvModuleL2(CvModuleStack):
def __call__(self, d):
d_stacked = super().__call__(d)
d_out = NpSqrt()(NpSum(axis=0)(NpSquare()(d_stacked)))
return d_out
[docs]class CvScale:
[docs] def __init__(self, width, height):
self.width = width
self.height = height
def __call__(self, img):
width = self.width
height = self.height
if isinstance(width, float):
width = int(img.shape[1] * width)
if isinstance(height, float):
height = int(img.shape[0] * height)
return cv2.resize(img, (width, height))
[docs]class CvResize(CvDictToDict):
fn = "resize"
[docs]class CvCvtColor(CvDictToDict):
fn = "cvtColor"
[docs]class CvDilate(CvDictToDict):
fn = "dilate"
[docs]class CvErode(CvDictToDict):
fn = "erode"
[docs]class CvFilter2d(CvDictToDict):
fn = "filter2D"
[docs]class CvSobel(CvDictToDict):
fn = "Sobel"
[docs] def __init__(self, ddepth="CV_64F", **kwargs):
if isinstance(ddepth, str):
kwargs["ddepth"] = getattr(cv2, ddepth)
else:
kwargs["ddepth"] = ddepth
super().__init__(**kwargs)
[docs]class CvBlur(CvDictToDict):
fn = "blur"
[docs]class CvBoxFilter(CvDictToDict):
fn = "boxFilter"
[docs]class CvGaussianBlur(CvDictToDict):
fn = "GaussianBlur"
[docs]class CvBilateralFilter(CvDictToDict):
fn = "bilateralFilter"
[docs]class CvCanny(CvDictToDict):
fn = "Canny"
[docs]class CvHoughLinesP(CvDictToDict):
fn = "HoughLinesP"
[docs]class CvLine(CvDictToDict):
fn = "line"
[docs]class CvEqualizeHist(CvDictToDict):
fn = "equalizeHist"
[docs]class CvThreshold(CvDictToDict):
fn = "threshold"
[docs] def __init__(self, type="THRESH_BINARY", **kwargs):
if isinstance(type, str):
kwargs["type"] = getattr(cv2, type)
else:
kwargs["type"] = type
super().__init__(**kwargs)
def __call__(self, *args, **kwargs):
out = super().__call__(*args, **kwargs)
return out
[docs]class CvBGR2Gray(CvDictToDict):
fn = "cvtColor"
[docs] def __init__(self, *args, **kwargs):
kwargs["code"] = cv2.COLOR_BGR2GRAY
super().__init__(*args, **kwargs)
[docs]class CvBGR2HSV(CvDictToDict):
fn = "cvtColor"
[docs] def __init__(self, *args, **kwargs):
kwargs["code"] = cv2.COLOR_BGR2HSV
super().__init__(*args, **kwargs)
diagonal_edge_kernel_dict = {
1: [
np.array([[1, 0, 0], [0, 0, 0], [0, 0, -1]]),
np.array([[0, 0, 1], [0, 0, 0], [-1, 0, 0]]),
],
2: [
np.array([[2, 1, 0], [1, 0, -1], [0, -1, -2]]),
np.array([[0, 1, 2], [-1, 0, 1], [-2, -1, 0]]),
],
}
[docs]class CvDiagonalEdgeFilter2d(CvModuleL2):
[docs] def __init__(self, kernel_type=2, **kwargs):
kwargs.setdefault("ddepth", -1)
self.modules = [
CvFilter2d(kernel=k, **kwargs)
for k in diagonal_edge_kernel_dict[kernel_type]
]