Merge branch 'gamemodel_imagepipeline' into gamemodel_basics

This commit is contained in:
John McCardle 2021-12-28 19:50:01 -05:00
commit 0c4bb4ee6b
3 changed files with 270 additions and 136 deletions

View File

@ -5,19 +5,20 @@ from functools import wraps
from utility import * from utility import *
import pointcluster import pointcluster
from imagepipeline import CVImage, ImagePipeline
class GameModel: class GameModel:
"""Platform-independent representation of the game's state.""" """Platform-independent representation of the game's state."""
def __init__(self, io:gameio.AbstractGameIO): def __init__(self, io:gameio.AbstractGameIO):
self.gameio = io self.gameio = io
self.asteroids = [ self.asteroids = [
("big", cv2.imread("images/game_assets/rock-big.png", 0)), CVImage("big", filename = "images/game_assets/rock-big.png"),
("normal", cv2.imread("images/game_assets/rock-normal.png", 0)), CVImage("normal", filename = "images/game_assets/rock-normal.png"),
("small", cv2.imread("images/game_assets/rock-small.png", 0)) CVImage("small", filename = "images/game_assets/rock-small.png")
] ]
self.ships = [ self.ships = [
("ship_off", cv2.imread("images/game_assets/spaceship-off.png", 0)), CVImage("ship_off", filename = "images/game_assets/spaceship-off.png"),
("ship_on", cv2.imread("images/game_assets/spaceship-on.png", 0)) CVImage("ship_on", filename = "images/game_assets/spaceship-on.png")
] ]
#self.missile = ("missile", cv2.imread("images/game_assets/missile.png", 0)) #self.missile = ("missile", cv2.imread("images/game_assets/missile.png", 0))
self.frame = None self.frame = None
@ -35,170 +36,81 @@ class GameModel:
@wraps(fn) @wraps(fn)
def inner(self, *args, **kwargs): def inner(self, *args, **kwargs):
if self.frame is None: if self.frame is None:
#print("Fetching frame.")
sshot = self.gameio.fetch_sshot() sshot = self.gameio.fetch_sshot()
open_cv_image = np.array(sshot) open_cv_image = np.array(sshot)
# Convert RGB to BGR # Convert RGB to BGR
self.frame = open_cv_image[:, :, ::-1].copy() array = open_cv_image[:, :, ::-1].copy()
self.color_frame = np.copy(self.frame) self.color_frame = CVImage("gameio frame", np.copy(array))
self.frame = cv2.cvtColor(self.frame, cv2.COLOR_BGR2GRAY) self.frame = CVImage("BW frame", self.color_frame.copy())
self.frame.image = self.frame.convert_color(False)
self.mask_frame() self.mask_frame()
print(self.frame)
return fn(self, *args, **kwargs) return fn(self, *args, **kwargs)
return inner return inner
## def with_masking(fn):
## """Decorator to cut lives and score into smaller subimages, and mask them out of self.frame."""
## @wraps(fn)
## def inner(self, *args, **kwargs):
## if self.score_img is None:
##
## return fn(self, *args, **kwargs)
## return inner
def mask_frame(self): def mask_frame(self):
self.lives_img = self.frame[self.lives_rect[0][0]:self.lives_rect[0][1], self.lives_img = CVImage("lives", self.frame.snip(self.lives_rect))
self.lives_rect[1][0]:self.lives_rect[1][1]] self.frame.image = self.frame.mask(self.lives_rect)
lives_mask = np.full(self.frame.shape, 255, dtype=np.uint8) self.score_img = CVImage("score", self.frame.snip(self.score_rect))
self.frame.image = self.frame.mask(self.score_rect)
cv2.rectangle(lives_mask,
*self.lives_rect,
color=0, thickness=cv2.FILLED)
self.score_img = self.frame[self.score_rect[0][0]:self.score_rect[0][1],
self.score_rect[1][0]:self.score_rect[1][1]]
score_mask = np.full(self.frame.shape, 255, dtype=np.uint8)
cv2.rectangle(score_mask,
*self.score_rect,
color = 0, thickness=cv2.FILLED)
self.frame = cv2.bitwise_and(self.frame, lives_mask)
self.frame = cv2.bitwise_and(self.frame, score_mask)
## print("Displaying images for testing purposes")
## cv2.imshow("Original", self.color_frame)
## cv2.waitKey(0)
## cv2.imshow("Masked", self.frame)
## cv2.waitKey(0)
def clear_frame(self): def clear_frame(self):
self.prev_frame = frame self.prev_frame = frame
self.frame = None self.frame = None
@with_frame @with_frame
def find_asteroids(self): def find_asteroids(self):
asteroid_rects = [] results = []
for label, a in self.asteroids: for a in self.asteroids:
h, w = a.shape r = self.frame.template_detect(a,
res = cv2.matchTemplate(self.frame, a, cv2.TM_CCOEFF_NORMED) self.cv_template_thresh,
loc = np.where( res >= self.cv_template_thresh) self.duplicate_dist_thresh)
for pt in zip(*loc[::-1]): results.extend(r)
if not asteroid_rects or squared_distance(asteroid_rects[-1][0], pt) > self.duplicate_dist_thresh: return results
asteroid_rects.append((pt, (pt[0] + w, pt[1] + h), label))
return asteroid_rects
@with_frame @with_frame
def display_results(self, rects = [], pointsets = [], circles = []): def display_results(self, rects = [], pointsets = [], circles = []):
"""Draws results on the current frame for test purposes.""" """Draws results on the current frame for test purposes."""
displayable = np.copy(self.color_frame) displayable = CVImage("GameModel results", self.color_frame.copy())
cv2.rectangle(displayable, *self.lives_rect, (255,255,255), 1) label_color = { "big": (255, 0, 0),
cv2.rectangle(displayable, *self.score_rect, (255,255,255), 1)
#else:
# displayable = np.copy(self.color_frame)
for pt, wh, label in rects:
color = { "big": (255, 0, 0),
"normal": (0, 255, 0), "normal": (0, 255, 0),
"small": (0, 0, 255), "small": (0, 0, 255),
"missile": (0, 255, 128), "missile": (0, 255, 128),
"ship_on": (0, 0, 128), "ship_on": (0, 0, 128),
"ship_off": (0, 64, 128)}[label] "ship_off": (0, 64, 128)}
cv2.rectangle(displayable, pt, wh, color, 1) for r in rects:
cv2.putText(displayable, label, pt, displayable.draw_rect(r, color=label_color[r.label])
cv2.FONT_HERSHEY_PLAIN,
1.0, color)
for ps in pointsets: for ps in pointsets:
color = (0, 255, 255) displayable.draw_poly(ps, color=(0, 255, 255))
cv2.polylines(displayable, np.int32([ps]), True, color)
for center, radius, label in circles: for center, radius, label in circles:
color = (255, 255, 0) displayable.draw_circle(center, radius)
cv2.circle(displayable, np.int32(center), int(radius), color, 1) displayable.draw_text(label, center, (255, 255, 0))
cv2.putText(displayable, label, np.int32(center),
cv2.FONT_HERSHEY_PLAIN, displayable.show()
1.0, color)
cv2.imshow("Results", displayable)
cv2.waitKey(0)
@with_frame @with_frame
def frame_sift(self): def frame_sift(self):
sift = cv2.SIFT_create() ship_r = sqrt(rect_radius_squared(*self.ships[0].image.shape[:2]) * 0.85)
kp_desc = {} # dict of (keypoints, descriptions) for all ship sprites return self.frame.sift_clusters(cluster_radius = ship_r)
kp_desc["frame"] = sift.detectAndCompute(self.frame, None)
frame_kp, frame_desc = kp_desc["frame"]
## for label, s in self.ships:
## kp_desc[label] = sift.detectAndCompute(s, None)
## bf = cv2.BFMatcher(cv2.NORM_L1, crossCheck=True)
## matchsets = {}
## for label in kp_desc:
## _, desc = kp_desc[label]
## matchsets[label] = bf.match(frame_desc, desc)
## #return { "matchsets": matchsets,
## # "kp_desc": kp_desc
## # }
ship_rsq = rect_radius_squared(*self.ships[0][1].shape) * 0.85
#print(f"max radius^2: {ship_rsq}")
clusters = pointcluster.cluster_set([k.pt for k in frame_kp], sqrt(ship_rsq))
return clusters
@with_frame @with_frame
def find_ships(self): def find_ships(self):
ship_rects = [] results = []
for label, a in self.ships: for a in self.ships:
h, w = a.shape r = self.frame.template_detect(a,
res = cv2.matchTemplate(self.frame, a, cv2.TM_CCOEFF_NORMED) self.cv_template_thresh,
loc = np.where( res >= self.cv_template_thresh) self.duplicate_dist_thresh)
for pt in zip(*loc[::-1]): results.extend(r)
if not ship_rects or squared_distance(ship_rects[-1][0], pt) > self.duplicate_dist_thresh: return results
ship_rects.append((pt, (pt[0] + w, pt[1] + h), label))
return ship_rects
@with_frame @with_frame
def find_missiles(self): def find_missiles(self):
# Setup SimpleBlobDetector parameters. p = CVImage.blob_params(minThreshold = 10, maxThreshold = 200,
params = cv2.SimpleBlobDetector_Params() maxArea = 100,
minConvexity = 0.95,
# Change thresholds minInertiaRatio = 0.4)
params.minThreshold = 10; return self.frame.blob_detect(size=9, params=p)
params.maxThreshold = 200;
# Filter by Area.
params.filterByArea = True
#params.minArea = 1500
params.maxArea = 100
# Filter by Circularity
#params.filterByCircularity = True
#params.minCircularity = 0.1
# Filter by Convexity
params.filterByConvexity = True
params.minConvexity = 0.95
# Filter by Inertia
params.filterByInertia = True
params.minInertiaRatio = 0.4
detector = cv2.SimpleBlobDetector_create(params)
keypoints = detector.detect(cv2.bitwise_not(self.frame)) # inverted black/white frame
#im_with_keypoints = cv2.drawKeypoints(self.frame, keypoints, np.array([]),
# (0,0,255), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
#cv2.imshow("keypoints", im_with_keypoints)
#cv2.waitKey(0)
s = 9 # pixels for the missile
rect_tuple = lambda pt: ((int(pt[0]-s/2),int(pt[1]-s/2)),
(int(pt[0]+s/2), int(pt[1]+s/2)),
"missile")
return [rect_tuple(k.pt) for k in keypoints]
def analyse_frame(self): def analyse_frame(self):
rocks = self.find_asteroids() rocks = self.find_asteroids()
@ -264,9 +176,12 @@ if __name__ == '__main__':
ship_results = gm.find_ships() ship_results = gm.find_ships()
polygons = [c.points for c in s_results] polygons = [c.points for c in s_results]
##circles = [(c.center, c.max_distance, f"cluster_{i}") for i, c in enumerate(s_results)] ##circles = [(c.center, c.max_distance, f"cluster_{i}") for i, c in enumerate(s_results)]
r_circles = [(c.center, sqrt(rect_radius_squared(*gm.ships[0][1].shape)), f"cluster_{i}") for i, c in enumerate(s_results)] r_circles = [(c.center, sqrt(rect_radius_squared(*gm.ships[0].image.shape[:2])), f"cluster_{i}") for i, c in enumerate(s_results)]
missile_results = gm.find_missiles() missile_results = gm.find_missiles()
##m_circles = [(pt, 10, f"missile_{i}") for i, pt in enumerate(missiles)] ##m_circles = [(pt, 10, f"missile_{i}") for i, pt in enumerate(missiles)]
##pprint(a_results+ship_results+missile_results) ##pprint(a_results+ship_results+missile_results)
gm.display_results(rects=a_results+ship_results+missile_results, pointsets=polygons, circles=r_circles) rects = a_results
if ship_results: rects.extend(ship_results)
if missile_results: rects.extend(missile_results)
gm.display_results(rects=rects, pointsets=polygons, circles=r_circles)
gm.analyse_frame() gm.analyse_frame()

165
imagepipeline.py Normal file
View File

@ -0,0 +1,165 @@
import cv2
import numpy as np
import typing
import pointcluster
from shapes import Rect
from utility import *
class CVImage:
"""Dummy definition to allow recursive type hints"""
pass
class CVImage:
def __init__(self, label="", img:np.ndarray=None, **kwargs):
"""You can provide a 'filename' keyword arg to automatically load a file."""
self.label = label
self.image = img
self._init_kwargs = kwargs
if kwargs:
load_kwargs = dict(kwargs) # copy
self.load(**load_kwargs)
@property
def is_color(self):
return len(self.image.shape) == 3
def load(self, filename:str, label:str=None):
"""Load an image from file. You can optionally set the 'label' keyword."""
self.image = cv2.imread(filename)
if label: self.label = label
return self
def convert_color(self, color:bool):
return cv2.cvtColor(self.image, cv2.COLOR_GRAY2BGR if color else cv2.COLOR_BGR2GRAY)
def __repr__(self):
if self._init_kwargs:
kwargstr = ", " + ", ".join([f"{k}={repr(self._init_kwargs[k])}" for k in self._init_kwargs])
else:
kwargstr = ''
return f"<CVImage label={repr(self.label)}, image={self.image.shape} px, is_color={self.is_color}{kwargstr}>"
def copy(self):
return np.copy(self.image)
def snip(self, rect):
assert all((len(rect)==2, len(rect[0])==2, len(rect[1])==2)) #((x,y),(w,h))
return self.image[rect[0][1]:rect[0][1]+rect[1][1],
rect[0][0]:rect[0][0]+rect[1][0]
]
def mask(self, rect, mask_color=None, nonmask_color=None):
assert all((len(rect)==2, len(rect[0])==2, len(rect[1])==2)) #((x,y),(w,h))
if mask_color is None:
mask_color = (0, 0, 0) if self.is_color else 0
if nonmask_color is None:
nonmask_color = (255, 255, 255) if self.is_color else 255
mask = np.full(self.image.shape, nonmask_color, dtype=np.uint8)
cv2.rectangle(mask, *rect, color=mask_color, thickness=cv2.FILLED)
return cv2.bitwise_and(self.image, mask)
def sift_clusters(self, cluster_radius) -> pointcluster.PointCluster:
sift = cv2.SIFT_create()
keypoints, descriptions = sift.detectAndCompute(self.image, None)
return pointcluster.cluster_set([k.pt for k in keypoints], cluster_radius)
@staticmethod
def blob_params(*, minThreshold = 10, maxThreshold = 200,
minArea = None, maxArea = None,
minCircularity = None, maxCircularity = None,
minConvexity = None, maxConvexity = None,
minInertiaRatio = None, maxInertiaRatio = None):
p = cv2.SimpleBlobDetector_Params()
p.minThreshold = minThreshold
p.maxThreshold = maxThreshold
if minArea or maxArea:
p.filterByArea = True
if minArea: p.minArea = minArea
if maxArea: p.maxArea = maxArea
if minConvexity or maxConvexity:
p.filterByConvexity = True
if minConvexity: p.minConvexity = minConvexity
if maxConvexity: p.maxConvexity = maxConvexity
if minInertiaRatio or maxInertiaRatio:
p.filterByInertia = True
if minInertiaRatio: p.minInertiaRatio = minInertiaRatio
if maxInertiaRatio: p.maxInertiaRatio = maxInertiaRatio
if minCircularity or maxCircularity:
p.filterByCircularity = True
if minCircularity: p.minCircularity = minCircularity
if maxCircularity: p.maxCircularity = maxCircularity
return p
def blob_detect(self, size:int, params=None, invert:bool=False, label:str=None) -> typing.List[Rect]:
if params is None: params = CVImage.blob_params()
detector = cv2.SimpleBlobDetector_create(params)
keypoints = detector.detect(cv2.bitwise_not(self.image) if invert else self.image)
rects = []
s = size / 2.0
for kp in keypoints:
rects.append(Rect(x=kp.pt[0] - s, y = kp.pt[1] - s,
w = size, h = size,
label = label or "blob"))
return rects
def template_detect(self, template:CVImage, threshold:int, dupe_spacing:int) -> typing.List[Rect]:
if template.is_color:
h, w, _ = template.image.shape
else:
h, w = template.image.shape
if template.is_color != self.is_color:
template = CVImage(template.label, template.convert_color(not template.is_color))
res = cv2.matchTemplate(self.image, template.image, cv2.TM_CCOEFF_NORMED)
loc = np.where(res >= threshold)
rects = []
for pt in zip(*loc[::-1]):
if len(rects) > 0:
if squared_distance(rects[-1][0], pt) < dupe_spacing: continue
rects.append(Rect(x=pt[0], y=pt[1], w=w, h=h, label=template.label))
return rects
def show(self, delay=0):
cv2.imshow(self.label, self.image)
cv2.waitKey(delay)
def draw_rect(self, rect:Rect, color=None, text_color=None, text:bool=True, thickness=1):
if color is None:
color = (255, 255, 255) if self.is_color else 255
cv2.rectangle(self.image, rect.point, rect.point2, color, thickness)
if text:
self.draw_text(rect.label, rect.point, text_color if text_color else color)
def draw_poly(self, points:typing.List[typing.Tuple], closed=True, color=None):
if color is None:
color = (255, 255, 255) if self.is_color else 255
cv2.polylines(self.image, np.int32([points]), closed, color)
def draw_circle(self, center, radius, color=None, thickness = 1):
if color is None:
color = (255, 255, 255) if self.is_color else 255
cv2.circle(self.image, np.int32(center), np.int32(radius), color, thickness)
def draw_text(self, text, point, color):
cv2.putText(self.image, text, np.int32(point), cv2.FONT_HERSHEY_PLAIN, 1.0, color)
class ImagePipeline:
def __init__(self):
pass
# running this module executes tests
if __name__ == '__main__':
# initializer for CVImage can load from file
img = CVImage("test frame", filename="/home/john/Desktop/Screenshot at 2021-12-19 20-55-22.png")
#img.show()
# initializer for CVImage can accept a numpy array
img_no_title = CVImage("test frame", img.snip( ((0,24),(800,600)) ))
#img_no_title.show()
#standard rectangle format used throughout the class, avoiding ugly splat operator
lives_rect = ((10,10), (190, 65))
lives = CVImage("lives", img_no_title.snip(lives_rect))
lives.show()

54
shapes.py Normal file
View File

@ -0,0 +1,54 @@
class Rect:
def __init__(self, *args, label=None, **kwargs):
if len(args) == 4 and all([type(i) is int or type(i) is float for i in args]):
self.x, self.y, self.w, self.h = args
elif len(args) == 2 and all([type(i) is tuple and len(i) == 2 and all([type(j) is int or type(j) is float for j in i]) for i in args]):
xy, wh = self.args
self.x, self.y = xy
self.w, self.h = wh
elif all([k in kwargs for k in ("x", "y", "w", "h")]):
self.x = kwargs["x"]
self.y = kwargs["y"]
self.w = kwargs["w"]
self.h = kwargs["h"]
elif all([k in kwargs for k in ("x", "y", "x2", "y2")]):
self.x = kwargs["x"]
self.y = kwargs["y"]
self.w = kwargs["x2"] - self.x
self.h = kwargs["y2"] - self.y
elif all([k in kwargs for k in ("x1", "y1", "x2", "y2")]):
self.x = kwargs["x1"]
self.y = kwargs["y1"]
self.w = kwargs["x2"] - self.x
self.h = kwargs["y2"] - self.y
else:
raise RuntimeError("Rect requires 4 values: two coordinates or a coordinate plus width and height.")
self.label = label
def __repr__(self):
return f"<Rect label={repr(self.label)}, (({self.x}, {self.y}), ({self.w}, {self.h}))>"
def __iter__(self):
yield (self.x, self.y)
yield (self.w, self.h)
def __len__(self):
return 2
def __getitem__(self, i):
if i == 0: return (self.x, self.y)
elif i == 1: return (self.w, self.h)
else: raise IndexError("Rect only supports index of 0 or 1.")
def __setitem__(self, i, value):
assert i in (0, 1) and len(value) == 2
if not i: self.x, self.y = value
else: self.w, self.h = value
@property
def point(self):
return (self.x, self.y)
@property
def point2(self):
return (self.x + self.w, self.y + self.h)