A Step-by-Step Coding Tutorial on NVIDIA PhysicsNeMo: Darcy Flow, FNOs, PINNs, Surrogate Models, and Inference Benchmarking
In this tutorial, we implement NVIDIA PhysicsNeMo on Colab and build a practical workflow for physics-informed machine learning. We start by setting up the environment, generating data for the 2D Darcy Flow problem, and visualizing the physical fields to clearly understand the learning task. From there, we implement and train powerful models such as the Fourier Neural Operator and a convolutional surrogate baseline, while also exploring the ideas behind Physics-Informed Neural Networks. Also, we compare architectures, evaluate predictions, benchmark inference, and save trained models, providing a comprehensive hands-on view of how PhysicsNeMo can be used for scientific machine learning problems.
print("="*80)
print("SECTION 1: INSTALLATION AND SETUP")
print("="*80)
import subprocess
import sys
def install_packages():
"""Install required packages for the tutorial."""
packages = [
"nvidia-physicsnemo",
"matplotlib",
"numpy",
"h5py",
"scipy",
"tqdm",
]
for package in packages:
print(f"Installing {package}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
print("\n✓ All packages installed successfully!")
install_packages()
print("\n" + "="*80)
print("SECTION 2: IMPORTS AND CONFIGURATION")
print("="*80)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple, Optional
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
print(f"Using device: {device}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
try:
import physicsnemo
print(f"PhysicsNeMo version: {physicsnemo.__version__}")
PHYSICSNEMO_AVAILABLE = True
except ImportError:
print("PhysicsNeMo not installed. Using custom implementations.")
PHYSICSNEMO_AVAILABLE = False
def set_seed(seed: int = 42):
"""Set random seeds for reproducibility."""
torch.manual_seed(seed)
np.random.seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(42)
print("\n" + "="*80)
print("SECTION 3: DATA GENERATION - 2D DARCY FLOW")
print("="*80)
"""
The 2D Darcy Flow equation is a classic benchmark for neural operators:
-∇·(k(x,y)∇u(x,y)) = f(x,y) in Ω = [0,1]²
where:
- k(x,y) is the permeability field (input)
- u(x,y) is the pressure field (output)
- f(x,y) is the source term
This is a fundamental problem in subsurface flow modeling, heat conduction,
and other diffusion-dominated physics.
"""
class DarcyFlowDataGenerator:
"""
Generate synthetic 2D Darcy Flow data for training neural operators.
Uses Gaussian Random Fields for permeability and finite differences
to solve the Darcy equation.
"""
def __init__(
self,
resolution: int = 64,
length_scale: float = 0.1,
variance: float = 1.0
):
self.resolution = resolution
self.length_scale = length_scale
self.variance = variance
self.dx = 1.0 / (resolution - 1)
x = np.linspace(0, 1, resolution)
y = np.linspace(0, 1, resolution)
self.X, self.Y = np.meshgrid(x, y)
self._setup_grf()
def _setup_grf(self):
"""Setup Gaussian Random Field covariance."""
n = self.resolution
x_flat = self.X.flatten()
y_flat = self.Y.flatten()
dist_sq = (
(x_flat[:, None] - x_flat[None, :]) ** 2 +
(y_flat[:, None] - y_flat[None, :]) ** 2
)
self.cov_matrix = self.variance * np.exp(
-dist_sq / (2 * self.length_scale ** 2)
)
self.cov_matrix += 1e-6 * np.eye(n * n)
self.L = np.linalg.cholesky(self.cov_matrix)
def generate_permeability(self, n_samples: int = 1) -> np.ndarray:
"""Generate random permeability fields using GRF."""
n = self.resolution
z = np.random.randn(n * n, n_samples)
samples = self.L @ z
k = np.exp(samples.T.reshape(n_samples, n, n))
return k
def solve_darcy(
self,
permeability: np.ndarray,
source: Optional[np.ndarray] = None,
n_iterations: int = 5000,
tol: float = 1e-6
) -> np.ndarray:
"""
Solve Darcy equation using iterative Jacobi method.
-∇·(k∇u) = f with u=0 on boundary (Dirichlet BC)
"""
n = self.resolution
dx = self.dx
if source is None:
source = np.ones((n, n))
u = np.zeros((n, n))
u_new = np.zeros_like(u)
k = permeability
for iteration in range(n_iterations):
for i in range(1, n - 1):
for j in range(1, n - 1):
k_sum = k_e + k_w + k_n + k_s
u_new[i, j] = (
k_e * u[i, j + 1] + k_w * u[i, j - 1] +
k_n * u[i - 1, j] + k_s * u[i + 1, j] +
dx ** 2 * source[i, j]
) / (k_sum + 1e-10)
if np.max(np.abs(u_new - u)) < tol:
break
u = u_new.copy()
return u
def generate_dataset(
self,
n_samples: int = 100,
show_progress: bool = True
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate a dataset of (permeability, pressure) pairs."""
permeabilities = self.generate_permeability(n_samples)
pressures = np.zeros_like(permeabilities)
iterator = tqdm(range(n_samples), desc="Generating data") if show_progress else range(n_samples)
for i in iterator:
pressures[i] = self.solve_darcy(permeabilities[i])
return permeabilities, pressures
class DarcyDataset(Dataset):
"""PyTorch Dataset for Darcy Flow data."""
def __init__(
self,
permeability: np.ndarray,
pressure: np.ndarray,
normalize: bool = True
):
self.permeability = torch.FloatTensor(permeability)
self.pressure = torch.FloatTensor(pressure)
if normalize:
self.perm_mean = self.permeability.mean()
self.perm_std = self.permeability.std()
self.press_mean = self.pressure.mean()
self.press_std = self.pressure.std()
self.permeability = (self.permeability - self.perm_mean) / self.perm_std
self.pressure = (self.pressure - self.press_mean) / self.press_std
self.permeability = self.permeability.unsqueeze(1)
self.pressure = self.pressure.unsqueeze(1)
def __len__(self):
return len(self.permeability)
def __getitem__(self, idx):
return self.permeability[idx], self.pressure[idx]
print("\nGenerating Darcy Flow dataset...")
generator = DarcyFlowDataGenerator(resolution=32, length_scale=0.15)
n_train = 200
n_test = 50
print(f"Generating {n_train} training samples...")
perm_train, press_train = generator.generate_dataset(n_train)
print(f"Generating {n_test} test samples...")
perm_test, press_test = generator.generate_dataset(n_test)
train_dataset = DarcyDataset(perm_train, press_train)
test_dataset = DarcyDataset(perm_test, press_test, normalize=True)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
print(f"\n✓ Dataset created!")
print(f" Training samples: {len(train_dataset)}")
print(f" Test samples: {len(test_dataset)}")
print(f" Resolution: {generator.resolution}x{generator.resolution}")We set up the full Colab environment and installed all required to run the PhysicsNeMo tutorial smoothly. We import the core libraries, configure the device, check GPU availability, and make the workflow reproducible by setting a fixed random seeds. We also generate the Darcy Flow dataset, prepare the PyTorch datasets and dataloaders, and establish the foundation for everything we train later in the tutorial.
print("\n" + "="*80)
print("SECTION 4: DATA VISUALIZATION")
print("="*80)
def visualize_darcy_samples(
permeability: np.ndarray,
pressure: np.ndarray,
n_samples: int = 3
):
"""Visualize Darcy flow samples."""
fig, axes = plt.subplots(n_samples, 2, figsize=(10, 4 * n_samples))
for i in range(n_samples):
im1 = axes[i, 0].imshow(permeability[i], cmap='viridis', origin='lower')
axes[i, 0].set_title(f'Permeability Field (Sample {i+1})')
axes[i, 0].set_xlabel('x')
axes[i, 0].set_ylabel('y')
plt.colorbar(im1, ax=axes[i, 0], label='k(x,y)')
im2 = axes[i, 1].imshow(pressure[i], cmap='hot', origin='lower')
axes[i, 1].set_title(f'Pressure Field (Sample {i+1})')
axes[i, 1].set_xlabel('x')
axes[i, 1].set_ylabel('y')
plt.colorbar(im2, ax=axes[i, 1], label='u(x,y)')
plt.tight_layout()
plt.savefig('darcy_samples.png', dpi=150, bbox_inches='tight')
plt.show()
print("✓ Saved visualization to 'darcy_samples.png'")
visualize_darcy_samples(perm_train[:3], press_train[:3])
print("\n" + "="*80)
print("SECTION 5: FOURIER NEURAL OPERATOR (FNO)")
print("="*80)
"""
The Fourier Neural Operator (FNO) learns mappings between function spaces
by parameterizing the integral kernel in Fourier space.
Key insight: Convolution in physical space = multiplication in Fourier space
The FNO layer consists of:
1. FFT to transform to frequency domain
2. Multiplication with learnable weights (keeping only low-frequency modes)
3. Inverse FFT to transform back
4. Residual connection with a local linear transformation
"""
class SpectralConv2d(nn.Module):
"""
2D Spectral Convolution Layer for FNO.
Performs convolution in Fourier space by:
1. Computing FFT of input
2. Multiplying with complex learnable weights
3. Computing inverse FFT
"""
def __init__(
self,
in_channels: int,
out_channels: int,
modes1: int,
modes2: int
):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.modes1 = modes1
self.modes2 = modes2
self.scale = 1 / (in_channels * out_channels)
self.weights1 = nn.Parameter(
self.scale * torch.rand(in_channels, out_channels, modes1, modes2, dtype=torch.cfloat)
)
self.weights2 = nn.Parameter(
self.scale * torch.rand(in_channels, out_channels, modes1, modes2, dtype=torch.cfloat)
)
def compl_mul2d(self, input: torch.Tensor, weights: torch.Tensor) -> torch.Tensor:
"""Complex multiplication for batch of 2D tensors."""
return torch.einsum("bixy,ioxy->boxy", input, weights)
def forward(self, x: torch.Tensor) -> torch.Tensor:
batch_size = x.shape[0]
x_ft = torch.fft.rfft2(x)
out_ft = torch.zeros(
batch_size, self.out_channels, x.size(-2), x.size(-1) // 2 + 1,
dtype=torch.cfloat, device=x.device
)
out_ft[:, :, :self.modes1, :self.modes2] = \
self.compl_mul2d(x_ft[:, :, :self.modes1, :self.modes2], self.weights1)
out_ft[:, :, -self.modes1:, :self.modes2] = \
self.compl_mul2d(x_ft[:, :, -self.modes1:, :self.modes2], self.weights2)
x = torch.fft.irfft2(out_ft, s=(x.size(-2), x.size(-1)))
return x
class FNOBlock(nn.Module):
"""
FNO Block combining spectral convolution with local linear transform.
output = σ(SpectralConv(x) + LocalLinear(x))
"""
def __init__(
self,
channels: int,
modes1: int,
modes2: int,
activation: str = 'gelu'
):
super().__init__()
self.spectral_conv = SpectralConv2d(channels, channels, modes1, modes2)
self.local_linear = nn.Conv2d(channels, channels, 1)
self.activation = nn.GELU() if activation == 'gelu' else nn.ReLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.activation(self.spectral_conv(x) + self.local_linear(x))
class FourierNeuralOperator2D(nn.Module):
"""
Complete 2D Fourier Neural Operator for learning operators.
Architecture:
1. Lift input to higher dimensional channel space
2. Apply multiple FNO blocks (spectral convolutions + residuals)
3. Project back to output space
This learns the mapping: k(x,y) -> u(x,y) for Darcy flow
"""
def __init__(
self,
in_channels: int = 1,
out_channels: int = 1,
modes1: int = 12,
modes2: int = 12,
width: int = 32,
n_layers: int = 4,
padding: int = 9
):
super().__init__()
self.modes1 = modes1
self.modes2 = modes2
self.width = width
self.padding = padding
self.fc0 = nn.Linear(in_channels + 2, width)
self.fno_blocks = nn.ModuleList([
FNOBlock(width, modes1, modes2) for _ in range(n_layers)
])
self.fc1 = nn.Linear(width, 128)
self.fc2 = nn.Linear(128, out_channels)
def get_grid(self, shape: Tuple, device: torch.device) -> torch.Tensor:
"""Create normalized grid coordinates."""
batch_size, size_x, size_y = shape[0], shape[2], shape[3]
gridx = torch.linspace(0, 1, size_x, device=device)
gridy = torch.linspace(0, 1, size_y, device=device)
gridx, gridy = torch.meshgrid(gridx, gridy, indexing='ij')
grid = torch.stack([gridx, gridy], dim=-1)
grid = grid.unsqueeze(0).repeat(batch_size, 1, 1, 1)
return grid
def forward(self, x: torch.Tensor) -> torch.Tensor:
batch_size = x.shape[0]
grid = self.get_grid(x.shape, x.device)
x = x.permute(0, 2, 3, 1)
x = torch.cat([x, grid], dim=-1)
x = self.fc0(x)
x = x.permute(0, 3, 1, 2)
if self.padding > 0:
x = F.pad(x, [0, self.padding, 0, self.padding])
for block in self.fno_blocks:
x = block(x)
if self.padding > 0:
x = x[..., :-self.padding, :-self.padding]
x = x.permute(0, 2, 3, 1)
x = F.gelu(self.fc1(x))
x = self.fc2(x)
x = x.permute(0, 3, 1, 2)
return x
print("\nCreating Fourier Neural Operator model...")
fno_model = FourierNeuralOperator2D(
in_channels=1,
out_channels=1,
modes1=8,
modes2=8,
width=32,
n_layers=4,
padding=5
).to(device)
n_params = sum(p.numel() for p in fno_model.parameters() if p.requires_grad)
print(f"✓ FNO Model created with {n_params:,} trainable parameters")We first visualize the generated Darcy Flow samples to clearly see the relationship between the permeability field and the resulting pressure field. We then implement the core building blocks of the Fourier Neural Operator, including the spectral convolution layer, the FNO block, and the complete 2D FNO architecture.
print("\n" + "="*80)
print("SECTION 6: PHYSICS-INFORMED NEURAL NETWORK (PINN)")
print("="*80)
"""
Physics-Informed Neural Networks (PINNs) incorporate physical laws directly
into the loss function. For the Darcy equation:
-∇·(k∇u) = f
The PINN loss consists of:
1. Data loss: MSE between predictions and observed data
2. Physics loss: Residual of the PDE at collocation points
3. Boundary loss: Satisfaction of boundary conditions
"""
class PINN_MLP(nn.Module):
"""
Multi-Layer Perceptron for PINN.
Takes (x, y, k(x,y)) as input and outputs u(x,y).
Uses sine activation (Fourier features) for better convergence.
"""
def __init__(
self,
input_dim: int = 3,
output_dim: int = 1,
hidden_dims: List[int] = [64, 64, 64, 64],
use_fourier_features: bool = True,
n_frequencies: int = 32
):
super().__init__()
self.use_fourier_features = use_fourier_features
self.n_frequencies = n_frequencies
if use_fourier_features:
self.B = nn.Parameter(
torch.randn(2, n_frequencies) * 2 * np.pi,
requires_grad=False
)
actual_input_dim = 2 * n_frequencies + 1
else:
actual_input_dim = input_dim
layers = []
dims = [actual_input_dim] + hidden_dims
for i in range(len(dims) - 1):
layers.append(nn.Linear(dims[i], dims[i + 1]))
layers.append(nn.Tanh())
layers.append(nn.Linear(dims[-1], output_dim))
self.network = nn.Sequential(*layers)
self._init_weights()
def _init_weights(self):
"""Xavier initialization for better convergence."""
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_normal_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
def fourier_embedding(self, xy: torch.Tensor) -> torch.Tensor:
"""Apply Fourier feature embedding to positional inputs."""
proj = xy @ self.B
return torch.cat([torch.sin(proj), torch.cos(proj)], dim=-1)
def forward(
self,
x: torch.Tensor,
y: torch.Tensor,
k: torch.Tensor
) -> torch.Tensor:
"""
Forward pass.
Args:
x: x-coordinates (batch,)
y: y-coordinates (batch,)
k: permeability values at (x,y) (batch,)
Returns:
u: predicted pressure values (batch,)
"""
xy = torch.stack([x, y], dim=-1)
if self.use_fourier_features:
pos_features = self.fourier_embedding(xy)
inputs = torch.cat([pos_features, k.unsqueeze(-1)], dim=-1)
else:
inputs = torch.cat([xy, k.unsqueeze(-1)], dim=-1)
return self.network(inputs).squeeze(-1)
class DarcyPINNLoss:
"""
Physics-informed loss for Darcy equation.
Computes:
1. Data loss at observation points
2. PDE residual loss: -∇·(k∇u) - f = 0
3. Boundary loss: u = 0 on boundaries
"""
def __init__(
self,
lambda_data: float = 1.0,
lambda_pde: float = 1.0,
lambda_bc: float = 10.0
):
self.lambda_data = lambda_data
self.lambda_pde = lambda_pde
self.lambda_bc = lambda_bc
def compute_gradients(
self,
model: nn.Module,
x: torch.Tensor,
y: torch.Tensor,
k: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""Compute first and second derivatives using autograd."""
x = x.requires_grad_(True)
y = y.requires_grad_(True)
u = model(x, y, k)
grad_outputs = torch.ones_like(u)
u_x = torch.autograd.grad(
u, x, grad_outputs=grad_outputs, create_graph=True
)[0]
u_y = torch.autograd.grad(
u, y, grad_outputs=grad_outputs, create_graph=True
)[0]
u_xx = torch.autograd.grad(
u_x, x, grad_outputs=grad_outputs, create_graph=True
)[0]
u_yy = torch.autograd.grad(
u_y, y, grad_outputs=grad_outputs, create_graph=True
)[0]
return u, (u_x, u_y), (u_xx, u_yy)
def pde_residual(
self,
model: nn.Module,
x: torch.Tensor,
y: torch.Tensor,
k: torch.Tensor,
f: torch.Tensor = None
) -> torch.Tensor:
"""
Compute PDE residual: -∇·(k∇u) - f
Simplified form for constant k locally:
-k(u_xx + u_yy) - f ≈ 0
"""
u, (u_x, u_y), (u_xx, u_yy) = self.compute_gradients(model, x, y, k)
if f is None:
f = torch.ones_like(u)
residual = -k * (u_xx + u_yy) - f
return residual
def __call__(
self,
model: nn.Module,
x_data: torch.Tensor,
y_data: torch.Tensor,
k_data: torch.Tensor,
u_data: torch.Tensor,
x_pde: torch.Tensor,
y_pde: torch.Tensor,
k_pde: torch.Tensor,
x_bc: torch.Tensor,
y_bc: torch.Tensor,
k_bc: torch.Tensor
) -> Dict[str, torch.Tensor]:
"""Compute total PINN loss."""
u_pred = model(x_data, y_data, k_data)
loss_data = F.mse_loss(u_pred, u_data)
residual = self.pde_residual(model, x_pde, y_pde, k_pde)
loss_pde = torch.mean(residual ** 2)
u_bc_pred = model(x_bc, y_bc, k_bc)
loss_bc = torch.mean(u_bc_pred ** 2)
total_loss = (
self.lambda_data * loss_data +
self.lambda_pde * loss_pde +
self.lambda_bc * loss_bc
)
return {
'total': total_loss,
'data': loss_data,
'pde': loss_pde,
'bc': loss_bc
}
print("\nCreating Physics-Informed Neural Network...")
pinn_model = PINN_MLP(
input_dim=3,
output_dim=1,
hidden_dims=[128, 128, 128, 128],
use_fourier_features=True,
n_frequencies=64
).to(device)
n_params_pinn = sum(p.numel() for p in pinn_model.parameters() if p.requires_grad)
print(f"✓ PINN Model created with {n_params_pinn:,} trainable parameters")
print("\n" + "="*80)
print("SECTION 7: TRAINING UTILITIES")
print("="*80)
class Trainer:
"""Generic trainer for neural network models."""
def __init__(
self,
model: nn.Module,
optimizer: torch.optim.Optimizer,
scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None,
device: torch.device = device
):
self.model = model
self.optimizer = optimizer
self.scheduler = scheduler
self.device = device
self.history = {'train_loss': [], 'val_loss': []}
def train_epoch(
self,
train_loader: DataLoader,
loss_fn: nn.Module = nn.MSELoss()
) -> float:
"""Train for one epoch."""
self.model.train()
total_loss = 0.0
for batch_x, batch_y in train_loader:
batch_x = batch_x.to(self.device)
batch_y = batch_y.to(self.device)
self.optimizer.zero_grad()
pred = self.model(batch_x)
loss = loss_fn(pred, batch_y)
loss.backward()
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
@torch.no_grad()
def validate(
self,
val_loader: DataLoader,
loss_fn: nn.Module = nn.MSELoss()
) -> float:
"""Validate model."""
self.model.eval()
total_loss = 0.0
for batch_x, batch_y in val_loader:
batch_x = batch_x.to(self.device)
batch_y = batch_y.to(self.device)
pred = self.model(batch_x)
loss = loss_fn(pred, batch_y)
total_loss += loss.item()
return total_loss / len(val_loader)
def train(
self,
train_loader: DataLoader,
val_loader: DataLoader,
n_epochs: int = 100,
loss_fn: nn.Module = nn.MSELoss(),
verbose: bool = True
):
"""Full training loop."""
best_val_loss = float('inf')
pbar = tqdm(range(n_epochs), desc="Training") if verbose else range(n_epochs)
for epoch in pbar:
train_loss = self.train_epoch(train_loader, loss_fn)
val_loss = self.validate(val_loader, loss_fn)
self.history['train_loss'].append(train_loss)
self.history['val_loss'].append(val_loss)
if self.scheduler:
self.scheduler.step()
if val_loss < best_val_loss:
best_val_loss = val_loss
self.best_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()}
if verbose:
pbar.set_postfix({
'train': f'{train_loss:.6f}',
'val': f'{val_loss:.6f}',
'best': f'{best_val_loss:.6f}'
})
return self.history
def plot_training_history(history: Dict[str, List[float]], title: str = "Training History"):
"""Plot training curves."""
fig, ax = plt.subplots(figsize=(10, 4))
ax.semilogy(history['train_loss'], label='Train Loss', linewidth=2)
ax.semilogy(history['val_loss'], label='Validation Loss', linewidth=2)
ax.set_xlabel('Epoch')
ax.set_ylabel('Loss (log scale)')
ax.set_title(title)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('training_history.png', dpi=150, bbox_inches='tight')
plt.show()
print("✓ Saved training history to 'training_history.png'")We build the Physics-Informed Neural Network components and define how to incorporate physical laws directly into the learning process. We create the PINN-based MLP, implement the Darcy PDE loss with data, residual, and boundary terms, and initialize the PINN model for experimentation. We also define generic training utilities, including the trainer class and a loss-plotting function, so we have a reusable training framework for the models in the tutorial.
print("\n" + "="*80)
print("SECTION 8: TRAINING FNO MODEL")
print("="*80)
optimizer_fno = torch.optim.AdamW(fno_model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler_fno = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer_fno, T_max=100)
fno_trainer = Trainer(fno_model, optimizer_fno, scheduler_fno, device)
print("\nTraining FNO model...")
fno_history = fno_trainer.train(
train_loader,
test_loader,
n_epochs=100,
loss_fn=nn.MSELoss(),
verbose=True
)
plot_training_history(fno_history, "FNO Training History")
print("\n" + "="*80)
print("SECTION 9: EVALUATION AND VISUALIZATION")
print("="*80)
@torch.no_grad()
def evaluate_model(
model: nn.Module,
test_loader: DataLoader,
device: torch.device = device
) -> Dict[str, float]:
"""Evaluate model on test set."""
model.eval()
all_preds = []
all_targets = []
for batch_x, batch_y in test_loader:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
pred = model(batch_x)
all_preds.append(pred.cpu())
all_targets.append(batch_y.cpu())
preds = torch.cat(all_preds, dim=0)
targets = torch.cat(all_targets, dim=0)
mse = F.mse_loss(preds, targets).item()
rmse = np.sqrt(mse)
mae = F.l1_loss(preds, targets).item()
rel_l2 = torch.norm(preds - targets) / torch.norm(targets)
rel_l2 = rel_l2.item()
return {
'MSE': mse,
'RMSE': rmse,
'MAE': mae,
'Relative L2': rel_l2,
'predictions': preds,
'targets': targets
}
fno_model.load_state_dict({k: v.to(device) for k, v in fno_trainer.best_state.items()})
fno_metrics = evaluate_model(fno_model, test_loader)
print("\nFNO Model Evaluation Metrics:")
print(f" MSE: {fno_metrics['MSE']:.6f}")
print(f" RMSE: {fno_metrics['RMSE']:.6f}")
print(f" MAE: {fno_metrics['MAE']:.6f}")
print(f" Relative L2: {fno_metrics['Relative L2']:.4f} ({fno_metrics['Relative L2']*100:.2f}%)")
def visualize_predictions(
model: nn.Module,
test_dataset: Dataset,
n_samples: int = 3,
device: torch.device = device
):
"""Visualize model predictions vs ground truth."""
model.eval()
fig, axes = plt.subplots(n_samples, 4, figsize=(16, 4 * n_samples))
for i in range(n_samples):
x, y_true = test_dataset[i]
x = x.unsqueeze(0).to(device)
with torch.no_grad():
y_pred = model(x)
x = x.squeeze().cpu().numpy()
y_true = y_true.squeeze().cpu().numpy()
y_pred = y_pred.squeeze().cpu().numpy()
error = np.abs(y_true - y_pred)
im0 = axes[i, 0].imshow(x, cmap='viridis', origin='lower')
axes[i, 0].set_title(f'Input Permeability (Sample {i+1})')
plt.colorbar(im0, ax=axes[i, 0])
im1 = axes[i, 1].imshow(y_true, cmap='hot', origin='lower')
axes[i, 1].set_title('True Pressure')
plt.colorbar(im1, ax=axes[i, 1])
im2 = axes[i, 2].imshow(y_pred, cmap='hot', origin='lower')
axes[i, 2].set_title('Predicted Pressure')
plt.colorbar(im2, ax=axes[i, 2])
im3 = axes[i, 3].imshow(error, cmap='Reds', origin='lower')
axes[i, 3].set_title(f'Absolute Error (Max: {error.max():.4f})')
plt.colorbar(im3, ax=axes[i, 3])
for ax_row in axes:
for ax in ax_row:
ax.set_xticks([])
ax.set_yticks([])
plt.tight_layout()
plt.savefig('fno_predictions.png', dpi=150, bbox_inches='tight')
plt.show()
print("✓ Saved predictions to 'fno_predictions.png'")
visualize_predictions(fno_model, test_dataset, n_samples=3)
print("\n" + "="*80)
print("SECTION 10: PHYSICSNEMO BUILT-IN MODELS")
print("="*80)
if PHYSICSNEMO_AVAILABLE:
try:
from physicsnemo.models.fno import FNO
from physicsnemo.models.mlp.fully_connected import FullyConnected
print("\n✓ PhysicsNeMo models available!")
pn_fno = FNO(
in_channels=1,
out_channels=1,
decoder_layers=1,
decoder_layer_size=32,
dimension=2,
latent_channels=32,
num_fno_layers=4,
num_fno_modes=12,
padding=5
).to(device)
print(f" PhysicsNeMo FNO created with {sum(p.numel() for p in pn_fno.parameters()):,} parameters")
pn_mlp = FullyConnected(
in_features=32,
out_features=64,
num_layers=4,
layer_size=128
).to(device)
print(f" PhysicsNeMo MLP created with {sum(p.numel() for p in pn_mlp.parameters()):,} parameters")
except ImportError as e:
print(f"Some PhysicsNeMo models not available: {e}")
else:
print("\nPhysicsNeMo not installed. Install with: pip install nvidia-physicsnemo")
print("Using custom implementations demonstrated in this tutorial.")We train the Fourier Neural Operator using the prepared dataloaders, optimizer, and scheduler, and we track its learning progress over epochs. We then evaluate the trained FNO model on the test set, compute metrics, and visualize predictions against the ground truth to understand how well it solves the Darcy problem. We also check whether PhysicsNeMo’s built-in models are available and, if so, instantiate them to connect our custom implementation to the official framework components.
print("\n" + "="*80)
print("SECTION 11: CONVOLUTIONAL BASELINE MODEL")
print("="*80)
class ConvolutionalSurrogate(nn.Module):
"""
Simple convolutional neural network baseline.
A standard U-Net style encoder-decoder for comparison with FNO.
"""
def __init__(
self,
in_channels: int = 1,
out_channels: int = 1,
base_channels: int = 32
):
super().__init__()
self.enc1 = self._conv_block(in_channels, base_channels)
self.enc2 = self._conv_block(base_channels, base_channels * 2)
self.enc3 = self._conv_block(base_channels * 2, base_channels * 4)
self.bottleneck = self._conv_block(base_channels * 4, base_channels * 8)
self.up3 = nn.ConvTranspose2d(base_channels * 8, base_channels * 4, 2, stride=2)
self.dec3 = self._conv_block(base_channels * 8, base_channels * 4)
self.up2 = nn.ConvTranspose2d(base_channels * 4, base_channels * 2, 2, stride=2)
self.dec2 = self._conv_block(base_channels * 4, base_channels * 2)
self.up1 = nn.ConvTranspose2d(base_channels * 2, base_channels, 2, stride=2)
self.dec1 = self._conv_block(base_channels * 2, base_channels)
self.output = nn.Conv2d(base_channels, out_channels, 1)
self.pool = nn.MaxPool2d(2)
def _conv_block(self, in_ch: int, out_ch: int) -> nn.Sequential:
return nn.Sequential(
nn.Conv2d(in_ch, out_ch, 3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True),
nn.Conv2d(out_ch, out_ch, 3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
e1 = self.enc1(x)
e2 = self.enc2(self.pool(e1))
e3 = self.enc3(self.pool(e2))
b = self.bottleneck(self.pool(e3))
d3 = self.dec3(torch.cat([self.up3(b), e3], dim=1))
d2 = self.dec2(torch.cat([self.up2(d3), e2], dim=1))
d1 = self.dec1(torch.cat([self.up1(d2), e1], dim=1))
return self.output(d1)
print("\nCreating Convolutional baseline model...")
conv_model = ConvolutionalSurrogate(in_channels=1, out_channels=1, base_channels=16).to(device)
n_params_conv = sum(p.numel() for p in conv_model.parameters() if p.requires_grad)
print(f"✓ Conv Model created with {n_params_conv:,} trainable parameters")
optimizer_conv = torch.optim.AdamW(conv_model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler_conv = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer_conv, T_max=100)
conv_trainer = Trainer(conv_model, optimizer_conv, scheduler_conv, device)
print("\nTraining Convolutional model...")
conv_history = conv_trainer.train(
train_loader,
test_loader,
n_epochs=100,
loss_fn=nn.MSELoss(),
verbose=True
)
conv_model.load_state_dict({k: v.to(device) for k, v in conv_trainer.best_state.items()})
conv_metrics = evaluate_model(conv_model, test_loader)
print("\nConv Model Evaluation Metrics:")
print(f" MSE: {conv_metrics['MSE']:.6f}")
print(f" RMSE: {conv_metrics['RMSE']:.6f}")
print(f" MAE: {conv_metrics['MAE']:.6f}")
print(f" Relative L2: {conv_metrics['Relative L2']:.4f} ({conv_metrics['Relative L2']*100:.2f}%)")
print("\n" + "="*80)
print("SECTION 12: MODEL COMPARISON")
print("="*80)
def compare_models(
models: Dict[str, Tuple[nn.Module, Dict]],
test_loader: DataLoader,
device: torch.device = device
):
"""Compare multiple models."""
results = {}
print("\n" + "-"*60)
print(f"{'Model':<20} {'MSE':<12} {'RMSE':<12} {'Rel. L2':<12} {'Params':<12}")
print("-"*60)
for name, (model, trainer_state) in models.items():
model.load_state_dict({k: v.to(device) for k, v in trainer_state.items()})
metrics = evaluate_model(model, test_loader)
n_params = sum(p.numel() for p in model.parameters())
results[name] = metrics
print(f"{name:<20} {metrics['MSE']:<12.6f} {metrics['RMSE']:<12.6f} "
f"{metrics['Relative L2']:<12.4f} {n_params:<12,}")
print("-"*60)
return results
models_to_compare = {
'FNO': (fno_model, fno_trainer.best_state),
'Conv U-Net': (conv_model, conv_trainer.best_state)
}
comparison_results = compare_models(models_to_compare, test_loader)
def plot_comparison(results: Dict):
"""Plot model comparison."""
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
models = list(results.keys())
metrics = ['MSE', 'RMSE', 'Relative L2']
colors = plt.cm.Set2(np.linspace(0, 1, len(models)))
for i, metric in enumerate(metrics):
values = [results[m][metric] for m in models]
axes[i].bar(models, values, color=colors)
axes[i].set_title(metric)
axes[i].set_ylabel('Value')
axes[i].grid(axis='y', alpha=0.3)
for j, v in enumerate(values):
axes[i].text(j, v + 0.01 * max(values), f'{v:.4f}',
ha='center', va='bottom', fontsize=10)
plt.tight_layout()
plt.savefig('model_comparison.png', dpi=150, bbox_inches='tight')
plt.show()
print("✓ Saved comparison to 'model_comparison.png'")
plot_comparison(comparison_results)We implement a convolutional surrogate baseline model to compare a standard deep learning architecture with the Fourier Neural Operator. We train this convolutional model, evaluate its performance, and print the main error metrics to see how well it approximates the pressure field. We then compare the FNO and convolutional models side by side and visualize their metrics, which helps us understand the strengths of operator learning versus conventional convolutional learning.
print("\n" + "="*80)
print("SECTION 13: INFERENCE AND DEPLOYMENT")
print("="*80)
@torch.no_grad()
def inference_single(
model: nn.Module,
permeability: np.ndarray,
device: torch.device = device
) -> np.ndarray:
"""
Run inference on a single permeability field.
Args:
model: Trained model
permeability: 2D numpy array of permeability values
device: Device to run inference on
Returns:
Predicted pressure field as numpy array
"""
model.eval()
perm_mean = train_dataset.perm_mean
perm_std = train_dataset.perm_std
x = (torch.FloatTensor(permeability) - perm_mean) / perm_std
x = x.unsqueeze(0).unsqueeze(0).to(device)
pred = model(x)
press_mean = train_dataset.press_mean
press_std = train_dataset.press_std
pred = pred.squeeze().cpu().numpy()
pred = pred * press_std.numpy() + press_mean.numpy()
return pred
def benchmark_inference(
model: nn.Module,
n_samples: int = 100,
resolution: int = 32,
device: torch.device = device
) -> Dict[str, float]:
"""Benchmark inference speed."""
import time
model.eval()
x = torch.randn(n_samples, 1, resolution, resolution).to(device)
with torch.no_grad():
for _ in range(10):
_ = model(x[:1])
if torch.cuda.is_available():
torch.cuda.synchronize()
start = time.time()
with torch.no_grad():
for i in range(n_samples):
_ = model(x[i:i+1])
if torch.cuda.is_available():
torch.cuda.synchronize()
total_time = time.time() - start
return {
'total_time': total_time,
'time_per_sample': total_time / n_samples,
'samples_per_second': n_samples / total_time
}
print("\nBenchmarking inference speed...")
print("-" * 50)
for name, (model, _) in models_to_compare.items():
benchmark = benchmark_inference(model, n_samples=100)
print(f"{name}:")
print(f" Time per sample: {benchmark['time_per_sample']*1000:.3f} ms")
print(f" Throughput: {benchmark['samples_per_second']:.1f} samples/sec")
print("\n\nDemonstrating single sample inference...")
test_perm = perm_test[0]
predicted_pressure = inference_single(fno_model, test_perm)
print(f"Input permeability shape: {test_perm.shape}")
print(f"Output pressure shape: {predicted_pressure.shape}")
print(f"Pressure range: [{predicted_pressure.min():.4f}, {predicted_pressure.max():.4f}]")
print("\n" + "="*80)
print("SECTION 14: SAVING AND LOADING MODELS")
print("="*80)
def save_model(
model: nn.Module,
path: str,
optimizer: torch.optim.Optimizer = None,
metadata: Dict = None
):
"""Save model checkpoint."""
checkpoint = {
'model_state_dict': model.state_dict(),
'model_class': model.__class__.__name__,
}
if optimizer:
checkpoint['optimizer_state_dict'] = optimizer.state_dict()
if metadata:
checkpoint['metadata'] = metadata
torch.save(checkpoint, path)
print(f"✓ Model saved to {path}")
def load_model(
model: nn.Module,
path: str,
optimizer: torch.optim.Optimizer = None,
device: torch.device = device
) -> Dict:
"""Load model checkpoint."""
checkpoint = torch.load(path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
if optimizer and 'optimizer_state_dict' in checkpoint:
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
print(f"✓ Model loaded from {path}")
return checkpoint.get('metadata', {})
save_model(
fno_model,
'fno_darcy_best.pt',
optimizer_fno,
metadata={
'epochs_trained': 100,
'test_rmse': fno_metrics['RMSE'],
'resolution': 32
}
)
print("\n" + "="*80)
print("SECTION 15: SUMMARY AND NEXT STEPS")
print("="*80)
print("""
╔══════════════════════════════════════════════════════════════════════════════╗
║ TUTORIAL SUMMARY ║
╠══════════════════════════════════════════════════════════════════════════════╣
║ ║
║ This tutorial covered: ║
║ ║
║ 1. ✓ Installing and setting up PhysicsNeMo on Google Colab ║
║ 2. ✓ Understanding the 2D Darcy Flow problem (PDE for subsurface flow) ║
║ 3. ✓ Generating synthetic training data ║
║ 4. ✓ Implementing Fourier Neural Operators (FNO) from scratch ║
║ 5. ✓ Understanding Physics-Informed Neural Networks (PINNs) ║
║ 6. ✓ Training neural operators for operator learning ║
║ 7. ✓ Comparing different model architectures ║
║ 8. ✓ Benchmarking inference speed ║
║ 9. ✓ Saving and loading trained models ║
║ ║
╠══════════════════════════════════════════════════════════════════════════════╣
║ NEXT STEPS ║
╠══════════════════════════════════════════════════════════════════════════════╣
║ ║
║ → Try PhysicsNeMo's built-in models for better performance ║
║ → Explore other neural operators: DeepONet, GraphNet ║
║ → Apply to real-world datasets from CFD simulations ║
║ → Implement physics-informed training for better generalization ║
║ → Use distributed training for larger models ║
║ ║
║ Resources: ║
║ • PhysicsNeMo Docs: https://docs.nvidia.com/physicsnemo ║
║ • GitHub: https://github.com/NVIDIA/physicsnemo ║
║ • Examples: https://github.com/NVIDIA/physicsnemo/tree/main/examples ║
║ ║
╚══════════════════════════════════════════════════════════════════════════════╝
""")
print("\n✓ Tutorial completed successfully!")
print(f" Total GPU memory used: {torch.cuda.max_memory_allocated() / 1e9:.2f} GB" if torch.cuda.is_available() else " (No GPU used)")We focus on inference, deployment, and model persistence so the tutorial moves beyond training into practical usage. We benchmark inference speed, run single-sample prediction, and inspect the shape and range of the generated output to verify that the trained model behaves as expected. Also, we save the best model checkpoint and close the tutorial with a summary of what we accomplished and where we can go next with PhysicsNeMo and physics-informed machine learning.
In conclusion, we built a full PhysicsNeMo-inspired pipeline to solve a representative scientific machine learning problem in an interactive Colab environment. We generated synthetic PDE data, trained neural operator models, examined prediction quality, compared different approaches, and prepared the models for inference and reuse. Also, we strengthened our understanding of operator learning, surrogate modeling, and physics-informed deep learning, and we saw how these ideas connect to real-world simulation tasks. This tutorial provides a strong foundation for further exploration with PhysicsNeMo, whether we want to experiment with larger datasets, more advanced neural operators, or more realistic physics-driven applications.
Check out the Full Implementation Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post A Step-by-Step Coding Tutorial on NVIDIA PhysicsNeMo: Darcy Flow, FNOs, PINNs, Surrogate Models, and Inference Benchmarking appeared first on MarkTechPost.
from MarkTechPost https://ift.tt/J9Wxkyj
via IFTTT


Comments
Post a Comment