Source code for water_packer.packer

"""
Optimized Water Packer using vectorized operations and spatial hash grid.

This version uses:
- Monte Carlo volume estimation (avoids overlap errors)
- cKDTree for vectorized substrate culling (50k candidates/batch)
- Spatial hash grid for O(1) collision checking
- 10-100x faster for large systems
"""

from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np

from .constants import WATER_DENSITY_FACTOR
from .converters import (
    detect_input_type,
    from_arrays,
    from_ase,
    from_hyobj,
    to_arrays,
    to_ase,
    to_hyobj,
)
from .distance_manager import DistanceManager
from .water_geometry import get_water_template, rotate_water
from .volume_estimation import estimate_available_volume_mc, generate_candidate_batch
from .spatial_hash import SpatialHashGrid


class WaterPackerOptimized:
    """
    High-performance water packer using vectorized operations.
    
    Uses batch generation + spatial hash grid for O(1) collision checking.
    10-100× faster than basic implementation for large systems.
    
    Parameters
    ----------
    min_distance : float
        Default minimum distance between any atoms (Å).
    pairwise_distances : dict, optional
        Species-specific minimum distances.
    water_density : float
        Target water density in g/cm³.
    seed : int, optional
        Random seed for reproducibility.
    batch_size : int
        Number of candidates to generate per batch (default 50000).
    grid_spacing : float
        Spatial hash grid cell size in Å (default 3.0).
    n_mc_probes : int
        Number of Monte Carlo probes for volume estimation (default 100000).
    """
    
    def __init__(
        self,
        min_distance: float = 2.0,
        pairwise_distances: Optional[Dict[Tuple[str, str], float]] = None,
        water_density: float = 1.0,
        seed: Optional[int] = None,
        batch_size: int = 50000,
        grid_spacing: float = 3.0,
        n_mc_probes: int = 100000,
    ):
        self.distance_manager = DistanceManager(
            default_distance=min_distance,
            pairwise_distances=pairwise_distances,
        )
        self.water_density = water_density
        self.batch_size = batch_size
        self.grid_spacing = grid_spacing
        self.n_mc_probes = n_mc_probes
        
        # Create random generator (avoids polluting global state)
        self.rng = np.random.default_rng(seed)
        
        # Water template
        self.water_symbols, self.water_template = get_water_template()
    
    def pack(
        self,
        system: Any,
        n_waters: Optional[int] = None,
        region: Optional[Dict[str, float]] = None,
    ) -> Any:
        """
        Pack water molecules into the system.
        
        Parameters
        ----------
        system : ASE Atoms, hyobj PeriodicSystem, or dict
            Input system.
        n_waters : int, optional
            Number of waters to pack. If None, calculated from density.
        region : dict, optional
            Restrict packing region.
        
        Returns
        -------
        Same type as input, with water molecules added.
        """
        # Detect and convert input
        input_type = detect_input_type(system)
        
        if input_type == 'ase':
            data = from_ase(system)
        elif input_type == 'hyobj':
            data = from_hyobj(system)
        elif input_type == 'dict':
            data = system.copy()
        else:
            raise TypeError(
                f"Unsupported system type: {type(system)}. "
                "Use ASE Atoms, hyobj PeriodicSystem, or dict."
            )
        
        # Pack water
        result_data = self._pack_water_optimized(data, n_waters, region)
        
        # Convert back to original type
        if input_type == 'ase':
            return to_ase(result_data)
        elif input_type == 'hyobj':
            return to_hyobj(result_data, units='metal')
        else:
            return result_data
    
    def _pack_water_optimized(
        self,
        data: Dict,
        n_waters: Optional[int],
        region: Optional[Dict[str, float]],
    ) -> Dict:
        """Optimized packing implementation using batch + grid."""
        cell = np.array(data['cell'])
        positions = np.array(data['positions'])
        species = list(data['species'])
        
        # Monte Carlo volume estimation
        available_volume = estimate_available_volume_mc(
            positions, species, cell,
            self.distance_manager,
            n_probes=self.n_mc_probes,
            rng=self.rng,
        )
        
        if available_volume <= 0:
            print(f"Warning: No available volume for water packing.")
            return data
        
        # Physics safeguard: Check for impossible density requests
        if n_waters is not None:
            # Calculate implied density
            # implied_conc = n_waters / available_volume (molecules/ų)
            implied_conc = n_waters / available_volume
            
            # Theoretical max density for Random Close Packing (RCP) of spheres
            # RCP fraction ≈ 0.64
            # Sphere radius r = min_dist(O,O) / 2
            min_OO_dist = self.distance_manager.get_min_distance('O', 'O')
            r = min_OO_dist / 2.0
            sphere_vol = (4/3) * np.pi * (r**3)
            max_conc = 0.64 / sphere_vol
            
            if implied_conc > max_conc:
                implied_g_cm3 = implied_conc / WATER_DENSITY_FACTOR
                max_g_cm3 = max_conc / WATER_DENSITY_FACTOR
                print(f"\n⚠️  WARNING: Requested packing density is extremely high!")
                print(f"   - Implied density: {implied_g_cm3:.2f} g/cm³ ({n_waters} waters in {available_volume:.1f} ų)")
                print(f"   - Theoretical max (RCP): ~{max_g_cm3:.2f} g/cm³")
                print(f"   - This will likely fail or take a very long time.\n")
                
        # Calculate number of waters if not specified
        else:
            target_concentration = self.water_density * WATER_DENSITY_FACTOR
            n_waters = int(available_volume * target_concentration)
            n_waters = max(1, n_waters)
        
        # Initialize spatial hash grid for water-water collision checking
        grid = SpatialHashGrid(cell, grid_spacing=self.grid_spacing)
        
        # Add substrate atoms to grid
        grid.add_multiple(positions, species)
        
        # Pack waters using batch generation
        placed_waters = []
        placed_species = []
        max_batch_attempts = 100  # Prevent infinite loops
        batch_attempt = 0
        
        while len(placed_waters) < n_waters and batch_attempt < max_batch_attempts:
            # Phase A: Generate batch and cull against substrate
            candidates, rotations = generate_candidate_batch(
                cell, positions, species,
                self.distance_manager,
                batch_size=self.batch_size,
                region=region,
                rng=self.rng,
            )
            
            if len(candidates) == 0:
                batch_attempt += 1
                continue
            
            # Phase B: Check grid collisions and place waters
            for i, (center_pos, rotation) in enumerate(zip(candidates, rotations)):
                if len(placed_waters) >= n_waters:
                    break
                
                # Create water molecule at this position
                # water_template is (3, 3) where rows are atoms
                # rotation is (3, 3) matrix
                # We want to rotate each atom vector: v_new = R @ v_old
                # Equivalent to: M_new = M_old @ R.T
                water_coords = self.water_template @ rotation.T + center_pos
                
                # Check collision with all atoms (substrate + waters) using grid
                has_collision = False
                for j, (atom_pos, atom_species) in enumerate(zip(water_coords, self.water_symbols)):
                    if grid.check_collision(
                        atom_pos,
                        atom_species,
                        self.distance_manager.get_min_distance,
                    ):
                        has_collision = True
                        break
                
                if not has_collision:
                    # Place water
                    placed_waters.append(water_coords)
                    placed_species.extend(self.water_symbols)
                    
                    # Add to grid for future collision checking
                    for atom_pos, atom_species in zip(water_coords, self.water_symbols):
                        grid.add(atom_pos, atom_species)
            
            batch_attempt += 1
        
        if len(placed_waters) < n_waters:
            print(f"Warning: Could only place {len(placed_waters)}/{n_waters} water molecules.")
        
        # Combine substrate + waters
        if placed_waters:
            all_positions = np.vstack([positions, np.vstack(placed_waters)])
            all_species = species + placed_species
        else:
            all_positions = positions
            all_species = species
        
        result = {
            'cell': cell,
            'positions': all_positions,
            'species': all_species,
            'pbc': data.get('pbc', True),
        }
        
        return result


# Alias for backward compatibility
WaterPacker = WaterPackerOptimized


[docs] def pack_water( system: Any, n_waters: Optional[int] = None, min_distance: float = 2.0, pairwise_distances: Optional[Dict[Tuple[str, str], float]] = None, water_density: Optional[float] = None, seed: Optional[int] = None, ) -> Any: """ Convenience function to pack water into a system. Uses optimized implementation with batch generation and spatial hash grid. """ if n_waters is not None and water_density is not None: raise ValueError("Cannot specify both 'n_waters' and 'water_density'.") if n_waters is None and water_density is None: water_density = 1.0 packer = WaterPacker( min_distance=min_distance, pairwise_distances=pairwise_distances, water_density=water_density or 1.0, seed=seed, ) return packer.pack(system, n_waters=n_waters)