| name | dataflow |
| description | Kailash DataFlow - zero-config database framework with automatic model-to-node generation. Use when asking about 'database operations', 'DataFlow', 'database models', 'CRUD operations', 'bulk operations', 'database queries', 'database migrations', 'multi-tenancy', 'multi-instance', 'database transactions', 'PostgreSQL', 'MySQL', 'SQLite', 'MongoDB', 'pgvector', 'vector search', 'document database', 'RAG', 'semantic search', 'existing database', 'database performance', 'database deployment', 'database testing', or 'TDD with databases'. DataFlow is NOT an ORM - it generates 11 workflow nodes per SQL model, 8 nodes for MongoDB, and 3 nodes for vector operations. |
Kailash DataFlow - Zero-Config Database Framework
DataFlow is a zero-config database framework built on Kailash Core SDK that automatically generates workflow nodes from database models.
Overview
DataFlow transforms database models into workflow nodes automatically, providing:
- Automatic Node Generation: 11 nodes per model (@db.model decorator)
- Multi-Database Support: PostgreSQL, MySQL, SQLite (SQL) + MongoDB (Document) + pgvector (Vector Search)
- Enterprise Features: Multi-tenancy, multi-instance isolation, transactions
- Zero Configuration: String IDs preserved, deferred schema operations
- Integration Ready: Works with Nexus for multi-channel deployment
- Specialized Adapters: SQL (11 nodes/model), Document (8 nodes), Vector (3 nodes)
⚠️ Critical Updates & Bug Fixes
v0.7.11 Bulk Operations Parameter Handling (LATEST - 2025-10-31)
Bug Fix:
- ✅ Parameter Conflict Resolution: Fixed
TypeError: got multiple values for keyword argument 'model_name'in all 4 bulk operations when workflows have global input parameters
What Was Fixed:
Bulk operations (BulkCreate, BulkUpdate, BulkDelete, BulkUpsert) now correctly filter model_name and db_instance from kwargs before passing to internal methods, preventing parameter conflicts when global workflow inputs are present.
Impact:
- All bulk operations work correctly with Nexus/AsyncLocalRuntime global parameters
- No breaking changes - existing workflows continue working unchanged
Upgrade Command:
pip install --upgrade kailash-dataflow>=0.7.11
v0.7.9 CountNode + PostgreSQL ARRAY + Auto-Query Caching (2025-10-30)
New Features:
- ✅ CountNode: 11th auto-generated node for efficient COUNT(*) queries (10-50x faster than ListNode)
- ✅ PostgreSQL Native Arrays: TEXT[], INTEGER[], REAL[] support with 2-10x performance gain
- ✅ Auto-Query Caching: Redis auto-detection with in-memory LRU fallback for 5-10x throughput
CountNode Usage:
workflow.add_node("UserCountNode", "count_users", {"filter": {"active": True}})
# Returns: {"count": 42} in 1-5ms vs 20-50ms with ListNode
PostgreSQL ARRAY Usage:
@db.model
class AgentMemory:
tags: List[str] # Becomes TEXT[] on PostgreSQL
__dataflow__ = {'use_native_arrays': True} # Opt-in
Auto-Query Caching:
- Redis auto-detection on startup
- Automatic in-memory LRU fallback if Redis unavailable
- 5-10x throughput improvement for repeated queries
Upgrade Command:
pip install --upgrade kailash-dataflow>=0.7.9
v0.7.3 Schema Cache + Migration Fixes (2025-10-26)
Performance Improvement:
- ✅ Schema Cache: Thread-safe table existence cache for 91-99% performance improvement
- ✅ Cache Metrics: Observable metrics for monitoring cache performance
- ✅ Automatic Management: Configurable TTL, size limits, LRU eviction
Bug Fixes:
- ✅ Async-Safe Migration: Fixed migration recording in FastAPI/async contexts
- ✅ Error Messages: Enhanced error messages with contextual help
v0.7.0 Bulk Operations Fixes (2025-10-24)
8 Critical bugs fixed in bulk operations:
- BUG-001: BulkUpsertNode silent INSERT failure (CRITICAL) - Fixed in v0.7.0
- BUG-002: Parameter serialization (conflict_fields) - Fixed in v0.7.0
- BUG-003: BulkCreateNode count reporting - Fixed in v0.7.0
- BUG-004: BulkUpsertNode UPDATE not working - Fixed in v0.7.0
- BUG-005: BulkDeleteNode $in operator not converting to SQL IN - Fixed in v0.7.0
- BUG-006: BulkUpdateNode $in operator not converting to SQL IN - Fixed in v0.7.0
- BUG-007: Empty $in list causes SQL syntax error - Fixed in v0.7.0
- BUG-008: Empty $nin list not handled - Fixed in v0.7.0
Key Fixes:
- ✅ UPDATE Operations: BulkUpsertNode now correctly updates existing records using PostgreSQL
xmaxdetection - ✅ MongoDB Operators: All bulk operations support
$in,$nin,$gt,$gte,$lt,$lte,$ne - ✅ Empty List Handling:
{"id": {"$in": []}}now works correctly (matches nothing) - ✅ Code Quality: 160 lines eliminated via shared helper function
Test Coverage: 57/57 tests passing (100%)
Upgrade Command:
pip install --upgrade kailash-dataflow>=0.7.0
v0.6.2-v0.6.3 Truthiness Bug Pattern (FIXED)
Two critical bugs caused by Python truthiness checks on empty dicts:
v0.6.2 - ListNode Filter Operators:
- Bug:
if filter_dict:at nodes.py:1810 evaluated to False for empty dict {} - Impact: ALL MongoDB-style filter operators ($ne, $nin, $in, $not) were broken
- Fix: Changed to
if "filter" in kwargs: - Result: All filter operators now work correctly
v0.6.3 - BulkDeleteNode Safe Mode:
- Bug:
not filter_conditionsat bulk_delete.py:177 evaluated to True for empty dict {} - Impact: Safe mode incorrectly rejected valid empty filter operations
- Fix: Changed to
"filter" not in validated_inputs - Result: Consistent validation logic
Pattern to Avoid
❌ NEVER use truthiness checks on filter/data parameters:
if filter_dict: # BAD - empty dict {} is falsy!
if not filter_dict: # BAD - empty dict {} is falsy!
✅ ALWAYS use key existence checks:
if "filter" in kwargs: # GOOD
if "filter" not in validated_inputs: # GOOD
Affected Versions
- ❌ v0.5.4 - v0.6.1: Broken filter operators
- ✅ v0.6.2+: All filter operators work correctly
- ✅ v0.6.3+: BulkDelete safe mode fixed
- ✅ v0.7.0+: All bulk operations fully functional with MongoDB operators
Quick Start
from dataflow import DataFlow
from kailash.workflow.builder import WorkflowBuilder
from kailash.runtime.local import LocalRuntime
# Initialize DataFlow
db = DataFlow(connection_string="postgresql://user:pass@localhost/db")
# Define model (generates 11 nodes automatically)
@db.model
class User:
id: str # String IDs preserved
name: str
email: str
# Use generated nodes in workflows
workflow = WorkflowBuilder()
workflow.add_node("User_Create", "create_user", {
"data": {"name": "John", "email": "john@example.com"}
})
# Execute
runtime = LocalRuntime()
results, run_id = runtime.execute(workflow.build())
user_id = results["create_user"]["result"] # Access pattern
Reference Documentation
Getting Started
- dataflow-quickstart - Quick start guide and core concepts
- dataflow-installation - Installation and setup
- dataflow-models - Defining models with @db.model decorator
- dataflow-connection-config - Database connection configuration
Core Operations
- dataflow-crud-operations - Create, Read, Update, Delete operations
- dataflow-queries - Query patterns and filtering
- dataflow-bulk-operations - Batch operations for performance
- dataflow-transactions - Transaction management
- dataflow-connection-isolation - ⚠️ CRITICAL: Connection isolation and ACID guarantees
- dataflow-result-access - Accessing results from nodes
Advanced Features
- dataflow-multi-instance - Multiple database instances
- dataflow-multi-tenancy - Multi-tenant architectures
- dataflow-existing-database - Working with existing databases
- dataflow-migrations-quick - Database migrations
- dataflow-custom-nodes - Creating custom database nodes
- dataflow-performance - Performance optimization
Integration & Deployment
- dataflow-nexus-integration - Deploying with Nexus platform
- dataflow-deployment - Production deployment patterns
- dataflow-dialects - Supported database dialects
- dataflow-monitoring - Monitoring and observability
Testing & Quality
- dataflow-tdd-mode - Test-driven development with DataFlow
- dataflow-tdd-api - Testing API for DataFlow
- dataflow-tdd-best-practices - Testing best practices
- dataflow-compliance - Compliance and standards
Troubleshooting
- dataflow-gotchas - Common pitfalls and solutions
Key Concepts
Not an ORM
DataFlow is NOT an ORM. It's a workflow framework that:
- Generates workflow nodes from models
- Operates within Kailash's workflow execution model
- Uses string-based result access patterns
- Integrates seamlessly with other workflow nodes
Automatic Node Generation
Each @db.model class generates 11 nodes:
{Model}_Create- Create single record{Model}_Read- Read by ID{Model}_Update- Update record{Model}_Delete- Delete record{Model}_List- List with filters{Model}_Upsert- Insert or update (atomic){Model}_Count- Efficient COUNT(*) queries{Model}_BulkCreate- Bulk insert{Model}_BulkUpdate- Bulk update{Model}_BulkDelete- Bulk delete{Model}_BulkUpsert- Bulk upsert
Critical Rules
- ✅ String IDs preserved (no UUID conversion)
- ✅ Deferred schema operations (safe for Docker/FastAPI)
- ✅ Multi-instance isolation (one DataFlow per database)
- ✅ Result access:
results["node_id"]["result"] - ❌ NEVER use direct SQL when DataFlow nodes exist
- ❌ NEVER use SQLAlchemy/Django ORM alongside DataFlow
Database Support
- SQL Databases: PostgreSQL, MySQL, SQLite (11 nodes per @db.model)
- Document Database: MongoDB with flexible schema (8 specialized nodes)
- Vector Search: PostgreSQL pgvector for RAG/AI (3 vector nodes)
- 100% Feature Parity: SQL databases support identical workflows
When to Use This Skill
Use DataFlow when you need to:
- Perform database operations in workflows
- Generate CRUD APIs automatically (with Nexus)
- Implement multi-tenant systems
- Work with existing databases
- Build database-first applications
- Handle bulk data operations
- Implement enterprise data management
Integration Patterns
With Nexus (Multi-Channel)
from dataflow import DataFlow
from nexus import Nexus
db = DataFlow(connection_string="...")
@db.model
class User:
id: str
name: str
# Auto-generates API + CLI + MCP
nexus = Nexus(db.get_workflows())
nexus.run() # Instant multi-channel platform
With Core SDK (Custom Workflows)
from dataflow import DataFlow
from kailash.workflow.builder import WorkflowBuilder
db = DataFlow(connection_string="...")
# Use db-generated nodes in custom workflows
workflow = WorkflowBuilder()
workflow.add_node("User_Create", "user1", {...})
Version Compatibility
- Current Version: 0.7.11 (Bulk operations parameter handling fix)
- Core SDK Version: 0.9.25+
- Python: 3.8+
- v0.7.11: Bulk operations parameter conflict fix (model_name/db_instance filtering)
- v0.7.9: CountNode (11th node) + PostgreSQL native arrays + auto-query caching
- v0.7.3: Schema cache (91-99% faster) + async-safe migrations
- v0.7.0: Bulk operations fixes (8 critical bugs)
- v0.6.3: BulkDeleteNode safe mode validation fix
- v0.6.2: ListNode filter operators fix ($ne, $nin, $in, $not)
- v0.6.0: MongoDB document database + PostgreSQL pgvector support
- Architecture: BaseAdapter hierarchy with SQL, Document, and Vector adapters
Multi-Database Support Matrix
SQL Databases (DatabaseAdapter)
- PostgreSQL: Full support with advanced features (asyncpg driver, pgvector extension, native arrays)
- MySQL: Full support with 100% feature parity (aiomysql driver)
- SQLite: Full support for development/testing/mobile (aiosqlite + custom pooling)
- Nodes Generated: 11 per @db.model (Create, Read, Update, Delete, List, Upsert, Count, BulkCreate, BulkUpdate, BulkDelete, BulkUpsert)
Document Databases (MongoDBAdapter)
- MongoDB: Complete NoSQL support (Motor async driver)
- Features: Flexible schema, aggregation pipelines, text search, geospatial queries
- Workflow Nodes: 8 specialized nodes (DocumentInsert, DocumentFind, DocumentUpdate, DocumentDelete, BulkDocumentInsert, Aggregate, CreateIndex, DocumentCount)
- Use Cases: E-commerce catalogs, content management, user profiles, event logs
Vector Databases (PostgreSQLVectorAdapter)
- PostgreSQL pgvector: Semantic similarity search for RAG/AI (pgvector extension)
- Features: Cosine/L2/inner product distance, HNSW/IVFFlat indexes
- Workflow Nodes: 3 vector nodes (VectorSearch, VectorInsert, VectorUpdate)
- Use Cases: RAG applications, semantic search, recommendation engines
Architecture
- BaseAdapter: Minimal interface for all adapter types (adapter_type, database_type, health_check)
- DatabaseAdapter: SQL-specific (inherits BaseAdapter)
- MongoDBAdapter: Document database (inherits BaseAdapter)
- PostgreSQLVectorAdapter: Vector operations (inherits DatabaseAdapter)
Planned Extensions
- TimescaleDB: Time-series data optimization (PostgreSQL extension)
- Qdrant/Milvus: Dedicated vector databases with advanced filtering
- Redis: Caching and key-value operations
- Neo4j: Graph database with Cypher queries
Related Skills
- 01-core-sdk - Core workflow patterns
- 03-nexus - Multi-channel deployment
- 04-kaizen - AI agent integration
- 17-gold-standards - Best practices
Support
For DataFlow-specific questions, invoke:
dataflow-specialist- DataFlow implementation and patternstesting-specialist- DataFlow testing strategies (NO MOCKING policy)framework-advisor- Choose between Core SDK and DataFlow