[toc]
背景
自動(dòng)化測試需要繞過極驗(yàn)驗(yàn)證碼進(jìn)行登錄,經(jīng)過方案可行性驗(yàn)證后,確認(rèn)使用yolov5作為驗(yàn)證碼滑塊的檢測工具;
但yolov5本身沒有提供服務(wù)化,需要對相關(guān)推理能力自行打包封裝成服務(wù),本文記錄了服務(wù)化的相關(guān)過程。
目標(biāo)
- 低門檻:需低門檻形式提供服務(wù)
- 低維護(hù)成本:由于接入項(xiàng)目可能眾多,對訓(xùn)練后的新模型適配需要降低維護(hù)成本
- 響應(yīng)速度:由于是ui自動(dòng)化腳本使用,因此需要盡可能快的響應(yīng)速度(1s內(nèi))
實(shí)現(xiàn)過程
推理腳本改造
yolo本身自帶了一個(gè)推理腳本 detect.py,供模型訓(xùn)練完畢后測試效果用,但該腳本并不全部符合使用需求,因此不能直接使用,主要原因是:
1、每次都會(huì)調(diào)用都會(huì)進(jìn)行模型加載,耗時(shí)較久,如果直接用來服務(wù)化響應(yīng)時(shí)間不可接受;
2、沒有返回值,執(zhí)行后產(chǎn)物為驗(yàn)證用圖片,不能直接提供檢測目標(biāo)的定位信息。
基于以上原因,分析推理腳本代碼后,作出如下修改:
1、將推理腳本結(jié)構(gòu)從純方法(method)改造為類(class),提供初始化方法,方便一次初始化后,后續(xù)無需再次初始化,節(jié)約了加載模型的時(shí)間;
2、在類(class)中直接提供返回檢測目標(biāo)坐標(biāo)值的方法,方便給后續(xù)web服務(wù)調(diào)用。
其余代碼不做更改(包括生成本地圖片,主要用于調(diào)試驗(yàn)證),整體代碼如下:
import re
import threading
# YOLOv5 ?? by Ultralytics, GPL-3.0 license
"""
Run inference on images, videos, directories, streams, etc.
Usage:
$ python path/to/detect.py --weights yolov5s.pt --source 0 # webcam
img.jpg # image
vid.mp4 # video
path/ # directory
path/*.jpg # glob
'https://youtu.be/Zgi9g1ksQHc' # YouTube
'rtsp://example.com/media.mp4' # RTSP, RTMP, HTTP stream
"""
import argparse
import os
import sys
from pathlib import Path
import cv2
import torch
import torch.backends.cudnn as cudnn
FILE = Path(__file__).resolve()
ROOT = FILE.parents[0] # YOLOv5 root directory
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT)) # add ROOT to PATH
ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative
from models.common import DetectMultiBackend
from utils.datasets import IMG_FORMATS, VID_FORMATS, LoadImages, LoadStreams
from utils.general import (LOGGER, check_file, check_img_size, check_imshow, check_requirements, colorstr,
increment_path, non_max_suppression, print_args, scale_coords, strip_optimizer, xyxy2xywh)
from utils.plots import Annotator, colors, save_one_box
from utils.torch_utils import select_device, time_sync
class yolo_detect():
_instance_lock = threading.Lock()
def __init__(self):
print("yolo server now start initing.....")
self.weights = ROOT / 'best.pt' # model.pt path(s)
#self.source = '', # file/dir/URL/glob, 0 for webcam
self.imgsz = (640, 640) # inference size (height, width)
self.conf_thres = 0.25 # confidence threshold
self.iou_thres = 0.45 # NMS IOU threshold
self.max_det = 1000 # maximum detections per image
self.device = 'cpu' # cuda device, i.e. 0 or 0,1,2,3 or cpu
self.view_img = False # show results
self.save_txt = False # save results to *.txt
self.save_conf = False # save confidences in --save-txt labels
self.save_crop = False # save cropped prediction boxes
self.nosave = False # do not save images/videos
self.classes = None # filter by class: --class 0, or --class 0 2 3
self.agnostic_nms = False # class-agnostic NMS
self.augment = False # augmented inference
self.visualize = False # visualize features
self.update = False # update all models
self.project = ROOT / 'runs/detect' # save results to project/name
self.name = 'exp' # save results to project/name
self.exist_ok = False # existing project/name ok, do not increment
self.line_thickness = 3 # bounding box thickness (pixels)
self.hide_labels = False # hide labels
self.hide_conf = False # hide confidences
self.half = False # use FP16 half-precision inference
self.dnn = False # use OpenCV DNN for ONNX inference
# Directories
self.save_dir = increment_path(Path(self.project) / self.name, exist_ok=self.exist_ok) # increment run
(self.save_dir / 'labels' if self.save_txt else self.save_dir).mkdir(parents=True, exist_ok=True) # make dir
# Load model
device = select_device(self.device)
self.model = DetectMultiBackend(self.weights, device=device, dnn=self.dnn)
self.stride, self.names, self.pt, jit, onnx, engine = self.model.stride, self.model.names, self.model.pt, self.model.jit, self.model.onnx, self.model.engine
self.imgsz = check_img_size(self.imgsz, s=self.stride) # check image size
# Half
self.half &= (self.pt or jit or engine) and device.type != 'cpu' # half precision only supported by PyTorch on CUDA
if self.pt or jit:
self.model.model.half() if self.half else self.model.model.float()
def detect(self, source):
detect_result = {
'target_info_list': [],
}
source = str(source)
save_img = not self.nosave and not source.endswith('.txt') # save inference images
is_file = Path(source).suffix[1:] in (IMG_FORMATS + VID_FORMATS)
is_url = source.lower().startswith(('rtsp://', 'rtmp://', 'http://', 'https://'))
webcam = source.isnumeric() or source.endswith('.txt') or (is_url and not is_file)
if is_url and is_file:
source = check_file(source) # download
# Dataloader
if webcam:
view_img = check_imshow()
cudnn.benchmark = True # set True to speed up constant image size inference
dataset = LoadStreams(source, img_size=self.imgsz, stride=self.stride, auto=self.pt)
bs = len(dataset) # batch_size
else:
dataset = LoadImages(source, img_size=self.imgsz, stride=self.stride, auto=self.pt)
bs = 1 # batch_size
vid_path, vid_writer = [None] * bs, [None] * bs
# Run inference
self.model.warmup(imgsz=(1, 3, *self.imgsz), half=self.half) # warmup
dt, seen = [0.0, 0.0, 0.0], 0
for path, im, im0s, vid_cap, s in dataset:
t1 = time_sync()
im = torch.from_numpy(im).to(self.device)
im = im.half() if self.half else im.float() # uint8 to fp16/32
im /= 255 # 0 - 255 to 0.0 - 1.0
if len(im.shape) == 3:
im = im[None] # expand for batch dim
t2 = time_sync()
dt[0] += t2 - t1
# Inference
self.visualize = increment_path(self.save_dir / Path(path).stem, mkdir=True) if self.visualize else False
pred = self.model(im, augment=self.augment, visualize=self.visualize)
t3 = time_sync()
dt[1] += t3 - t2
# NMS
pred = non_max_suppression(pred, self.conf_thres, self.iou_thres, self.classes, self.agnostic_nms, max_det=self.max_det)
dt[2] += time_sync() - t3
# Second-stage classifier (optional)
# pred = utils.general.apply_classifier(pred, classifier_model, im, im0s)
# Process predictions
for i, det in enumerate(pred): # per image
seen += 1
if webcam: # batch_size >= 1
p, im0, frame = path[i], im0s[i].copy(), dataset.count
s += f'{i}: '
else:
p, im0, frame = path, im0s.copy(), getattr(dataset, 'frame', 0)
p = Path(p) # to Path
save_path = str(self.save_dir / p.name) # im.jpg
txt_path = str(self.save_dir / 'labels' / p.stem) + (
'' if dataset.mode == 'image' else f'_{frame}') # im.txt
s += '%gx%g ' % im.shape[2:] # print string
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
imc = im0.copy() if self.save_crop else im0 # for save_crop
annotator = Annotator(im0, line_width=self.line_thickness, example=str(self.names))
if len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(im.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string
# Write results
for *xyxy, conf, cls in reversed(det):
if self.save_txt: # Write to file
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh
line = (cls, *xywh, conf) if self.save_conf else (cls, *xywh) # label format
with open(txt_path + '.txt', 'a') as f:
f.write(('%g ' * len(line)).rstrip() % line + '\n')
if save_img or self.save_crop or self.view_img: # Add bbox to image
c = int(cls) # integer class
label = None if self.hide_labels else (self.names[c] if self.hide_conf else f'{self.names[c]} {conf:.2f}')
infos = str(xyxy)+label[-3:]
print(infos)
detect_result['target_info_list'].append(re.findall("([0-9]+)", infos))
annotator.box_label(xyxy, label, color=colors(c, True))
if self.save_crop:
save_one_box(xyxy, imc, file=self.save_dir / 'crops' / self.names[c] / f'{p.stem}.jpg', BGR=True)
# Print time (inference-only)
LOGGER.info(f'{s}Done. ({t3 - t2:.3f}s)')
# Stream results
im0 = annotator.result()
if self.view_img:
cv2.imshow(str(p), im0)
cv2.waitKey(1) # 1 millisecond
# Save results (image with detections)
if save_img:
if dataset.mode == 'image':
cv2.imwrite(save_path, im0)
else: # 'video' or 'stream'
if vid_path[i] != save_path: # new video
vid_path[i] = save_path
if isinstance(vid_writer[i], cv2.VideoWriter):
vid_writer[i].release() # release previous video writer
if vid_cap: # video
fps = vid_cap.get(cv2.CAP_PROP_FPS)
w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
else: # stream
fps, w, h = 30, im0.shape[1], im0.shape[0]
save_path += '.mp4'
vid_writer[i] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
vid_writer[i].write(im0)
# Print results
t = tuple(x / seen * 1E3 for x in dt) # speeds per image
LOGGER.info(f'Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {(1, 3, *self.imgsz)}' % t)
if self.save_txt or save_img:
s = f"\n{len(list(self.save_dir.glob('labels/*.txt')))} labels saved to {self.save_dir / 'labels'}" if self.save_txt else ''
LOGGER.info(f"Results saved to {colorstr('bold', self.save_dir)}{s}")
if self.update:
strip_optimizer(self.weights) # update model (to fix SourceChangeWarning)
return detect_result
def __new__(cls, *args, **kwargs) -> object:
"""
使用new方法實(shí)現(xiàn)單例模式
:param args:
:param kwargs:
:return:
"""
if not hasattr(yolo_detect, "_instance"):
with yolo_detect._instance_lock:
if not hasattr(yolo_detect, "_instance"):
yolo_detect._instance = object.__new__(cls)
return yolo_detect._instance
if __name__ == '__main__':
y = yolo_detect()
y.detect("1640749331.png")
后臺(tái)服務(wù)搭建
基于以上修改后的腳本理論上已經(jīng)可以直接本地使用了,但考慮到y(tǒng)olo基于深度學(xué)習(xí),需要搭建一整套的環(huán)境后才能運(yùn)行,如果全部本地使用的話,各項(xiàng)目復(fù)用成本較高;因此決定將該腳本以web服務(wù)的方式提供,考慮到腳本為py,因此web服務(wù)采用py棧的Django框架。
主要為以下幾步:
1、使用pycharm新建一個(gè)django項(xiàng)目
2、settings文件注釋 MIDDLEWARE 的'django.middleware.csrf.CsrfViewMiddleware',解決跨域問題
3、url文件內(nèi)增加路由映射關(guān)系
from yolo import views as yolo_view
urlpatterns = [
path('admin/', admin.site.urls),
path(r'up_file', yolo_view.up_file.as_view()),
]
4、views文件內(nèi)增加路由方法
class up_file(View):
def post(self, request):
try:
file = request.FILES.get('file','')
print(file)
current_path = os.path.dirname(__file__) # 當(dāng)前路徑
print(current_path)
file_path = os.path.join(current_path,'tempdata')
print(file_path)
if not os.path.exists(file_path): # 文件夾不存在則創(chuàng)建
os.mkdir(file_path)
save_file_path = os.path.join(file_path,file.name)
with open(save_file_path,'wb') as fp: # 寫文件
for i in file.chunks():
fp.write(i)
_msg = yolo.detect(save_file_path)
return HttpResponse(json.dumps(_msg))
except Exception as e:
print(e)
return HttpResponse(json.dumps({'status': False, 'msg': u'錯(cuò)誤:{}'.format(e)}))
5、啟動(dòng)服務(wù)
測試腳本
服務(wù)搭建完成后,寫一個(gè)測試腳本進(jìn)行相關(guān)測試,內(nèi)容如下:
import requests, os, time
def post_pic(url:str, path:str) -> dict:
"""
把圖片上傳到服務(wù)器,并獲取返回值
:param url: 服務(wù)器地址
:param path: 圖片本地路徑
:return: 服務(wù)器的返回值
"""
if not os.path.exists(path):
print("文件不存在!")
return {}
fp = open(path, 'rb')
result = requests.post(url, files={'file': fp},)
fp.close()
data = result.json()
return data
def parse_response(response:dict) -> list:
"""
把請求返回值進(jìn)行處理,返回處理后的值
:param response: 第一次執(zhí)行上傳方法后的
:return:處理后的列表,如果不是正確識(shí)別的情況,則返回空列表
"""
if response:
# 0,提取出列表
taglist = response['target_info_list']
# 1,把置信度 >95 的提取出來
real_tag_list = []
for tag in taglist:
if int(tag[-1]) >= 95:
real_tag_list.append(tag)
if len(real_tag_list) == 2:
# 2,如果提取出的列表長度是2,代表識(shí)別沒有問題,直接返回提取后的列表
return real_tag_list # 返回處理后的列表
else:
# 3,提取出來后的列表長度如果不是2,代表本次識(shí)別的有問題,需要丟棄;返回空列表
return []
else:
# 傳入的參數(shù)不是字典類型,代表識(shí)別出了問題,直接返回空列表
return []
if __name__ == '__main__':
url = 'http://172.31.183.153:8000/up_file'
path = '333333.png'
t1 = time.time()
tag_list = parse_response(post_pic(url, path))
print(tag_list)
difftime =time.time() - t1
print(difftime)
效果
請求后,響應(yīng)體如下:
{'target_info_list': [['120', '933', '295', '1082', '97'], ['631', '932', '800', '1078', '98']]}
target_info_list是識(shí)別結(jié)果的列表,每一個(gè)列表代表一個(gè)被識(shí)別滑塊的信息,其中,前四位數(shù)字是位置信息,代表滑塊左上角的x軸位置、y軸位置和右下角的x軸位置、y軸位置;第五位數(shù)字為置信度,代表程序認(rèn)為是滑塊的可能性。
根據(jù)兩個(gè)滑塊的x軸位置,即可計(jì)算出滑動(dòng)距離。
實(shí)測上傳一張300kb左右的圖片,響應(yīng)時(shí)間為600ms左右。