Implementing a New Database
This guide provides step-by-step instructions for adding a new database backend to Gofannon.
Overview
To add a new database backend, you need to:
- Create an implementation file
- Implement the DatabaseService interface
- Add configuration settings
- Update the factory function
- Add dependencies
- Write unit tests
- Add Docker support (optional)
- Update documentation
- Test integration
Step 1: Create Implementation File
Create a new Python file in the database service directory:
touch webapp/packages/api/user-service/services/database_service/mynewdb.py
Step 2: Implement the Interface
Implement all four abstract methods from DatabaseService:
"""
MyNewDB database service implementation.
"""
from typing import Any, Dict, List
from fastapi import HTTPException
from .base import DatabaseService
class MyNewDBService(DatabaseService):
"""Database service implementation for MyNewDB."""
def __init__(self, connection_string: str, **options):
"""
Initialize the MyNewDB connection.
Args:
connection_string: Database connection string
**options: Additional database-specific options
"""
# Initialize your database connection here
self.connection_string = connection_string
self.options = options
self.client = None # Your database client
# Example connection logic:
# self.client = mynewdb.connect(connection_string, **options)
def _ensure_collection_exists(self, db_name: str):
"""
Ensure the collection/table exists.
Create it if it doesn't exist (if your database requires this).
"""
# Example:
# if not self.client.collection_exists(db_name):
# self.client.create_collection(db_name)
pass
def get(self, db_name: str, doc_id: str) -> Dict[str, Any]:
"""
Retrieve a document by ID.
Args:
db_name: Collection/table name
doc_id: Document identifier
Returns:
Document as dictionary
Raises:
HTTPException(404): If document not found
"""
try:
# Example implementation:
# collection = self.client.collection(db_name)
# doc = collection.find_one({"_id": doc_id})
# if doc is None:
# raise HTTPException(status_code=404, detail=f"Document {doc_id} not found")
# return doc
raise NotImplementedError("Implement document retrieval")
except Exception as e:
# Handle database-specific exceptions
if "not found" in str(e).lower():
raise HTTPException(status_code=404, detail=f"Document {doc_id} not found")
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
def save(self, db_name: str, doc_id: str, doc: Dict[str, Any]) -> Dict[str, Any]:
"""
Save (create or update) a document.
Args:
db_name: Collection/table name
doc_id: Document identifier
doc: Document data
Returns:
Dictionary with {"id": doc_id, "rev": revision_id}
"""
try:
# Ensure collection exists (if needed)
self._ensure_collection_exists(db_name)
# Add document ID to the document
doc["_id"] = doc_id
# Example implementation:
# collection = self.client.collection(db_name)
# result = collection.replace_one(
# {"_id": doc_id},
# doc,
# upsert=True # Create if doesn't exist
# )
#
# # Get revision/version info if your database supports it
# rev = result.get("revision") or "v1"
#
# return {"id": doc_id, "rev": rev}
raise NotImplementedError("Implement document save")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save document: {str(e)}")
def delete(self, db_name: str, doc_id: str) -> None:
"""
Delete a document by ID.
Args:
db_name: Collection/table name
doc_id: Document identifier
Raises:
HTTPException(404): If document not found
"""
try:
# Example implementation:
# collection = self.client.collection(db_name)
# result = collection.delete_one({"_id": doc_id})
#
# if result.deleted_count == 0:
# raise HTTPException(status_code=404, detail=f"Document {doc_id} not found")
raise NotImplementedError("Implement document deletion")
except HTTPException:
raise # Re-raise HTTPException
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete document: {str(e)}")
def list_all(self, db_name: str) -> List[Dict[str, Any]]:
"""
List all documents in a collection.
Args:
db_name: Collection/table name
Returns:
List of documents
"""
try:
# Example implementation:
# collection = self.client.collection(db_name)
# documents = list(collection.find({}))
# return documents
# If collection doesn't exist, return empty list
# try:
# self._ensure_collection_exists(db_name)
# except:
# return []
raise NotImplementedError("Implement list all documents")
except Exception as e:
# Some databases may not have the collection yet
return []
Step 3: Add Configuration Settings
Update config/init.py to include your database settings:
class Settings:
# Existing settings...
# MyNewDB configuration
MYNEWDB_CONNECTION_STRING: str | None = os.getenv("MYNEWDB_CONNECTION_STRING")
MYNEWDB_DATABASE: str = os.getenv("MYNEWDB_DATABASE", "gofannon")
MYNEWDB_OPTION_1: str | None = os.getenv("MYNEWDB_OPTION_1")
# Add other configuration options as needed
Step 4: Update Factory Function
Update init.py to include your implementation:
from .mynewdb import MyNewDBService
def get_database_service(settings) -> DatabaseService:
"""Factory function to get the appropriate database service instance."""
global _db_instance
if _db_instance is not None:
return _db_instance
provider = settings.DATABASE_PROVIDER.lower()
# Existing implementations...
elif provider == "mynewdb":
# Validate required configuration
if not settings.MYNEWDB_CONNECTION_STRING:
raise ValueError("MyNewDB requires MYNEWDB_CONNECTION_STRING")
_db_instance = MyNewDBService(
connection_string=settings.MYNEWDB_CONNECTION_STRING,
database=settings.MYNEWDB_DATABASE,
option_1=settings.MYNEWDB_OPTION_1
)
else:
# Default to memory
_db_instance = MemoryDatabaseService()
return _db_instance
Step 5: Add Dependencies
If your database requires additional Python packages, update the requirements:
# Add to webapp/packages/api/user-service/requirements.txt
mynewdb-client>=1.0.0
Or in pyproject.toml:
[project]
dependencies = [
# ... existing dependencies
"mynewdb-client>=1.0.0",
]
Step 6: Write Unit Tests
Create comprehensive tests following the pattern in test_memory_database_service.py:
"""
Unit tests for MyNewDB database service implementation.
"""
import pytest
from fastapi import HTTPException
from services.database_service.mynewdb import MyNewDBService
@pytest.fixture
def db_service():
"""Create a MyNewDB service instance for testing."""
service = MyNewDBService(
connection_string="test://localhost:1234/testdb"
)
yield service
# Cleanup after tests
service.client.close()
class TestMyNewDBService:
"""Test suite for MyNewDB database service."""
def test_save_and_get(self, db_service):
"""Test saving and retrieving a document."""
doc = {"name": "Test Agent", "value": 42}
# Save document
result = db_service.save("agents", "test-id", doc)
assert result["id"] == "test-id"
assert "rev" in result
# Retrieve document
retrieved = db_service.get("agents", "test-id")
assert retrieved["_id"] == "test-id"
assert retrieved["name"] == "Test Agent"
assert retrieved["value"] == 42
def test_get_nonexistent_document(self, db_service):
"""Test retrieving a document that doesn't exist."""
with pytest.raises(HTTPException) as exc_info:
db_service.get("agents", "nonexistent-id")
assert exc_info.value.status_code == 404
def test_update_document(self, db_service):
"""Test updating an existing document."""
# Create initial document
doc = {"name": "Original", "value": 1}
db_service.save("agents", "test-id", doc)
# Update document
updated_doc = {"name": "Updated", "value": 2}
result = db_service.save("agents", "test-id", updated_doc)
assert result["id"] == "test-id"
# Verify update
retrieved = db_service.get("agents", "test-id")
assert retrieved["name"] == "Updated"
assert retrieved["value"] == 2
def test_delete(self, db_service):
"""Test deleting a document."""
# Create document
doc = {"name": "To Delete"}
db_service.save("agents", "test-id", doc)
# Delete document
db_service.delete("agents", "test-id")
# Verify deletion
with pytest.raises(HTTPException) as exc_info:
db_service.get("agents", "test-id")
assert exc_info.value.status_code == 404
def test_delete_nonexistent_document(self, db_service):
"""Test deleting a document that doesn't exist."""
with pytest.raises(HTTPException) as exc_info:
db_service.delete("agents", "nonexistent-id")
assert exc_info.value.status_code == 404
def test_list_all(self, db_service):
"""Test listing all documents in a collection."""
# Create multiple documents
db_service.save("agents", "id-1", {"name": "Agent 1"})
db_service.save("agents", "id-2", {"name": "Agent 2"})
db_service.save("agents", "id-3", {"name": "Agent 3"})
# List all documents
documents = db_service.list_all("agents")
assert len(documents) == 3
# Verify all documents are present
ids = {doc["_id"] for doc in documents}
assert ids == {"id-1", "id-2", "id-3"}
def test_list_all_empty_collection(self, db_service):
"""Test listing documents from an empty collection."""
documents = db_service.list_all("empty-collection")
assert documents == []
def test_multiple_collections(self, db_service):
"""Test that collections are isolated from each other."""
# Save to different collections
db_service.save("agents", "id-1", {"type": "agent"})
db_service.save("users", "id-1", {"type": "user"})
# Verify isolation
agent = db_service.get("agents", "id-1")
user = db_service.get("users", "id-1")
assert agent["type"] == "agent"
assert user["type"] == "user"
Run tests:
cd webapp/packages/api/user-service
pytest tests/unit/services/test_mynewdb_database_service.py -v
Step 7: Add Docker Support (Optional)
If your database requires a service, add it to docker-compose.yml:
services:
mynewdb:
image: mynewdb:latest
container_name: gofannon-mynewdb
ports:
- "9999:9999" # Adjust port as needed
environment:
- MYNEWDB_USER=${MYNEWDB_USER:-admin}
- MYNEWDB_PASSWORD=${MYNEWDB_PASSWORD:-password}
volumes:
- mynewdb-data:/var/lib/mynewdb/data
healthcheck:
test: ["CMD", "mynewdb-healthcheck"]
interval: 10s
timeout: 5s
retries: 5
volumes:
mynewdb-data:
Step 8: Update Documentation
Add your database to this documentation:
- Add a new section under "Existing Database Implementations"
- Document configuration requirements
- Add any special considerations
- Provide setup instructions
Step 9: Integration Testing
Test your implementation with the full application:
# Set environment variables
export DATABASE_PROVIDER=mynewdb
export MYNEWDB_CONNECTION_STRING=mynewdb://localhost:9999/gofannon
# Start the application
cd webapp/packages/api/user-service
uvicorn main:app --reload
# Test API endpoints
curl http://localhost:8000/agents
Common Implementation Patterns
Pattern 1: Auto-Create Collections
Some databases require explicit collection creation:
def _ensure_collection_exists(self, db_name: str):
"""Create collection if it doesn't exist."""
if not self.client.collection_exists(db_name):
self.client.create_collection(db_name)
# Add indexes if needed
self.client.create_index(db_name, "_id", unique=True)
Pattern 2: Connection Pooling
For production use, implement connection pooling:
def __init__(self, connection_string: str, pool_size: int = 10):
self.pool = mynewdb.ConnectionPool(
connection_string,
max_connections=pool_size,
min_connections=2
)
def get(self, db_name: str, doc_id: str) -> Dict[str, Any]:
with self.pool.get_connection() as conn:
# Use connection
return conn.get(db_name, doc_id)
Pattern 3: Type Conversion
Handle database-specific type requirements:
def _prepare_document(self, doc: Dict[str, Any]) -> Dict[str, Any]:
"""Convert Python types to database-compatible types."""
prepared = {}
for key, value in doc.items():
if isinstance(value, datetime):
prepared[key] = value.isoformat()
elif isinstance(value, Decimal):
prepared[key] = float(value)
else:
prepared[key] = value
return prepared
Pattern 4: Error Mapping
Map database-specific exceptions to HTTP exceptions:
def _handle_error(self, error: Exception, operation: str) -> HTTPException:
"""Map database errors to HTTP exceptions."""
error_str = str(error).lower()
if "not found" in error_str or "does not exist" in error_str:
return HTTPException(status_code=404, detail=f"{operation} failed: not found")
if "conflict" in error_str or "already exists" in error_str:
return HTTPException(status_code=409, detail=f"{operation} failed: conflict")
if "permission" in error_str or "unauthorized" in error_str:
return HTTPException(status_code=403, detail=f"{operation} failed: permission denied")
return HTTPException(status_code=500, detail=f"{operation} failed: {str(error)}")
Implementation Checklist
- Create implementation file in
database_service/ - Implement all abstract methods from
DatabaseService - Handle document ID storage (
_idfield) - Return proper format from
save()method - Raise
HTTPException(404)for missing documents - Add configuration settings to
config/__init__.py - Update factory function in
__init__.py - Add dependencies to
requirements.txt - Write comprehensive unit tests
- Test with actual database instance
- Add Docker Compose service (if applicable)
- Update this documentation
- Test integration with API endpoints
- Verify all six collections work correctly
- Test error handling and edge cases
- Document any special considerations
Example: Complete PostgreSQL Implementation
See the appendix in the main database-service.md for a complete PostgreSQL implementation example.
Related Documentation
- Database Interface - Abstract base class and method specifications
- Configuration - Database provider configuration
- Schema - Collection and document schemas
- Testing - Testing strategies
- Database Service README - Overview
Last Updated: 2026-01-11