"""Connected component clustering algorithm for QR code modules.
This module implements Phase 2 of the multi-phase rendering pipeline,
which identifies and groups adjacent modules into connected components
for optimized rendering.
The clustering algorithm:
1. **Traverses** the QR matrix to find active modules
2. **Groups** adjacent modules of the same type into clusters
3. **Analyzes** cluster properties (size, density, shape)
4. **Filters** clusters based on configurable thresholds
5. **Optimizes** rendering by treating clusters as single shapes
This approach significantly reduces the number of SVG elements needed
for complex QR codes, improving both file size and rendering performance.
"""
from typing import Any, Dict, List, Literal, Optional, Set, Tuple
from ..core.detector import ModuleDetector
from ..core.interfaces import AlgorithmProcessor, Matrix
from .models import ClusteringConfig
[docs]
class ConnectedComponentAnalyzer(AlgorithmProcessor):
"""Analyzes connected components in QR code matrices.
This analyzer implements a depth-first search algorithm to identify
clusters of connected modules. It's used in Phase 2 processing to
group adjacent modules for optimized rendering.
Attributes:
min_cluster_size: Minimum modules required to form a valid cluster
density_threshold: Minimum density (filled ratio) for valid clusters
visited: Set tracking already-processed module positions
Example:
>>> analyzer = ConnectedComponentAnalyzer(min_cluster_size=5)
>>> clusters = analyzer.process(matrix, detector)
>>> print(f"Found {len(clusters)} clusters")
"""
[docs]
def __init__(
self,
min_cluster_size: int = 3,
density_threshold: float = 0.5,
connectivity_mode: Literal["4-way", "8-way"] = "4-way",
):
"""Initialize the connected component analyzer.
Args:
min_cluster_size: Minimum number of modules to form a cluster
density_threshold: Minimum density ratio (0.0-1.0) for valid clusters
connectivity_mode: '4-way' or '8-way' connectivity
Example:
>>> analyzer = ConnectedComponentAnalyzer(min_cluster_size=5)
>>> # Or using Pydantic model
>>> config = ClusteringConfig(min_cluster_size=5, density_threshold=0.7)
>>> analyzer = ConnectedComponentAnalyzer(**config.model_dump())
"""
# Validate inputs using Pydantic
config = ClusteringConfig(
min_cluster_size=min_cluster_size,
density_threshold=density_threshold,
connectivity_mode=connectivity_mode,
)
# Use validated values
self.min_cluster_size = config.min_cluster_size
self.density_threshold = config.density_threshold
self.connectivity_mode = config.connectivity_mode
self.visited: Set[Tuple[int, int]] = set()
self.config = config
[docs]
def process(self, matrix: Matrix, detector: ModuleDetector, **kwargs: Any) -> List[Dict[str, Any]]:
"""Process the QR matrix to find connected components.
Traverses the matrix to identify groups of connected modules,
analyzes their properties, and returns clusters that meet the
size and density requirements.
Args:
matrix: 2D boolean matrix representing QR code modules.
detector: Module detector for determining module types.
**kwargs: Additional parameters.
Keyword Args:
cluster_module_types (List[str]): List of module types to cluster.
Defaults to ``['data']``.
Returns:
List[Dict[str, Any]]: List of cluster dictionaries, each containing:
- positions: List of (row, col) tuples
- bounds: (min_row, min_col, max_row, max_col)
- module_count: Number of modules in cluster
- density: Ratio of filled modules in bounding box
- aspect_ratio: Width/height ratio
- is_rectangular: Whether cluster forms a rectangle
"""
cluster_module_types = kwargs.get("cluster_module_types", ["data"])
# Reset visited set to prevent memory accumulation
self.visited.clear()
clusters = []
for row in range(len(matrix)):
for col in range(len(matrix[0])):
if (row, col) not in self.visited and matrix[row][col]:
module_type = detector.get_module_type(row, col)
if module_type in cluster_module_types:
cluster = self._find_connected_component(
matrix, detector, row, col, cluster_module_types
)
if len(cluster["positions"]) >= self.min_cluster_size:
cluster_info = self._analyze_cluster(cluster, matrix, detector)
if cluster_info["density"] >= self.density_threshold:
clusters.append(cluster_info)
# Explicit cleanup of visited set after processing
self.visited.clear()
return clusters
[docs]
def cluster_modules(self, modules: Any, **kwargs: Any) -> List[Dict[str, Any]]:
"""Alias for process() method for backward compatibility.
Args:
modules: Either a 2D matrix or list of (row, col, active) tuples
**kwargs: Additional parameters passed to process()
Returns:
List of cluster dictionaries
"""
# Convert list of tuples to matrix format if needed
if isinstance(modules, list) and modules and len(modules[0]) == 3:
# Convert (row, col, active) tuples to matrix
max_row = max(pos[0] for pos in modules)
max_col = max(pos[1] for pos in modules)
matrix = [[False for _ in range(max_col + 1)] for _ in range(max_row + 1)]
for row, col, active in modules:
if active:
matrix[row][col] = True
else:
matrix = modules
# Use a mock detector for compatibility
from unittest.mock import Mock
from ..core.detector import ModuleDetector
mock_detector = Mock(spec=ModuleDetector)
mock_detector.get_module_type.return_value = "data"
mock_detector.get_neighbors.return_value = []
try:
result = self.process(matrix, mock_detector, **kwargs)
finally:
# Explicit cleanup to prevent memory leaks
del mock_detector
if isinstance(modules, list) and modules and len(modules[0]) == 3:
del matrix # Clean up converted matrix
return result
def _find_connected_component(
self,
matrix: Matrix,
detector: ModuleDetector,
start_row: int,
start_col: int,
target_types: List[str],
) -> Dict[str, Any]:
"""Find all connected modules starting from a given position.
Uses depth-first search to explore all modules connected to the
starting position. Only considers modules of the specified types
and uses 4-connectivity (von Neumann neighborhood).
Args:
matrix: QR code matrix
detector: Module detector instance
start_row: Starting row position
start_col: Starting column position
target_types: List of module types to include in cluster
Returns:
Dict[str, Any]: Dictionary containing:
- positions: List of (row, col) tuples in the component
- start_position: Original starting position
"""
stack = [(start_row, start_col)]
positions = []
while stack:
row, col = stack.pop()
if (row, col) in self.visited:
continue
if not (0 <= row < len(matrix) and 0 <= col < len(matrix[0])):
continue
if not matrix[row][col]:
continue
module_type = detector.get_module_type(row, col)
if module_type not in target_types:
continue
self.visited.add((row, col))
positions.append((row, col))
# Add neighbors to stack based on connectivity mode
if self.connectivity_mode == "8-way":
# 8-way connectivity (Moore neighborhood)
neighbors = detector.get_neighbors(row, col, "moore")
else:
# 4-way connectivity (von Neumann neighborhood)
neighbors = detector.get_neighbors(row, col, "von_neumann")
for nr, nc in neighbors:
if (nr, nc) not in self.visited:
stack.append((nr, nc))
return {
"positions": positions,
"start_position": (start_row, start_col),
}
def _analyze_cluster(
self, cluster: Dict[str, Any], matrix: Matrix, detector: ModuleDetector
) -> Dict[str, Any]:
"""Analyze properties of a cluster to determine rendering suitability.
Computes various metrics about the cluster including its bounding box,
density, aspect ratio, and whether it forms a perfect rectangle.
Args:
cluster: Cluster data with positions
matrix: QR code matrix
detector: Module detector instance
Returns:
Dict[str, Any]: Enhanced cluster data with computed properties:
- All original cluster data
- bounds: Bounding box coordinates
- module_count: Total modules in cluster
- density: Fill ratio within bounding box
- aspect_ratio: Width to height ratio
- is_rectangular: True if cluster fills entire bounding box
"""
positions = cluster["positions"]
if not positions:
return cluster
# Calculate bounding box
rows = [pos[0] for pos in positions]
cols = [pos[1] for pos in positions]
min_row, max_row = min(rows), max(rows)
min_col, max_col = min(cols), max(cols)
# Calculate cluster properties
width = max_col - min_col + 1
height = max_row - min_row + 1
area = width * height
density = len(positions) / area if area > 0 else 0
# Calculate aspect ratio
aspect_ratio = width / height if height > 0 else 1.0
# Determine cluster shape type
shape_type = self._determine_cluster_shape(positions, width, height, aspect_ratio)
# Calculate center of mass
center_row = sum(rows) / len(rows)
center_col = sum(cols) / len(cols)
# Analyze connectivity patterns
connectivity = self._analyze_connectivity(positions, matrix, detector)
# Generate rendering hints
rendering_hints = self._generate_rendering_hints(
positions, width, height, aspect_ratio, shape_type, connectivity
)
cluster.update(
{
"bounding_box": (min_row, min_col, max_row, max_col),
"width": width,
"height": height,
"area": area,
"density": density,
"aspect_ratio": aspect_ratio,
"shape_type": shape_type,
"center": (center_row, center_col),
"connectivity": connectivity,
"rendering_hints": rendering_hints,
"size": len(positions),
}
)
return cluster
def _determine_cluster_shape(
self,
positions: List[Tuple[int, int]],
width: int,
height: int,
aspect_ratio: float,
) -> str:
"""Determine the best shape representation for a cluster.
Classifies clusters based on their dimensions and aspect ratio
to select optimal rendering strategies.
Args:
positions: List of module positions in cluster
width: Cluster width in modules
height: Cluster height in modules
aspect_ratio: Width to height ratio
Returns:
str: Shape type classification
"""
# Simple heuristics for shape classification
if width == 1 and height > 2:
return "vertical_line"
elif height == 1 and width > 2:
return "horizontal_line"
elif abs(aspect_ratio - 1.0) < 0.3: # Nearly square
return "square_cluster"
elif aspect_ratio > 2.0:
return "horizontal_rectangle"
elif aspect_ratio < 0.5:
return "vertical_rectangle"
else:
return "rectangle_cluster"
def _analyze_connectivity(
self,
positions: List[Tuple[int, int]],
matrix: Matrix,
detector: ModuleDetector,
) -> Dict[str, Any]:
"""Analyze connectivity patterns within the cluster.
Examines how modules within the cluster are connected to each
other to determine rendering suitability.
Args:
positions: List of module positions in cluster
matrix: QR code matrix
detector: Module detector instance
Returns:
Dict[str, Any]: Connectivity metrics including:
- internal_connections: Number of cardinal connections
- corner_connections: Number of diagonal connections
- connectivity_ratio: Ratio of actual to possible connections
- avg_connections_per_module: Average connectivity
"""
position_set = set(positions)
# Count different types of connections
internal_connections = 0
corner_connections = 0
for row, col in positions:
# Get 8-connected neighbors
neighbors = detector.get_neighbors(row, col, "moore")
cardinal_neighbors = []
diagonal_neighbors = []
for nr, nc in neighbors:
if (nr, nc) in position_set:
dr, dc = nr - row, nc - col
if abs(dr) + abs(dc) == 1: # Cardinal neighbor
cardinal_neighbors.append((nr, nc))
internal_connections += 1
else: # Diagonal neighbor
diagonal_neighbors.append((nr, nc))
corner_connections += 1
# Calculate connectivity metrics
total_possible_connections = len(positions) * 4 # Max cardinal connections
connectivity_ratio = (
internal_connections / total_possible_connections if total_possible_connections > 0 else 0
)
return {
"internal_connections": internal_connections // 2, # Avoid double counting
"corner_connections": corner_connections // 2,
"connectivity_ratio": connectivity_ratio,
"avg_connections_per_module": (internal_connections / len(positions) if positions else 0),
}
def _generate_rendering_hints(
self,
positions: List[Tuple[int, int]],
width: int,
height: int,
aspect_ratio: float,
shape_type: str,
connectivity: Dict[str, Any],
) -> Dict[str, Any]:
"""Generate hints for optimal cluster rendering.
Creates recommendations for how to render the cluster based on
its properties, including shape selection and styling parameters.
Args:
positions: Cluster module positions
width: Cluster width
height: Cluster height
aspect_ratio: Width/height ratio
shape_type: Determined shape classification
connectivity: Connectivity analysis results
Returns:
Dict[str, Any]: Rendering hints including:
- render_as_single_shape: Whether to merge modules
- preferred_shape: Recommended shape type
- roundness: Corner rounding factor
- merge_strategy: How to combine modules
"""
hints: Dict[str, Any] = {
"render_as_single_shape": False,
"preferred_shape": "rounded_rectangle",
"roundness": 0.2,
"merge_strategy": "outline",
"stroke_width": 1,
}
# Determine if cluster should be rendered as single shape
if len(positions) >= 4 and connectivity["connectivity_ratio"] > 0.6:
hints["render_as_single_shape"] = True
# Adjust rendering based on shape type
if shape_type in ["vertical_line", "horizontal_line"]:
hints["preferred_shape"] = "pill"
hints["roundness"] = 0.5
elif shape_type == "square_cluster":
hints["preferred_shape"] = "rounded_square"
hints["roundness"] = 0.3
elif "rectangle" in shape_type:
hints["preferred_shape"] = "rounded_rectangle"
hints["roundness"] = min(0.4, 1.0 / max(width, height))
# Adjust roundness based on size
size_factor = min(1.0, len(positions) / 10.0)
hints["roundness"] *= size_factor
return hints
[docs]
def get_cluster_svg_path(
self,
cluster: Dict[str, Any],
scale: int = 8,
border: int = 0,
path_clipper: Optional[Any] = None,
) -> str:
"""Generate SVG path for rendering cluster as a single shape.
Creates an SVG path string that represents the entire cluster
as a merged shape, reducing the number of individual elements.
Args:
cluster: Cluster data with positions and rendering hints
scale: Module size in pixels
border: Border size in modules
path_clipper: Optional PathClipper instance for frame-aware clipping
Returns:
str: SVG path data string for the cluster shape
Note:
Currently uses a simple bounding box approach. Future versions
could implement more sophisticated contour tracing.
"""
positions = cluster["positions"]
if not positions:
return ""
# Simple outline generation using convex hull approach
# For now, use bounding box approach
min_row, min_col, max_row, max_col = cluster["bounding_box"]
x = (min_col + border) * scale
y = (min_row + border) * scale
width = (max_col - min_col + 1) * scale
height = (max_row - min_row + 1) * scale
# If path clipper is provided, check if cluster is within frame
if path_clipper:
clipped_path = path_clipper.clip_rectangle_to_frame(x, y, width, height)
if not clipped_path:
return "" # Cluster is entirely outside frame
# For non-square frames, we might need to adjust the path
# For now, continue with standard path generation
roundness = cluster["rendering_hints"]["roundness"]
rx = width * roundness
ry = height * roundness
# Generate rounded rectangle path
if roundness > 0:
path = (
f"M {x + rx} {y} "
f"L {x + width - rx} {y} "
f"Q {x + width} {y} {x + width} {y + ry} "
f"L {x + width} {y + height - ry} "
f"Q {x + width} {y + height} {x + width - rx} {y + height} "
f"L {x + rx} {y + height} "
f"Q {x} {y + height} {x} {y + height - ry} "
f"L {x} {y + ry} "
f"Q {x} {y} {x + rx} {y} "
f"Z"
)
else:
# Simple rectangle
path = f"M {x} {y} L {x + width} {y} " f"L {x + width} {y + height} L {x} {y + height} Z"
# Apply frame clipping if needed
if path_clipper and path_clipper.frame_shape != "square":
path = path_clipper.adjust_cluster_path(path, scale)
return path