| name | file-upload-handling |
| description | Implement secure file upload handling with validation, virus scanning, storage management, and serving files efficiently. Use when building file upload features, managing file storage, and implementing file download systems. |
File Upload Handling
Overview
Build secure and robust file upload systems with validation, sanitization, virus scanning, efficient storage management, CDN integration, and proper file serving mechanisms across different backend frameworks.
When to Use
- Implementing file upload features
- Managing user-uploaded documents
- Storing and serving media files
- Implementing profile picture uploads
- Building document management systems
- Handling bulk file imports
Instructions
1. Python/Flask File Upload
# config.py
import os
class Config:
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50 MB
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'docx', 'doc'}
UPLOAD_DIRECTORY = os.path.join(os.path.dirname(__file__), UPLOAD_FOLDER)
# file_service.py
import os
import mimetypes
import hashlib
import secrets
from werkzeug.utils import secure_filename
from datetime import datetime
import magic
import aiofiles
class FileUploadService:
def __init__(self, upload_dir, allowed_extensions, max_size=50*1024*1024):
self.upload_dir = upload_dir
self.allowed_extensions = allowed_extensions
self.max_size = max_size
self.mime = magic.Magic(mime=True)
def validate_file(self, file):
"""Validate uploaded file"""
errors = []
# Check filename
if not file.filename:
errors.append('No filename provided')
# Check file extension
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
if ext not in self.allowed_extensions:
errors.append(f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}')
# Check file size
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > self.max_size:
errors.append(f'File too large. Max size: {self.max_size / 1024 / 1024}MB')
# Check MIME type
file_content = file.read(8192)
file.seek(0)
detected_mime = self.mime.from_buffer(file_content)
if not self._is_valid_mime(detected_mime, ext):
errors.append('File MIME type does not match extension')
return errors
def _is_valid_mime(self, mime_type, ext):
"""Verify MIME type matches extension"""
allowed_mimes = {
'txt': ['text/plain'],
'pdf': ['application/pdf'],
'png': ['image/png'],
'jpg': ['image/jpeg'],
'jpeg': ['image/jpeg'],
'gif': ['image/gif'],
'doc': ['application/msword'],
'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document']
}
return mime_type in allowed_mimes.get(ext, [])
def save_file(self, file, user_id):
"""Save uploaded file with sanitization"""
errors = self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
safe_filename = f"{file_hash}.{ext}"
# Create user-specific directory
user_upload_dir = os.path.join(self.upload_dir, user_id)
os.makedirs(user_upload_dir, exist_ok=True)
filepath = os.path.join(user_upload_dir, safe_filename)
try:
file.save(filepath)
file_info = {
'id': file_hash,
'original_name': filename,
'safe_name': safe_filename,
'size': os.path.getsize(filepath),
'user_id': user_id,
'uploaded_at': datetime.utcnow().isoformat(),
'mime_type': self.mime.from_file(filepath)
}
return {'success': True, 'file': file_info}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, user_id, file_id):
"""Delete file safely"""
filepath = os.path.join(self.upload_dir, user_id, f"{file_id}.*")
import glob
files = glob.glob(filepath)
for f in files:
try:
os.remove(f)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
return {'success': False, 'error': 'File not found'}
# routes.py
from flask import request, jsonify, send_file, safe_join
from functools import wraps
import os
file_service = FileUploadService(
app.config['UPLOAD_DIRECTORY'],
app.config['ALLOWED_EXTENSIONS']
)
@app.route('/api/upload', methods=['POST'])
@token_required
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
result = file_service.save_file(file, current_user.id)
if result['success']:
# Save metadata to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user.id,
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.session.add(file_record)
db.session.commit()
return jsonify(result['file']), 201
else:
return jsonify({'errors': result.get('errors') or [result['error']]}), 400
@app.route('/api/files/<file_id>', methods=['GET'])
@token_required
def download_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Construct safe file path
filepath = safe_join(
app.config['UPLOAD_DIRECTORY'],
current_user.id,
file_record.file_id + '.' + file_record.original_name.rsplit('.', 1)[1]
)
if filepath and os.path.exists(filepath):
return send_file(
filepath,
mimetype=file_record.mime_type,
as_attachment=True,
download_name=file_record.original_name
)
return jsonify({'error': 'File not found'}), 404
@app.route('/api/files/<file_id>', methods=['DELETE'])
@token_required
def delete_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
result = file_service.delete_file(current_user.id, file_id)
if result['success']:
db.session.delete(file_record)
db.session.commit()
return '', 204
else:
return jsonify({'error': result['error']}), 500
2. Node.js Express File Upload with Multer
// config.js
const multer = require('multer');
const path = require('path');
const crypto = require('crypto');
const fs = require('fs');
const storage = multer.diskStorage({
destination: (req, file, cb) => {
const uploadDir = path.join(__dirname, 'uploads', req.user.id);
fs.mkdirSync(uploadDir, { recursive: true });
cb(null, uploadDir);
},
filename: (req, file, cb) => {
const hash = crypto.randomBytes(16).toString('hex');
const ext = path.extname(file.originalname);
cb(null, hash + ext);
}
});
const fileFilter = (req, file, cb) => {
const allowedMimes = [
'image/jpeg',
'image/png',
'image/gif',
'application/pdf',
'text/plain'
];
const allowedExts = ['.pdf', '.txt', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'];
const ext = path.extname(file.originalname).toLowerCase();
if (!allowedMimes.includes(file.mimetype) || !allowedExts.includes(ext)) {
return cb(new Error('Invalid file type'));
}
cb(null, true);
};
const upload = multer({
storage: storage,
fileFilter: fileFilter,
limits: {
fileSize: 50 * 1024 * 1024 // 50 MB
}
});
module.exports = upload;
// file-service.js
const fs = require('fs').promises;
const path = require('path');
const FileRecord = require('../models/FileRecord');
class FileService {
async uploadFile(req) {
if (!req.file) {
throw new Error('No file provided');
}
const fileInfo = {
id: path.basename(req.file.filename, path.extname(req.file.filename)),
originalName: req.file.originalname,
safeName: req.file.filename,
size: req.file.size,
mimeType: req.file.mimetype,
userId: req.user.id,
uploadedAt: new Date()
};
// Save to database
const record = await FileRecord.create(fileInfo);
return record;
}
async downloadFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
return { record, filepath };
}
async deleteFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
await fs.unlink(filepath);
await record.destroy();
return { success: true };
}
async listUserFiles(userId, limit = 20, offset = 0) {
const { rows, count } = await FileRecord.findAndCountAll({
where: { userId },
limit,
offset,
order: [['uploadedAt', 'DESC']]
});
return { files: rows, total: count };
}
}
module.exports = new FileService();
// routes.js
const express = require('express');
const upload = require('../config/multer');
const fileService = require('../services/file-service');
const { authenticate } = require('../middleware/auth');
const router = express.Router();
router.post('/upload', authenticate, upload.single('file'), async (req, res, next) => {
try {
const file = await fileService.uploadFile(req);
res.status(201).json(file);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
router.get('/files/:fileId', authenticate, async (req, res, next) => {
try {
const { record, filepath } = await fileService.downloadFile(
req.params.fileId,
req.user.id
);
res.download(filepath, record.originalName);
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.delete('/files/:fileId', authenticate, async (req, res, next) => {
try {
await fileService.deleteFile(req.params.fileId, req.user.id);
res.status(204).send();
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.get('/files', authenticate, async (req, res, next) => {
try {
const page = parseInt(req.query.page) || 1;
const limit = parseInt(req.query.limit) || 20;
const offset = (page - 1) * limit;
const { files, total } = await fileService.listUserFiles(
req.user.id,
limit,
offset
);
res.json({
data: files,
pagination: { page, limit, total, pages: Math.ceil(total / limit) }
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
module.exports = router;
3. FastAPI File Upload
# main.py
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.responses import FileResponse
import aiofiles
import os
import hashlib
import secrets
from pathlib import Path
app = FastAPI()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 50 * 1024 * 1024
ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'}
class FileUploadService:
async def validate_file(self, file: UploadFile) -> list:
"""Validate uploaded file"""
errors = []
# Check extension
file_ext = Path(file.filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
errors.append(f'File type not allowed')
# Check file size
content = await file.read()
await file.seek(0)
if len(content) > MAX_FILE_SIZE:
errors.append(f'File too large')
return errors
async def save_file(self, file: UploadFile, user_id: str):
"""Save uploaded file"""
errors = await self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
file_ext = Path(file.filename).suffix
safe_filename = f"{file_hash}{file_ext}"
# Create user directory
user_dir = UPLOAD_DIR / user_id
user_dir.mkdir(exist_ok=True)
filepath = user_dir / safe_filename
try:
content = await file.read()
async with aiofiles.open(filepath, 'wb') as f:
await f.write(content)
return {
'success': True,
'file': {
'id': file_hash,
'original_name': file.filename,
'safe_name': safe_filename,
'size': len(content),
'mime_type': file.content_type
}
}
except Exception as e:
return {'success': False, 'error': str(e)}
file_service = FileUploadService()
@app.post("/api/upload")
async def upload_file(
file: UploadFile = File(...),
current_user: dict = Depends(get_current_user)
):
"""Upload file"""
result = await file_service.save_file(file, current_user['id'])
if result['success']:
# Save to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user['id'],
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.add(file_record)
await db.commit()
return result['file']
else:
raise HTTPException(status_code=400, detail=result.get('errors'))
@app.get("/api/files/{file_id}")
async def download_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Download file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
return FileResponse(
path=filepath,
media_type=file_record.mime_type,
filename=file_record.original_name
)
@app.delete("/api/files/{file_id}")
async def delete_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Delete file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
try:
Path(filepath).unlink()
await db.delete(file_record)
await db.commit()
return {'success': True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
4. S3/Cloud Storage Integration
# s3_service.py
import boto3
from datetime import timedelta
import os
class S3FileService:
def __init__(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=os.getenv('AWS_REGION')
)
self.bucket_name = os.getenv('S3_BUCKET_NAME')
def upload_file(self, file, user_id, file_key):
"""Upload file to S3"""
try:
self.s3_client.upload_fileobj(
file,
self.bucket_name,
file_key,
ExtraArgs={
'ContentType': file.content_type,
'Metadata': {'user_id': user_id}
}
)
return {'success': True, 'key': file_key}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_signed_url(self, file_key, expires_in=3600):
"""Generate signed URL for download"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': file_key},
ExpiresIn=expires_in
)
return {'success': True, 'url': url}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, file_key):
"""Delete file from S3"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=file_key)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
Best Practices
✅ DO
- Validate file extensions and MIME types
- Check file size before processing
- Use secure filenames to prevent directory traversal
- Store files outside web root
- Implement virus scanning
- Use CDN for file delivery
- Generate signed URLs for direct access
- Log file upload/download events
- Implement access control checks
- Clean up temporary files
❌ DON'T
- Trust user-provided filenames
- Store files in web-accessible directories
- Allow arbitrary file types
- Skip virus scanning for uploaded files
- Expose absolute file paths
- Allow unlimited file sizes
- Ignore access control
- Use predictable file paths
- Store sensitive metadata in filenames
- Forget to validate file content
Complete Example
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
if file.size > 10 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large")
allowed = ['.pdf', '.txt', '.jpg']
ext = Path(file.filename).suffix
if ext not in allowed:
raise HTTPException(status_code=400, detail="Invalid file type")
filename = f"{uuid4()}{ext}"
async with aiofiles.open(f"uploads/{filename}", 'wb') as f:
await f.write(await file.read())
return {"filename": filename}