Source code for yolort.v5.utils.general

# YOLOv5 🚀 by Ultralytics, GPL-3.0 license
"""
General utils
"""

import contextlib
import glob
import logging
import math
import os
import platform
import random
import re
import signal
import time
import urllib
from pathlib import Path

import numpy as np
import pandas as pd
import pkg_resources as pkg
import torch
import torchvision
import yaml

try:
    import cv2
except ImportError:
    cv2 = None

from .metrics import box_iou, fitness

# Settings
torch.set_printoptions(linewidth=320, precision=5, profile="long")
# format short g, %precision=5
np.set_printoptions(linewidth=320, formatter={"float_kind": "{:11.5g}".format})
pd.options.display.max_columns = 10
# prevent OpenCV from multithreading (incompatible with PyTorch DataLoader)
if cv2 is not None:
    cv2.setNumThreads(0)
os.environ["NUMEXPR_MAX_THREADS"] = str(min(os.cpu_count(), 8))  # NumExpr max threads

FILE = Path(__file__).resolve()
ROOT = FILE.parents[1]  # YOLOv5 root directory


[docs]def set_logging(name=None, verbose=True): # Sets level and returns logger rank = int(os.getenv("RANK", -1)) # rank in world for Multi-GPU trainings logging.basicConfig( format="%(message)s", level=logging.INFO if (verbose and rank in (-1, 0)) else logging.WARNING ) return logging.getLogger(name)
# define globally (used in train.py, val.py, detect.py, etc.) LOGGER = set_logging(__name__) class Profile(contextlib.ContextDecorator): # Usage: @Profile() decorator or 'with Profile():' context manager def __enter__(self): self.start = time.time() def __exit__(self, type, value, traceback): print(f"Profile results: {time.time() - self.start:.5f}s") class Timeout(contextlib.ContextDecorator): # Usage: @Timeout(seconds) decorator or 'with Timeout(seconds):' context manager def __init__(self, seconds, *, timeout_msg="", suppress_timeout_errors=True): self.seconds = int(seconds) self.timeout_message = timeout_msg self.suppress = bool(suppress_timeout_errors) def _timeout_handler(self, signum, frame): raise TimeoutError(self.timeout_message) def __enter__(self): signal.signal(signal.SIGALRM, self._timeout_handler) # Set handler for SIGALRM signal.alarm(self.seconds) # start countdown for SIGALRM to be raised def __exit__(self, exc_type, exc_val, exc_tb): signal.alarm(0) # Cancel SIGALRM if it's scheduled if self.suppress and exc_type is TimeoutError: # Suppress TimeoutError return True def try_except(func): # try-except function. Usage: @try_except decorator def handler(*args, **kwargs): try: func(*args, **kwargs) except Exception as e: print(e) return handler def methods(instance): # Get class/instance methods return [f for f in dir(instance) if callable(getattr(instance, f)) and not f.startswith("__")] def print_args(name, opt): # Print argparser arguments LOGGER.info(colorstr(f"{name}: ") + ", ".join(f"{k}={v}" for k, v in vars(opt).items())) def init_seeds(seed=0): """ Initialize random number generator (RNG) seeds https://pytorch.org/docs/stable/notes/randomness.html cudnn seed 0 settings are slower and more reproducible, else faster and less reproducible """ import torch.backends.cudnn as cudnn random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) cudnn.benchmark, cudnn.deterministic = (False, True) if seed == 0 else (True, False)
[docs]def intersect_dicts(dict1, dict2, exclude=()): """ Dictionary intersection of matching keys and shapes, omitting 'exclude' keys, using dict1 values """ return { k: v for k, v in dict1.items() if k in dict2 and not any(x in k for x in exclude) and v.shape == dict2[k].shape }
def get_latest_run(search_dir="."): # Return path to most recent 'last.pt' in /runs (i.e. to --resume from) last_list = glob.glob(f"{search_dir}/**/last*.pt", recursive=True) return max(last_list, key=os.path.getctime) if last_list else "" def user_config_dir(dir="Ultralytics", env_var="YOLOV5_CONFIG_DIR"): """ Return path of user configuration directory. Prefer environment variable if exists. Make dir if required. """ env = os.getenv(env_var) if env: path = Path(env) # use environment variable else: cfg = { "Windows": "AppData/Roaming", "Linux": ".config", "Darwin": "Library/Application Support", } # 3 OS dirs path = Path.home() / cfg.get(platform.system(), "") # OS-specific config dir path = ( path if is_writeable(path) else Path("/tmp") ) / dir # GCP and AWS lambda fix, only /tmp is writeable path.mkdir(exist_ok=True) # make if required return path def is_writeable(dir, test=False): """ Return True if directory has write permissions, test opening a file with write permissions if test=True """ if test: # method 1 file = Path(dir) / "tmp.txt" try: with open(file, "w"): # open file with write permissions pass file.unlink() # remove file return True except OSError: return False else: # method 2 return os.access(dir, os.R_OK) # possible issues on Windows def is_pip(): # Is file in a pip package? return "site-packages" in Path(__file__).resolve().parts def is_ascii(s=""): """ Is string composed of all ASCII (no UTF) characters? (note str().isascii() introduced in python 3.7) """ s = str(s) # convert list, tuple, None, etc. to str return len(s.encode().decode("ascii", "ignore")) == len(s) def is_chinese(s="人工智能"): # Is string composed of any Chinese characters? return re.search("[\u4e00-\u9fff]", s) def emojis(str=""): # Return platform-dependent emoji-safe version of string return str.encode().decode("ascii", "ignore") if platform.system() == "Windows" else str def file_size(path): # Return file/dir size (MB) path = Path(path) if path.is_file(): return path.stat().st_size / 1e6 elif path.is_dir(): return sum(f.stat().st_size for f in path.glob("**/*") if f.is_file()) / 1e6 else: return 0.0 def check_online(): # Check internet connectivity import socket try: socket.create_connection(("1.1.1.1", 443), 5) # check host accessibility return True except OSError: return False def check_python(minimum="3.6.2"): # Check current python version vs. required python version check_version(platform.python_version(), minimum, name="Python ", hard=True) def check_version(current="0.0.0", minimum="0.0.0", name="version ", pinned=False, hard=False): # Check version vs. required version current, minimum = (pkg.parse_version(x) for x in (current, minimum)) result = (current == minimum) if pinned else (current >= minimum) # bool if hard: # assert min requirements met assert result, f"{name}{minimum} required by YOLOv5, but {name}{current} is currently installed" else: return result def check_img_size(image_size, stride=32, floor=0): """ Verify image size is a multiple of stride stride in each dimension """ if isinstance(image_size, int): # integer i.e. img_size=640 new_size = max(make_divisible(image_size, int(stride)), floor) else: # list i.e. img_size=[640, 480] new_size = [max(make_divisible(x, int(stride)), floor) for x in image_size] if new_size != image_size: print( f"WARNING: --img-size {image_size} must be multiple of " f"max stride {stride}, updating to {new_size}" ) return new_size def check_suffix(file="yolov5s.pt", suffix=(".pt",), msg=""): # Check file(s) for acceptable suffix if file and suffix: if isinstance(suffix, str): suffix = [suffix] for f in file if isinstance(file, (list, tuple)) else [file]: s = Path(f).suffix.lower() # file suffix if len(s): assert s in suffix, f"{msg}{f} acceptable suffix is {suffix}" def check_yaml(file, suffix=(".yaml", ".yml")): # Search/download YAML file (if necessary) and return path, checking suffix return check_file(file, suffix) def check_file(file, suffix=""): """ Search/download file (if necessary) and return path """ check_suffix(file, suffix) # optional file = str(file) # convert to str() if Path(file).is_file() or file == "": # return the file if the file exists return file if file.startswith(("http:/", "https:/")): # download the file if the image source is a link url = str(Path(file)).replace(":/", "://") # Pathlib turns :// -> :/ # '%2F' to '/', split https://url.com/file.txt?auth file = Path(urllib.parse.unquote(file).split("?")[0]).name if Path(file).is_file(): print(f"Found {url} locally at {file}") # file already exists else: print(f"Downloading {url} to {file}...") torch.hub.download_url_to_file(url, file) assert Path(file).exists() and Path(file).stat().st_size > 0, f"File download failed: {url}" return file files = [] for d in "data", "models", "utils": # search the directories files.extend(glob.glob(str(ROOT / d / "**" / file), recursive=True)) # find file assert len(files), f"File not found: {file}" # assert file was found # assert unique assert len(files) == 1, f"Multiple files match '{file}', specify exact path: {files}" return files[0] # return file def url2file(url): # Convert URL to filename, i.e. https://url.com/file.txt?auth -> file.txt url = str(Path(url)).replace(":/", "://") # Pathlib turns :// -> :/ # '%2F' to '/', split https://url.com/file.txt?auth file = Path(urllib.parse.unquote(url)).name.split("?")[0] return file def make_divisible(x, divisor): # Returns x evenly divisible by divisor return math.ceil(x / divisor) * divisor def clean_str(s): # Cleans a string by replacing special characters with underscore _ return re.sub(pattern="[|@#!¡·$€%&()=?¿^*;:,¨´><+]", repl="_", string=s) def one_cycle(y1=0.0, y2=1.0, steps=100): # lambda function for sinusoidal ramp from y1 to y2 https://arxiv.org/pdf/1812.01187.pdf return lambda x: ((1 - math.cos(x * math.pi / steps)) / 2) * (y2 - y1) + y1 def colorstr(*input): """ Colors a string https://en.wikipedia.org/wiki/ANSI_escape_code, i.e. colorstr('blue', 'hello world') """ *args, string = input if len(input) > 1 else ("blue", "bold", input[0]) # color arguments, string colors = { "black": "\033[30m", # basic colors "red": "\033[31m", "green": "\033[32m", "yellow": "\033[33m", "blue": "\033[34m", "magenta": "\033[35m", "cyan": "\033[36m", "white": "\033[37m", "bright_black": "\033[90m", # bright colors "bright_red": "\033[91m", "bright_green": "\033[92m", "bright_yellow": "\033[93m", "bright_blue": "\033[94m", "bright_magenta": "\033[95m", "bright_cyan": "\033[96m", "bright_white": "\033[97m", "end": "\033[0m", # misc "bold": "\033[1m", "underline": "\033[4m", } return "".join(colors[x] for x in args) + f"{string}" + colors["end"] def labels_to_class_weights(labels, nc=80): # Get class weights (inverse frequency) from training labels if labels[0] is None: # no labels loaded return torch.Tensor() labels = np.concatenate(labels, 0) # labels.shape = (866643, 5) for COCO classes = labels[:, 0].astype(np.int) # labels = [class xywh] weights = np.bincount(classes, minlength=nc) # occurrences per class # Prepend gridpoint count (for uCE training) # gpi = ((320 / 32 * np.array([1, 2, 4])) ** 2 * 3).sum() # gridpoints per image # prepend gridpoints to start # weights = np.hstack([gpi * len(labels) - weights.sum() * 9, weights * 9]) ** 0.5 weights[weights == 0] = 1 # replace empty bins with 1 weights = 1 / weights # number of targets per class weights /= weights.sum() # normalize return torch.from_numpy(weights) def labels_to_image_weights(labels, nc=80, class_weights=np.ones(80)): # Produces image weights based on class_weights and image contents class_counts = np.array([np.bincount(x[:, 0].astype(np.int), minlength=nc) for x in labels]) image_weights = (class_weights.reshape(1, nc) * class_counts).sum(1) # index = random.choices(range(n), weights=image_weights, k=1) # weight image sample return image_weights def xyxy2xywh(x): """ Convert nx4 boxes from [x1, y1, x2, y2] to [x, y, w, h] where xy1=top-left, xy2=bottom-right """ y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = (x[:, 0] + x[:, 2]) / 2 # x center y[:, 1] = (x[:, 1] + x[:, 3]) / 2 # y center y[:, 2] = x[:, 2] - x[:, 0] # width y[:, 3] = x[:, 3] - x[:, 1] # height return y def xywh2xyxy(x): """ Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right """ y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y return y def xywhn2xyxy(x, w=640, h=640, padw=0, padh=0): """ Convert nx4 boxes from [x, y, w, h] normalized to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right """ y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = w * (x[:, 0] - x[:, 2] / 2) + padw # top left x y[:, 1] = h * (x[:, 1] - x[:, 3] / 2) + padh # top left y y[:, 2] = w * (x[:, 0] + x[:, 2] / 2) + padw # bottom right x y[:, 3] = h * (x[:, 1] + x[:, 3] / 2) + padh # bottom right y return y def xyxy2xywhn(x, w=640, h=640, clip=False, eps=0.0): """ Convert nx4 boxes from [x1, y1, x2, y2] to [x, y, w, h] normalized where xy1=top-left, xy2=bottom-right """ if clip: clip_coords(x, (h - eps, w - eps)) # warning: inplace clip y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = ((x[:, 0] + x[:, 2]) / 2) / w # x center y[:, 1] = ((x[:, 1] + x[:, 3]) / 2) / h # y center y[:, 2] = (x[:, 2] - x[:, 0]) / w # width y[:, 3] = (x[:, 3] - x[:, 1]) / h # height return y def xyn2xy(x, w=640, h=640, padw=0, padh=0): # Convert normalized segments into pixel segments, shape (n,2) y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = w * x[:, 0] + padw # top left x y[:, 1] = h * x[:, 1] + padh # top left y return y def segment2box(segment, width=640, height=640): """ Convert 1 segment label to 1 box label, applying inside-image constraint, i.e. (xy1, xy2, ...) to (xyxy) """ x, y = segment.T # segment xy inside = (x >= 0) & (y >= 0) & (x <= width) & (y <= height) x, y, = ( x[inside], y[inside], ) return np.array([x.min(), y.min(), x.max(), y.max()]) if any(x) else np.zeros((1, 4)) def segments2boxes(segments): # Convert segment labels to box labels, i.e. (cls, xy1, xy2, ...) to (cls, xywh) boxes = [] for s in segments: x, y = s.T # segment xy boxes.append([x.min(), y.min(), x.max(), y.max()]) # cls, xyxy return xyxy2xywh(np.array(boxes)) # cls, xywh def resample_segments(segments, n=1000): # Up-sample an (n,2) segment for i, s in enumerate(segments): x = np.linspace(0, len(s) - 1, n) xp = np.arange(len(s)) # segment xy segments[i] = np.concatenate([np.interp(x, xp, s[:, i]) for i in range(2)]).reshape(2, -1).T return segments
[docs]def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None): # Rescale coords (xyxy) from img1_shape to img0_shape if ratio_pad is None: # calculate from img0_shape # gain = old / new gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # wh padding pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 else: gain = ratio_pad[0][0] pad = ratio_pad[1] coords[:, [0, 2]] -= pad[0] # x padding coords[:, [1, 3]] -= pad[1] # y padding coords[:, :4] /= gain clip_coords(coords, img0_shape) return coords
def clip_coords(boxes, shape): # Clip bounding xyxy bounding boxes to image shape (height, width) if isinstance(boxes, torch.Tensor): # faster individually boxes[:, 0].clamp_(0, shape[1]) # x1 boxes[:, 1].clamp_(0, shape[0]) # y1 boxes[:, 2].clamp_(0, shape[1]) # x2 boxes[:, 3].clamp_(0, shape[0]) # y2 else: # np.array (faster grouped) boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1]) # x1, x2 boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0]) # y1, y2
[docs]def non_max_suppression( prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, labels=(), max_det=300, ): """ Runs Non-Maximum Suppression (NMS) on inference results Returns: list of detections, on (n,6) tensor per image [xyxy, conf, cls] """ nc = prediction.shape[2] - 5 # number of classes xc = prediction[..., 4] > conf_thres # candidates # Checks assert ( 0 <= conf_thres <= 1 ), f"Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0" assert 0 <= iou_thres <= 1, f"Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0" # Settings # min_wh = 2 # (pixels) minimum box width and height max_wh = 4096 # (pixels) maximum box width and height max_nms = 30000 # maximum number of boxes into torchvision.ops.nms() time_limit = 10.0 # seconds to quit after redundant = True # require redundant detections multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) merge = False # use merge-NMS t = time.time() output = [torch.zeros((0, 6), device=prediction.device)] * prediction.shape[0] for xi, x in enumerate(prediction): # image index, image inference # Apply constraints # x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height x = x[xc[xi]] # confidence # Cat apriori labels if autolabelling if labels and len(labels[xi]): l = labels[xi] v = torch.zeros((len(l), nc + 5), device=x.device) v[:, :4] = l[:, 1:5] # box v[:, 4] = 1.0 # conf v[range(len(l)), l[:, 0].long() + 5] = 1.0 # cls x = torch.cat((x, v), 0) # If none remain process next image if not x.shape[0]: continue # Compute conf x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf # Box (center x, center y, width, height) to (x1, y1, x2, y2) box = xywh2xyxy(x[:, :4]) # Detections matrix nx6 (xyxy, conf, cls) if multi_label: i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1) else: # best class only conf, j = x[:, 5:].max(1, keepdim=True) x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres] # Filter by class if classes is not None: x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] # Apply finite constraint # if not torch.isfinite(x).all(): # x = x[torch.isfinite(x).all(1)] # Check shape n = x.shape[0] # number of boxes if not n: # no boxes continue elif n > max_nms: # excess boxes x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence # Batched NMS c = x[:, 5:6] * (0 if agnostic else max_wh) # classes boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS if i.shape[0] > max_det: # limit detections i = i[:max_det] if merge and (1 < n < 3e3): # Merge NMS (boxes merged using weighted mean) # update boxes as boxes(i,4) = weights(i,n) * boxes(n,4) iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix weights = iou * scores[None] # box weights # merged boxes x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) if redundant: i = i[iou.sum(1) > 1] # require redundancy output[xi] = x[i] if (time.time() - t) > time_limit: print(f"WARNING: NMS time limit {time_limit}s exceeded") break # time limit exceeded return output
def strip_optimizer(checkpoint_path="best.pt", saved_path=""): """ Strip optimizer from 'checkpoint_path' to finalize training, optionally save as 'saved_path' """ x = torch.load(checkpoint_path, map_location=torch.device("cpu")) if x.get("ema"): x["model"] = x["ema"] # replace model with ema for k in "optimizer", "training_results", "wandb_id", "ema", "updates": # keys x[k] = None x["epoch"] = -1 x["model"].half() # to FP16 for p in x["model"].parameters(): p.requires_grad = False torch.save(x, saved_path or checkpoint_path) mb = os.path.getsize(saved_path or checkpoint_path) / 1e6 # filesize saved_info = f" saved as {saved_path}," if saved_path else "" print(f"Optimizer stripped from {checkpoint_path},{saved_info} {mb:.1f}MB") def print_mutation(results, hyp, save_dir, bucket): evolve_csv, evolve_yaml = save_dir / "evolve.csv", save_dir / "hyp_evolve.yaml" # [results + hyps] keys = ( "metrics/precision", "metrics/recall", "metrics/mAP_0.5", "metrics/mAP_0.5:0.95", "val/box_loss", "val/obj_loss", "val/cls_loss", ) + tuple(hyp.keys()) keys = tuple(x.strip() for x in keys) vals = results + tuple(hyp.values()) n = len(keys) # Log to evolve.csv s = "" if evolve_csv.exists() else (("%20s," * n % keys).rstrip(",") + "\n") # add header with open(evolve_csv, "a") as f: f.write(s + ("%20.5g," * n % vals).rstrip(",") + "\n") # Print to screen print(colorstr("evolve: ") + ", ".join(f"{x.strip():>20s}" for x in keys)) print(colorstr("evolve: ") + ", ".join(f"{x:20.5g}" for x in vals), end="\n\n\n") # Save yaml with open(evolve_yaml, "w") as f: data = pd.read_csv(evolve_csv) data = data.rename(columns=lambda x: x.strip()) # strip keys i = np.argmax(fitness(data.values[:, :7])) # f.write( "# YOLOv5 Hyperparameter Evolution Results\n" + f"# Best generation: {i}\n" + f"# Last generation: {len(data)}\n" + "# " + ", ".join(f"{x.strip():>20s}" for x in keys[:7]) + "\n" + "# " + ", ".join(f"{x:>20.5g}" for x in data.values[i, :7]) + "\n\n" ) yaml.safe_dump(hyp, f, sort_keys=False) if bucket: os.system(f"gsutil cp {evolve_csv} {evolve_yaml} gs://{bucket}") # upload def apply_classifier(x, model, img, im0): # Apply a second stage classifier to YOLO outputs im0 = [im0] if isinstance(im0, np.ndarray) else im0 for i, d in enumerate(x): # per image if d is not None and len(d): d = d.clone() # Reshape and pad cutouts b = xyxy2xywh(d[:, :4]) # boxes b[:, 2:] = b[:, 2:].max(1)[0].unsqueeze(1) # rectangle to square b[:, 2:] = b[:, 2:] * 1.3 + 30 # pad d[:, :4] = xywh2xyxy(b).long() # Rescale boxes from img_size to im0 size scale_coords(img.shape[2:], d[:, :4], im0[i].shape) # Classes pred_cls1 = d[:, 5].long() ims = [] for j, a in enumerate(d): # per item cutout = im0[i][int(a[1]) : int(a[3]), int(a[0]) : int(a[2])] im = cv2.resize(cutout, (224, 224)) # BGR # cv2.imwrite('example%i.jpg' % j, cutout) im = im[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416 im = np.ascontiguousarray(im, dtype=np.float32) # uint8 to float32 im /= 255 # 0 - 255 to 0.0 - 1.0 ims.append(im) pred_cls2 = model(torch.Tensor(ims).to(d.device)).argmax(1) # classifier prediction x[i] = x[i][pred_cls1 == pred_cls2] # retain matching class detections return x def increment_path(path, exist_ok=False, sep="", mkdir=False): """ Increment file or directory path. i.e. runs/exp --> runs/exp{sep}2, runs/exp{sep}3, ... etc. """ path = Path(path) # os-agnostic if path.exists() and not exist_ok: path, suffix = (path.with_suffix(""), path.suffix) if path.is_file() else (path, "") dirs = glob.glob(f"{path}{sep}*") # similar paths matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs] i = [int(m.groups()[0]) for m in matches if m] # indices n = max(i) + 1 if i else 2 # increment number path = Path(f"{path}{sep}{n}{suffix}") # increment path if mkdir: path.mkdir(parents=True, exist_ok=True) # make directory return path