# ESM Workflows and Examples ## Overview This document provides complete, end-to-end examples of common workflows using ESM3 and ESM C. Each workflow includes setup, execution, and analysis code. ## Workflow 1: Novel GFP Design with Chain-of-Thought Design a novel fluorescent protein using ESM3's multimodal generation capabilities. ### Objective Generate a green fluorescent protein (GFP) with specific properties using chain-of-thought reasoning across sequence, structure, and function. ### Complete Implementation ```python from esm.models.esm3 import ESM3 from esm.sdk.api import ESMProtein, GenerationConfig, FunctionAnnotation import matplotlib.pyplot as plt # Setup model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda") # Step 1: Define target properties print("Step 1: Defining target GFP properties...") # Create protein with desired function target_length = 238 # Typical GFP length protein = ESMProtein( sequence="_" * target_length, function_annotations=[ FunctionAnnotation( label="green_fluorescent_protein", start=65, end=75 # Chromophore region ) ] ) # Step 2: Generate initial sequence with function conditioning print("Step 2: Generating initial sequence...") config = GenerationConfig( track="sequence", num_steps=target_length // 3, # Gradual generation temperature=0.7 # Moderate diversity ) protein = model.generate(protein, config) print(f"Generated sequence: {protein.sequence[:50]}...") # Step 3: Predict structure print("Step 3: Predicting structure...") config = GenerationConfig( track="structure", num_steps=target_length // 2 ) protein = model.generate(protein, config) print(f"Structure predicted, coordinates shape: {protein.coordinates.shape}") # Step 4: Refine sequence based on structure print("Step 4: Refining sequence based on structure...") # Mask regions for refinement (e.g., surface residues) sequence_list = list(protein.sequence) # Keep chromophore region, refine others for i in range(0, 65): if i % 3 == 0: # Refine every third position sequence_list[i] = '_' for i in range(75, target_length): if i % 3 == 0: sequence_list[i] = '_' protein.sequence = ''.join(sequence_list) config = GenerationConfig( track="sequence", num_steps=50, temperature=0.5 # Lower temperature for refinement ) protein = model.generate(protein, config) # Step 5: Final validation print("Step 5: Final validation...") # Predict final structure config = GenerationConfig(track="structure", num_steps=30) protein = model.generate(protein, config) # Save results with open("novel_gfp.pdb", "w") as f: f.write(protein.to_pdb()) with open("novel_gfp_sequence.txt", "w") as f: f.write(f">Novel_GFP\n{protein.sequence}\n") print(f"\nFinal GFP sequence:\n{protein.sequence}") print(f"\nFunction annotations: {protein.function_annotations}") print(f"Structure saved to: novel_gfp.pdb") ``` ### Validation Steps ```python # Analyze designed GFP def analyze_gfp(protein): """Analyze generated GFP properties.""" # Check chromophore region (should be around Ser65-Tyr66-Gly67) chromophore_region = protein.sequence[64:68] print(f"Chromophore region: {chromophore_region}") # Check barrel structure (GFPs have beta-barrel) # Analyze secondary structure if available if protein.secondary_structure: beta_content = protein.secondary_structure.count('E') / len(protein.sequence) print(f"Beta sheet content: {beta_content:.2%}") # Check sequence similarity to known GFPs # (Would require BLAST or alignment tool in practice) return { 'length': len(protein.sequence), 'chromophore': chromophore_region, 'coordinates_available': protein.coordinates is not None } analysis = analyze_gfp(protein) print(f"\nAnalysis results: {analysis}") ``` ## Workflow 2: Protein Variant Library Generation Generate and analyze a library of protein variants for directed evolution. ### Objective Create variants of a parent protein by targeted mutagenesis while maintaining structural integrity. ### Complete Implementation ```python from esm.models.esm3 import ESM3 from esm.sdk.api import ESMProtein, GenerationConfig import numpy as np from sklearn.cluster import KMeans # Setup model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda") # Parent protein parent_sequence = "MPRTKEINDAGLIVHSPQWFYKARNDTESLGKIVHEFPM" parent_protein = ESMProtein(sequence=parent_sequence) # Define mutation parameters num_variants = 50 positions_to_mutate = 5 # Number of positions per variant # Step 1: Generate variant library print("Generating variant library...") variants = [] for i in range(num_variants): # Create masked sequence with random positions seq_list = list(parent_sequence) # Select random positions to mutate mutation_positions = np.random.choice( len(seq_list), size=positions_to_mutate, replace=False ) for pos in mutation_positions: seq_list[pos] = '_' # Generate variant variant_protein = ESMProtein(sequence=''.join(seq_list)) config = GenerationConfig( track="sequence", num_steps=positions_to_mutate * 2, temperature=0.8 # Higher diversity ) variant = model.generate(variant_protein, config) variants.append(variant.sequence) if (i + 1) % 10 == 0: print(f"Generated {i + 1}/{num_variants} variants") print(f"\nGenerated {len(variants)} variants") # Step 2: Predict structures for variants print("\nPredicting structures...") variant_proteins_with_structure = [] for i, seq in enumerate(variants): protein = ESMProtein(sequence=seq) config = GenerationConfig( track="structure", num_steps=len(seq) // 2 ) protein_with_structure = model.generate(protein, config) variant_proteins_with_structure.append(protein_with_structure) if (i + 1) % 10 == 0: print(f"Predicted structures for {i + 1}/{len(variants)} variants") # Step 3: Analyze variant diversity print("\nAnalyzing variant diversity...") # Calculate Hamming distances from parent def hamming_distance(seq1, seq2): """Calculate Hamming distance between sequences.""" return sum(c1 != c2 for c1, c2 in zip(seq1, seq2)) distances = [hamming_distance(parent_sequence, var) for var in variants] print(f"Average mutations per variant: {np.mean(distances):.1f}") print(f"Mutation range: {min(distances)}-{max(distances)}") # Step 4: Get embeddings for clustering print("\nGenerating embeddings for clustering...") from esm.models.esmc import ESMC embedding_model = ESMC.from_pretrained("esmc-300m").to("cuda") def get_embedding(sequence): """Get mean-pooled embedding for sequence.""" protein = ESMProtein(sequence=sequence) tensor = embedding_model.encode(protein) emb = embedding_model.forward(tensor) return emb.mean(dim=1).cpu().detach().numpy().flatten() variant_embeddings = np.array([get_embedding(seq) for seq in variants]) # Step 5: Cluster variants print("Clustering variants...") n_clusters = 5 kmeans = KMeans(n_clusters=n_clusters, random_state=42) cluster_labels = kmeans.fit_predict(variant_embeddings) # Analyze clusters print("\nCluster analysis:") for i in range(n_clusters): cluster_variants = [var for var, label in zip(variants, cluster_labels) if label == i] cluster_distances = [hamming_distance(parent_sequence, var) for var in cluster_variants] print(f"\nCluster {i}:") print(f" Size: {len(cluster_variants)}") print(f" Avg distance from parent: {np.mean(cluster_distances):.1f}") print(f" Representative: {cluster_variants[0][:40]}...") # Step 6: Select diverse representatives print("\nSelecting diverse representatives...") representatives = [] for i in range(n_clusters): # Get centroid cluster_indices = np.where(cluster_labels == i)[0] cluster_embs = variant_embeddings[cluster_indices] # Find closest to centroid centroid = cluster_embs.mean(axis=0) distances_to_centroid = np.linalg.norm(cluster_embs - centroid, axis=1) rep_idx = cluster_indices[np.argmin(distances_to_centroid)] representatives.append(variants[rep_idx]) # Save results print("\nSaving results...") with open("variant_library.fasta", "w") as f: f.write(f">Parent\n{parent_sequence}\n\n") for i, var in enumerate(variants): f.write(f">Variant_{i+1}_Cluster_{cluster_labels[i]}\n{var}\n") with open("representative_variants.fasta", "w") as f: for i, rep in enumerate(representatives): f.write(f">Representative_Cluster_{i}\n{rep}\n") print("Variant library saved to: variant_library.fasta") print("Representatives saved to: representative_variants.fasta") ``` ## Workflow 3: Structure-Based Sequence Optimization Optimize a protein sequence to improve stability while maintaining function. ### Objective Given a protein structure, design sequences that maintain the fold but have improved properties. ### Complete Implementation ```python from esm.models.esm3 import ESM3 from esm.sdk.api import ESMProtein, GenerationConfig import numpy as np # Setup model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda") # Load target structure (e.g., from PDB) target_protein = ESMProtein.from_pdb("target_structure.pdb") original_sequence = target_protein.sequence print(f"Original sequence: {original_sequence}") print(f"Structure loaded: {target_protein.coordinates.shape}") # Step 1: Generate multiple sequence designs print("\nGenerating optimized sequences...") num_designs = 20 optimized_sequences = [] for i in range(num_designs): # Start with structure, remove sequence design_protein = ESMProtein( coordinates=target_protein.coordinates.copy(), secondary_structure=target_protein.secondary_structure ) # Generate sequence for this structure config = GenerationConfig( track="sequence", num_steps=len(original_sequence), temperature=0.7, condition_on_coordinates_only=True ) designed = model.generate(design_protein, config) optimized_sequences.append(designed.sequence) if (i + 1) % 5 == 0: print(f"Generated {i + 1}/{num_designs} designs") # Step 2: Validate structural compatibility print("\nValidating structural compatibility...") validated_designs = [] for seq in optimized_sequences: # Predict structure for designed sequence test_protein = ESMProtein(sequence=seq) config = GenerationConfig( track="structure", num_steps=len(seq) // 2 ) predicted = model.generate(test_protein, config) # Calculate RMSD (simplified - in practice use proper alignment) # Here we just check if structure prediction succeeds if predicted.coordinates is not None: validated_designs.append(seq) print(f"Validated {len(validated_designs)}/{num_designs} designs") # Step 3: Analyze sequence properties print("\nAnalyzing sequence properties...") def calculate_properties(sequence): """Calculate basic sequence properties.""" # Hydrophobicity (simplified) hydrophobic = "AILMFWYV" hydrophobic_fraction = sum(1 for aa in sequence if aa in hydrophobic) / len(sequence) # Charge positive = "KR" negative = "DE" net_charge = sum(1 for aa in sequence if aa in positive) - sum(1 for aa in sequence if aa in negative) # Aromatic content aromatic = "FWY" aromatic_fraction = sum(1 for aa in sequence if aa in aromatic) / len(sequence) return { 'hydrophobic_fraction': hydrophobic_fraction, 'net_charge': net_charge, 'aromatic_fraction': aromatic_fraction } # Compare to original original_props = calculate_properties(original_sequence) print(f"\nOriginal properties:") print(f" Hydrophobic: {original_props['hydrophobic_fraction']:.2%}") print(f" Net charge: {original_props['net_charge']:+d}") print(f" Aromatic: {original_props['aromatic_fraction']:.2%}") # Analyze designs design_properties = [calculate_properties(seq) for seq in validated_designs] avg_hydrophobic = np.mean([p['hydrophobic_fraction'] for p in design_properties]) avg_charge = np.mean([p['net_charge'] for p in design_properties]) avg_aromatic = np.mean([p['aromatic_fraction'] for p in design_properties]) print(f"\nDesigned sequences (average):") print(f" Hydrophobic: {avg_hydrophobic:.2%}") print(f" Net charge: {avg_charge:+.1f}") print(f" Aromatic: {avg_aromatic:.2%}") # Step 4: Rank designs print("\nRanking designs...") def score_design(sequence, original_props): """Score design based on desired properties.""" props = calculate_properties(sequence) # Prefer higher hydrophobic content (for stability) hydrophobic_score = props['hydrophobic_fraction'] # Prefer similar charge to original charge_score = 1.0 / (1.0 + abs(props['net_charge'] - original_props['net_charge'])) # Combined score return hydrophobic_score * 0.6 + charge_score * 0.4 scores = [(seq, score_design(seq, original_props)) for seq in validated_designs] scores.sort(key=lambda x: x[1], reverse=True) print("\nTop 5 designs:") for i, (seq, score) in enumerate(scores[:5]): print(f"\n{i+1}. Score: {score:.3f}") print(f" Sequence: {seq[:40]}...") # Step 5: Save results print("\nSaving results...") with open("optimized_sequences.fasta", "w") as f: f.write(f">Original\n{original_sequence}\n\n") for i, (seq, score) in enumerate(scores): props = calculate_properties(seq) f.write(f">Design_{i+1}_Score_{score:.3f}\n") f.write(f"# Hydrophobic: {props['hydrophobic_fraction']:.2%}, ") f.write(f"Charge: {props['net_charge']:+d}, ") f.write(f"Aromatic: {props['aromatic_fraction']:.2%}\n") f.write(f"{seq}\n\n") print("Results saved to: optimized_sequences.fasta") ``` ## Workflow 4: Function Prediction Pipeline Predict protein function from sequence using ESM3 and ESM C. ### Objective Build a pipeline that predicts protein function using both generative (ESM3) and embedding (ESM C) approaches. ### Complete Implementation ```python from esm.models.esm3 import ESM3 from esm.models.esmc import ESMC from esm.sdk.api import ESMProtein, GenerationConfig import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import cross_val_score # Setup models esm3_model = ESM3.from_pretrained("esm3-sm-open-v1").to("cuda") esmc_model = ESMC.from_pretrained("esmc-600m").to("cuda") # Example: Predict if protein is an enzyme # (In practice, you'd have a labeled training set) def predict_function_generative(sequence): """Predict function using ESM3 generative approach.""" protein = ESMProtein(sequence=sequence) # Generate function annotations config = GenerationConfig( track="function", num_steps=20, temperature=0.3 # Low temperature for confident predictions ) protein_with_function = esm3_model.generate(protein, config) return protein_with_function.function_annotations def predict_function_embedding(sequence, function_classifier): """Predict function using ESM C embeddings + classifier.""" # Get embedding protein = ESMProtein(sequence=sequence) tensor = esmc_model.encode(protein) embedding = esmc_model.forward(tensor) # Mean pool embedding_pooled = embedding.mean(dim=1).cpu().detach().numpy() # Predict with classifier prediction = function_classifier.predict(embedding_pooled) probability = function_classifier.predict_proba(embedding_pooled) return prediction[0], probability[0] # Example workflow with test sequences test_sequences = { "kinase": "MPRTKEINDAGLIVHSPQWFYKARNDTESLGKIVHEF", "protease": "AGLIVHSPQWFYKARNDTESLGKIVHEFPMCDEGH", "transporter": "KTEFLNDGRPMLIVHSPQWFYKARNDTESLGKIVH" } print("Predicting functions...\n") for name, sequence in test_sequences.items(): print(f"{name.upper()}:") print(f"Sequence: {sequence[:30]}...") # Method 1: Generative functions = predict_function_generative(sequence) print(f" Generative predictions: {functions}") # Method 2: Embedding-based would require trained classifier # (Skipped in this example as it needs training data) print() ``` ## Workflow 5: Embedding-Based Clustering and Analysis Cluster and analyze a large protein dataset using ESM C embeddings. ### Complete Implementation ```python from esm.models.esmc import ESMC from esm.sdk.api import ESMProtein import numpy as np from sklearn.cluster import DBSCAN from sklearn.decomposition import PCA from sklearn.manifold import TSNE import matplotlib.pyplot as plt # Setup model = ESMC.from_pretrained("esmc-600m").to("cuda") # Load protein dataset (example) sequences = [ # In practice, load from FASTA or database "MPRTKEINDAGLIVHSPQWFYK", "AGLIVHSPQWFYKARNDTESL", # ... more sequences ] print(f"Loaded {len(sequences)} sequences") # Step 1: Generate embeddings print("Generating embeddings...") embeddings = [] for i, seq in enumerate(sequences): protein = ESMProtein(sequence=seq) tensor = model.encode(protein) emb = model.forward(tensor) # Mean pooling emb_pooled = emb.mean(dim=1).cpu().detach().numpy().flatten() embeddings.append(emb_pooled) if (i + 1) % 100 == 0: print(f"Processed {i + 1}/{len(sequences)}") embeddings = np.array(embeddings) print(f"Embeddings shape: {embeddings.shape}") # Step 2: Dimensionality reduction for visualization print("\nReducing dimensionality...") # PCA for initial reduction pca = PCA(n_components=50) embeddings_pca = pca.fit_transform(embeddings) print(f"PCA explained variance: {pca.explained_variance_ratio_[:10].sum():.2%}") # t-SNE for visualization tsne = TSNE(n_components=2, random_state=42) embeddings_2d = tsne.fit_transform(embeddings_pca) # Step 3: Clustering print("\nClustering...") # DBSCAN for density-based clustering clustering = DBSCAN(eps=0.5, min_samples=5) cluster_labels = clustering.fit_predict(embeddings) n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0) n_noise = list(cluster_labels).count(-1) print(f"Number of clusters: {n_clusters}") print(f"Number of noise points: {n_noise}") # Step 4: Visualize print("\nGenerating visualization...") plt.figure(figsize=(12, 8)) scatter = plt.scatter( embeddings_2d[:, 0], embeddings_2d[:, 1], c=cluster_labels, cmap='viridis', alpha=0.6 ) plt.colorbar(scatter) plt.title("Protein Sequence Clustering (ESM C Embeddings)") plt.xlabel("t-SNE 1") plt.ylabel("t-SNE 2") plt.savefig("protein_clusters.png", dpi=300, bbox_inches='tight') print("Visualization saved to: protein_clusters.png") # Step 5: Analyze clusters print("\nCluster analysis:") for cluster_id in range(n_clusters): cluster_indices = np.where(cluster_labels == cluster_id)[0] cluster_seqs = [sequences[i] for i in cluster_indices] print(f"\nCluster {cluster_id}:") print(f" Size: {len(cluster_seqs)}") print(f" Avg length: {np.mean([len(s) for s in cluster_seqs]):.1f}") print(f" Example: {cluster_seqs[0][:40]}...") # Save cluster assignments with open("cluster_assignments.txt", "w") as f: for i, (seq, label) in enumerate(zip(sequences, cluster_labels)): f.write(f"Sequence_{i}\tCluster_{label}\t{seq}\n") print("\nCluster assignments saved to: cluster_assignments.txt") ``` ## Additional Workflow Tips ### Memory Management for Large Datasets ```python def process_large_dataset(sequences, batch_size=32): """Process large dataset with memory management.""" import gc import torch results = [] for i in range(0, len(sequences), batch_size): batch = sequences[i:i + batch_size] # Process batch batch_results = [process_sequence(seq) for seq in batch] results.extend(batch_results) # Clear memory torch.cuda.empty_cache() gc.collect() if (i + batch_size) % 100 == 0: print(f"Processed {min(i + batch_size, len(sequences))}/{len(sequences)}") return results ``` ### Parallel Processing ```python from concurrent.futures import ThreadPoolExecutor import asyncio def parallel_workflow(sequences, n_workers=4): """Process sequences in parallel.""" with ThreadPoolExecutor(max_workers=n_workers) as executor: results = list(executor.map(process_sequence, sequences)) return results ``` These workflows provide comprehensive examples for common ESM use cases. Adapt them to your specific needs and always validate results with appropriate biological experiments.