294 lines
9.3 KiB
Python
294 lines
9.3 KiB
Python
"""Nuclio handler for CVAT automatic annotation using OpenVINO 2025 IR (.xml/.bin).
|
||
|
||
This file combines YOLOv9 inference logic with Nuclio serverless handler structure.
|
||
It loads an OpenVINO Intermediate Representation (IR) model consisting of a
|
||
``.xml`` file (network topology) and a ``.bin`` file (weights).
|
||
|
||
Adjust ``MODEL_XML`` and ``MODEL_BIN`` if your files are located elsewhere.
|
||
"""
|
||
|
||
import base64
|
||
import json
|
||
import os
|
||
from pathlib import Path
|
||
|
||
import cv2
|
||
import numpy as np
|
||
import openvino as ov
|
||
from openvino.preprocess import PrePostProcessor
|
||
from openvino.preprocess import ColorFormat
|
||
from openvino import Layout, Type
|
||
|
||
# Paths to the IR model files – change if your model is in a different location.
|
||
MODEL_XML = os.getenv("MODEL_XML","/models/best.xml")
|
||
MODEL_BIN = os.getenv("MODEL_BIN", "/models/best.bin")
|
||
|
||
coconame = [
|
||
"karung",
|
||
]
|
||
|
||
|
||
class Yolov9:
|
||
def __init__(
|
||
self, xml_model_path=MODEL_XML, bin_model_path=MODEL_BIN, conf=0.1, nms=0.4
|
||
):
|
||
# Step 1. Initialize OpenVINO Runtime core
|
||
core = ov.Core()
|
||
# Step 2. Read a model
|
||
if bin_model_path:
|
||
model = core.read_model(
|
||
str(Path(xml_model_path)), str(Path(bin_model_path))
|
||
)
|
||
else:
|
||
model = core.read_model(str(Path(xml_model_path)))
|
||
|
||
# Step 3. Initialize Preprocessing for the model
|
||
ppp = PrePostProcessor(model)
|
||
# Specify input image format
|
||
ppp.input().tensor().set_element_type(Type.u8).set_layout(
|
||
Layout("NHWC")
|
||
).set_color_format(ColorFormat.BGR)
|
||
# Specify preprocess pipeline to input image without resizing
|
||
ppp.input().preprocess().convert_element_type(Type.f32).convert_color(
|
||
ColorFormat.RGB
|
||
).scale([255.0, 255.0, 255.0])
|
||
# Specify model's input layout
|
||
ppp.input().model().set_layout(Layout("NCHW"))
|
||
# Specify output results format
|
||
ppp.output().tensor().set_element_type(Type.f32)
|
||
# Embed above steps in the graph
|
||
model = ppp.build()
|
||
|
||
self.compiled_model = core.compile_model(model, "CPU")
|
||
#self.input_shape = self.compiled_model.input(0).shape
|
||
#_, _, self.input_height, self.input_width = self.input_shape
|
||
|
||
self.input_width = 320
|
||
self.input_height = 320
|
||
self.conf_thresh = conf
|
||
self.nms_thresh = nms
|
||
self.colors = []
|
||
|
||
# Create random colors
|
||
np.random.seed(42) # Setting seed for reproducibility
|
||
for i in range(len(coconame)):
|
||
color = tuple(np.random.randint(100, 256, size=3))
|
||
self.colors.append(color)
|
||
|
||
def resize_and_pad(self, image):
|
||
old_h, old_w = image.shape[:2]
|
||
ratio = min(self.input_width / old_w, self.input_height / old_h)
|
||
new_w = int(old_w * ratio)
|
||
new_h = int(old_h * ratio)
|
||
|
||
image = cv2.resize(image, (new_w, new_h))
|
||
|
||
delta_w = self.input_width - new_w
|
||
delta_h = self.input_height - new_h
|
||
|
||
color = [100, 100, 100]
|
||
new_im = cv2.copyMakeBorder(
|
||
image, 0, delta_h, 0, delta_w, cv2.BORDER_CONSTANT, value=color
|
||
)
|
||
|
||
return new_im, delta_w, delta_h
|
||
|
||
def predict(self, img):
|
||
# Step 4. Create tensor from image
|
||
input_tensor = np.expand_dims(img, 0)
|
||
|
||
# Step 5. Create an infer request for model inference
|
||
infer_request = self.compiled_model.create_infer_request()
|
||
infer_request.infer({0: input_tensor})
|
||
|
||
# Step 6. Retrieve inference results
|
||
output = infer_request.get_output_tensor()
|
||
detections = output.data[0].T
|
||
|
||
# Step 7. Postprocessing including NMS
|
||
boxes = []
|
||
class_ids = []
|
||
confidences = []
|
||
for prediction in detections:
|
||
classes_scores = prediction[4:]
|
||
_, _, _, max_indx = cv2.minMaxLoc(classes_scores)
|
||
class_id = max_indx[1]
|
||
if classes_scores[class_id] > self.conf_thresh:
|
||
confidences.append(classes_scores[class_id])
|
||
class_ids.append(class_id)
|
||
x, y, w, h = (
|
||
prediction[0].item(),
|
||
prediction[1].item(),
|
||
prediction[2].item(),
|
||
prediction[3].item(),
|
||
)
|
||
xmin = x - (w / 2)
|
||
ymin = y - (h / 2)
|
||
box = np.array([xmin, ymin, w, h])
|
||
boxes.append(box)
|
||
|
||
indexes = cv2.dnn.NMSBoxes(
|
||
boxes, confidences, self.conf_thresh, self.nms_thresh
|
||
)
|
||
|
||
results = []
|
||
for i in indexes:
|
||
j = i.item()
|
||
results.append(
|
||
{
|
||
"class_index": class_ids[j],
|
||
"confidence": confidences[j],
|
||
"box": boxes[j],
|
||
}
|
||
)
|
||
|
||
return results
|
||
|
||
def draw(self, img, detections, dw, dh):
|
||
# Step 8. Print results and save Figure with detections
|
||
for detection in detections:
|
||
box = detection["box"]
|
||
classId = detection["class_index"]
|
||
confidence = detection["confidence"]
|
||
|
||
rx = img.shape[1] / (self.input_width - dw)
|
||
ry = img.shape[0] / (self.input_height - dh)
|
||
box[0] = rx * box[0]
|
||
box[1] = ry * box[1]
|
||
box[2] = rx * box[2]
|
||
box[3] = ry * box[3]
|
||
|
||
xmax = box[0] + box[2]
|
||
ymax = box[1] + box[3]
|
||
|
||
# Drawing detection box
|
||
cv2.rectangle(
|
||
img,
|
||
(int(box[0]), int(box[1])),
|
||
(int(xmax), int(ymax)),
|
||
tuple(map(int, self.colors[classId])),
|
||
3,
|
||
)
|
||
|
||
# Detection box text
|
||
class_string = coconame[classId] + " " + str(confidence)[:4]
|
||
text_size, _ = cv2.getTextSize(class_string, cv2.FONT_HERSHEY_DUPLEX, 1, 2)
|
||
text_rect = (box[0], box[1] - 40, text_size[0] + 10, text_size[1] + 20)
|
||
cv2.rectangle(
|
||
img,
|
||
(int(text_rect[0]), int(text_rect[1])),
|
||
(int(text_rect[0] + text_rect[2]), int(text_rect[1] + text_rect[3])),
|
||
tuple(map(int, self.colors[classId])),
|
||
cv2.FILLED,
|
||
)
|
||
cv2.putText(
|
||
img,
|
||
class_string,
|
||
(int(box[0] + 5), int(box[1] - 10)),
|
||
cv2.FONT_HERSHEY_DUPLEX,
|
||
1,
|
||
(0, 0, 0),
|
||
2,
|
||
cv2.LINE_AA,
|
||
)
|
||
|
||
|
||
def init_context(context):
|
||
"""Nuclio init_context – called once per container.
|
||
|
||
Loads the IR model and compiles it for the CPU.
|
||
"""
|
||
context.logger.info("Init context ----> 0%")
|
||
model = Yolov9(MODEL_XML, MODEL_BIN, conf=0.1, nms=0.4)
|
||
context.user_data.model = model
|
||
context.logger.info("Init context ----> 100%")
|
||
|
||
|
||
def handler(context, event):
|
||
"""Nuclio handler – called for every request.
|
||
|
||
Expects a JSON body with a base‑64 encoded image under the key ``"image"``.
|
||
Returns a CVAT‑compatible JSON with detected objects.
|
||
"""
|
||
context.logger.info("Run OpenVINO YOLOv9 model")
|
||
|
||
# Parse request body
|
||
try:
|
||
data = event.body
|
||
image_b64 = data["image"]
|
||
except Exception as exc:
|
||
context.logger.error(f"Invalid request body: {exc}")
|
||
return context.Response(
|
||
body=json.dumps({"error": "Invalid request body"}),
|
||
status_code=400,
|
||
content_type="application/json",
|
||
)
|
||
|
||
# Decode image
|
||
image_bytes = base64.b64decode(image_b64)
|
||
image = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR)
|
||
if image is None:
|
||
context.logger.error("Failed to decode image")
|
||
return context.Response(
|
||
body=json.dumps({"error": "Failed to decode image"}),
|
||
status_code=400,
|
||
content_type="application/json",
|
||
)
|
||
|
||
# Get model from context
|
||
model = context.user_data.model
|
||
|
||
print("Prepare Model")
|
||
|
||
# Preprocess: resize and pad
|
||
img_resized, dw, dh = model.resize_and_pad(image)
|
||
|
||
#print("Resize Image")
|
||
|
||
# Inference
|
||
detections = model.predict(img_resized)
|
||
|
||
#print("Detecion")
|
||
|
||
# Convert detections to CVAT-compatible format
|
||
shapes = []
|
||
for detection in detections:
|
||
class_id = detection["class_index"]
|
||
confidence = float(detection["confidence"])
|
||
box = detection["box"]
|
||
|
||
# Scale box coordinates back to original image size
|
||
rx = image.shape[1] / (model.input_width - dw)
|
||
ry = image.shape[0] / (model.input_height - dh)
|
||
|
||
xmin = box[0] * rx
|
||
ymin = box[1] * ry
|
||
xmax = (box[0] + box[2]) * rx
|
||
ymax = (box[1] + box[3]) * ry
|
||
|
||
# Convert to pixel coordinates
|
||
x_min_px = int(max(0, xmin))
|
||
y_min_px = int(max(0, ymin))
|
||
x_max_px = int(min(image.shape[1], xmax))
|
||
y_max_px = int(min(image.shape[0], ymax))
|
||
|
||
label = coconame[class_id] if class_id < len(coconame) else "unknown"
|
||
|
||
shapes.append(
|
||
{
|
||
"label": label,
|
||
"points": [x_min_px, y_min_px, x_max_px, y_max_px],
|
||
"type": "rectangle",
|
||
"confidence": str(confidence),
|
||
}
|
||
)
|
||
|
||
context.logger.info(f"Detected {len(shapes)} objects")
|
||
|
||
return context.Response(
|
||
body=json.dumps(shapes),
|
||
headers={},
|
||
content_type="application/json",
|
||
status_code=200,
|
||
)
|