Files
2026-02-10 10:15:25 +07:00

294 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Nuclio handler for CVAT automatic annotation using OpenVINO 2025 IR (.xml/.bin).
This file combines YOLOv9 inference logic with Nuclio serverless handler structure.
It loads an OpenVINO Intermediate Representation (IR) model consisting of a
``.xml`` file (network topology) and a ``.bin`` file (weights).
Adjust ``MODEL_XML`` and ``MODEL_BIN`` if your files are located elsewhere.
"""
import base64
import json
import os
from pathlib import Path
import cv2
import numpy as np
import openvino as ov
from openvino.preprocess import PrePostProcessor
from openvino.preprocess import ColorFormat
from openvino import Layout, Type
# Paths to the IR model files change if your model is in a different location.
MODEL_XML = os.getenv("MODEL_XML","/models/best.xml")
MODEL_BIN = os.getenv("MODEL_BIN", "/models/best.bin")
coconame = [
"karung",
]
class Yolov9:
def __init__(
self, xml_model_path=MODEL_XML, bin_model_path=MODEL_BIN, conf=0.1, nms=0.4
):
# Step 1. Initialize OpenVINO Runtime core
core = ov.Core()
# Step 2. Read a model
if bin_model_path:
model = core.read_model(
str(Path(xml_model_path)), str(Path(bin_model_path))
)
else:
model = core.read_model(str(Path(xml_model_path)))
# Step 3. Initialize Preprocessing for the model
ppp = PrePostProcessor(model)
# Specify input image format
ppp.input().tensor().set_element_type(Type.u8).set_layout(
Layout("NHWC")
).set_color_format(ColorFormat.BGR)
# Specify preprocess pipeline to input image without resizing
ppp.input().preprocess().convert_element_type(Type.f32).convert_color(
ColorFormat.RGB
).scale([255.0, 255.0, 255.0])
# Specify model's input layout
ppp.input().model().set_layout(Layout("NCHW"))
# Specify output results format
ppp.output().tensor().set_element_type(Type.f32)
# Embed above steps in the graph
model = ppp.build()
self.compiled_model = core.compile_model(model, "CPU")
#self.input_shape = self.compiled_model.input(0).shape
#_, _, self.input_height, self.input_width = self.input_shape
self.input_width = 320
self.input_height = 320
self.conf_thresh = conf
self.nms_thresh = nms
self.colors = []
# Create random colors
np.random.seed(42) # Setting seed for reproducibility
for i in range(len(coconame)):
color = tuple(np.random.randint(100, 256, size=3))
self.colors.append(color)
def resize_and_pad(self, image):
old_h, old_w = image.shape[:2]
ratio = min(self.input_width / old_w, self.input_height / old_h)
new_w = int(old_w * ratio)
new_h = int(old_h * ratio)
image = cv2.resize(image, (new_w, new_h))
delta_w = self.input_width - new_w
delta_h = self.input_height - new_h
color = [100, 100, 100]
new_im = cv2.copyMakeBorder(
image, 0, delta_h, 0, delta_w, cv2.BORDER_CONSTANT, value=color
)
return new_im, delta_w, delta_h
def predict(self, img):
# Step 4. Create tensor from image
input_tensor = np.expand_dims(img, 0)
# Step 5. Create an infer request for model inference
infer_request = self.compiled_model.create_infer_request()
infer_request.infer({0: input_tensor})
# Step 6. Retrieve inference results
output = infer_request.get_output_tensor()
detections = output.data[0].T
# Step 7. Postprocessing including NMS
boxes = []
class_ids = []
confidences = []
for prediction in detections:
classes_scores = prediction[4:]
_, _, _, max_indx = cv2.minMaxLoc(classes_scores)
class_id = max_indx[1]
if classes_scores[class_id] > self.conf_thresh:
confidences.append(classes_scores[class_id])
class_ids.append(class_id)
x, y, w, h = (
prediction[0].item(),
prediction[1].item(),
prediction[2].item(),
prediction[3].item(),
)
xmin = x - (w / 2)
ymin = y - (h / 2)
box = np.array([xmin, ymin, w, h])
boxes.append(box)
indexes = cv2.dnn.NMSBoxes(
boxes, confidences, self.conf_thresh, self.nms_thresh
)
results = []
for i in indexes:
j = i.item()
results.append(
{
"class_index": class_ids[j],
"confidence": confidences[j],
"box": boxes[j],
}
)
return results
def draw(self, img, detections, dw, dh):
# Step 8. Print results and save Figure with detections
for detection in detections:
box = detection["box"]
classId = detection["class_index"]
confidence = detection["confidence"]
rx = img.shape[1] / (self.input_width - dw)
ry = img.shape[0] / (self.input_height - dh)
box[0] = rx * box[0]
box[1] = ry * box[1]
box[2] = rx * box[2]
box[3] = ry * box[3]
xmax = box[0] + box[2]
ymax = box[1] + box[3]
# Drawing detection box
cv2.rectangle(
img,
(int(box[0]), int(box[1])),
(int(xmax), int(ymax)),
tuple(map(int, self.colors[classId])),
3,
)
# Detection box text
class_string = coconame[classId] + " " + str(confidence)[:4]
text_size, _ = cv2.getTextSize(class_string, cv2.FONT_HERSHEY_DUPLEX, 1, 2)
text_rect = (box[0], box[1] - 40, text_size[0] + 10, text_size[1] + 20)
cv2.rectangle(
img,
(int(text_rect[0]), int(text_rect[1])),
(int(text_rect[0] + text_rect[2]), int(text_rect[1] + text_rect[3])),
tuple(map(int, self.colors[classId])),
cv2.FILLED,
)
cv2.putText(
img,
class_string,
(int(box[0] + 5), int(box[1] - 10)),
cv2.FONT_HERSHEY_DUPLEX,
1,
(0, 0, 0),
2,
cv2.LINE_AA,
)
def init_context(context):
"""Nuclio init_context called once per container.
Loads the IR model and compiles it for the CPU.
"""
context.logger.info("Init context ----> 0%")
model = Yolov9(MODEL_XML, MODEL_BIN, conf=0.1, nms=0.4)
context.user_data.model = model
context.logger.info("Init context ----> 100%")
def handler(context, event):
"""Nuclio handler called for every request.
Expects a JSON body with a base64 encoded image under the key ``"image"``.
Returns a CVATcompatible JSON with detected objects.
"""
context.logger.info("Run OpenVINO YOLOv9 model")
# Parse request body
try:
data = event.body
image_b64 = data["image"]
except Exception as exc:
context.logger.error(f"Invalid request body: {exc}")
return context.Response(
body=json.dumps({"error": "Invalid request body"}),
status_code=400,
content_type="application/json",
)
# Decode image
image_bytes = base64.b64decode(image_b64)
image = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR)
if image is None:
context.logger.error("Failed to decode image")
return context.Response(
body=json.dumps({"error": "Failed to decode image"}),
status_code=400,
content_type="application/json",
)
# Get model from context
model = context.user_data.model
print("Prepare Model")
# Preprocess: resize and pad
img_resized, dw, dh = model.resize_and_pad(image)
#print("Resize Image")
# Inference
detections = model.predict(img_resized)
#print("Detecion")
# Convert detections to CVAT-compatible format
shapes = []
for detection in detections:
class_id = detection["class_index"]
confidence = float(detection["confidence"])
box = detection["box"]
# Scale box coordinates back to original image size
rx = image.shape[1] / (model.input_width - dw)
ry = image.shape[0] / (model.input_height - dh)
xmin = box[0] * rx
ymin = box[1] * ry
xmax = (box[0] + box[2]) * rx
ymax = (box[1] + box[3]) * ry
# Convert to pixel coordinates
x_min_px = int(max(0, xmin))
y_min_px = int(max(0, ymin))
x_max_px = int(min(image.shape[1], xmax))
y_max_px = int(min(image.shape[0], ymax))
label = coconame[class_id] if class_id < len(coconame) else "unknown"
shapes.append(
{
"label": label,
"points": [x_min_px, y_min_px, x_max_px, y_max_px],
"type": "rectangle",
"confidence": str(confidence),
}
)
context.logger.info(f"Detected {len(shapes)} objects")
return context.Response(
body=json.dumps(shapes),
headers={},
content_type="application/json",
status_code=200,
)