Back to Tutorials
AdvancedTutorial 18

Advanced Architectures: Multi-vector and Temporal Search

NeuronDB Team
2/24/2025
30 min read

Advanced Architectures Overview

Advanced architectures handle complex requirements. They use multi-vector embeddings. They support temporal search. They employ ensemble methods. They optimize indexing strategies.

Advanced architectures improve performance. They handle complexity. They enable sophisticated applications. They optimize for scale.

Advanced Architectures
Figure: Advanced Architectures

The diagram shows advanced architecture components. Multi-vector handles complexity. Temporal search handles time. Ensembles improve performance.

Multi-vector Embeddings

Multi-vector embeddings use multiple vectors per document. They capture different aspects. They improve retrieval coverage. They handle complex documents.

Methods include sentence-level, chunk-level, and aspect-based embeddings. Each captures different information. Combined they improve retrieval.

# Multi-vector Embeddings
def create_multi_vectors(document):
# Multiple embedding strategies
sentence_embs = embed_sentences(document)
chunk_embs = embed_chunks(document)
aspect_embs = embed_aspects(document)
return {
'sentences': sentence_embs,
'chunks': chunk_embs,
'aspects': aspect_embs
}
def multi_vector_search(query, multi_vectors):
query_emb = embed_query(query)
# Search across all vector types
all_results = []
for doc_id, vectors in multi_vectors.items():
for vec_type, embs in vectors.items():
scores = compute_similarity(query_emb, embs)
all_results.append((doc_id, vec_type, max(scores)))
# Aggregate and rank
return aggregate_results(all_results)

Multi-vector embeddings improve coverage. They capture document complexity. They enable better retrieval.

Temporal Search Patterns

Temporal search handles time-sensitive information. It considers document timestamps. It prioritizes recent information. It enables time-based queries.

Patterns include recency boosting, time-weighted scoring, and temporal filtering. Each handles time differently. Combined they improve temporal relevance.

# Temporal Search
def temporal_search(query, documents, timestamps, recency_weight=0.3):
# Relevance scores
relevance = compute_relevance(query, documents)
# Recency scores
max_time = max(timestamps)
recency = [1.0 / (1 + (max_time - t).days) for t in timestamps]
recency = normalize(recency)
# Combined scores
scores = (1 - recency_weight) * relevance + recency_weight * recency
return rank_by_scores(scores)

Temporal search handles time-sensitive queries. It prioritizes recent information. It improves temporal relevance.

Ensemble Methods

Ensemble methods combine multiple models. They improve performance. They reduce variance. They increase robustness.

Methods include voting, averaging, and stacking. Voting combines predictions. Averaging combines probabilities. Stacking uses meta-learner.

# Ensemble Methods
def ensemble_predict(models, input_data):
predictions = []
for model in models:
pred = model.predict(input_data)
predictions.append(pred)
# Voting
voted = majority_vote(predictions)
# Averaging
averaged = np.mean(predictions, axis=0)
# Stacking
stacked = meta_learner.predict(predictions)
return stacked

Ensemble methods improve performance. They combine model strengths. They reduce individual weaknesses.

Detailed Ensemble Techniques

Voting ensembles combine predictions from multiple models. Hard voting uses majority class. Soft voting averages probabilities. Voting works well when models are diverse. It reduces individual model errors.

Averaging ensembles average predictions. For regression, average numeric predictions. For classification, average probability distributions. Averaging reduces variance. It improves stability.

Stacking uses meta-learner. Base models make predictions. Meta-learner learns to combine predictions. It learns optimal combination. It often performs best. It requires more data.

# Detailed Ensemble Implementation
from sklearn.ensemble import VotingClassifier, VotingRegressor, StackingClassifier
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
import numpy as np
class EnsembleMethods:
def __init__(self):
self.models = []
def hard_voting(self, models, X):
"""Hard voting ensemble"""
predictions = []
for model in models:
pred = model.predict(X)
predictions.append(pred)
# Majority vote
predictions = np.array(predictions)
final_pred = []
for i in range(X.shape[0]):
votes = predictions[:, i]
final_pred.append(np.bincount(votes.astype(int)).argmax())
return np.array(final_pred)
def soft_voting(self, models, X):
"""Soft voting ensemble"""
probabilities = []
for model in models:
if hasattr(model, 'predict_proba'):
proba = model.predict_proba(X)
probabilities.append(proba)
# Average probabilities
avg_proba = np.mean(probabilities, axis=0)
return np.argmax(avg_proba, axis=1)
def stacking_ensemble(self, base_models, meta_model, X_train, y_train, X_test):
"""Stacking ensemble"""
# Train base models
base_predictions = []
for model in base_models:
model.fit(X_train, y_train)
pred = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else model.predict(X_test)
base_predictions.append(pred)
# Stack predictions
stacked_X = np.column_stack(base_predictions)
# Train meta-learner
meta_model.fit(stacked_X, y_train)
# Predict
return meta_model.predict(stacked_X)
def bagging_ensemble(self, base_model, X_train, y_train, n_estimators=10):
"""Bootstrap aggregating"""
from sklearn.utils import resample
models = []
for i in range(n_estimators):
# Bootstrap sample
X_boot, y_boot = resample(X_train, y_train, random_state=i)
# Train model on bootstrap sample
model = type(base_model)(**base_model.get_params())
model.fit(X_boot, y_boot)
models.append(model)
return models
def predict_bagging(self, models, X):
"""Predict using bagging ensemble"""
predictions = []
for model in models:
pred = model.predict(X)
predictions.append(pred)
# Average predictions
return np.mean(predictions, axis=0)
# Example
ensemble = EnsembleMethods()
base_models = [
LogisticRegression(),
DecisionTreeClassifier(),
SVC(probability=True)
]
# Hard voting
# hard_pred = ensemble.hard_voting(base_models, X_test)
# Soft voting
# soft_pred = ensemble.soft_voting(base_models, X_test)
# Stacking
meta_model = LogisticRegression()
# stacked_pred = ensemble.stacking_ensemble(base_models, meta_model, X_train, y_train, X_test)

Ensemble Selection and Optimization

Select diverse base models. Different algorithms learn different patterns. Different architectures capture different features. Diversity improves ensemble performance. Similar models provide little benefit.

Optimize ensemble size. More models improve performance but increase computation. Diminishing returns occur after certain point. Typical ensemble size is 5-20 models. Test different sizes to find optimal.

Weight ensemble members. Some models perform better. Assign higher weights to better models. Learn weights from validation data. Weighted combination improves performance.

# Ensemble Optimization
class EnsembleOptimizer:
def __init__(self):
self.model_weights = None
def optimize_weights(self, models, X_val, y_val):
"""Optimize ensemble weights using validation data"""
from scipy.optimize import minimize
# Get predictions from all models
predictions = []
for model in models:
if hasattr(model, 'predict_proba'):
pred = model.predict_proba(X_val)
else:
pred = model.predict(X_val)
predictions.append(pred)
predictions = np.array(predictions)
# Objective function: minimize error with weighted combination
def objective(weights):
weighted_pred = np.tensordot(weights, predictions, axes=1)
if len(weighted_pred.shape) > 1:
weighted_pred = np.argmax(weighted_pred, axis=1)
error = np.mean(weighted_pred != y_val)
return error
# Constraint: weights sum to 1
constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
bounds = [(0, 1) for _ in range(len(models))]
# Initial weights (equal)
initial_weights = np.ones(len(models)) / len(models)
# Optimize
result = minimize(objective, initial_weights, method='SLSQP',
bounds=bounds, constraints=constraints)
self.model_weights = result.x
return self.model_weights
def weighted_ensemble_predict(self, models, X):
"""Predict using optimized weights"""
predictions = []
for model in models:
if hasattr(model, 'predict_proba'):
pred = model.predict_proba(X)
else:
pred = model.predict(X)
predictions.append(pred)
predictions = np.array(predictions)
weighted_pred = np.tensordot(self.model_weights, predictions, axes=1)
if len(weighted_pred.shape) > 1:
return np.argmax(weighted_pred, axis=1)
return weighted_pred
# Example
optimizer = EnsembleOptimizer()
# weights = optimizer.optimize_weights(base_models, X_val, y_val)
# weighted_pred = optimizer.weighted_ensemble_predict(base_models, X_test)

Advanced Indexing Strategies

Advanced indexing optimizes search performance. It uses specialized structures. It handles high dimensions. It scales to large datasets.

Strategies include HNSW, IVF, and product quantization. Each optimizes different aspects. Combined they enable scale.

# Advanced Indexing
import faiss
# HNSW index
index_hnsw = faiss.IndexHNSWFlat(dimension, M=16)
index_hnsw.add(vectors)
# IVF index
quantizer = faiss.IndexFlatL2(dimension)
index_ivf = faiss.IndexIVFFlat(quantizer, dimension, nlist=100)
index_ivf.train(vectors)
index_ivf.add(vectors)
# Product quantization
index_pq = faiss.IndexPQ(dimension, M=8, nbits=8)
index_pq.train(vectors)
index_pq.add(vectors)

Advanced indexing enables scale. It optimizes performance. It handles large datasets.

Complex Architecture Designs

Complex architectures combine multiple techniques. They optimize for specific requirements. They balance tradeoffs. They enable sophisticated applications.

Designs include multi-stage retrieval, cascading models, and adaptive systems. Each handles complexity differently. Combined they enable advanced applications.

# Complex Architecture
class AdvancedSearchSystem:
def __init__(self):
self.retrievers = [semantic_retriever, keyword_retriever, hybrid_retriever]
self.reranker = cross_encoder_reranker
self.generator = llm_generator
def search(self, query):
# Multi-stage retrieval
candidates = []
for retriever in self.retrievers:
results = retriever.retrieve(query, top_k=20)
candidates.extend(results)
# Deduplicate
candidates = deduplicate(candidates)
# Rerank
reranked = self.reranker.rerank(query, candidates, top_k=10)
# Generate
context = format_context(reranked)
answer = self.generator.generate(query, context)
return answer

Complex architectures enable advanced applications. They combine techniques effectively. They optimize for requirements.

Detailed Architecture Design Patterns

Multi-stage retrieval uses multiple retrieval passes. First stage uses fast approximate search. It retrieves large candidate set. Second stage uses accurate reranking. It selects final results. This balances speed and accuracy.

Cascading models use multiple models in sequence. Early models filter candidates quickly. Later models provide accurate predictions. Each model has different speed-accuracy tradeoff. This optimizes overall performance.

Adaptive systems adjust behavior dynamically. They monitor performance metrics. They switch strategies based on conditions. They optimize for current workload. They improve efficiency.

# Detailed Architecture Patterns
class MultiStageRetrieval:
def __init__(self):
self.fast_retriever = FastApproximateRetriever() # Fast, approximate
self.accurate_reranker = AccurateReranker() # Slow, accurate
def retrieve(self, query, top_k=10):
# Stage 1: Fast approximate retrieval
candidates = self.fast_retriever.retrieve(query, top_k=100)
# Stage 2: Accurate reranking
final_results = self.accurate_reranker.rerank(query, candidates, top_k=top_k)
return final_results
class CascadingModels:
def __init__(self):
self.fast_model = FastModel() # Quick filtering
self.accurate_model = AccurateModel() # Precise prediction
def predict(self, input_data):
# Stage 1: Fast filtering
fast_prediction = self.fast_model.predict(input_data)
# Only use accurate model if needed
if fast_prediction.confidence < 0.8:
accurate_prediction = self.accurate_model.predict(input_data)
return accurate_prediction
else:
return fast_prediction
class AdaptiveSystem:
def __init__(self):
self.strategies = {
'fast': FastStrategy(),
'balanced': BalancedStrategy(),
'accurate': AccurateStrategy()
}
self.current_strategy = 'balanced'
self.metrics = {'latency': [], 'accuracy': []}
def adapt(self):
avg_latency = np.mean(self.metrics['latency'][-100:])
avg_accuracy = np.mean(self.metrics['accuracy'][-100:])
if avg_latency > 1.0:
self.current_strategy = 'fast'
elif avg_accuracy < 0.8:
self.current_strategy = 'accurate'
else:
self.current_strategy = 'balanced'
def process(self, input_data):
result = self.strategies[self.current_strategy].process(input_data)
self.metrics['latency'].append(result.latency)
self.metrics['accuracy'].append(result.accuracy)
self.adapt()
return result

Attention Mechanisms

Attention mechanisms enable models to focus on relevant information. Self-attention processes all positions. Cross-attention connects different sequences. Multi-head attention captures multiple patterns.

Attention Mechanisms
Figure: Attention Mechanisms

The diagram shows attention types. Self-attention connects all tokens. Cross-attention connects query and keys. Multi-head attention uses parallel heads. Each captures different relationships.

Detailed Attention Mechanism Mathematics

Self-attention computes attention(Q, K, V) = softmax(QKᵀ / √dₖ) V. Q, K, V are query, key, and value matrices. Each row represents a token position. QKᵀ computes similarity between all position pairs. Division by √dₖ prevents large values. Softmax converts to probabilities. V provides content to attend to.

Scaled dot-product attention uses dot products. It is computationally efficient. It works well in practice. It requires O(n²) computation for sequence length n. This limits maximum sequence length.

Attention weights show what each position attends to. They are interpretable. They reveal model focus. They help debug models. They enable visualization.

# Detailed Attention Mathematics
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
class AttentionMechanismDetailed:
def __init__(self, d_model, d_k=None, d_v=None):
self.d_model = d_model
self.d_k = d_k if d_k else d_model
self.d_v = d_v if d_v else d_model
self.W_q = nn.Linear(d_model, self.d_k)
self.W_k = nn.Linear(d_model, self.d_k)
self.W_v = nn.Linear(d_model, self.d_v)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
"""Compute scaled dot-product attention"""
# Compute attention scores
scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)
# Apply mask if provided
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# Softmax
attention_weights = torch.softmax(scores, dim=-1)
# Apply to values
output = torch.matmul(attention_weights, V)
return output, attention_weights
def forward(self, x, mask=None):
"""Forward pass"""
Q = self.W_q(x)
K = self.W_k(x)
V = self.W_v(x)
output, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)
return output, attention_weights
def visualize_attention(self, attention_weights, tokens):
"""Visualize attention weights"""
weights = attention_weights[0].detach().numpy() # First head, first sample
plt.figure(figsize=(10, 8))
plt.imshow(weights, cmap='Blues')
plt.xlabel('Key Position')
plt.ylabel('Query Position')
plt.title('Attention Weights Visualization')
plt.colorbar()
plt.show()
# Example
attention = AttentionMechanismDetailed(d_model=512)
x = torch.randn(1, 10, 512) # batch=1, seq_len=10, d_model=512
output, weights = attention(x)
print("Output shape: " + str(output.shape))
print("Attention weights shape: " + str(weights.shape))
print("Attention weights sum (should be 1): " + str(weights.sum(dim=-1)[0, 0].item()))

Attention Variants and Optimizations

Sparse attention reduces computation. It attends to subset of positions. It uses patterns or learned sparsity. It scales to longer sequences. It maintains quality.

Linear attention uses kernel methods. It reduces complexity from O(n²) to O(n). It approximates softmax attention. It enables longer sequences. It trades some accuracy for speed.

Flash attention optimizes memory usage. It computes attention in blocks. It reduces memory from O(n²) to O(n). It speeds up training. It enables larger batch sizes.

# Attention Variants
class SparseAttention:
def __init__(self, d_model, window_size=3):
self.d_model = d_model
self.window_size = window_size
def forward(self, x):
"""Local window attention"""
batch_size, seq_len, d_model = x.shape
output = torch.zeros_like(x)
for i in range(seq_len):
# Attend to local window
start = max(0, i - self.window_size)
end = min(seq_len, i + self.window_size + 1)
# Compute attention for window
window = x[:, start:end, :]
# ... attention computation ...
return output
class LinearAttention:
def __init__(self, d_model):
self.d_model = d_model
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
def forward(self, x):
"""Linear attention using kernel trick"""
Q = self.W_q(x)
K = self.W_k(x)
V = self.W_v(x)
# Use ReLU as kernel function
Q_kernel = torch.relu(Q)
K_kernel = torch.relu(K)
# Linear complexity: O(n) instead of O(n²)
KV = torch.matmul(K_kernel.transpose(-2, -1), V)
Z = torch.matmul(Q_kernel, KV)
# Normalize
normalizer = torch.matmul(Q_kernel, K_kernel.sum(dim=-2, keepdim=True))
output = Z / (normalizer + 1e-8)
return output
# Compare complexities
print("Standard attention: O(n²) complexity")
print("Sparse attention: O(n×w) complexity where w is window size")
print("Linear attention: O(n) complexity")

Encoder-Decoder Architectures

Encoder-decoder architectures process input-output sequences. Encoders process input. Decoders generate output. Attention connects them.

Encoder-Decoder
Figure: Encoder-Decoder

The diagram shows encoder-decoder structure. Encoder processes source sequence. Decoder generates target sequence. Cross-attention connects them. Enables sequence-to-sequence tasks.

Graph Neural Networks

Graph neural networks process graph-structured data. They handle nodes and edges. They capture relationships. They work for social networks and knowledge graphs.

Graph Neural Networks
Figure: Graph Neural Networks

The diagram shows graph structure. Nodes represent entities. Edges represent relationships. Networks process graph information. Enable relationship learning.

Detailed Graph Neural Network Implementation

Graph neural networks process graph-structured data. They aggregate neighbor information. They update node representations. They capture graph structure.

Message passing is core mechanism. Each node collects messages from neighbors. Messages contain neighbor features. Aggregation combines messages. Update function computes new node representation.

# Detailed GNN Implementation
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing
from torch_geometric.data import Data
class GCNLayer(MessagePassing):
"""Graph Convolutional Network Layer"""
def __init__(self, in_channels, out_channels):
super(GCNLayer, self).__init__(aggr='add')
self.lin = nn.Linear(in_channels, out_channels)
def forward(self, x, edge_index):
# Linear transformation
x = self.lin(x)
# Message passing
return self.propagate(edge_index, x=x)
def message(self, x_j):
"""Message from neighbor j to node i"""
return x_j
def update(self, aggr_out):
"""Update node representation"""
return aggr_out
class GraphNeuralNetwork(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2):
super().__init__()
self.layers = nn.ModuleList()
# First layer
self.layers.append(GCNLayer(input_dim, hidden_dim))
# Hidden layers
for _ in range(num_layers - 2):
self.layers.append(GCNLayer(hidden_dim, hidden_dim))
# Output layer
self.layers.append(GCNLayer(hidden_dim, output_dim))
def forward(self, x, edge_index):
for layer in self.layers:
x = layer(x, edge_index)
x = F.relu(x)
return x
# Example
# Create graph data
num_nodes = 5
x = torch.randn(num_nodes, 10) # Node features
edge_index = torch.tensor([[0, 1, 1, 2, 2, 3, 3, 4],
[1, 0, 2, 1, 3, 2, 4, 3]], dtype=torch.long)
# Create GNN
gnn = GraphNeuralNetwork(input_dim=10, hidden_dim=16, output_dim=2, num_layers=2)
output = gnn(x, edge_index)
print("GNN output shape: " + str(output.shape))

GNN Variants and Applications

Graph Convolutional Networks use spectral graph theory. They filter signals on graphs. They work well for node classification. They scale to large graphs.

Graph Attention Networks use attention mechanisms. They learn importance of neighbors. They adapt to different graph structures. They improve performance on many tasks.

GraphSAGE samples and aggregates neighbors. It works for large graphs. It generalizes to unseen nodes. It enables inductive learning.

# GNN Variants
class GraphAttentionLayer(nn.Module):
"""Graph Attention Network Layer"""
def __init__(self, in_features, out_features, dropout=0.1, alpha=0.2):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.dropout = dropout
self.alpha = alpha
self.W = nn.Parameter(torch.empty(size=(in_features, out_features)))
self.a = nn.Parameter(torch.empty(size=(2*out_features, 1)))
self.reset_parameters()
def reset_parameters(self):
nn.init.xavier_uniform_(self.W.data, gain=1.414)
nn.init.xavier_uniform_(self.a.data, gain=1.414)
def forward(self, h, adj):
Wh = torch.mm(h, self.W)
e = self._prepare_attentional_mechanism_input(Wh)
e = F.leaky_relu(e, negative_slope=self.alpha)
attention = F.softmax(e, dim=1)
attention = F.dropout(attention, self.dropout, training=self.training)
h_prime = torch.matmul(attention, Wh)
return h_prime
def _prepare_attentional_mechanism_input(self, Wh):
N = Wh.size()[0]
Wh1 = torch.matmul(Wh, self.a[:self.out_features, :])
Wh2 = torch.matmul(Wh, self.a[self.out_features:, :])
e = Wh1 + Wh2.T
return e
# GraphSAGE implementation
class GraphSAGELayer(nn.Module):
def __init__(self, in_features, out_features):
super().__init__()
self.linear = nn.Linear(in_features * 2, out_features)
def forward(self, x, adj, sample_size=5):
# Sample neighbors
sampled_neighbors = self.sample_neighbors(adj, sample_size)
# Aggregate neighbor features
neighbor_features = x[sampled_neighbors]
aggregated = torch.mean(neighbor_features, dim=1)
# Concatenate and transform
combined = torch.cat([x, aggregated], dim=1)
output = self.linear(combined)
return F.relu(output)
def sample_neighbors(self, adj, sample_size):
# Simplified neighbor sampling
return torch.randint(0, adj.size(0), (adj.size(0), sample_size))
# Example usage
# gat_layer = GraphAttentionLayer(10, 16)
# sage_layer = GraphSAGELayer(10, 16)

Diffusion Models

Diffusion models generate data through iterative denoising. Forward process adds noise. Reverse process removes noise. They generate high-quality images and audio.

Diffusion Models
Figure: Diffusion Models

The diagram shows diffusion process. Forward adds noise gradually. Reverse removes noise iteratively. Generates new samples. Works for images and audio.

Detailed Diffusion Model Implementation

Diffusion models learn to reverse noise process. Forward process adds Gaussian noise. q(x_t | x_{t-1}) = N(x_t; √(1-β_t)x_{t-1}, β_t I). β_t is noise schedule. It increases over time. Eventually data becomes pure noise.

Reverse process learns to denoise. p_θ(x_{t-1} | x_t) predicts previous step. Model learns to predict noise. It subtracts predicted noise. It recovers original data.

Training objective minimizes noise prediction error. L = E[||ε - ε_θ(x_t, t)||²]. ε is actual noise. ε_θ is predicted noise. Model learns to predict noise at each timestep.

# Detailed Diffusion Model Implementation
import torch
import torch.nn as nn
import numpy as np
class DiffusionModel(nn.Module):
def __init__(self, input_dim, hidden_dim=256, num_timesteps=1000):
super().__init__()
self.num_timesteps = num_timesteps
# Noise schedule
self.betas = self.linear_beta_schedule(num_timesteps)
self.alphas = 1.0 - self.betas
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# Noise prediction network
self.network = nn.Sequential(
nn.Linear(input_dim + 1, hidden_dim), # +1 for timestep
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim)
)
def linear_beta_schedule(self, timesteps, start=0.0001, end=0.02):
"""Linear noise schedule"""
return torch.linspace(start, end, timesteps)
def forward_process(self, x_0, t):
"""Forward diffusion process"""
sqrt_alphas_cumprod_t = torch.sqrt(self.alphas_cumprod[t])
sqrt_one_minus_alphas_cumprod_t = torch.sqrt(1.0 - self.alphas_cumprod[t])
noise = torch.randn_like(x_0)
x_t = sqrt_alphas_cumprod_t * x_0 + sqrt_one_minus_alphas_cumprod_t * noise
return x_t, noise
def reverse_process(self, x_t, t):
"""Reverse diffusion process"""
# Predict noise
t_tensor = t.float().unsqueeze(-1)
network_input = torch.cat([x_t, t_tensor], dim=-1)
predicted_noise = self.network(network_input)
# Denoise
alpha_t = self.alphas[t]
alpha_cumprod_t = self.alphas_cumprod[t]
beta_t = self.betas[t]
pred_x_0 = (x_t - torch.sqrt(1.0 - alpha_cumprod_t) * predicted_noise) / torch.sqrt(alpha_cumprod_t)
pred_x_0 = torch.clamp(pred_x_0, -1.0, 1.0)
# Predict x_{t-1}
pred_x_prev = (1.0 / torch.sqrt(alpha_t)) * (x_t - beta_t / torch.sqrt(1.0 - alpha_cumprod_t) * predicted_noise)
if t[0] > 0:
posterior_variance = beta_t * (1.0 - self.alphas_cumprod[t-1]) / (1.0 - alpha_cumprod_t)
noise = torch.randn_like(x_t)
pred_x_prev = pred_x_prev + torch.sqrt(posterior_variance) * noise
return pred_x_prev
def sample(self, shape, device):
"""Generate samples"""
x = torch.randn(shape, device=device)
for t in range(self.num_timesteps - 1, -1, -1):
t_tensor = torch.full((shape[0],), t, device=device, dtype=torch.long)
x = self.reverse_process(x, t_tensor)
return x
# Example
model = DiffusionModel(input_dim=784) # For 28x28 images
x_0 = torch.randn(32, 784) # Batch of 32 samples
t = torch.randint(0, 1000, (32,))
# Forward process
x_t, noise = model.forward_process(x_0, t)
print("Noisy sample shape: " + str(x_t.shape))
# Reverse process
x_prev = model.reverse_process(x_t, t)
print("Denoised sample shape: " + str(x_prev.shape))

Diffusion Model Training and Sampling

Training samples random timesteps. It adds noise to data. It predicts added noise. It minimizes prediction error. Process is straightforward.

Sampling starts from pure noise. It iteratively denoises. Each step removes noise. It gradually recovers data. Many steps required for quality.

DDPM uses fixed number of steps. DDIM uses fewer steps. It accelerates sampling. It maintains quality. It enables faster generation.

# Diffusion Training and Sampling
def train_diffusion_model(model, dataloader, num_epochs=100, device='cuda'):
"""Train diffusion model"""
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
model = model.to(device)
for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
x_0 = batch.to(device)
batch_size = x_0.shape[0]
# Sample random timesteps
t = torch.randint(0, model.num_timesteps, (batch_size,), device=device)
# Forward process
x_t, noise = model.forward_process(x_0, t)
# Predict noise
t_tensor = t.float().unsqueeze(-1)
network_input = torch.cat([x_t, t_tensor], dim=-1)
predicted_noise = model.network(network_input)
# Loss
loss = nn.functional.mse_loss(predicted_noise, noise)
# Backward
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")
def sample_ddpm(model, num_samples=10, device='cuda'):
"""Sample using DDPM"""
shape = (num_samples, model.input_dim)
x = torch.randn(shape, device=device)
for t in range(model.num_timesteps - 1, -1, -1):
t_tensor = torch.full((num_samples,), t, device=device, dtype=torch.long)
x = model.reverse_process(x, t_tensor)
return x
def sample_ddim(model, num_samples=10, num_steps=50, device='cuda'):
"""Sample using DDIM (faster)"""
shape = (num_samples, model.input_dim)
x = torch.randn(shape, device=device)
# Use fewer steps
step_size = model.num_timesteps // num_steps
for i in range(num_steps - 1, -1, -1):
t = i * step_size
t_tensor = torch.full((num_samples,), t, device=device, dtype=torch.long)
x = model.reverse_process(x, t_tensor)
return x
# Example
# train_diffusion_model(model, dataloader)
# samples_ddpm = sample_ddpm(model, num_samples=10)
# samples_ddim = sample_ddim(model, num_samples=10, num_steps=50)

Reinforcement Learning

Reinforcement learning learns from interaction. Agents take actions. Environments provide rewards. Policies improve over time. Enables game playing and robotics.

Reinforcement Learning
Figure: Reinforcement Learning

The diagram shows RL loop. Agent observes state. Agent takes action. Environment provides reward. Agent updates policy. Process repeats for learning.

Detailed Reinforcement Learning Algorithms

Q-learning learns action-value function. Q(s, a) estimates expected return. It uses Bellman equation. Q(s, a) = r + γ max Q(s', a'). It learns optimal policy. It works for discrete actions.

Policy gradient methods learn policy directly. They maximize expected return. They use gradient ascent. They work for continuous actions. They require more samples.

Actor-critic combines value and policy methods. Actor learns policy. Critic learns value function. Critic guides actor updates. It reduces variance. It improves learning.

# Detailed RL Implementation
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
class QNetwork(nn.Module):
"""Q-learning network"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
super().__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
return self.fc3(x)
class QLearning:
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99, epsilon=1.0):
self.q_network = QNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
def select_action(self, state, training=True):
"""Epsilon-greedy action selection"""
if training and np.random.random() < self.epsilon:
return np.random.randint(self.q_network.fc3.out_features)
else:
with torch.no_grad():
q_values = self.q_network(torch.FloatTensor(state))
return q_values.argmax().item()
def update(self, state, action, reward, next_state, done):
"""Update Q-network"""
state_tensor = torch.FloatTensor(state)
next_state_tensor = torch.FloatTensor(next_state)
current_q = self.q_network(state_tensor)[action]
if done:
target_q = reward
else:
next_q = self.q_network(next_state_tensor).max()
target_q = reward + self.gamma * next_q
loss = nn.functional.mse_loss(current_q, target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
return loss.item()
class PolicyGradient:
"""Policy gradient method"""
def __init__(self, state_dim, action_dim, lr=0.001):
self.policy_network = nn.Sequential(
nn.Linear(state_dim, 64),
nn.ReLU(),
nn.Linear(64, action_dim),
nn.Softmax(dim=-1)
)
self.optimizer = optim.Adam(self.policy_network.parameters(), lr=lr)
def select_action(self, state):
"""Sample action from policy"""
probs = self.policy_network(torch.FloatTensor(state))
action = torch.distributions.Categorical(probs).sample()
return action.item(), probs[action].item()
def update(self, states, actions, rewards, log_probs):
"""Update policy using REINFORCE"""
returns = []
G = 0
for reward in reversed(rewards):
G = reward + 0.99 * G
returns.insert(0, G)
returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
policy_loss = []
for log_prob, G in zip(log_probs, returns):
policy_loss.append(-log_prob * G)
loss = torch.stack(policy_loss).sum()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# Example
# q_learning = QLearning(state_dim=4, action_dim=2)
# policy_gradient = PolicyGradient(state_dim=4, action_dim=2)

RL Training Strategies

Experience replay stores past experiences. It breaks correlation between samples. It improves sample efficiency. It enables off-policy learning. It requires memory buffer.

Target networks stabilize learning. Separate network for target values. Target network updates slowly. It reduces training instability. It improves convergence.

# RL Training Strategies
from collections import deque
import random
class ExperienceReplay:
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
"""Store experience"""
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
"""Sample batch of experiences"""
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return states, actions, rewards, next_states, dones
def __len__(self):
return len(self.buffer)
class DQNWithReplay:
"""Deep Q-Network with experience replay"""
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.q_network = QNetwork(state_dim, action_dim)
self.target_network = QNetwork(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.gamma = gamma
self.replay_buffer = ExperienceReplay()
self.update_target_frequency = 100
self.steps = 0
def update(self, batch_size=32):
"""Update using experience replay"""
if len(self.replay_buffer) < batch_size:
return
states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.BoolTensor(dones)
current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
with torch.no_grad():
next_q = self.target_network(next_states).max(1)[0]
target_q = rewards + self.gamma * next_q * (~dones)
loss = nn.functional.mse_loss(current_q.squeeze(), target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.steps += 1
if self.steps % self.update_target_frequency == 0:
self.target_network.load_state_dict(self.q_network.state_dict())
return loss.item()
# Example
# dqn = DQNWithReplay(state_dim=4, action_dim=2)
# dqn.replay_buffer.push(state, action, reward, next_state, done)
# loss = dqn.update(batch_size=32)

Real-World Application Examples

E-commerce recommendation systems use multi-vector embeddings. Product embeddings capture multiple aspects. User embeddings capture preferences. Temporal search prioritizes recent products. Ensemble methods combine multiple recommenders. This improves recommendation quality.

Search engines use hybrid retrieval. Keyword search handles exact matches. Semantic search handles meaning. Reranking improves order. Multi-stage retrieval balances speed and accuracy. This provides comprehensive results.

# Real-World Application: E-commerce Recommendation
class ECommerceRecommendation:
def __init__(self):
self.product_embeddings = {} # Multiple embeddings per product
self.user_embeddings = {}
self.retriever = HybridRetriever()
self.reranker = CrossEncoderReranker()
def get_recommendations(self, user_id, query=None, top_k=10):
"""Get product recommendations"""
# Multi-vector product search
if query:
query_emb = self.embed_query(query)
candidates = self.retriever.retrieve(query_emb, top_k=50)
else:
# User-based recommendations
user_emb = self.user_embeddings[user_id]
candidates = self.retriever.retrieve(user_emb, top_k=50)
# Temporal boosting (prioritize recent products)
candidates = self.apply_temporal_boosting(candidates)
# Rerank
final_recommendations = self.reranker.rerank(query or user_emb, candidates, top_k=top_k)
return final_recommendations
def apply_temporal_boosting(self, candidates, recency_weight=0.3):
"""Boost recent products"""
for candidate in candidates:
days_old = (datetime.now() - candidate.created_date).days
recency_score = 1.0 / (1 + days_old)
candidate.score = (1 - recency_weight) * candidate.score + recency_weight * recency_score
return sorted(candidates, key=lambda x: x.score, reverse=True)
# Real-World Application: Enterprise Search
class EnterpriseSearchSystem:
def __init__(self):
self.semantic_retriever = SemanticRetriever()
self.keyword_retriever = KeywordRetriever()
self.reranker = LearnedToRankReranker()
self.generator = LLMGenerator()
def search(self, query, filters=None, top_k=10):
"""Enterprise search with filters"""
# Hybrid retrieval
semantic_results = self.semantic_retriever.retrieve(query, top_k=50)
keyword_results = self.keyword_retriever.retrieve(query, top_k=50)
# Combine and deduplicate
all_candidates = self.combine_results(semantic_results, keyword_results)
# Apply filters (department, date, type, etc.)
if filters:
all_candidates = self.apply_filters(all_candidates, filters)
# Rerank
reranked = self.reranker.rerank(query, all_candidates, top_k=top_k)
return reranked
def apply_filters(self, candidates, filters):
"""Apply metadata filters"""
filtered = []
for candidate in candidates:
matches = True
for key, value in filters.items():
if candidate.metadata.get(key) != value:
matches = False
break
if matches:
filtered.append(candidate)
return filtered

Summary

Advanced architectures handle complex requirements. Multi-vector embeddings improve coverage. Temporal search handles time. Ensemble methods improve performance. Advanced indexing enables scale. Complex architectures combine techniques. Advanced systems enable sophisticated applications.

References

Related Tutorials