| name | Directus Backend Architecture |
| description | Master Directus backend internals: API extensions, hooks, flows, services, and database operations |
| version | 1.0.0 |
| author | Directus Development System |
| tags | directus, backend, api, hooks, flows, typescript, node, database |
Directus Backend Architecture
Overview
This skill provides deep expertise in Directus backend architecture, covering API endpoint extensions, hook systems, service layers, flows and automation, database operations, authentication, and performance optimization. Master the TypeScript/Node.js backend to build scalable, secure, and efficient Directus applications.
When to Use This Skill
- Creating custom API endpoints and routes
- Implementing business logic with hooks
- Building automation workflows with Flows
- Extending authentication and permissions
- Optimizing database queries and performance
- Creating custom services and providers
- Implementing real-time features
- Building data migration and seeding scripts
- Integrating third-party services
Core Architecture
Directus Stack
- Runtime: Node.js (v18+)
- Framework: Express.js
- Language: TypeScript
- Database: Knex.js query builder
- ORM: Custom abstraction layer
- Cache: Redis/Memory
- Queue: Bull/Redis
- WebSockets: Socket.io
Directory Structure
directus/
├── api/ # Core API implementation
│ ├── src/
│ │ ├── services/ # Business logic layer
│ │ ├── controllers/ # Request handlers
│ │ ├── middleware/ # Express middleware
│ │ ├── database/ # Database abstraction
│ │ ├── utils/ # Helper functions
│ │ └── types/ # TypeScript definitions
├── extensions/
│ ├── endpoints/ # Custom API endpoints
│ ├── hooks/ # Event hooks
│ ├── operations/ # Flow operations
│ └── services/ # Custom services
└── shared/ # Shared utilities
Process: Creating Custom API Endpoints
Step 1: Initialize Endpoint Extension
npx create-directus-extension@latest
# Select:
# > endpoint
# > my-custom-api
# > typescript
Step 2: Implement Endpoint Logic
// src/index.ts
import { defineEndpoint } from '@directus/extensions-sdk';
import { Router } from 'express';
import Joi from 'joi';
export default defineEndpoint((router, context) => {
const { services, database, getSchema, env, logger, emitter } = context;
const { ItemsService, MailService, AssetsService } = services;
// Input validation schema
const createSchema = Joi.object({
title: Joi.string().required().min(3).max(255),
content: Joi.string().required(),
status: Joi.string().valid('draft', 'published').default('draft'),
tags: Joi.array().items(Joi.string()),
metadata: Joi.object(),
});
// GET /custom/analytics
router.get('/analytics', async (req, res, next) => {
try {
// Check authentication
if (!req.accountability?.user) {
return res.status(401).json({
error: 'Unauthorized',
});
}
const schema = await getSchema();
// Use Knex directly for complex queries
const results = await database
.select(
database.raw('DATE(created_at) as date'),
database.raw('COUNT(*) as count'),
database.raw('AVG(amount) as avg_amount')
)
.from('orders')
.where('status', 'completed')
.whereRaw('created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)')
.groupBy(database.raw('DATE(created_at)'))
.orderBy('date', 'desc');
// Transform results
const analytics = {
daily: results,
total: results.reduce((sum, day) => sum + day.count, 0),
average: results.reduce((sum, day) => sum + day.avg_amount, 0) / results.length,
period: '30_days',
};
return res.json({
data: analytics,
});
} catch (error) {
logger.error('Analytics endpoint error:', error);
return next(error);
}
});
// POST /custom/process
router.post('/process', async (req, res, next) => {
try {
// Validate input
const { error, value } = createSchema.validate(req.body);
if (error) {
return res.status(400).json({
error: 'Validation Error',
details: error.details,
});
}
// Create items service with user context
const itemsService = new ItemsService('articles', {
schema: await getSchema(),
accountability: req.accountability,
});
// Business logic transaction
const result = await database.transaction(async (trx) => {
// Create main item
const article = await itemsService.createOne({
...value,
author: req.accountability?.user,
}, { emitEvents: false });
// Create related items
if (value.tags && value.tags.length > 0) {
const tagsService = new ItemsService('article_tags', {
schema: await getSchema(),
accountability: req.accountability,
knex: trx,
});
await tagsService.createMany(
value.tags.map(tag => ({
article_id: article.id,
tag_name: tag,
}))
);
}
// Emit custom event
emitter.emitAction('custom.article.created', {
article,
user: req.accountability?.user,
});
return article;
});
// Send notification email
if (env.EMAIL_NOTIFICATIONS === 'true') {
const mailService = new MailService({ schema: await getSchema() });
await mailService.send({
to: env.ADMIN_EMAIL,
subject: `New Article: ${result.title}`,
html: `
<h2>New Article Published</h2>
<p><strong>Title:</strong> ${result.title}</p>
<p><strong>Author:</strong> ${req.accountability?.user}</p>
<p><strong>Status:</strong> ${result.status}</p>
`,
});
}
return res.status(201).json({
data: result,
});
} catch (error) {
logger.error('Process endpoint error:', error);
return next(error);
}
});
// DELETE /custom/cleanup
router.delete('/cleanup/:days', async (req, res, next) => {
try {
// Check admin permission
if (req.accountability?.role !== 'admin') {
return res.status(403).json({
error: 'Forbidden',
});
}
const days = parseInt(req.params.days, 10);
if (isNaN(days) || days < 1) {
return res.status(400).json({
error: 'Invalid days parameter',
});
}
// Perform cleanup
const deleted = await database('logs')
.where('created_at', '<', database.raw(`DATE_SUB(NOW(), INTERVAL ? DAY)`, [days]))
.delete();
logger.info(`Cleaned up ${deleted} old log entries`);
return res.json({
data: {
deleted,
message: `Removed ${deleted} log entries older than ${days} days`,
},
});
} catch (error) {
logger.error('Cleanup endpoint error:', error);
return next(error);
}
});
// WebSocket endpoint
router.get('/stream', (req, res) => {
// Server-Sent Events for real-time updates
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// Send initial connection
res.write('data: {"type":"connected"}\n\n');
// Listen for events
const handler = (data: any) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
emitter.onAction('items.create', handler);
emitter.onAction('items.update', handler);
// Cleanup on disconnect
req.on('close', () => {
emitter.offAction('items.create', handler);
emitter.offAction('items.update', handler);
});
});
});
Process: Implementing Hooks
Hook Types
- Filter Hooks - Modify data before database operations
- Action Hooks - React to events after they occur
- Init Hooks - Run during startup
- Schedule Hooks - Run on cron schedules
Step 1: Create Hook Extension
// src/index.ts
import { defineHook } from '@directus/extensions-sdk';
import { createHash } from 'crypto';
import axios from 'axios';
export default defineHook(({ filter, action, init, schedule }, context) => {
const { services, database, getSchema, env, logger } = context;
const { ItemsService, ActivityService } = services;
// Filter hook - runs before database operations
filter('items.create', async (payload, meta, context) => {
// Auto-generate slugs for articles
if (meta.collection === 'articles') {
if (!payload.slug && payload.title) {
payload.slug = generateSlug(payload.title);
}
// Add metadata
payload.word_count = countWords(payload.content || '');
payload.reading_time = Math.ceil(payload.word_count / 200);
// Generate excerpt if not provided
if (!payload.excerpt && payload.content) {
payload.excerpt = generateExcerpt(payload.content, 160);
}
// Validate uniqueness
const schema = await getSchema();
const articlesService = new ItemsService('articles', {
schema,
knex: database,
});
const existing = await articlesService.readByQuery({
filter: { slug: { _eq: payload.slug } },
limit: 1,
});
if (existing.length > 0) {
payload.slug = `${payload.slug}-${Date.now()}`;
}
}
return payload;
});
// Action hook - runs after database operations
action('items.create', async ({ payload, key, collection }, context) => {
try {
if (collection === 'orders') {
// Send to external API
if (env.EXTERNAL_API_URL) {
await axios.post(`${env.EXTERNAL_API_URL}/webhook/order`, {
order_id: key,
...payload,
}, {
headers: {
'X-API-Key': env.EXTERNAL_API_KEY,
},
});
}
// Create audit log
const schema = await getSchema();
const activityService = new ActivityService({
schema,
accountability: context.accountability,
});
await activityService.createOne({
action: 'create',
collection: 'orders',
item: key,
comment: `Order #${key} created with total: ${payload.total}`,
});
// Update statistics
await updateStatistics('orders', 'created');
}
} catch (error) {
logger.error('Order creation hook error:', error);
// Don't throw - let the main operation succeed
}
});
// Filter hook for updates
filter('items.update', async (payload, meta, context) => {
const { collection, keys } = meta;
if (collection === 'users') {
// Hash sensitive data
if (payload.password) {
payload.password = await hashPassword(payload.password);
}
// Track changes
payload.last_modified = new Date().toISOString();
payload.modified_by = context.accountability?.user;
// Validate email changes
if (payload.email) {
const isValid = validateEmail(payload.email);
if (!isValid) {
throw new Error('Invalid email format');
}
}
}
return payload;
});
// Action hook for deletions
action('items.delete', async ({ collection, payload }, context) => {
if (collection === 'files') {
// Clean up associated resources
for (const fileId of payload) {
await cleanupFileResources(fileId);
}
// Log deletion
logger.info(`Files deleted: ${payload.join(', ')}`);
}
});
// Init hook - runs on startup
init('app.before', async () => {
logger.info('Initializing custom hooks...');
// Verify database tables
await verifyCustomTables();
// Load configuration
await loadConfiguration();
// Register custom validators
registerValidators();
logger.info('Custom hooks initialized successfully');
});
// Schedule hook - runs on cron schedule
schedule('0 0 * * *', async () => {
// Daily cleanup task
logger.info('Running daily cleanup...');
const schema = await getSchema();
// Clean expired sessions
const deleted = await database('directus_sessions')
.where('expires', '<', new Date())
.delete();
logger.info(`Cleaned ${deleted} expired sessions`);
// Generate daily report
await generateDailyReport(schema);
// Update search index
await updateSearchIndex();
});
// Schedule hook - every 5 minutes
schedule('*/5 * * * *', async () => {
// Check system health
const health = await checkSystemHealth();
if (!health.healthy) {
logger.error('System health check failed:', health.issues);
// Send alert
await sendHealthAlert(health);
}
});
// Helper functions
function generateSlug(title: string): string {
return title
.toLowerCase()
.replace(/[^\w\s-]/g, '')
.replace(/[\s_-]+/g, '-')
.replace(/^-+|-+$/g, '');
}
function countWords(text: string): number {
return text.trim().split(/\s+/).length;
}
function generateExcerpt(content: string, maxLength: number): string {
const stripped = content.replace(/<[^>]*>/g, '');
if (stripped.length <= maxLength) return stripped;
return stripped.substring(0, maxLength).trim() + '...';
}
async function hashPassword(password: string): Promise<string> {
const hash = createHash('sha256');
hash.update(password + env.KEY);
return hash.digest('hex');
}
function validateEmail(email: string): boolean {
const regex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return regex.test(email);
}
async function cleanupFileResources(fileId: string): Promise<void> {
// Clean thumbnails
await database('directus_files')
.where('folder', fileId)
.delete();
// Clean CDN cache
if (env.CDN_URL) {
await axios.delete(`${env.CDN_URL}/purge/${fileId}`);
}
}
async function verifyCustomTables(): Promise<void> {
const requiredTables = ['custom_logs', 'custom_statistics'];
for (const table of requiredTables) {
const exists = await database.schema.hasTable(table);
if (!exists) {
logger.warn(`Creating missing table: ${table}`);
await database.schema.createTable(table, (t) => {
t.uuid('id').primary();
t.jsonb('data');
t.timestamps(true, true);
});
}
}
}
async function updateStatistics(type: string, action: string): Promise<void> {
await database('custom_statistics')
.insert({
id: database.raw('gen_random_uuid()'),
type,
action,
count: 1,
date: new Date(),
})
.onConflict(['type', 'action', database.raw('DATE(date)')])
.merge({
count: database.raw('custom_statistics.count + 1'),
});
}
async function checkSystemHealth(): Promise<any> {
const health = {
healthy: true,
issues: [],
metrics: {},
};
// Check database connection
try {
await database.raw('SELECT 1');
health.metrics.database = 'connected';
} catch (error) {
health.healthy = false;
health.issues.push('Database connection failed');
}
// Check memory usage
const memUsage = process.memoryUsage();
const heapUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024);
health.metrics.memory = `${heapUsedMB}MB`;
if (heapUsedMB > 512) {
health.healthy = false;
health.issues.push(`High memory usage: ${heapUsedMB}MB`);
}
return health;
}
});
Process: Building Flows and Operations
Step 1: Create Custom Operation
// src/index.ts
import { defineOperationApi } from '@directus/extensions-sdk';
type Options = {
collection: string;
batchSize: number;
operation: 'archive' | 'delete' | 'export';
conditions: Record<string, any>;
};
export default defineOperationApi<Options>({
id: 'batch-processor',
handler: async ({ collection, batchSize, operation, conditions }, context) => {
const { services, database, getSchema, logger, data } = context;
const { ItemsService } = services;
const schema = await getSchema();
const itemsService = new ItemsService(collection, {
schema,
accountability: {
role: 'admin', // Use admin context for operations
},
});
let processed = 0;
let hasMore = true;
const results = [];
while (hasMore) {
// Fetch batch
const items = await itemsService.readByQuery({
filter: conditions,
limit: batchSize,
offset: processed,
});
if (items.length === 0) {
hasMore = false;
break;
}
// Process batch based on operation
for (const item of items) {
try {
switch (operation) {
case 'archive':
await itemsService.updateOne(item.id, {
status: 'archived',
archived_at: new Date(),
});
results.push({ id: item.id, status: 'archived' });
break;
case 'delete':
await itemsService.deleteOne(item.id);
results.push({ id: item.id, status: 'deleted' });
break;
case 'export':
// Transform and prepare for export
const exportData = transformForExport(item);
results.push(exportData);
break;
}
processed++;
} catch (error) {
logger.error(`Failed to process item ${item.id}:`, error);
results.push({ id: item.id, status: 'error', error: error.message });
}
}
// Check if we've processed all items
if (items.length < batchSize) {
hasMore = false;
}
// Add delay to prevent overload
await new Promise(resolve => setTimeout(resolve, 100));
}
return {
processed,
results,
summary: {
total: processed,
successful: results.filter(r => r.status !== 'error').length,
failed: results.filter(r => r.status === 'error').length,
},
};
},
});
function transformForExport(item: any): any {
return {
id: item.id,
title: item.title || 'Untitled',
created: new Date(item.created_at).toISOString(),
data: JSON.stringify(item),
};
}
Step 2: Create Flow Configuration
// src/flow-app.ts
import { defineOperationApp } from '@directus/extensions-sdk';
export default defineOperationApp({
id: 'batch-processor',
name: 'Batch Processor',
icon: 'library_books',
description: 'Process items in batches with various operations',
overview: ({ collection, operation, batchSize }) => [
{
label: 'Collection',
text: collection,
},
{
label: 'Operation',
text: operation,
},
{
label: 'Batch Size',
text: String(batchSize),
},
],
options: [
{
field: 'collection',
name: 'Collection',
type: 'string',
meta: {
width: 'half',
interface: 'system-collection',
},
},
{
field: 'operation',
name: 'Operation',
type: 'string',
meta: {
width: 'half',
interface: 'select-dropdown',
options: {
choices: [
{ text: 'Archive', value: 'archive' },
{ text: 'Delete', value: 'delete' },
{ text: 'Export', value: 'export' },
],
},
},
},
{
field: 'batchSize',
name: 'Batch Size',
type: 'integer',
meta: {
width: 'half',
interface: 'input',
},
schema: {
default_value: 100,
},
},
{
field: 'conditions',
name: 'Filter Conditions',
type: 'json',
meta: {
width: 'full',
interface: 'input-code',
options: {
language: 'json',
},
},
},
],
});
Service Layer Architecture
Custom Service Implementation
// src/services/analytics.service.ts
import { BaseService } from '@directus/api/services';
import { Knex } from 'knex';
export class AnalyticsService extends BaseService {
private knex: Knex;
private tableName = 'analytics_events';
constructor(options: any) {
super(options);
this.knex = options.knex || options.database;
}
async trackEvent(event: {
category: string;
action: string;
label?: string;
value?: number;
userId?: string;
metadata?: Record<string, any>;
}): Promise<void> {
await this.knex(this.tableName).insert({
id: this.knex.raw('gen_random_uuid()'),
category: event.category,
action: event.action,
label: event.label,
value: event.value,
user_id: event.userId,
metadata: JSON.stringify(event.metadata || {}),
created_at: new Date(),
session_id: this.accountability?.session,
ip_address: this.accountability?.ip,
});
// Update aggregates
await this.updateAggregates(event);
}
async getMetrics(options: {
startDate: Date;
endDate: Date;
groupBy: 'hour' | 'day' | 'week' | 'month';
category?: string;
}): Promise<any[]> {
const query = this.knex(this.tableName)
.select(
this.knex.raw(`DATE_TRUNC('${options.groupBy}', created_at) as period`),
'category',
'action',
this.knex.raw('COUNT(*) as count'),
this.knex.raw('COUNT(DISTINCT user_id) as unique_users'),
this.knex.raw('AVG(value) as avg_value')
)
.whereBetween('created_at', [options.startDate, options.endDate])
.groupBy('period', 'category', 'action')
.orderBy('period', 'desc');
if (options.category) {
query.where('category', options.category);
}
return await query;
}
async getUserJourney(userId: string): Promise<any[]> {
return await this.knex(this.tableName)
.where('user_id', userId)
.orderBy('created_at', 'asc')
.select('category', 'action', 'label', 'created_at', 'metadata');
}
async getFunnelAnalysis(steps: string[]): Promise<any> {
const results = {};
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
const query = this.knex(this.tableName)
.countDistinct('user_id as users')
.where('action', step);
if (i > 0) {
// Only count users who completed previous steps
query.whereIn('user_id', function() {
this.select('user_id')
.from('analytics_events')
.where('action', steps[i - 1]);
});
}
const result = await query.first();
results[step] = result.users;
}
return results;
}
async getCohortAnalysis(cohortDate: Date): Promise<any> {
const cohortQuery = `
WITH cohort_users AS (
SELECT DISTINCT user_id
FROM ${this.tableName}
WHERE DATE(created_at) = DATE(?)
),
retention_data AS (
SELECT
DATE_PART('day', ae.created_at - ?) as days_since_cohort,
COUNT(DISTINCT ae.user_id) as retained_users
FROM ${this.tableName} ae
INNER JOIN cohort_users cu ON ae.user_id = cu.user_id
WHERE ae.created_at >= ?
GROUP BY days_since_cohort
)
SELECT * FROM retention_data
ORDER BY days_since_cohort
`;
return await this.knex.raw(cohortQuery, [cohortDate, cohortDate, cohortDate]);
}
private async updateAggregates(event: any): Promise<void> {
const date = new Date().toISOString().split('T')[0];
await this.knex('analytics_aggregates')
.insert({
id: this.knex.raw('gen_random_uuid()'),
date,
category: event.category,
action: event.action,
count: 1,
sum_value: event.value || 0,
})
.onConflict(['date', 'category', 'action'])
.merge({
count: this.knex.raw('analytics_aggregates.count + 1'),
sum_value: this.knex.raw('analytics_aggregates.sum_value + ?', [event.value || 0]),
});
}
async cleanupOldEvents(daysToKeep: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysToKeep);
return await this.knex(this.tableName)
.where('created_at', '<', cutoffDate)
.delete();
}
}
Database Operations
Advanced Query Patterns
// Complex database queries
import { Knex } from 'knex';
export class DatabaseOperations {
constructor(private database: Knex) {}
// Recursive CTE for hierarchical data
async getHierarchy(parentId: string | null): Promise<any[]> {
const query = `
WITH RECURSIVE category_tree AS (
-- Anchor: top-level categories
SELECT id, name, parent_id, 0 as level, ARRAY[id] as path
FROM categories
WHERE parent_id ${parentId ? '= ?' : 'IS NULL'}
UNION ALL
-- Recursive: child categories
SELECT c.id, c.name, c.parent_id, ct.level + 1, ct.path || c.id
FROM categories c
INNER JOIN category_tree ct ON c.parent_id = ct.id
WHERE ct.level < 10 -- Prevent infinite recursion
)
SELECT * FROM category_tree
ORDER BY path
`;
const bindings = parentId ? [parentId] : [];
const result = await this.database.raw(query, bindings);
return result.rows;
}
// Window functions for analytics
async getRunningTotals(startDate: Date, endDate: Date): Promise<any[]> {
const query = `
SELECT
date,
amount,
SUM(amount) OVER (ORDER BY date) as running_total,
AVG(amount) OVER (
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7d,
ROW_NUMBER() OVER (ORDER BY amount DESC) as rank_by_amount
FROM daily_sales
WHERE date BETWEEN ? AND ?
ORDER BY date
`;
const result = await this.database.raw(query, [startDate, endDate]);
return result.rows;
}
// Full-text search with ranking
async searchContent(searchTerm: string): Promise<any[]> {
const query = `
SELECT
id,
title,
content,
ts_rank(search_vector, query) as relevance,
ts_headline(
'english',
content,
query,
'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=20'
) as excerpt
FROM articles,
plainto_tsquery('english', ?) as query
WHERE search_vector @@ query
ORDER BY relevance DESC
LIMIT 20
`;
const result = await this.database.raw(query, [searchTerm]);
return result.rows;
}
// Batch upsert with conflict resolution
async batchUpsert(table: string, records: any[]): Promise<void> {
const chunkSize = 1000;
for (let i = 0; i < records.length; i += chunkSize) {
const chunk = records.slice(i, i + chunkSize);
await this.database(table)
.insert(chunk)
.onConflict('id')
.merge({
updated_at: this.database.fn.now(),
// Merge only changed fields
...chunk.reduce((acc, record) => {
Object.keys(record).forEach(key => {
if (key !== 'id' && key !== 'created_at') {
acc[key] = this.database.raw('EXCLUDED.' + key);
}
});
return acc;
}, {}),
});
}
}
// Optimized pagination with cursor
async getCursorPagination(options: {
table: string;
cursor?: string;
limit: number;
orderBy: string;
}): Promise<{ data: any[]; nextCursor: string | null }> {
let query = this.database(options.table)
.orderBy(options.orderBy)
.limit(options.limit + 1);
if (options.cursor) {
const decodedCursor = Buffer.from(options.cursor, 'base64').toString();
query = query.where(options.orderBy, '>', decodedCursor);
}
const results = await query;
const hasMore = results.length > options.limit;
const data = hasMore ? results.slice(0, -1) : results;
const nextCursor = hasMore
? Buffer.from(data[data.length - 1][options.orderBy]).toString('base64')
: null;
return { data, nextCursor };
}
}
Authentication & Permissions
Custom Authentication Provider
// src/auth-provider.ts
import jwt from 'jsonwebtoken';
import bcrypt from 'bcrypt';
import { BaseAuthProvider } from '@directus/api/auth';
export class CustomAuthProvider extends BaseAuthProvider {
async login(credentials: { email: string; password: string }) {
const user = await this.knex('directus_users')
.where('email', credentials.email)
.first();
if (!user) {
throw new Error('Invalid credentials');
}
const validPassword = await bcrypt.compare(
credentials.password,
user.password
);
if (!validPassword) {
// Log failed attempt
await this.logFailedAttempt(user.id);
throw new Error('Invalid credentials');
}
// Check if account is locked
if (user.status !== 'active') {
throw new Error('Account is not active');
}
// Generate tokens
const accessToken = this.generateAccessToken(user);
const refreshToken = await this.generateRefreshToken(user);
// Update last login
await this.knex('directus_users')
.where('id', user.id)
.update({
last_access: new Date(),
last_page: '/dashboard',
});
return {
accessToken,
refreshToken,
user: this.sanitizeUser(user),
};
}
async verify(token: string) {
try {
const payload = jwt.verify(token, this.secret) as any;
// Check if token is still valid in database
const session = await this.knex('directus_sessions')
.where('token', token)
.where('expires', '>', new Date())
.first();
if (!session) {
throw new Error('Session expired');
}
return {
id: payload.id,
role: payload.role,
app_access: payload.app_access,
};
} catch (error) {
throw new Error('Invalid token');
}
}
private generateAccessToken(user: any): string {
return jwt.sign(
{
id: user.id,
role: user.role,
app_access: user.app_access,
email: user.email,
},
this.secret,
{
expiresIn: '15m',
issuer: 'directus',
}
);
}
private async generateRefreshToken(user: any): Promise<string> {
const token = this.generateRandomToken();
await this.knex('directus_sessions').insert({
token,
user: user.id,
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000), // 7 days
ip: this.request?.ip,
user_agent: this.request?.headers['user-agent'],
});
return token;
}
private sanitizeUser(user: any): any {
const { password, ...sanitized } = user;
return sanitized;
}
}
Performance Optimization
Caching Strategy
// src/cache/cache.service.ts
import Redis from 'ioredis';
import { LRUCache } from 'lru-cache';
export class CacheService {
private redis: Redis;
private memoryCache: LRUCache<string, any>;
constructor() {
// Redis for distributed cache
this.redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
});
// Memory cache for hot data
this.memoryCache = new LRUCache({
max: 500,
ttl: 1000 * 60 * 5, // 5 minutes
});
}
async get(key: string): Promise<any | null> {
// Check memory cache first
const memResult = this.memoryCache.get(key);
if (memResult) return memResult;
// Check Redis
const redisResult = await this.redis.get(key);
if (redisResult) {
const data = JSON.parse(redisResult);
// Populate memory cache
this.memoryCache.set(key, data);
return data;
}
return null;
}
async set(key: string, value: any, ttl?: number): Promise<void> {
const serialized = JSON.stringify(value);
// Set in both caches
this.memoryCache.set(key, value);
if (ttl) {
await this.redis.setex(key, ttl, serialized);
} else {
await this.redis.set(key, serialized);
}
}
async invalidate(pattern: string): Promise<void> {
// Clear from memory cache
for (const key of this.memoryCache.keys()) {
if (key.includes(pattern)) {
this.memoryCache.delete(key);
}
}
// Clear from Redis
const keys = await this.redis.keys(`*${pattern}*`);
if (keys.length > 0) {
await this.redis.del(...keys);
}
}
// Cache wrapper for database queries
async remember<T>(
key: string,
ttl: number,
callback: () => Promise<T>
): Promise<T> {
const cached = await this.get(key);
if (cached) return cached;
const fresh = await callback();
await this.set(key, fresh, ttl);
return fresh;
}
}
Query Optimization
// src/optimization/query-optimizer.ts
export class QueryOptimizer {
constructor(private knex: Knex) {}
// Optimize N+1 queries with dataloader pattern
async batchLoadRelations<T>(
items: T[],
relation: string,
foreignKey: string
): Promise<Map<string, any[]>> {
const ids = items.map(item => item[foreignKey]);
const relations = await this.knex(relation)
.whereIn(foreignKey, ids)
.select();
// Group by foreign key
const grouped = new Map<string, any[]>();
for (const rel of relations) {
const key = rel[foreignKey];
if (!grouped.has(key)) {
grouped.set(key, []);
}
grouped.get(key)!.push(rel);
}
return grouped;
}
// Index hints for specific queries
async optimizedSearch(table: string, conditions: any): Promise<any[]> {
return await this.knex.raw(`
SELECT /*+ INDEX(${table} idx_search) */ *
FROM ${table}
WHERE ?
ORDER BY created_at DESC
LIMIT 100
`, [conditions]);
}
// Query result streaming for large datasets
streamLargeDataset(table: string, batchSize: number = 1000) {
return this.knex(table)
.select()
.stream({ highWaterMark: batchSize });
}
// Explain query execution plan
async explainQuery(query: Knex.QueryBuilder): Promise<any> {
const sql = query.toSQL();
const result = await this.knex.raw(`EXPLAIN ANALYZE ${sql.sql}`, sql.bindings);
return result.rows;
}
}
Testing Strategies
Integration Tests
// test/endpoints.test.ts
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import request from 'supertest';
import { createDirectus } from '@directus/sdk';
describe('Custom Endpoints', () => {
let app;
let authToken;
beforeAll(async () => {
app = await createTestApp();
authToken = await getAuthToken();
});
afterAll(async () => {
await cleanupTestData();
});
describe('POST /custom/process', () => {
it('should create article with valid data', async () => {
const response = await request(app)
.post('/custom/process')
.set('Authorization', `Bearer ${authToken}`)
.send({
title: 'Test Article',
content: 'Test content',
status: 'draft',
tags: ['test', 'api'],
});
expect(response.status).toBe(201);
expect(response.body.data).toHaveProperty('id');
expect(response.body.data.title).toBe('Test Article');
});
it('should validate input data', async () => {
const response = await request(app)
.post('/custom/process')
.set('Authorization', `Bearer ${authToken}`)
.send({
// Missing required fields
status: 'invalid',
});
expect(response.status).toBe(400);
expect(response.body.error).toBe('Validation Error');
});
it('should handle unauthorized requests', async () => {
const response = await request(app)
.post('/custom/process')
.send({ title: 'Test' });
expect(response.status).toBe(401);
});
});
describe('GET /custom/analytics', () => {
it('should return analytics data', async () => {
const response = await request(app)
.get('/custom/analytics')
.set('Authorization', `Bearer ${authToken}`);
expect(response.status).toBe(200);
expect(response.body.data).toHaveProperty('daily');
expect(response.body.data).toHaveProperty('total');
expect(response.body.data).toHaveProperty('average');
});
it('should respect date filters', async () => {
const response = await request(app)
.get('/custom/analytics')
.query({
start_date: '2024-01-01',
end_date: '2024-01-31',
})
.set('Authorization', `Bearer ${authToken}`);
expect(response.status).toBe(200);
expect(response.body.data.period).toBe('30_days');
});
});
});
Migration Scripts
Database Migrations
// migrations/001_create_analytics_tables.ts
import { Knex } from 'knex';
export async function up(knex: Knex): Promise<void> {
// Create analytics events table
await knex.schema.createTable('analytics_events', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table.string('category', 50).notNullable();
table.string('action', 50).notNullable();
table.string('label', 100);
table.decimal('value', 10, 2);
table.uuid('user_id').references('id').inTable('directus_users');
table.jsonb('metadata');
table.string('session_id', 100);
table.string('ip_address', 45);
table.timestamp('created_at').defaultTo(knex.fn.now());
// Indexes for performance
table.index(['category', 'action']);
table.index('user_id');
table.index('created_at');
table.index(['category', 'created_at']);
});
// Create aggregates table
await knex.schema.createTable('analytics_aggregates', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table.date('date').notNullable();
table.string('category', 50).notNullable();
table.string('action', 50).notNullable();
table.integer('count').defaultTo(0);
table.decimal('sum_value', 12, 2).defaultTo(0);
table.integer('unique_users').defaultTo(0);
table.timestamps(true, true);
// Composite unique constraint
table.unique(['date', 'category', 'action']);
table.index('date');
});
// Add trigger for auto-aggregation
await knex.raw(`
CREATE OR REPLACE FUNCTION update_analytics_aggregates()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO analytics_aggregates (date, category, action, count, sum_value)
VALUES (DATE(NEW.created_at), NEW.category, NEW.action, 1, COALESCE(NEW.value, 0))
ON CONFLICT (date, category, action)
DO UPDATE SET
count = analytics_aggregates.count + 1,
sum_value = analytics_aggregates.sum_value + COALESCE(NEW.value, 0);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER analytics_events_aggregate
AFTER INSERT ON analytics_events
FOR EACH ROW
EXECUTE FUNCTION update_analytics_aggregates();
`);
}
export async function down(knex: Knex): Promise<void> {
await knex.raw('DROP TRIGGER IF EXISTS analytics_events_aggregate ON analytics_events');
await knex.raw('DROP FUNCTION IF EXISTS update_analytics_aggregates()');
await knex.schema.dropTableIfExists('analytics_aggregates');
await knex.schema.dropTableIfExists('analytics_events');
}
Monitoring & Logging
Custom Logger
// src/logger/custom-logger.ts
import winston from 'winston';
import { ElasticsearchTransport } from 'winston-elasticsearch';
export class CustomLogger {
private logger: winston.Logger;
constructor() {
this.logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: {
service: 'directus-backend',
environment: process.env.NODE_ENV,
},
transports: [
// Console transport
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
),
}),
// File transport
new winston.transports.File({
filename: 'logs/error.log',
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5,
}),
new winston.transports.File({
filename: 'logs/combined.log',
maxsize: 5242880,
maxFiles: 5,
}),
// Elasticsearch transport for centralized logging
new ElasticsearchTransport({
level: 'info',
clientOpts: {
node: process.env.ELASTICSEARCH_URL || 'http://localhost:9200',
},
index: 'directus-logs',
}),
],
});
}
info(message: string, meta?: any): void {
this.logger.info(message, meta);
}
error(message: string, error?: Error, meta?: any): void {
this.logger.error(message, {
...meta,
error: {
message: error?.message,
stack: error?.stack,
name: error?.name,
},
});
}
warn(message: string, meta?: any): void {
this.logger.warn(message, meta);
}
debug(message: string, meta?: any): void {
this.logger.debug(message, meta);
}
// Performance logging
logPerformance(operation: string, duration: number, meta?: any): void {
this.logger.info(`Performance: ${operation}`, {
...meta,
duration_ms: duration,
slow: duration > 1000,
});
}
// Audit logging
logAudit(action: string, userId: string, details: any): void {
this.logger.info(`Audit: ${action}`, {
audit: true,
user_id: userId,
action,
details,
timestamp: new Date().toISOString(),
});
}
}
Success Metrics
- ✅ API endpoints respond within 200ms for 95% of requests
- ✅ Database queries optimized with proper indexing
- ✅ Hooks execute without blocking main operations
- ✅ Flows process batches efficiently without memory leaks
- ✅ Authentication system secure with proper token management
- ✅ Caching reduces database load by 60%+
- ✅ Error handling prevents data corruption
- ✅ Logging provides complete audit trail
- ✅ Tests achieve 80%+ code coverage
- ✅ Migrations run without data loss
Resources
- Directus API Documentation
- Directus Extensions SDK
- Knex.js Query Builder
- Express.js Middleware
- Node.js Best Practices
- TypeScript Handbook
Version History
- 1.0.0 - Initial release with comprehensive backend architecture patterns