gRPC Service Example
Complete example of building a gRPC microservice with Verb, including Protocol Buffers, streaming, authentication, and client implementation.
Overview
This example demonstrates building a full-featured gRPC service for a product catalog system with:
- Protocol Buffer service definitions
- Unary, server streaming, client streaming, and bidirectional streaming RPCs
- Authentication and authorization
- Error handling and validation
- Client implementation
- Testing strategies
- Performance optimization
Project Setup
bash
# Create new project
mkdir product-service
cd product-service
bun init -y
# Install dependencies
bun install verb
bun install -D @types/bun typescript
# Install gRPC utilities
bun install @grpc/grpc-js @grpc/proto-loader
bun install protobufjs
Protocol Buffer Definitions
// protos/product.proto
syntax = "proto3";
package product;
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
service ProductService {
// Unary RPCs
rpc GetProduct(GetProductRequest) returns (Product);
rpc CreateProduct(CreateProductRequest) returns (Product);
rpc UpdateProduct(UpdateProductRequest) returns (Product);
rpc DeleteProduct(DeleteProductRequest) returns (google.protobuf.Empty);
// Server streaming RPC
rpc ListProducts(ListProductsRequest) returns (stream Product);
rpc SearchProducts(SearchProductsRequest) returns (stream Product);
// Client streaming RPC
rpc BulkCreateProducts(stream CreateProductRequest) returns (BulkCreateResponse);
// Bidirectional streaming RPC
rpc SyncProducts(stream ProductSyncRequest) returns (stream ProductSyncResponse);
}
message Product {
string id = 1;
string name = 2;
string description = 3;
double price = 4;
string category = 5;
repeated string tags = 6;
int32 stock_quantity = 7;
string sku = 8;
ProductStatus status = 9;
google.protobuf.Timestamp created_at = 10;
google.protobuf.Timestamp updated_at = 11;
ProductMetadata metadata = 12;
}
message ProductMetadata {
double weight = 1;
repeated string images = 2;
map<string, string> attributes = 3;
string vendor = 4;
}
enum ProductStatus {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
OUT_OF_STOCK = 3;
DISCONTINUED = 4;
}
message GetProductRequest {
string id = 1;
}
message CreateProductRequest {
string name = 1;
string description = 2;
double price = 3;
string category = 4;
repeated string tags = 5;
int32 stock_quantity = 6;
string sku = 7;
ProductMetadata metadata = 8;
}
message UpdateProductRequest {
string id = 1;
optional string name = 2;
optional string description = 3;
optional double price = 4;
optional string category = 5;
repeated string tags = 6;
optional int32 stock_quantity = 7;
optional ProductStatus status = 8;
optional ProductMetadata metadata = 9;
}
message DeleteProductRequest {
string id = 1;
}
message ListProductsRequest {
int32 page_size = 1;
string page_token = 2;
string category = 3;
ProductStatus status = 4;
string order_by = 5;
}
message SearchProductsRequest {
string query = 1;
repeated string categories = 2;
double min_price = 3;
double max_price = 4;
repeated string tags = 5;
int32 limit = 6;
}
message BulkCreateResponse {
int32 created_count = 1;
int32 failed_count = 2;
repeated string created_ids = 3;
repeated BulkError errors = 4;
}
message BulkError {
int32 index = 1;
string error = 2;
string product_name = 3;
}
message ProductSyncRequest {
enum SyncType {
SUBSCRIBE = 0;
UPDATE = 1;
DELETE = 2;
}
SyncType type = 1;
string product_id = 2;
Product product = 3;
}
message ProductSyncResponse {
enum ResponseType {
INITIAL_DATA = 0;
UPDATE = 1;
DELETE = 2;
ERROR = 3;
}
ResponseType type = 1;
Product product = 2;
string error = 3;
google.protobuf.Timestamp timestamp = 4;
}
gRPC Server Implementation
typescript
// src/server.ts
import { createServer, ServerProtocol } from "verb";
import { ProductService } from "./services/ProductService";
import { authMiddleware } from "./middleware/auth";
import { loggingMiddleware } from "./middleware/logging";
import { validationMiddleware } from "./middleware/validation";
const app = createServer(ServerProtocol.GRPC);
app.withOptions({
port: 50051,
grpc: {
maxReceiveMessageLength: 4 * 1024 * 1024, // 4MB
maxSendMessageLength: 4 * 1024 * 1024, // 4MB
keepaliveTimeMs: 30000,
keepaliveTimeoutMs: 5000,
keepalivePermitWithoutCalls: true
}
});
// Global middleware
app.use(loggingMiddleware);
app.use(authMiddleware);
app.use(validationMiddleware);
// Add the product service
app.addService({
name: "ProductService",
protoFile: "./protos/product.proto",
methods: {
// Unary RPC: Get single product
GetProduct: {
handler: async (call) => {
const { id } = call.request;
if (!id) {
throw new GrpcError(
GrpcStatus.INVALID_ARGUMENT,
"Product ID is required"
);
}
const product = await ProductService.findById(id);
if (!product) {
throw new GrpcError(
GrpcStatus.NOT_FOUND,
`Product with ID ${id} not found`
);
}
return product;
}
},
// Unary RPC: Create product
CreateProduct: {
handler: async (call) => {
const productData = call.request;
const userId = call.metadata.get("user-id")?.[0];
// Validate required fields
if (!productData.name || !productData.price) {
throw new GrpcError(
GrpcStatus.INVALID_ARGUMENT,
"Product name and price are required"
);
}
// Check permissions
if (!await hasPermission(userId, "product:create")) {
throw new GrpcError(
GrpcStatus.PERMISSION_DENIED,
"Insufficient permissions to create products"
);
}
try {
const product = await ProductService.create(productData);
// Publish event for real-time updates
await publishProductEvent("product.created", product);
return product;
} catch (error) {
if (error.code === "DUPLICATE_SKU") {
throw new GrpcError(
GrpcStatus.ALREADY_EXISTS,
`Product with SKU ${productData.sku} already exists`
);
}
throw new GrpcError(GrpcStatus.INTERNAL, "Failed to create product");
}
}
},
// Unary RPC: Update product
UpdateProduct: {
handler: async (call) => {
const { id, ...updateData } = call.request;
const userId = call.metadata.get("user-id")?.[0];
// Check if product exists
const existingProduct = await ProductService.findById(id);
if (!existingProduct) {
throw new GrpcError(
GrpcStatus.NOT_FOUND,
`Product with ID ${id} not found`
);
}
// Check permissions
if (!await hasPermission(userId, "product:update")) {
throw new GrpcError(
GrpcStatus.PERMISSION_DENIED,
"Insufficient permissions to update products"
);
}
try {
const updatedProduct = await ProductService.update(id, updateData);
// Publish event for real-time updates
await publishProductEvent("product.updated", updatedProduct);
return updatedProduct;
} catch (error) {
throw new GrpcError(GrpcStatus.INTERNAL, "Failed to update product");
}
}
},
// Unary RPC: Delete product
DeleteProduct: {
handler: async (call) => {
const { id } = call.request;
const userId = call.metadata.get("user-id")?.[0];
// Check if product exists
const existingProduct = await ProductService.findById(id);
if (!existingProduct) {
throw new GrpcError(
GrpcStatus.NOT_FOUND,
`Product with ID ${id} not found`
);
}
// Check permissions
if (!await hasPermission(userId, "product:delete")) {
throw new GrpcError(
GrpcStatus.PERMISSION_DENIED,
"Insufficient permissions to delete products"
);
}
await ProductService.delete(id);
// Publish event for real-time updates
await publishProductEvent("product.deleted", { id });
return {}; // Empty response
}
},
// Server streaming RPC: List products
ListProducts: {
handler: async function* (call) {
const { pageSize = 20, pageToken, category, status, orderBy } = call.request;
const filters = {
category,
status,
orderBy: orderBy || "created_at DESC"
};
let currentToken = pageToken;
while (true) {
const result = await ProductService.findMany({
...filters,
limit: pageSize,
pageToken: currentToken
});
// Yield each product
for (const product of result.products) {
yield product;
}
// Check if there are more pages
if (!result.nextPageToken) {
break;
}
currentToken = result.nextPageToken;
}
}
},
// Server streaming RPC: Search products
SearchProducts: {
handler: async function* (call) {
const { query, categories, minPrice, maxPrice, tags, limit = 100 } = call.request;
const searchFilters = {
query,
categories: categories || [],
priceRange: minPrice || maxPrice ? { min: minPrice, max: maxPrice } : undefined,
tags: tags || [],
limit
};
// Stream search results
const searchResults = ProductService.search(searchFilters);
for await (const product of searchResults) {
yield product;
}
}
},
// Client streaming RPC: Bulk create products
BulkCreateProducts: {
handler: async (requestStream) => {
const results = {
createdCount: 0,
failedCount: 0,
createdIds: [],
errors: []
};
let index = 0;
try {
for await (const productRequest of requestStream) {
try {
// Validate product data
if (!productRequest.name || !productRequest.price) {
throw new Error("Name and price are required");
}
const product = await ProductService.create(productRequest);
results.createdCount++;
results.createdIds.push(product.id);
// Publish event for each created product
await publishProductEvent("product.created", product);
} catch (error) {
results.failedCount++;
results.errors.push({
index,
error: error.message,
productName: productRequest.name || "Unknown"
});
}
index++;
}
return results;
} catch (error) {
throw new GrpcError(
GrpcStatus.INTERNAL,
`Bulk create failed: ${error.message}`
);
}
}
},
// Bidirectional streaming RPC: Sync products
SyncProducts: {
handler: async function* (requestStream) {
const subscriptions = new Set();
try {
// Send initial data
const allProducts = await ProductService.findMany({ limit: 1000 });
for (const product of allProducts.products) {
yield {
type: "INITIAL_DATA",
product,
timestamp: new Date()
};
}
// Process incoming sync requests
const processRequests = async () => {
for await (const request of requestStream) {
switch (request.type) {
case "SUBSCRIBE":
subscriptions.add(request.productId);
break;
case "UPDATE":
if (request.product) {
try {
const updatedProduct = await ProductService.update(
request.productId,
request.product
);
yield {
type: "UPDATE",
product: updatedProduct,
timestamp: new Date()
};
} catch (error) {
yield {
type: "ERROR",
error: error.message,
timestamp: new Date()
};
}
}
break;
case "DELETE":
try {
await ProductService.delete(request.productId);
yield {
type: "DELETE",
product: { id: request.productId },
timestamp: new Date()
};
} catch (error) {
yield {
type: "ERROR",
error: error.message,
timestamp: new Date()
};
}
break;
}
}
};
// Start processing requests in background
processRequests().catch(console.error);
// Keep connection alive and send updates
while (true) {
// In a real implementation, you'd listen for database changes
// and yield updates when subscribed products change
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
subscriptions.clear();
}
}
}
}
});
app.listen(50051);
console.log("🚀 gRPC Product Service running on port 50051");
Product Service Implementation
typescript
// src/services/ProductService.ts
import { Database } from "bun:sqlite";
import { Product, ProductStatus } from "../types";
const db = new Database("products.db");
// Initialize database
db.exec(`
CREATE TABLE IF NOT EXISTS products (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
price REAL NOT NULL,
category TEXT,
tags TEXT, -- JSON array
stock_quantity INTEGER DEFAULT 0,
sku TEXT UNIQUE,
status TEXT DEFAULT 'ACTIVE',
metadata TEXT, -- JSON object
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_products_category ON products(category);
CREATE INDEX IF NOT EXISTS idx_products_status ON products(status);
CREATE INDEX IF NOT EXISTS idx_products_sku ON products(sku);
CREATE INDEX IF NOT EXISTS idx_products_created_at ON products(created_at);
`);
export class ProductService {
static async findById(id: string): Promise<Product | null> {
const row = db.query("SELECT * FROM products WHERE id = ?").get(id);
return row ? this.mapRowToProduct(row) : null;
}
static async findMany(options: {
limit?: number;
pageToken?: string;
category?: string;
status?: ProductStatus;
orderBy?: string;
}) {
let query = "SELECT * FROM products WHERE 1=1";
const params: any[] = [];
if (options.category) {
query += " AND category = ?";
params.push(options.category);
}
if (options.status) {
query += " AND status = ?";
params.push(options.status);
}
// Add ordering
query += ` ORDER BY ${options.orderBy || "created_at DESC"}`;
// Add pagination
const limit = Math.min(options.limit || 20, 100);
query += " LIMIT ?";
params.push(limit + 1); // Get one extra to check if there's a next page
if (options.pageToken) {
// Decode page token (in real implementation, this would be more sophisticated)
const offset = parseInt(atob(options.pageToken));
query += " OFFSET ?";
params.push(offset);
}
const rows = db.query(query).all(...params);
const hasMore = rows.length > limit;
const products = rows.slice(0, limit).map(this.mapRowToProduct);
let nextPageToken;
if (hasMore) {
const offset = (options.pageToken ? parseInt(atob(options.pageToken)) : 0) + limit;
nextPageToken = btoa(offset.toString());
}
return {
products,
nextPageToken
};
}
static async* search(filters: {
query?: string;
categories?: string[];
priceRange?: { min?: number; max?: number };
tags?: string[];
limit?: number;
}) {
let query = "SELECT * FROM products WHERE 1=1";
const params: any[] = [];
if (filters.query) {
query += " AND (name LIKE ? OR description LIKE ?)";
const searchTerm = `%${filters.query}%`;
params.push(searchTerm, searchTerm);
}
if (filters.categories && filters.categories.length > 0) {
const placeholders = filters.categories.map(() => "?").join(",");
query += ` AND category IN (${placeholders})`;
params.push(...filters.categories);
}
if (filters.priceRange) {
if (filters.priceRange.min !== undefined) {
query += " AND price >= ?";
params.push(filters.priceRange.min);
}
if (filters.priceRange.max !== undefined) {
query += " AND price <= ?";
params.push(filters.priceRange.max);
}
}
if (filters.tags && filters.tags.length > 0) {
for (const tag of filters.tags) {
query += " AND tags LIKE ?";
params.push(`%"${tag}"%`);
}
}
query += " ORDER BY created_at DESC";
if (filters.limit) {
query += " LIMIT ?";
params.push(filters.limit);
}
const rows = db.query(query).all(...params);
for (const row of rows) {
yield this.mapRowToProduct(row);
}
}
static async create(productData: any): Promise<Product> {
const id = crypto.randomUUID();
const now = new Date().toISOString();
// Validate SKU uniqueness
if (productData.sku) {
const existing = db.query("SELECT id FROM products WHERE sku = ?").get(productData.sku);
if (existing) {
throw new Error("DUPLICATE_SKU");
}
}
const product = {
id,
name: productData.name,
description: productData.description || "",
price: productData.price,
category: productData.category || "",
tags: JSON.stringify(productData.tags || []),
stockQuantity: productData.stockQuantity || 0,
sku: productData.sku || "",
status: "ACTIVE",
metadata: JSON.stringify(productData.metadata || {}),
createdAt: now,
updatedAt: now
};
db.query(`
INSERT INTO products (
id, name, description, price, category, tags, stock_quantity,
sku, status, metadata, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
product.id,
product.name,
product.description,
product.price,
product.category,
product.tags,
product.stockQuantity,
product.sku,
product.status,
product.metadata,
product.createdAt,
product.updatedAt
);
return this.findById(id)!;
}
static async update(id: string, updateData: any): Promise<Product> {
const updates: string[] = [];
const params: any[] = [];
// Build dynamic update query
for (const [key, value] of Object.entries(updateData)) {
if (value !== undefined) {
const dbColumn = this.mapFieldToColumn(key);
updates.push(`${dbColumn} = ?`);
if (key === "tags" || key === "metadata") {
params.push(JSON.stringify(value));
} else {
params.push(value);
}
}
}
if (updates.length === 0) {
throw new Error("No fields to update");
}
updates.push("updated_at = ?");
params.push(new Date().toISOString());
params.push(id);
const result = db.query(`
UPDATE products SET ${updates.join(", ")} WHERE id = ?
`).run(...params);
if (result.changes === 0) {
throw new Error("Product not found");
}
return this.findById(id)!;
}
static async delete(id: string): Promise<void> {
const result = db.query("DELETE FROM products WHERE id = ?").run(id);
if (result.changes === 0) {
throw new Error("Product not found");
}
}
private static mapRowToProduct(row: any): Product {
return {
id: row.id,
name: row.name,
description: row.description,
price: row.price,
category: row.category,
tags: JSON.parse(row.tags || "[]"),
stockQuantity: row.stock_quantity,
sku: row.sku,
status: row.status,
createdAt: row.created_at,
updatedAt: row.updated_at,
metadata: JSON.parse(row.metadata || "{}")
};
}
private static mapFieldToColumn(field: string): string {
const mapping = {
stockQuantity: "stock_quantity",
createdAt: "created_at",
updatedAt: "updated_at"
};
return mapping[field] || field;
}
}
Middleware Implementation
typescript
// src/middleware/auth.ts
import { verify } from "jsonwebtoken";
import { GrpcError, GrpcStatus } from "verb";
export const authMiddleware = async (call: any, next: any) => {
// Skip auth for certain methods
const publicMethods = ["GetProduct", "ListProducts", "SearchProducts"];
if (publicMethods.includes(call.method)) {
return next();
}
const metadata = call.metadata;
const token = metadata.get("authorization")?.[0];
if (!token) {
throw new GrpcError(
GrpcStatus.UNAUTHENTICATED,
"Authentication token required"
);
}
try {
const decoded = verify(token.replace("Bearer ", ""), process.env.JWT_SECRET!) as any;
// Add user info to metadata
metadata.set("user-id", decoded.userId);
metadata.set("user-role", decoded.role);
return next();
} catch (error) {
throw new GrpcError(
GrpcStatus.UNAUTHENTICATED,
"Invalid or expired token"
);
}
};
// src/middleware/logging.ts
export const loggingMiddleware = async (call: any, next: any) => {
const start = Date.now();
const method = call.method;
const clientInfo = call.getPeer();
console.log(`[${new Date().toISOString()}] gRPC ${method} started - Client: ${clientInfo}`);
try {
const result = await next();
const duration = Date.now() - start;
console.log(`[${new Date().toISOString()}] gRPC ${method} completed in ${duration}ms`);
return result;
} catch (error) {
const duration = Date.now() - start;
console.error(`[${new Date().toISOString()}] gRPC ${method} failed in ${duration}ms:`, error.message);
throw error;
}
};
// src/middleware/validation.ts
export const validationMiddleware = async (call: any, next: any) => {
const method = call.method;
const request = call.request;
// Method-specific validation
switch (method) {
case "CreateProduct":
case "UpdateProduct":
if (request.price !== undefined && request.price < 0) {
throw new GrpcError(
GrpcStatus.INVALID_ARGUMENT,
"Price cannot be negative"
);
}
if (request.stockQuantity !== undefined && request.stockQuantity < 0) {
throw new GrpcError(
GrpcStatus.INVALID_ARGUMENT,
"Stock quantity cannot be negative"
);
}
break;
case "GetProduct":
case "DeleteProduct":
if (!request.id || request.id.trim() === "") {
throw new GrpcError(
GrpcStatus.INVALID_ARGUMENT,
"Product ID is required"
);
}
break;
}
return next();
};
gRPC Client Implementation
typescript
// src/client/ProductClient.ts
import * as grpc from "@grpc/grpc-js";
import * as protoLoader from "@grpc/proto-loader";
const PROTO_PATH = "./protos/product.proto";
// Load the protobuf
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const productProto = grpc.loadPackageDefinition(packageDefinition).product as any;
export class ProductClient {
private client: any;
constructor(serverAddress = "localhost:50051", credentials?: grpc.ChannelCredentials) {
this.client = productProto.ProductService(
serverAddress,
credentials || grpc.credentials.createInsecure()
);
}
// Unary call example
async getProduct(id: string, token?: string): Promise<any> {
return new Promise((resolve, reject) => {
const metadata = grpc.Metadata();
if (token) {
metadata.set("authorization", `Bearer ${token}`);
}
this.client.GetProduct({ id }, metadata, (error: any, response: any) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// Unary call with authentication
async createProduct(productData: any, token: string): Promise<any> {
return new Promise((resolve, reject) => {
const metadata = grpc.Metadata();
metadata.set("authorization", `Bearer ${token}`);
this.client.CreateProduct(productData, metadata, (error: any, response: any) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// Server streaming example
async* listProducts(request: any = {}): AsyncGenerator<any> {
const call = this.client.ListProducts(request);
for await (const product of call) {
yield product;
}
}
// Client streaming example
async bulkCreateProducts(products: any[], token: string): Promise<any> {
return new Promise((resolve, reject) => {
const metadata = grpc.Metadata();
metadata.set("authorization", `Bearer ${token}`);
const call = this.client.BulkCreateProducts(metadata, (error: any, response: any) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
// Send products
for (const product of products) {
call.write(product);
}
call.end();
});
}
// Bidirectional streaming example
async* syncProducts(token: string): AsyncGenerator<any> {
const metadata = new grpc.Metadata();
metadata.set("authorization", `Bearer ${token}`);
const call = this.client.SyncProducts(metadata);
// Subscribe to all products initially
call.write({
type: "SUBSCRIBE",
productId: "*"
});
// Listen for responses
for await (const response of call) {
yield response;
}
}
close() {
this.client.close();
}
}
// Usage example
export const clientExample = async () => {
const client = new ProductClient();
try {
// Get a product
const product = await client.getProduct("product-123");
console.log("Product:", product);
// List products (server streaming)
console.log("Listing products:");
for await (const product of client.listProducts({ pageSize: 10 })) {
console.log("-", product.name);
}
// Bulk create products (client streaming)
const productsToCreate = [
{ name: "Product 1", price: 19.99, category: "electronics" },
{ name: "Product 2", price: 29.99, category: "electronics" },
{ name: "Product 3", price: 39.99, category: "books" }
];
const bulkResult = await client.bulkCreateProducts(productsToCreate, "your-auth-token");
console.log("Bulk create result:", bulkResult);
// Sync products (bidirectional streaming)
console.log("Starting product sync:");
for await (const syncResponse of client.syncProducts("your-auth-token")) {
console.log("Sync update:", syncResponse.type, syncResponse.product?.name);
}
} catch (error) {
console.error("Client error:", error);
} finally {
client.close();
}
}
Testing gRPC Service
typescript
// tests/product-service.test.ts
import { test, expect, beforeAll, afterAll } from "bun:test";
import { ProductClient } from "../src/client/ProductClient";
import { startTestServer, stopTestServer } from "./test-utils";
let client: ProductClient;
beforeAll(async () => {
await startTestServer();
client = new ProductClient("localhost:50052"); // Test server port
});
afterAll(async () => {
if (client) {
client.close();
}
await stopTestServer();
});
test("GetProduct returns product by ID", async () => {
// First create a test product
const testProduct = {
name: "Test Product",
description: "A test product",
price: 19.99,
category: "test",
sku: "TEST-001"
};
const created = await client.createProduct(testProduct, "test-token");
// Then get it
const retrieved = await client.getProduct(created.id);
expect(retrieved.id).toBe(created.id);
expect(retrieved.name).toBe(testProduct.name);
expect(retrieved.price).toBe(testProduct.price);
});
test("ListProducts returns stream of products", async () => {
const products = [];
for await (const product of client.listProducts({ pageSize: 5 })) {
products.push(product);
}
expect(products.length).toBeGreaterThan(0);
expect(products[0]).toHaveProperty("id");
expect(products[0]).toHaveProperty("name");
});
test("BulkCreateProducts creates multiple products", async () => {
const productsToCreate = [
{ name: "Bulk Product 1", price: 10.00, category: "bulk" },
{ name: "Bulk Product 2", price: 20.00, category: "bulk" },
{ name: "Bulk Product 3", price: 30.00, category: "bulk" }
];
const result = await client.bulkCreateProducts(productsToCreate, "test-token");
expect(result.createdCount).toBe(3);
expect(result.failedCount).toBe(0);
expect(result.createdIds).toHaveLength(3);
});
test("handles gRPC errors correctly", async () => {
try {
await client.getProduct("non-existent-id");
expect.fail("Should have thrown an error");
} catch (error) {
expect(error.code).toBe(5); // NOT_FOUND
expect(error.message).toContain("not found");
}
});
test("authentication is required for write operations", async () => {
try {
await client.createProduct({
name: "Unauthorized Product",
price: 10.00
}, ""); // No token
expect.fail("Should have thrown an authentication error");
} catch (error) {
expect(error.code).toBe(16); // UNAUTHENTICATED
}
});
Performance Optimization
typescript
// src/performance/connection-pool.ts
import * as grpc from "@grpc/grpc-js";
export class GrpcConnectionPool {
private connections: Map<string, any> = new Map();
private maxConnections = 10;
getClient(address: string, serviceType: any): any {
if (!this.connections.has(address)) {
if (this.connections.size >= this.maxConnections) {
// Remove oldest connection
const firstKey = this.connections.keys().next().value;
const oldClient = this.connections.get(firstKey);
oldClient.close();
this.connections.delete(firstKey);
}
const client = serviceType(
address,
grpc.credentials.createInsecure(),
{
"grpc.keepalive_time_ms": 30000,
"grpc.keepalive_timeout_ms": 5000,
"grpc.keepalive_permit_without_calls": true,
"grpc.http2.max_pings_without_data": 0,
"grpc.http2.min_time_between_pings_ms": 10000,
"grpc.http2.min_ping_interval_without_data_ms": 300000
}
);
this.connections.set(address, client);
}
return this.connections.get(address);
}
closeAll() {
for (const client of this.connections.values()) {
client.close();
}
this.connections.clear();
}
}
// src/performance/caching.ts
export class GrpcResponseCache {
private cache = new Map<string, { data: any, expiry: number }>();
private defaultTTL = 5 * 60 * 1000; // 5 minutes
get(key: string): any | null {
const cached = this.cache.get(key);
if (cached && cached.expiry > Date.now()) {
return cached.data;
}
if (cached) {
this.cache.delete(key);
}
return null;
}
set(key: string, data: any, ttl = this.defaultTTL) {
this.cache.set(key, {
data,
expiry: Date.now() + ttl
});
}
clear() {
this.cache.clear();
}
}
Running the Application
bash
# Set environment variables
export JWT_SECRET="your-secret-key"
export NODE_ENV="development"
# Start the gRPC server
bun run src/server.ts
# In another terminal, run the client example
bun run src/client/example.ts
# Run tests
bun test
Key Features Demonstrated
This gRPC service example showcases:
- Complete gRPC Implementation: All four types of gRPC calls
- Protocol Buffers: Comprehensive schema definition
- Authentication & Authorization: JWT-based security
- Error Handling: Proper gRPC status codes and error messages
- Streaming: Server, client, and bidirectional streaming
- Database Integration: SQLite with proper data mapping
- Middleware: Logging, authentication, and validation
- Client Implementation: Full-featured gRPC client
- Testing: Comprehensive test coverage
- Performance: Connection pooling and caching strategies
See Also
- gRPC Server API - gRPC server configuration
- Protocol Gateway - Multi-protocol integration
- Authentication Example - Authentication patterns
- gRPC Protocol Guide - gRPC protocol configuration