First Commit

This commit is contained in:
2026-02-09 10:07:56 +07:00
parent 93c695d283
commit 31ce985bf2
2 changed files with 418 additions and 0 deletions

46
function.yaml Normal file
View File

@@ -0,0 +1,46 @@
# To build the function you need to adjust docker settings. Be sure that you
# have enough memory (more than 4GB). Look here how to do that
# https://stackoverflow.com/questions/44417159/docker-process-killed-with-cryptic-killed-message
metadata:
name: openvino-yolov9-karung
namespace: cvat
annotations:
name: Yolov9 Karung Model
type: detector
spec: |
[
{ "id": 0, "name": "karung"}
]
spec:
description: Detect Karung via Intel OpenVINO with Yolov9
runtime: 'python:3.10'
handler: main:handler
eventTimeout: 60s
build:
baseImage: cvat.nuclio.base
triggers:
myHttpTrigger:
numWorkers: 2
kind: 'http'
workerAvailabilityTimeoutMilliseconds: 10000
attributes:
maxRequestBodySize: 33554432 # 32MB
platform:
attributes:
restartPolicy:
name: always
maximumRetryCount: 3
mountMode: volume
volumes:
- volumeMount:
name: nuclio
mountPath: /opt/nuclio/output
readOnly: false
volume:
name: nuclio
hostPath:
path: /tmp/nuclio

372
main.py Normal file
View File

@@ -0,0 +1,372 @@
"""Nuclio handler for CVAT automatic annotation using OpenVINO 2025 IR (.xml/.bin).
This file combines YOLOv9 inference logic with Nuclio serverless handler structure.
It loads an OpenVINO Intermediate Representation (IR) model consisting of a
``.xml`` file (network topology) and a ``.bin`` file (weights).
Adjust ``MODEL_XML`` and ``MODEL_BIN`` if your files are located elsewhere.
"""
import base64
import json
import os
from pathlib import Path
import cv2
import numpy as np
import openvino as ov
from openvino.preprocess import PrePostProcessor
from openvino.preprocess import ColorFormat
from openvino import Layout, Type
# Paths to the IR model files change if your model is in a different location.
MODEL_XML = "/opt/nuclio/models/best-202602051700.xml"
MODEL_BIN = "/opt/nuclio/models/best-202602051700.bin"
coconame = [
"karung",
"bicycle",
"car",
"motorcycle",
"airplane",
"bus",
"train",
"truck",
"boat",
"traffic light",
"fire hydrant",
"stop sign",
"parking meter",
"bench",
"bird",
"cat",
"dog",
"horse",
"sheep",
"cow",
"elephant",
"bear",
"zebra",
"giraffe",
"backpack",
"umbrella",
"handbag",
"tie",
"suitcase",
"frisbee",
"skis",
"snowboard",
"sports ball",
"kite",
"baseball bat",
"baseball glove",
"skateboard",
"surfboard",
"tennis racket",
"bottle",
"wine glass",
"cup",
"fork",
"knife",
"spoon",
"bowl",
"banana",
"apple",
"sandwich",
"orange",
"broccoli",
"carrot",
"hot dog",
"pizza",
"donut",
"cake",
"chair",
"couch",
"potted plant",
"bed",
"dining table",
"toilet",
"tv",
"laptop",
"mouse",
"remote",
"keyboard",
"cell phone",
"microwave",
"oven",
"toaster",
"sink",
"refrigerator",
"book",
"clock",
"vase",
"scissors",
"teddy bear",
"hair drier",
"toothbrush",
]
class Yolov9:
def __init__(
self, xml_model_path=MODEL_XML, bin_model_path=MODEL_BIN, conf=0.1, nms=0.4
):
# Step 1. Initialize OpenVINO Runtime core
core = ov.Core()
# Step 2. Read a model
if bin_model_path:
model = core.read_model(
str(Path(xml_model_path)), str(Path(bin_model_path))
)
else:
model = core.read_model(str(Path(xml_model_path)))
# Step 3. Initialize Preprocessing for the model
ppp = PrePostProcessor(model)
# Specify input image format
ppp.input().tensor().set_element_type(Type.u8).set_layout(
Layout("NHWC")
).set_color_format(ColorFormat.BGR)
# Specify preprocess pipeline to input image without resizing
ppp.input().preprocess().convert_element_type(Type.f32).convert_color(
ColorFormat.RGB
).scale([255.0, 255.0, 255.0])
# Specify model's input layout
ppp.input().model().set_layout(Layout("NCHW"))
# Specify output results format
ppp.output().tensor().set_element_type(Type.f32)
# Embed above steps in the graph
model = ppp.build()
self.compiled_model = core.compile_model(model, "CPU")
#self.input_shape = self.compiled_model.input(0).shape
#_, _, self.input_height, self.input_width = self.input_shape
self.input_width = 320
self.input_height = 320
self.conf_thresh = conf
self.nms_thresh = nms
self.colors = []
# Create random colors
np.random.seed(42) # Setting seed for reproducibility
for i in range(len(coconame)):
color = tuple(np.random.randint(100, 256, size=3))
self.colors.append(color)
def resize_and_pad(self, image):
old_h, old_w = image.shape[:2]
ratio = min(self.input_width / old_w, self.input_height / old_h)
new_w = int(old_w * ratio)
new_h = int(old_h * ratio)
image = cv2.resize(image, (new_w, new_h))
delta_w = self.input_width - new_w
delta_h = self.input_height - new_h
color = [100, 100, 100]
new_im = cv2.copyMakeBorder(
image, 0, delta_h, 0, delta_w, cv2.BORDER_CONSTANT, value=color
)
return new_im, delta_w, delta_h
def predict(self, img):
# Step 4. Create tensor from image
input_tensor = np.expand_dims(img, 0)
# Step 5. Create an infer request for model inference
infer_request = self.compiled_model.create_infer_request()
infer_request.infer({0: input_tensor})
# Step 6. Retrieve inference results
output = infer_request.get_output_tensor()
detections = output.data[0].T
# Step 7. Postprocessing including NMS
boxes = []
class_ids = []
confidences = []
for prediction in detections:
classes_scores = prediction[4:]
_, _, _, max_indx = cv2.minMaxLoc(classes_scores)
class_id = max_indx[1]
if classes_scores[class_id] > self.conf_thresh:
confidences.append(classes_scores[class_id])
class_ids.append(class_id)
x, y, w, h = (
prediction[0].item(),
prediction[1].item(),
prediction[2].item(),
prediction[3].item(),
)
xmin = x - (w / 2)
ymin = y - (h / 2)
box = np.array([xmin, ymin, w, h])
boxes.append(box)
indexes = cv2.dnn.NMSBoxes(
boxes, confidences, self.conf_thresh, self.nms_thresh
)
results = []
for i in indexes:
j = i.item()
results.append(
{
"class_index": class_ids[j],
"confidence": confidences[j],
"box": boxes[j],
}
)
return results
def draw(self, img, detections, dw, dh):
# Step 8. Print results and save Figure with detections
for detection in detections:
box = detection["box"]
classId = detection["class_index"]
confidence = detection["confidence"]
rx = img.shape[1] / (self.input_width - dw)
ry = img.shape[0] / (self.input_height - dh)
box[0] = rx * box[0]
box[1] = ry * box[1]
box[2] = rx * box[2]
box[3] = ry * box[3]
xmax = box[0] + box[2]
ymax = box[1] + box[3]
# Drawing detection box
cv2.rectangle(
img,
(int(box[0]), int(box[1])),
(int(xmax), int(ymax)),
tuple(map(int, self.colors[classId])),
3,
)
# Detection box text
class_string = coconame[classId] + " " + str(confidence)[:4]
text_size, _ = cv2.getTextSize(class_string, cv2.FONT_HERSHEY_DUPLEX, 1, 2)
text_rect = (box[0], box[1] - 40, text_size[0] + 10, text_size[1] + 20)
cv2.rectangle(
img,
(int(text_rect[0]), int(text_rect[1])),
(int(text_rect[0] + text_rect[2]), int(text_rect[1] + text_rect[3])),
tuple(map(int, self.colors[classId])),
cv2.FILLED,
)
cv2.putText(
img,
class_string,
(int(box[0] + 5), int(box[1] - 10)),
cv2.FONT_HERSHEY_DUPLEX,
1,
(0, 0, 0),
2,
cv2.LINE_AA,
)
def init_context(context):
"""Nuclio init_context called once per container.
Loads the IR model and compiles it for the CPU.
"""
context.logger.info("Init context ----> 0%")
model = Yolov9(MODEL_XML, MODEL_BIN, conf=0.1, nms=0.4)
context.user_data.model = model
context.logger.info("Init context ----> 100%")
def handler(context, event):
"""Nuclio handler called for every request.
Expects a JSON body with a base64 encoded image under the key ``"image"``.
Returns a CVATcompatible JSON with detected objects.
"""
context.logger.info("Run OpenVINO YOLOv9 model")
# Parse request body
try:
data = event.body
image_b64 = data["image"]
except Exception as exc:
context.logger.error(f"Invalid request body: {exc}")
return context.Response(
body=json.dumps({"error": "Invalid request body"}),
status_code=400,
content_type="application/json",
)
# Decode image
image_bytes = base64.b64decode(image_b64)
image = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR)
if image is None:
context.logger.error("Failed to decode image")
return context.Response(
body=json.dumps({"error": "Failed to decode image"}),
status_code=400,
content_type="application/json",
)
# Get model from context
model = context.user_data.model
print("Prepare Model")
# Preprocess: resize and pad
img_resized, dw, dh = model.resize_and_pad(image)
#print("Resize Image")
# Inference
detections = model.predict(img_resized)
#print("Detecion")
# Convert detections to CVAT-compatible format
shapes = []
for detection in detections:
class_id = detection["class_index"]
confidence = float(detection["confidence"])
box = detection["box"]
# Scale box coordinates back to original image size
rx = image.shape[1] / (model.input_width - dw)
ry = image.shape[0] / (model.input_height - dh)
xmin = box[0] * rx
ymin = box[1] * ry
xmax = (box[0] + box[2]) * rx
ymax = (box[1] + box[3]) * ry
# Convert to pixel coordinates
x_min_px = int(max(0, xmin))
y_min_px = int(max(0, ymin))
x_max_px = int(min(image.shape[1], xmax))
y_max_px = int(min(image.shape[0], ymax))
label = coconame[class_id] if class_id < len(coconame) else "unknown"
shapes.append(
{
"label": label,
"points": [x_min_px, y_min_px, x_max_px, y_max_px],
"type": "rectangle",
"confidence": str(confidence),
}
)
context.logger.info(f"Detected {len(shapes)} objects")
return context.Response(
body=json.dumps(shapes),
headers={},
content_type="application/json",
status_code=200,
)