Skip to content

Processors Module

This module provides classes and functions for processing images and videos using semantic segmentation models.

It includes processors for handling individual files (images or videos) and directories containing multiple video files. The module also manages caching of segmentation results, generation of output visualizations, and analysis of segmentation statistics.

ImageProcessor

ImageProcessor(config)

Processes individual images using semantic segmentation models.

This class handles the segmentation of single images, including saving results and analyzing the segmentation output.

ATTRIBUTE DESCRIPTION
config

Configuration object containing processing parameters.

TYPE: Config

pipeline

Segmentation pipeline for processing images.

file_handler

Handles file operations.

TYPE: FileHandler

visualizer

Handles visualization of segmentation results.

TYPE: VisualizationHandler

analyzer

Analyzes segmentation results.

TYPE: SegmentationAnalyzer

Initializes the ImageProcessor with the given configuration.

PARAMETER DESCRIPTION
config

Configuration object for the processor.

TYPE: Config

Source code in cityseg/processors.py
def __init__(self, config: Config):
    """
    Initializes the ImageProcessor with the given configuration.

    Args:
        config (Config): Configuration object for the processor.
    """
    self.config = config
    self.pipeline = create_segmentation_pipeline(config.model)
    self.file_handler = FileHandler()
    self.visualizer = VisualizationHandler()
    self.analyzer = SegmentationAnalyzer()

process

process()

Processes the input image according to the configuration.

This method handles the entire image processing pipeline, including segmentation, result saving, and analysis.

RAISES DESCRIPTION
ProcessingError

If an error occurs during image processing.

Source code in cityseg/processors.py
def process(self) -> None:
    """
    Processes the input image according to the configuration.

    This method handles the entire image processing pipeline, including
    segmentation, result saving, and analysis.

    Raises:
        ProcessingError: If an error occurs during image processing.
    """
    logger.info(f"Processing image: {self.config.input}")
    try:
        image = Image.open(self.config.input).convert("RGB")
        if self.config.model.max_size:
            image.thumbnail(
                (self.config.model.max_size, self.config.model.max_size)
            )

        result = self.pipeline([image])[0]

        self._save_results(image, result)
        self._analyze_results(result["seg_map"])

        logger.info("Image processing complete")
    except Exception as e:
        logger.exception(f"Error during image processing: {str(e)}")
        raise ProcessingError(f"Error during image processing: {str(e)}")

VideoProcessor

VideoProcessor(config)

Processes video files using semantic segmentation models.

This class handles the segmentation of video frames, including saving results, generating output videos, and analyzing the segmentation output.

ATTRIBUTE DESCRIPTION
config

Configuration object containing processing parameters.

TYPE: Config

pipeline

Segmentation pipeline for processing video frames.

processing_plan

Plan for video processing steps.

TYPE: ProcessingPlan

file_handler

Handles file operations.

TYPE: FileHandler

visualizer

Handles visualization of segmentation results.

TYPE: VisualizationHandler

analyzer

Analyzes segmentation results.

TYPE: SegmentationAnalyzer

Initializes the VideoProcessor with the given configuration.

PARAMETER DESCRIPTION
config

Configuration object for the processor.

TYPE: Config

Source code in cityseg/processors.py
def __init__(self, config: Config):
    """
    Initializes the VideoProcessor with the given configuration.

    Args:
        config (Config): Configuration object for the processor.
    """
    self.config = config
    self.pipeline = create_segmentation_pipeline(config.model)
    self.processing_plan = ProcessingPlan(config)
    self.file_handler = FileHandler()
    self.visualizer = VisualizationHandler()
    self.analyzer = SegmentationAnalyzer()
    logger.debug(f"VideoProcessor initialized with config: {config}")

process

process()

Processes the input video according to the configuration and processing plan.

This method handles the entire video processing pipeline, including frame segmentation, result saving, video generation, and analysis.

RAISES DESCRIPTION
ProcessingError

If an error occurs during video processing.

Source code in cityseg/processors.py
def process(self) -> None:
    """
    Processes the input video according to the configuration and processing plan.

    This method handles the entire video processing pipeline, including
    frame segmentation, result saving, video generation, and analysis.

    Raises:
        ProcessingError: If an error occurs during video processing.
    """
    logger.info(f"Processing video: {self.config.input.name}")
    try:
        output_path = self.config.get_output_path()
        hdf_path = output_path.with_name(f"{output_path.stem}_segmentation.h5")

        if self.processing_plan.plan["process_video"]:
            # logger.debug("Executing video frame processing")
            segmentation_data, metadata = self._process_video_frames()
            if self.processing_plan.plan["generate_hdf"]:
                logger.debug(f"Saving segmentation data to HDF file: {hdf_path}")
                self.file_handler.save_hdf_file(
                    hdf_path, segmentation_data, metadata
                )
        else:
            logger.info(
                f"Loading existing segmentation data from HDF file: {hdf_path.name}"
            )
            hdf_file, metadata = self.file_handler.load_hdf_file(hdf_path)
            segmentation_data = hdf_file[
                "segmentation"
            ]  # This is now a h5py.Dataset

        # Generate videos based on the processing plan
        if (
            self.processing_plan.plan["generate_colored_video"]
            or self.processing_plan.plan["generate_overlay_video"]
        ):
            self.generate_videos(segmentation_data, metadata)

        if self.processing_plan.plan["analyze_results"]:
            logger.debug("Analyzing segmentation results")
            SegmentationAnalyzer.analyze_results(
                segmentation_data, metadata, output_path
            )

        self._update_processing_history()

        logger.info("Video processing complete")
    except Exception as e:
        logger.exception(f"Error during video processing: {str(e)}")
        raise ProcessingError(f"Error during video processing: {str(e)}")
    finally:
        if hasattr(self, "hdf_file"):
            self.hdf_file.close()

generate_videos

generate_videos(segmentation_data, metadata)

Generates output videos based on the processing plan, using batched processing.

PARAMETER DESCRIPTION
segmentation_data

The segmentation data for all frames.

TYPE: Dataset

metadata

Metadata about the video and segmentation.

TYPE: Dict[str, Any]

Source code in cityseg/processors.py
def generate_videos(
    self, segmentation_data: h5py.Dataset, metadata: Dict[str, Any]
) -> None:
    """
    Generates output videos based on the processing plan, using batched processing.

    Args:
        segmentation_data (h5py.Dataset): The segmentation data for all frames.
        metadata (Dict[str, Any]): Metadata about the video and segmentation.
    """
    if not (
        self.processing_plan.plan.get("generate_colored_video", False)
        or self.processing_plan.plan.get("generate_overlay_video", False)
    ):
        logger.info("No video generation required according to the processing plan")
        return

    start_time = time.time()
    cap = cv2.VideoCapture(str(self.config.input))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = metadata["fps"] / metadata["frame_step"]

    output_base = self.config.get_output_path()
    video_writers = self._initialize_video_writers(width, height, fps)

    chunk_size = 100  # Adjust this value based on available memory
    for chunk_start in range(0, len(segmentation_data), chunk_size):
        chunk_end = min(chunk_start + chunk_size, len(segmentation_data))
        seg_chunk = get_segmentation_data_batch(
            segmentation_data, chunk_start, chunk_end
        )

        frames = self._get_video_frames_batch(
            cap, chunk_start, chunk_end, metadata["frame_step"]
        )

    if self.processing_plan.plan.get("generate_colored_video", False):
        colored_frames = self.visualizer.visualize_segmentation(
            frames, seg_chunk, metadata["palette"], colored_only=True
        )
        for colored_frame in colored_frames:
            video_writers["colored"].write(
                cv2.cvtColor(colored_frame, cv2.COLOR_RGB2BGR)
            )

    if self.processing_plan.plan.get("generate_overlay_video", False):
        overlay_frames = self.visualizer.visualize_segmentation(
            frames, seg_chunk, metadata["palette"], colored_only=False
        )
        for overlay_frame in overlay_frames:
            video_writers["overlay"].write(
                cv2.cvtColor(overlay_frame, cv2.COLOR_RGB2BGR)
            )

    for writer in video_writers.values():
        writer.release()

    cap.release()
    logger.debug(
        f"Video generation completed in {time.time() - start_time:.2f} seconds"
    )
    logger.debug(f"Videos saved to: {output_base}")

SegmentationProcessor

SegmentationProcessor(config)

Handles segmentation processing for both images and videos.

This class serves as a facade for ImageProcessor and VideoProcessor, delegating the processing based on the input type.

ATTRIBUTE DESCRIPTION
config

Configuration object containing processing parameters.

TYPE: Config

image_processor

Processor for handling image inputs.

TYPE: ImageProcessor

video_processor

Processor for handling video inputs.

TYPE: VideoProcessor

Initializes the SegmentationProcessor with the given configuration.

PARAMETER DESCRIPTION
config

Configuration object for the processor.

TYPE: Config

Source code in cityseg/processors.py
def __init__(self, config: Config):
    """
    Initializes the SegmentationProcessor with the given configuration.

    Args:
        config (Config): Configuration object for the processor.
    """
    self.config = config
    self.image_processor = ImageProcessor(config)
    self.video_processor = VideoProcessor(config)
    logger.debug(f"SegmentationProcessor initialized with config: {config}")

process

process()

Processes the input based on its type (image or video).

RAISES DESCRIPTION
ValueError

If the input type is not supported.

Source code in cityseg/processors.py
def process(self):
    """
    Processes the input based on its type (image or video).

    Raises:
        ValueError: If the input type is not supported.
    """
    if self.config.input_type == InputType.SINGLE_IMAGE:
        self.image_processor.process()
    elif self.config.input_type == InputType.SINGLE_VIDEO:
        self.video_processor.process()
    else:
        raise ValueError(f"Unsupported input type: {self.config.input_type}")

DirectoryProcessor

DirectoryProcessor(config)

Processes multiple video files in a directory.

This class handles the batch processing of video files found in a specified directory.

ATTRIBUTE DESCRIPTION
config

Configuration object containing processing parameters.

TYPE: Config

video_iterator

Iterator for video files in the directory.

TYPE: VideoFileIterator

logger

Logger instance for this processor.

Initializes the DirectoryProcessor with the given configuration.

PARAMETER DESCRIPTION
config

Configuration object for the processor.

TYPE: Config

Source code in cityseg/processors.py
def __init__(self, config: Config):
    """
    Initializes the DirectoryProcessor with the given configuration.

    Args:
        config (Config): Configuration object for the processor.
    """
    self.config = config
    self.video_iterator = VideoFileIterator(config.input)
    self.logger = logger.bind(
        processor_type=self.__class__.__name__,
        input_type=self.config.input_type.value,
        input_path=str(self.config.input),
        output_path=str(self.config.get_output_path()),
        frame_step=self.config.frame_step,
    )

process

process()

Processes all video files in the specified directory.

This method iterates through all video files, processing each one according to the configuration.

RAISES DESCRIPTION
InputError

If no video files are found in the directory.

Source code in cityseg/processors.py
def process(self) -> None:
    """
    Processes all video files in the specified directory.

    This method iterates through all video files, processing each one
    according to the configuration.

    Raises:
        InputError: If no video files are found in the directory.
    """
    self.logger.debug(
        "Starting directory processing", input_path=str(self.config.input)
    )

    if not self.video_iterator.video_files:
        self.logger.error("No video files found")
        raise InputError(f"No video files found in directory: {self.config.input}")

    output_dir = self.config.get_output_path()
    self.logger.info(
        f"Output directory set: {str(output_dir)}", output_dir=str(output_dir)
    )

    with tqdm_context(
        total=len(self.video_iterator.video_files),
        desc="Processing videos",
        disable=self.config.disable_tqdm,
    ) as pbar:
        for video_file in self.video_iterator:
            if video_file.name in self.config.ignore_files:
                self.logger.info(
                    f"Ignoring video file: {str(video_file.name)}",
                    video_file=str(video_file),
                )
                pbar.update(1)
                continue
            try:
                self._process_single_video(video_file, output_dir)
            except Exception as e:
                self.logger.error(
                    "Error processing video",
                    video_file=str(video_file),
                    error=str(e),
                )
                self.logger.debug("Error details", exc_info=True)
            finally:
                pbar.update(1)

    self.logger.info(
        "Finished processing all videos", input_directory=str(self.config.input)
    )

create_processor

create_processor(config)

Creates and returns the appropriate processor based on the input type.

PARAMETER DESCRIPTION
config

Configuration object containing processing parameters.

TYPE: Config

RETURNS DESCRIPTION
Union[SegmentationProcessor, DirectoryProcessor]

Union[SegmentationProcessor, DirectoryProcessor]: The appropriate processor instance.

Source code in cityseg/processors.py
def create_processor(
    config: Config,
) -> Union[SegmentationProcessor, DirectoryProcessor]:
    """
    Creates and returns the appropriate processor based on the input type.

    Args:
        config (Config): Configuration object containing processing parameters.

    Returns:
        Union[SegmentationProcessor, DirectoryProcessor]: The appropriate processor instance.
    """
    if config.input_type == InputType.DIRECTORY:
        return DirectoryProcessor(config)
    else:
        return SegmentationProcessor(config)

ProcessingPlan

ProcessingPlan(config)

A class to create and manage the processing plan for video segmentation tasks.

This class determines which processing steps need to be executed based on the configuration and the existence of previously generated outputs.

ATTRIBUTE DESCRIPTION
config

Configuration object containing processing parameters.

TYPE: Config

plan

A dictionary indicating which processing steps to execute.

TYPE: Dict[str, bool]

Initializes the ProcessingPlan with the given configuration.

PARAMETER DESCRIPTION
config

Configuration object for the processing plan.

TYPE: Config

Source code in cityseg/processing_plan.py
def __init__(self, config: Config):
    """
    Initializes the ProcessingPlan with the given configuration.

    Args:
        config (Config): Configuration object for the processing plan.
    """
    self.config = config
    self.plan = self._create_processing_plan()

VideoFileIterator

VideoFileIterator(input_path)

An iterator class for iterating over video files in a specified directory.

This class retrieves and stores video files from the given input path and provides an iterator interface to access these files.

ATTRIBUTE DESCRIPTION
input_path

The path to the directory containing video files.

TYPE: Path

video_files

A list of video file paths found in the input directory.

TYPE: List[Path]

Initializes the VideoFileIterator with the specified input path.

PARAMETER DESCRIPTION
input_path

The path to the directory containing video files.

TYPE: Path

Source code in cityseg/video_file_iterator.py
def __init__(self, input_path: Path):
    """
    Initializes the VideoFileIterator with the specified input path.

    Args:
        input_path (Path): The path to the directory containing video files.
    """
    self.input_path = input_path
    self.video_files = self._get_video_files()

__iter__

__iter__()

Returns an iterator over the video files.

This method allows the VideoFileIterator to be used in a for-loop or any context that requires an iterable.

RETURNS DESCRIPTION
Iterator[Path]

Iterator[Path]: An iterator over the video file paths.

Source code in cityseg/video_file_iterator.py
def __iter__(self) -> Iterator[Path]:
    """
    Returns an iterator over the video files.

    This method allows the VideoFileIterator to be used in a for-loop or any context
    that requires an iterable.

    Returns:
        Iterator[Path]: An iterator over the video file paths.
    """
    return iter(self.video_files)