--- description: Generate production-ready gRPC services with Protocol Buffers shortcut: grpc --- # Generate gRPC Service Automatically generate high-performance gRPC services with Protocol Buffer definitions, streaming support, load balancing, and comprehensive service implementations for multiple programming languages. ## When to Use This Command Use `/generate-grpc-service` when you need to: - Build high-performance microservices with binary protocol - Implement real-time bidirectional streaming communication - Create strongly-typed service contracts across languages - Build internal services requiring minimal latency - Support multiple programming languages with single definition - Implement efficient mobile/IoT communication protocols DON'T use this when: - Building browser-based web applications (limited browser support) - Simple REST APIs suffice (gRPC adds complexity) - Working with teams unfamiliar with Protocol Buffers - Debugging tools are limited in your environment ## Design Decisions This command implements **gRPC with Protocol Buffers v3** as the primary approach because: - Binary protocol offers 20-30% better performance than JSON - Built-in code generation for 10+ languages - Native support for streaming in all RPC patterns - Strong typing prevents runtime errors - Backward compatibility through field numbering - Built-in service discovery and load balancing **Alternative considered: Apache Thrift** - Similar performance characteristics - Less ecosystem support - Fewer language bindings - Recommended for Facebook ecosystem **Alternative considered: GraphQL with subscriptions** - Better for public APIs - More flexible queries - Higher overhead - Recommended for client-facing APIs ## Prerequisites Before running this command: 1. Protocol Buffer compiler (protoc) installed 2. Language-specific gRPC tools installed 3. Understanding of Protocol Buffer syntax 4. Service architecture defined 5. Authentication strategy determined ## Implementation Process ### Step 1: Define Service Contract Create comprehensive .proto files with service definitions and message types. ### Step 2: Generate Language Bindings Compile Protocol Buffers to target language code with gRPC plugins. ### Step 3: Implement Service Logic Build server-side implementations for all RPC methods. ### Step 4: Add Interceptors Implement cross-cutting concerns like auth, logging, and error handling. ### Step 5: Configure Production Settings Set up TLS, connection pooling, and load balancing. ## Output Format The command generates: - `proto/service.proto` - Protocol Buffer definitions - `server/` - Server implementation with all RPC methods - `client/` - Client library with connection management - `interceptors/` - Authentication, logging, metrics interceptors - `config/` - TLS certificates and configuration - `docs/api.md` - Service documentation ## Code Examples ### Example 1: E-commerce Service with All RPC Patterns ```protobuf // proto/ecommerce.proto syntax = "proto3"; package ecommerce.v1; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; // Service definition with all RPC patterns service ProductService { // Unary RPC rpc GetProduct(GetProductRequest) returns (Product); // Server streaming rpc ListProducts(ListProductsRequest) returns (stream Product); // Client streaming rpc ImportProducts(stream Product) returns (ImportSummary); // Bidirectional streaming rpc WatchInventory(stream InventoryUpdate) returns (stream InventoryChange); // Batch operations rpc BatchGetProducts(BatchGetProductsRequest) returns (BatchGetProductsResponse); } // Message definitions message Product { string id = 1; string name = 2; string description = 3; double price = 4; int32 inventory = 5; repeated string categories = 6; map metadata = 7; google.protobuf.Timestamp created_at = 8; google.protobuf.Timestamp updated_at = 9; enum Status { STATUS_UNSPECIFIED = 0; STATUS_ACTIVE = 1; STATUS_DISCONTINUED = 2; STATUS_OUT_OF_STOCK = 3; } Status status = 10; } message GetProductRequest { string product_id = 1; repeated string fields = 2; // Field mask for partial responses } message ListProductsRequest { string category = 1; int32 page_size = 2; string page_token = 3; string order_by = 4; message Filter { double min_price = 1; double max_price = 2; repeated string tags = 3; } Filter filter = 5; } message ImportSummary { int32 total_received = 1; int32 successful = 2; int32 failed = 3; repeated ImportError errors = 4; } message ImportError { int32 index = 1; string product_id = 2; string error = 3; } message InventoryUpdate { string product_id = 1; int32 quantity_change = 2; string warehouse_id = 3; } message InventoryChange { string product_id = 1; int32 old_quantity = 2; int32 new_quantity = 3; google.protobuf.Timestamp timestamp = 4; string triggered_by = 5; } message BatchGetProductsRequest { repeated string product_ids = 1; repeated string fields = 2; } message BatchGetProductsResponse { repeated Product products = 1; repeated string not_found = 2; } ``` ```go // server/main.go - Go server implementation package main import ( "context" "crypto/tls" "fmt" "io" "log" "net" "sync" "time" pb "github.com/company/ecommerce/proto" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) type productServer struct { pb.UnimplementedProductServiceServer mu sync.RWMutex products map[string]*pb.Product watchers map[string]chan *pb.InventoryChange } // Unary RPC implementation func (s *productServer) GetProduct( ctx context.Context, req *pb.GetProductRequest, ) (*pb.Product, error) { // Extract metadata for tracing if md, ok := metadata.FromIncomingContext(ctx); ok { if traceID := md.Get("trace-id"); len(traceID) > 0 { log.Printf("GetProduct request - trace: %s", traceID[0]) } } s.mu.RLock() product, exists := s.products[req.ProductId] s.mu.RUnlock() if !exists { return nil, status.Errorf( codes.NotFound, "product %s not found", req.ProductId, ) } // Apply field mask if specified if len(req.Fields) > 0 { return applyFieldMask(product, req.Fields), nil } return product, nil } // Server streaming implementation func (s *productServer) ListProducts( req *pb.ListProductsRequest, stream pb.ProductService_ListProductsServer, ) error { s.mu.RLock() defer s.mu.RUnlock() count := 0 for _, product := range s.products { // Apply filters if !matchesFilter(product, req) { continue } // Send product to stream if err := stream.Send(product); err != nil { return status.Errorf( codes.Internal, "failed to send product: %v", err, ) } count++ if req.PageSize > 0 && count >= int(req.PageSize) { break } // Simulate real-time processing time.Sleep(10 * time.Millisecond) } return nil } // Client streaming implementation func (s *productServer) ImportProducts( stream pb.ProductService_ImportProductsServer, ) error { var summary pb.ImportSummary var errors []*pb.ImportError index := 0 for { product, err := stream.Recv() if err == io.EOF { // Client finished sending summary.Errors = errors return stream.SendAndClose(&summary) } if err != nil { return status.Errorf( codes.Internal, "failed to receive product: %v", err, ) } summary.TotalReceived++ // Validate and store product if err := validateProduct(product); err != nil { summary.Failed++ errors = append(errors, &pb.ImportError{ Index: int32(index), ProductId: product.Id, Error: err.Error(), }) } else { s.mu.Lock() s.products[product.Id] = product s.mu.Unlock() summary.Successful++ } index++ } } // Bidirectional streaming implementation func (s *productServer) WatchInventory( stream pb.ProductService_WatchInventoryServer, ) error { // Create change channel for this client changeChan := make(chan *pb.InventoryChange, 100) clientID := generateClientID() s.mu.Lock() s.watchers[clientID] = changeChan s.mu.Unlock() defer func() { s.mu.Lock() delete(s.watchers, clientID) s.mu.Unlock() close(changeChan) }() // Handle bidirectional communication errChan := make(chan error, 2) // Goroutine to receive updates from client go func() { for { update, err := stream.Recv() if err == io.EOF { errChan <- nil return } if err != nil { errChan <- err return } // Process inventory update if err := s.processInventoryUpdate(update); err != nil { log.Printf("Failed to process update: %v", err) continue } // Notify all watchers change := &pb.InventoryChange{ ProductId: update.ProductId, NewQuantity: s.getInventory(update.ProductId), Timestamp: timestamppb.Now(), TriggeredBy: clientID, } s.broadcastChange(change) } }() // Goroutine to send changes to client go func() { for change := range changeChan { if err := stream.Send(change); err != nil { errChan <- err return } } }() // Wait for error or completion return <-errChan } // Interceptor for authentication func authInterceptor( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { // Extract token from metadata md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.Error(codes.Unauthenticated, "missing metadata") } tokens := md.Get("authorization") if len(tokens) == 0 { return nil, status.Error(codes.Unauthenticated, "missing token") } // Validate token (implement your auth logic) if !isValidToken(tokens[0]) { return nil, status.Error(codes.Unauthenticated, "invalid token") } // Continue to handler return handler(ctx, req) } // Main server setup func main() { // Load TLS credentials cert, err := tls.LoadX509KeyPair("server.crt", "server.key") if err != nil { log.Fatalf("Failed to load certificates: %v", err) } config := &tls.Config{ Certificates: []tls.Certificate{cert}, ClientAuth: tls.RequireAndVerifyClientCert, } creds := credentials.NewTLS(config) // Configure server options opts := []grpc.ServerOption{ grpc.Creds(creds), grpc.UnaryInterceptor(authInterceptor), grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: 5 * time.Minute, Time: 2 * time.Minute, Timeout: 20 * time.Second, }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, PermitWithoutStream: true, }), grpc.MaxConcurrentStreams(1000), } // Create gRPC server server := grpc.NewServer(opts...) // Register service pb.RegisterProductServiceServer(server, &productServer{ products: make(map[string]*pb.Product), watchers: make(map[string]chan *pb.InventoryChange), }) // Start listening lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("Failed to listen: %v", err) } log.Println("gRPC server starting on :50051") if err := server.Serve(lis); err != nil { log.Fatalf("Failed to serve: %v", err) } } ``` ### Example 2: Python Client with Retry and Load Balancing ```python # client/product_client.py import grpc import asyncio import logging from typing import List, Optional, AsyncIterator from concurrent import futures from grpc import aio import backoff from proto import ecommerce_pb2 as pb from proto import ecommerce_pb2_grpc as pb_grpc logger = logging.getLogger(__name__) class ProductClient: """Enhanced gRPC client with retry, load balancing, and connection pooling.""" def __init__( self, servers: List[str], api_key: Optional[str] = None, use_tls: bool = True, pool_size: int = 10 ): self.servers = servers self.api_key = api_key self.use_tls = use_tls self.pool_size = pool_size self.channels = [] self.stubs = [] self._round_robin_counter = 0 self._setup_channels() def _setup_channels(self): """Set up connection pool with load balancing.""" for server in self.servers: for _ in range(self.pool_size // len(self.servers)): if self.use_tls: # Load client certificates with open('client.crt', 'rb') as f: client_cert = f.read() with open('client.key', 'rb') as f: client_key = f.read() with open('ca.crt', 'rb') as f: ca_cert = f.read() credentials = grpc.ssl_channel_credentials( root_certificates=ca_cert, private_key=client_key, certificate_chain=client_cert ) channel = aio.secure_channel( server, credentials, options=[ ('grpc.keepalive_time_ms', 120000), ('grpc.keepalive_timeout_ms', 20000), ('grpc.keepalive_permit_without_calls', True), ('grpc.http2.max_pings_without_data', 0), ] ) else: channel = aio.insecure_channel( server, options=[ ('grpc.keepalive_time_ms', 120000), ('grpc.keepalive_timeout_ms', 20000), ] ) self.channels.append(channel) self.stubs.append(pb_grpc.ProductServiceStub(channel)) def _get_stub(self) -> pb_grpc.ProductServiceStub: """Get next stub using round-robin load balancing.""" stub = self.stubs[self._round_robin_counter] self._round_robin_counter = (self._round_robin_counter + 1) % len(self.stubs) return stub def _get_metadata(self) -> List[tuple]: """Generate request metadata.""" metadata = [] if self.api_key: metadata.append(('authorization', f'Bearer {self.api_key}')) metadata.append(('trace-id', self._generate_trace_id())) return metadata @backoff.on_exception( backoff.expo, grpc.RpcError, max_tries=3, giveup=lambda e: e.code() != grpc.StatusCode.UNAVAILABLE ) async def get_product( self, product_id: str, fields: Optional[List[str]] = None ) -> pb.Product: """Get single product with retry logic.""" request = pb.GetProductRequest( product_id=product_id, fields=fields or [] ) try: response = await self._get_stub().GetProduct( request, metadata=self._get_metadata(), timeout=5.0 ) return response except grpc.RpcError as e: logger.error(f"Failed to get product {product_id}: {e.details()}") raise async def list_products( self, category: Optional[str] = None, page_size: int = 100, min_price: Optional[float] = None, max_price: Optional[float] = None ) -> AsyncIterator[pb.Product]: """Stream products with server-side streaming.""" request = pb.ListProductsRequest( category=category or "", page_size=page_size ) if min_price is not None or max_price is not None: request.filter.CopyFrom(pb.ListProductsRequest.Filter( min_price=min_price or 0, max_price=max_price or float('inf') )) try: stream = self._get_stub().ListProducts( request, metadata=self._get_metadata(), timeout=30.0 ) async for product in stream: yield product except grpc.RpcError as e: logger.error(f"Failed to list products: {e.details()}") raise async def import_products( self, products: List[pb.Product] ) -> pb.ImportSummary: """Import products using client-side streaming.""" async def generate_products(): for product in products: yield product await asyncio.sleep(0.01) # Rate limiting try: response = await self._get_stub().ImportProducts( generate_products(), metadata=self._get_metadata(), timeout=60.0 ) if response.failed > 0: logger.warning( f"Import completed with {response.failed} failures: " f"{[e.error for e in response.errors]}" ) return response except grpc.RpcError as e: logger.error(f"Failed to import products: {e.details()}") raise async def watch_inventory( self, updates: AsyncIterator[pb.InventoryUpdate] ) -> AsyncIterator[pb.InventoryChange]: """Bidirectional streaming for inventory monitoring.""" try: stream = self._get_stub().WatchInventory( metadata=self._get_metadata() ) # Start sending updates send_task = asyncio.create_task(self._send_updates(stream, updates)) # Receive changes try: async for change in stream: yield change finally: send_task.cancel() except grpc.RpcError as e: logger.error(f"Failed in inventory watch: {e.details()}") raise async def _send_updates( self, stream, updates: AsyncIterator[pb.InventoryUpdate] ): """Send inventory updates to server.""" try: async for update in updates: await stream.write(update) await stream.done_writing() except asyncio.CancelledError: pass async def close(self): """Close all channels gracefully.""" close_tasks = [channel.close() for channel in self.channels] await asyncio.gather(*close_tasks) @staticmethod def _generate_trace_id() -> str: """Generate unique trace ID for request tracking.""" import uuid return str(uuid.uuid4()) # Usage example async def main(): # Initialize client with load balancing client = ProductClient( servers=[ 'product-service-1:50051', 'product-service-2:50051', 'product-service-3:50051' ], api_key='your-api-key', use_tls=True ) try: # Unary call product = await client.get_product('prod-123') print(f"Product: {product.name} - ${product.price}") # Server streaming async for product in client.list_products( category='electronics', min_price=100, max_price=1000 ): print(f"Listed: {product.name}") # Client streaming products_to_import = [ pb.Product(id=f'new-{i}', name=f'Product {i}', price=99.99) for i in range(100) ] summary = await client.import_products(products_to_import) print(f"Imported {summary.successful} products") # Bidirectional streaming async def generate_updates(): for i in range(10): yield pb.InventoryUpdate( product_id=f'prod-{i}', quantity_change=5, warehouse_id='warehouse-1' ) await asyncio.sleep(1) async for change in client.watch_inventory(generate_updates()): print(f"Inventory change: {change.product_id} -> {change.new_quantity}") finally: await client.close() if __name__ == '__main__': asyncio.run(main()) ``` ### Example 3: Node.js Implementation with Health Checking ```javascript // server/index.js const grpc = require('@grpc/grpc-js'); const protoLoader = require('@grpc/proto-loader'); const path = require('path'); // Load proto file const PROTO_PATH = path.join(__dirname, '../proto/ecommerce.proto'); const packageDefinition = protoLoader.loadSync(PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); const protoDescriptor = grpc.loadPackageDefinition(packageDefinition); const ecommerce = protoDescriptor.ecommerce.v1; // Health check implementation const health = require('grpc-health-check'); const healthImpl = new health.Implementation({ '': 'SERVING', 'ecommerce.v1.ProductService': 'SERVING' }); // Service implementation class ProductService { constructor() { this.products = new Map(); this.watchers = new Map(); } async getProduct(call, callback) { const { product_id } = call.request; const product = this.products.get(product_id); if (!product) { callback({ code: grpc.status.NOT_FOUND, message: `Product ${product_id} not found` }); return; } callback(null, product); } async listProducts(call) { const { category, page_size } = call.request; let count = 0; for (const [id, product] of this.products) { if (category && product.categories.indexOf(category) === -1) { continue; } call.write(product); count++; if (page_size > 0 && count >= page_size) { break; } // Simulate processing delay await new Promise(resolve => setTimeout(resolve, 10)); } call.end(); } // Add remaining methods... } // Server setup function main() { const server = new grpc.Server({ 'grpc.max_concurrent_streams': 1000, 'grpc.max_receive_message_length': 1024 * 1024 * 16 }); // Add services server.addService( ecommerce.ProductService.service, new ProductService() ); // Add health check server.addService(health.service, healthImpl); // Start server server.bindAsync( '0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) { console.error('Failed to bind:', err); return; } console.log(`gRPC server running on port ${port}`); server.start(); } ); } main(); ``` ## Error Handling | Error | Cause | Solution | |-------|-------|----------| | "Failed to compile proto" | Invalid Protocol Buffer syntax | Validate with `protoc --lint` | | "Connection refused" | Server not running or wrong port | Check server status and port | | "Deadline exceeded" | Request timeout | Increase timeout or optimize operation | | "Resource exhausted" | Rate limiting or quota exceeded | Implement backoff and retry | | "Unavailable" | Server temporarily down | Implement circuit breaker pattern | ## Configuration Options **Server Options** - `MaxConcurrentStreams`: Limit concurrent streams per connection - `MaxReceiveMessageSize`: Maximum message size (default 4MB) - `KeepaliveParams`: Connection health monitoring - `ConnectionTimeout`: Maximum idle time before closing **Client Options** - `LoadBalancingPolicy`: round_robin, pick_first, grpclb - `WaitForReady`: Block until server available - `Retry`: Automatic retry configuration - `Interceptors`: Add cross-cutting concerns ## Best Practices DO: - Use field numbers consistently for backward compatibility - Implement proper error codes and messages - Add request deadlines for all RPCs - Use streaming for large datasets - Implement health checking endpoints - Version your services properly DON'T: - Change field numbers in proto files - Use gRPC for browser clients without proxy - Ignore proper error handling - Send large messages without streaming - Skip TLS in production - Use synchronous calls for long operations ## Performance Considerations - Binary protocol reduces bandwidth by 20-30% vs JSON - HTTP/2 multiplexing eliminates head-of-line blocking - Connection pooling reduces handshake overhead - Streaming prevents memory exhaustion with large datasets - Protocol Buffers provide 3-10x faster serialization than JSON ## Security Considerations - Always use TLS in production with mutual authentication - Implement token-based authentication via metadata - Use interceptors for consistent auth across services - Validate all input according to proto definitions - Implement rate limiting per client - Use service accounts for service-to-service auth ## Related Commands - `/rest-api-generator` - Generate REST APIs - `/graphql-server-builder` - Build GraphQL servers - `/api-gateway-builder` - Create API gateways - `/webhook-handler-creator` - Handle webhooks - `/websocket-server-builder` - WebSocket servers ## Version History - v1.0.0 (2024-10): Initial implementation with Go, Python, Node.js support - Planned v1.1.0: Add Rust and Java implementations with advanced load balancing