Files
2025-11-30 08:30:10 +08:00

686 lines
20 KiB
Markdown

# ESM Workflows and Examples
## Overview
This document provides complete, end-to-end examples of common workflows using ESM3 and ESM C. Each workflow includes setup, execution, and analysis code.
## Workflow 1: Novel GFP Design with Chain-of-Thought
Design a novel fluorescent protein using ESM3's multimodal generation capabilities.
### Objective
Generate a green fluorescent protein (GFP) with specific properties using chain-of-thought reasoning across sequence, structure, and function.
### Complete Implementation
```python
from esm.models.esm3 import ESM3
from esm.sdk.api import ESMProtein, GenerationConfig, FunctionAnnotation
import matplotlib.pyplot as plt
# Setup
model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda")
# Step 1: Define target properties
print("Step 1: Defining target GFP properties...")
# Create protein with desired function
target_length = 238 # Typical GFP length
protein = ESMProtein(
sequence="_" * target_length,
function_annotations=[
FunctionAnnotation(
label="green_fluorescent_protein",
start=65,
end=75 # Chromophore region
)
]
)
# Step 2: Generate initial sequence with function conditioning
print("Step 2: Generating initial sequence...")
config = GenerationConfig(
track="sequence",
num_steps=target_length // 3, # Gradual generation
temperature=0.7 # Moderate diversity
)
protein = model.generate(protein, config)
print(f"Generated sequence: {protein.sequence[:50]}...")
# Step 3: Predict structure
print("Step 3: Predicting structure...")
config = GenerationConfig(
track="structure",
num_steps=target_length // 2
)
protein = model.generate(protein, config)
print(f"Structure predicted, coordinates shape: {protein.coordinates.shape}")
# Step 4: Refine sequence based on structure
print("Step 4: Refining sequence based on structure...")
# Mask regions for refinement (e.g., surface residues)
sequence_list = list(protein.sequence)
# Keep chromophore region, refine others
for i in range(0, 65):
if i % 3 == 0: # Refine every third position
sequence_list[i] = '_'
for i in range(75, target_length):
if i % 3 == 0:
sequence_list[i] = '_'
protein.sequence = ''.join(sequence_list)
config = GenerationConfig(
track="sequence",
num_steps=50,
temperature=0.5 # Lower temperature for refinement
)
protein = model.generate(protein, config)
# Step 5: Final validation
print("Step 5: Final validation...")
# Predict final structure
config = GenerationConfig(track="structure", num_steps=30)
protein = model.generate(protein, config)
# Save results
with open("novel_gfp.pdb", "w") as f:
f.write(protein.to_pdb())
with open("novel_gfp_sequence.txt", "w") as f:
f.write(f">Novel_GFP\n{protein.sequence}\n")
print(f"\nFinal GFP sequence:\n{protein.sequence}")
print(f"\nFunction annotations: {protein.function_annotations}")
print(f"Structure saved to: novel_gfp.pdb")
```
### Validation Steps
```python
# Analyze designed GFP
def analyze_gfp(protein):
"""Analyze generated GFP properties."""
# Check chromophore region (should be around Ser65-Tyr66-Gly67)
chromophore_region = protein.sequence[64:68]
print(f"Chromophore region: {chromophore_region}")
# Check barrel structure (GFPs have beta-barrel)
# Analyze secondary structure if available
if protein.secondary_structure:
beta_content = protein.secondary_structure.count('E') / len(protein.sequence)
print(f"Beta sheet content: {beta_content:.2%}")
# Check sequence similarity to known GFPs
# (Would require BLAST or alignment tool in practice)
return {
'length': len(protein.sequence),
'chromophore': chromophore_region,
'coordinates_available': protein.coordinates is not None
}
analysis = analyze_gfp(protein)
print(f"\nAnalysis results: {analysis}")
```
## Workflow 2: Protein Variant Library Generation
Generate and analyze a library of protein variants for directed evolution.
### Objective
Create variants of a parent protein by targeted mutagenesis while maintaining structural integrity.
### Complete Implementation
```python
from esm.models.esm3 import ESM3
from esm.sdk.api import ESMProtein, GenerationConfig
import numpy as np
from sklearn.cluster import KMeans
# Setup
model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda")
# Parent protein
parent_sequence = "MPRTKEINDAGLIVHSPQWFYKARNDTESLGKIVHEFPM"
parent_protein = ESMProtein(sequence=parent_sequence)
# Define mutation parameters
num_variants = 50
positions_to_mutate = 5 # Number of positions per variant
# Step 1: Generate variant library
print("Generating variant library...")
variants = []
for i in range(num_variants):
# Create masked sequence with random positions
seq_list = list(parent_sequence)
# Select random positions to mutate
mutation_positions = np.random.choice(
len(seq_list),
size=positions_to_mutate,
replace=False
)
for pos in mutation_positions:
seq_list[pos] = '_'
# Generate variant
variant_protein = ESMProtein(sequence=''.join(seq_list))
config = GenerationConfig(
track="sequence",
num_steps=positions_to_mutate * 2,
temperature=0.8 # Higher diversity
)
variant = model.generate(variant_protein, config)
variants.append(variant.sequence)
if (i + 1) % 10 == 0:
print(f"Generated {i + 1}/{num_variants} variants")
print(f"\nGenerated {len(variants)} variants")
# Step 2: Predict structures for variants
print("\nPredicting structures...")
variant_proteins_with_structure = []
for i, seq in enumerate(variants):
protein = ESMProtein(sequence=seq)
config = GenerationConfig(
track="structure",
num_steps=len(seq) // 2
)
protein_with_structure = model.generate(protein, config)
variant_proteins_with_structure.append(protein_with_structure)
if (i + 1) % 10 == 0:
print(f"Predicted structures for {i + 1}/{len(variants)} variants")
# Step 3: Analyze variant diversity
print("\nAnalyzing variant diversity...")
# Calculate Hamming distances from parent
def hamming_distance(seq1, seq2):
"""Calculate Hamming distance between sequences."""
return sum(c1 != c2 for c1, c2 in zip(seq1, seq2))
distances = [hamming_distance(parent_sequence, var) for var in variants]
print(f"Average mutations per variant: {np.mean(distances):.1f}")
print(f"Mutation range: {min(distances)}-{max(distances)}")
# Step 4: Get embeddings for clustering
print("\nGenerating embeddings for clustering...")
from esm.models.esmc import ESMC
embedding_model = ESMC.from_pretrained("esmc-300m").to("cuda")
def get_embedding(sequence):
"""Get mean-pooled embedding for sequence."""
protein = ESMProtein(sequence=sequence)
tensor = embedding_model.encode(protein)
emb = embedding_model.forward(tensor)
return emb.mean(dim=1).cpu().detach().numpy().flatten()
variant_embeddings = np.array([get_embedding(seq) for seq in variants])
# Step 5: Cluster variants
print("Clustering variants...")
n_clusters = 5
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
cluster_labels = kmeans.fit_predict(variant_embeddings)
# Analyze clusters
print("\nCluster analysis:")
for i in range(n_clusters):
cluster_variants = [var for var, label in zip(variants, cluster_labels) if label == i]
cluster_distances = [hamming_distance(parent_sequence, var) for var in cluster_variants]
print(f"\nCluster {i}:")
print(f" Size: {len(cluster_variants)}")
print(f" Avg distance from parent: {np.mean(cluster_distances):.1f}")
print(f" Representative: {cluster_variants[0][:40]}...")
# Step 6: Select diverse representatives
print("\nSelecting diverse representatives...")
representatives = []
for i in range(n_clusters):
# Get centroid
cluster_indices = np.where(cluster_labels == i)[0]
cluster_embs = variant_embeddings[cluster_indices]
# Find closest to centroid
centroid = cluster_embs.mean(axis=0)
distances_to_centroid = np.linalg.norm(cluster_embs - centroid, axis=1)
rep_idx = cluster_indices[np.argmin(distances_to_centroid)]
representatives.append(variants[rep_idx])
# Save results
print("\nSaving results...")
with open("variant_library.fasta", "w") as f:
f.write(f">Parent\n{parent_sequence}\n\n")
for i, var in enumerate(variants):
f.write(f">Variant_{i+1}_Cluster_{cluster_labels[i]}\n{var}\n")
with open("representative_variants.fasta", "w") as f:
for i, rep in enumerate(representatives):
f.write(f">Representative_Cluster_{i}\n{rep}\n")
print("Variant library saved to: variant_library.fasta")
print("Representatives saved to: representative_variants.fasta")
```
## Workflow 3: Structure-Based Sequence Optimization
Optimize a protein sequence to improve stability while maintaining function.
### Objective
Given a protein structure, design sequences that maintain the fold but have improved properties.
### Complete Implementation
```python
from esm.models.esm3 import ESM3
from esm.sdk.api import ESMProtein, GenerationConfig
import numpy as np
# Setup
model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda")
# Load target structure (e.g., from PDB)
target_protein = ESMProtein.from_pdb("target_structure.pdb")
original_sequence = target_protein.sequence
print(f"Original sequence: {original_sequence}")
print(f"Structure loaded: {target_protein.coordinates.shape}")
# Step 1: Generate multiple sequence designs
print("\nGenerating optimized sequences...")
num_designs = 20
optimized_sequences = []
for i in range(num_designs):
# Start with structure, remove sequence
design_protein = ESMProtein(
coordinates=target_protein.coordinates.copy(),
secondary_structure=target_protein.secondary_structure
)
# Generate sequence for this structure
config = GenerationConfig(
track="sequence",
num_steps=len(original_sequence),
temperature=0.7,
condition_on_coordinates_only=True
)
designed = model.generate(design_protein, config)
optimized_sequences.append(designed.sequence)
if (i + 1) % 5 == 0:
print(f"Generated {i + 1}/{num_designs} designs")
# Step 2: Validate structural compatibility
print("\nValidating structural compatibility...")
validated_designs = []
for seq in optimized_sequences:
# Predict structure for designed sequence
test_protein = ESMProtein(sequence=seq)
config = GenerationConfig(
track="structure",
num_steps=len(seq) // 2
)
predicted = model.generate(test_protein, config)
# Calculate RMSD (simplified - in practice use proper alignment)
# Here we just check if structure prediction succeeds
if predicted.coordinates is not None:
validated_designs.append(seq)
print(f"Validated {len(validated_designs)}/{num_designs} designs")
# Step 3: Analyze sequence properties
print("\nAnalyzing sequence properties...")
def calculate_properties(sequence):
"""Calculate basic sequence properties."""
# Hydrophobicity (simplified)
hydrophobic = "AILMFWYV"
hydrophobic_fraction = sum(1 for aa in sequence if aa in hydrophobic) / len(sequence)
# Charge
positive = "KR"
negative = "DE"
net_charge = sum(1 for aa in sequence if aa in positive) - sum(1 for aa in sequence if aa in negative)
# Aromatic content
aromatic = "FWY"
aromatic_fraction = sum(1 for aa in sequence if aa in aromatic) / len(sequence)
return {
'hydrophobic_fraction': hydrophobic_fraction,
'net_charge': net_charge,
'aromatic_fraction': aromatic_fraction
}
# Compare to original
original_props = calculate_properties(original_sequence)
print(f"\nOriginal properties:")
print(f" Hydrophobic: {original_props['hydrophobic_fraction']:.2%}")
print(f" Net charge: {original_props['net_charge']:+d}")
print(f" Aromatic: {original_props['aromatic_fraction']:.2%}")
# Analyze designs
design_properties = [calculate_properties(seq) for seq in validated_designs]
avg_hydrophobic = np.mean([p['hydrophobic_fraction'] for p in design_properties])
avg_charge = np.mean([p['net_charge'] for p in design_properties])
avg_aromatic = np.mean([p['aromatic_fraction'] for p in design_properties])
print(f"\nDesigned sequences (average):")
print(f" Hydrophobic: {avg_hydrophobic:.2%}")
print(f" Net charge: {avg_charge:+.1f}")
print(f" Aromatic: {avg_aromatic:.2%}")
# Step 4: Rank designs
print("\nRanking designs...")
def score_design(sequence, original_props):
"""Score design based on desired properties."""
props = calculate_properties(sequence)
# Prefer higher hydrophobic content (for stability)
hydrophobic_score = props['hydrophobic_fraction']
# Prefer similar charge to original
charge_score = 1.0 / (1.0 + abs(props['net_charge'] - original_props['net_charge']))
# Combined score
return hydrophobic_score * 0.6 + charge_score * 0.4
scores = [(seq, score_design(seq, original_props)) for seq in validated_designs]
scores.sort(key=lambda x: x[1], reverse=True)
print("\nTop 5 designs:")
for i, (seq, score) in enumerate(scores[:5]):
print(f"\n{i+1}. Score: {score:.3f}")
print(f" Sequence: {seq[:40]}...")
# Step 5: Save results
print("\nSaving results...")
with open("optimized_sequences.fasta", "w") as f:
f.write(f">Original\n{original_sequence}\n\n")
for i, (seq, score) in enumerate(scores):
props = calculate_properties(seq)
f.write(f">Design_{i+1}_Score_{score:.3f}\n")
f.write(f"# Hydrophobic: {props['hydrophobic_fraction']:.2%}, ")
f.write(f"Charge: {props['net_charge']:+d}, ")
f.write(f"Aromatic: {props['aromatic_fraction']:.2%}\n")
f.write(f"{seq}\n\n")
print("Results saved to: optimized_sequences.fasta")
```
## Workflow 4: Function Prediction Pipeline
Predict protein function from sequence using ESM3 and ESM C.
### Objective
Build a pipeline that predicts protein function using both generative (ESM3) and embedding (ESM C) approaches.
### Complete Implementation
```python
from esm.models.esm3 import ESM3
from esm.models.esmc import ESMC
from esm.sdk.api import ESMProtein, GenerationConfig
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
# Setup models
esm3_model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda")
esmc_model = ESMC.from_pretrained("esmc-600m").to("cuda")
# Example: Predict if protein is an enzyme
# (In practice, you'd have a labeled training set)
def predict_function_generative(sequence):
"""Predict function using ESM3 generative approach."""
protein = ESMProtein(sequence=sequence)
# Generate function annotations
config = GenerationConfig(
track="function",
num_steps=20,
temperature=0.3 # Low temperature for confident predictions
)
protein_with_function = esm3_model.generate(protein, config)
return protein_with_function.function_annotations
def predict_function_embedding(sequence, function_classifier):
"""Predict function using ESM C embeddings + classifier."""
# Get embedding
protein = ESMProtein(sequence=sequence)
tensor = esmc_model.encode(protein)
embedding = esmc_model.forward(tensor)
# Mean pool
embedding_pooled = embedding.mean(dim=1).cpu().detach().numpy()
# Predict with classifier
prediction = function_classifier.predict(embedding_pooled)
probability = function_classifier.predict_proba(embedding_pooled)
return prediction[0], probability[0]
# Example workflow with test sequences
test_sequences = {
"kinase": "MPRTKEINDAGLIVHSPQWFYKARNDTESLGKIVHEF",
"protease": "AGLIVHSPQWFYKARNDTESLGKIVHEFPMCDEGH",
"transporter": "KTEFLNDGRPMLIVHSPQWFYKARNDTESLGKIVH"
}
print("Predicting functions...\n")
for name, sequence in test_sequences.items():
print(f"{name.upper()}:")
print(f"Sequence: {sequence[:30]}...")
# Method 1: Generative
functions = predict_function_generative(sequence)
print(f" Generative predictions: {functions}")
# Method 2: Embedding-based would require trained classifier
# (Skipped in this example as it needs training data)
print()
```
## Workflow 5: Embedding-Based Clustering and Analysis
Cluster and analyze a large protein dataset using ESM C embeddings.
### Complete Implementation
```python
from esm.models.esmc import ESMC
from esm.sdk.api import ESMProtein
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
# Setup
model = ESMC.from_pretrained("esmc-600m").to("cuda")
# Load protein dataset (example)
sequences = [
# In practice, load from FASTA or database
"MPRTKEINDAGLIVHSPQWFYK",
"AGLIVHSPQWFYKARNDTESL",
# ... more sequences
]
print(f"Loaded {len(sequences)} sequences")
# Step 1: Generate embeddings
print("Generating embeddings...")
embeddings = []
for i, seq in enumerate(sequences):
protein = ESMProtein(sequence=seq)
tensor = model.encode(protein)
emb = model.forward(tensor)
# Mean pooling
emb_pooled = emb.mean(dim=1).cpu().detach().numpy().flatten()
embeddings.append(emb_pooled)
if (i + 1) % 100 == 0:
print(f"Processed {i + 1}/{len(sequences)}")
embeddings = np.array(embeddings)
print(f"Embeddings shape: {embeddings.shape}")
# Step 2: Dimensionality reduction for visualization
print("\nReducing dimensionality...")
# PCA for initial reduction
pca = PCA(n_components=50)
embeddings_pca = pca.fit_transform(embeddings)
print(f"PCA explained variance: {pca.explained_variance_ratio_[:10].sum():.2%}")
# t-SNE for visualization
tsne = TSNE(n_components=2, random_state=42)
embeddings_2d = tsne.fit_transform(embeddings_pca)
# Step 3: Clustering
print("\nClustering...")
# DBSCAN for density-based clustering
clustering = DBSCAN(eps=0.5, min_samples=5)
cluster_labels = clustering.fit_predict(embeddings)
n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
n_noise = list(cluster_labels).count(-1)
print(f"Number of clusters: {n_clusters}")
print(f"Number of noise points: {n_noise}")
# Step 4: Visualize
print("\nGenerating visualization...")
plt.figure(figsize=(12, 8))
scatter = plt.scatter(
embeddings_2d[:, 0],
embeddings_2d[:, 1],
c=cluster_labels,
cmap='viridis',
alpha=0.6
)
plt.colorbar(scatter)
plt.title("Protein Sequence Clustering (ESM C Embeddings)")
plt.xlabel("t-SNE 1")
plt.ylabel("t-SNE 2")
plt.savefig("protein_clusters.png", dpi=300, bbox_inches='tight')
print("Visualization saved to: protein_clusters.png")
# Step 5: Analyze clusters
print("\nCluster analysis:")
for cluster_id in range(n_clusters):
cluster_indices = np.where(cluster_labels == cluster_id)[0]
cluster_seqs = [sequences[i] for i in cluster_indices]
print(f"\nCluster {cluster_id}:")
print(f" Size: {len(cluster_seqs)}")
print(f" Avg length: {np.mean([len(s) for s in cluster_seqs]):.1f}")
print(f" Example: {cluster_seqs[0][:40]}...")
# Save cluster assignments
with open("cluster_assignments.txt", "w") as f:
for i, (seq, label) in enumerate(zip(sequences, cluster_labels)):
f.write(f"Sequence_{i}\tCluster_{label}\t{seq}\n")
print("\nCluster assignments saved to: cluster_assignments.txt")
```
## Additional Workflow Tips
### Memory Management for Large Datasets
```python
def process_large_dataset(sequences, batch_size=32):
"""Process large dataset with memory management."""
import gc
import torch
results = []
for i in range(0, len(sequences), batch_size):
batch = sequences[i:i + batch_size]
# Process batch
batch_results = [process_sequence(seq) for seq in batch]
results.extend(batch_results)
# Clear memory
torch.cuda.empty_cache()
gc.collect()
if (i + batch_size) % 100 == 0:
print(f"Processed {min(i + batch_size, len(sequences))}/{len(sequences)}")
return results
```
### Parallel Processing
```python
from concurrent.futures import ThreadPoolExecutor
import asyncio
def parallel_workflow(sequences, n_workers=4):
"""Process sequences in parallel."""
with ThreadPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(process_sequence, sequences))
return results
```
These workflows provide comprehensive examples for common ESM use cases. Adapt them to your specific needs and always validate results with appropriate biological experiments.