Claude Code Plugins

Community-maintained marketplace

Feedback

file-upload-handling

@aj-geddes/useful-ai-prompts
4
0

Implement secure file upload handling with validation, virus scanning, storage management, and serving files efficiently. Use when building file upload features, managing file storage, and implementing file download systems.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name file-upload-handling
description Implement secure file upload handling with validation, virus scanning, storage management, and serving files efficiently. Use when building file upload features, managing file storage, and implementing file download systems.

File Upload Handling

Overview

Build secure and robust file upload systems with validation, sanitization, virus scanning, efficient storage management, CDN integration, and proper file serving mechanisms across different backend frameworks.

When to Use

  • Implementing file upload features
  • Managing user-uploaded documents
  • Storing and serving media files
  • Implementing profile picture uploads
  • Building document management systems
  • Handling bulk file imports

Instructions

1. Python/Flask File Upload

# config.py
import os

class Config:
    MAX_CONTENT_LENGTH = 50 * 1024 * 1024  # 50 MB
    UPLOAD_FOLDER = 'uploads'
    ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'docx', 'doc'}
    UPLOAD_DIRECTORY = os.path.join(os.path.dirname(__file__), UPLOAD_FOLDER)

# file_service.py
import os
import mimetypes
import hashlib
import secrets
from werkzeug.utils import secure_filename
from datetime import datetime
import magic
import aiofiles

class FileUploadService:
    def __init__(self, upload_dir, allowed_extensions, max_size=50*1024*1024):
        self.upload_dir = upload_dir
        self.allowed_extensions = allowed_extensions
        self.max_size = max_size
        self.mime = magic.Magic(mime=True)

    def validate_file(self, file):
        """Validate uploaded file"""
        errors = []

        # Check filename
        if not file.filename:
            errors.append('No filename provided')

        # Check file extension
        filename = secure_filename(file.filename)
        ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
        if ext not in self.allowed_extensions:
            errors.append(f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}')

        # Check file size
        file.seek(0, os.SEEK_END)
        file_size = file.tell()
        file.seek(0)
        if file_size > self.max_size:
            errors.append(f'File too large. Max size: {self.max_size / 1024 / 1024}MB')

        # Check MIME type
        file_content = file.read(8192)
        file.seek(0)
        detected_mime = self.mime.from_buffer(file_content)
        if not self._is_valid_mime(detected_mime, ext):
            errors.append('File MIME type does not match extension')

        return errors

    def _is_valid_mime(self, mime_type, ext):
        """Verify MIME type matches extension"""
        allowed_mimes = {
            'txt': ['text/plain'],
            'pdf': ['application/pdf'],
            'png': ['image/png'],
            'jpg': ['image/jpeg'],
            'jpeg': ['image/jpeg'],
            'gif': ['image/gif'],
            'doc': ['application/msword'],
            'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document']
        }
        return mime_type in allowed_mimes.get(ext, [])

    def save_file(self, file, user_id):
        """Save uploaded file with sanitization"""
        errors = self.validate_file(file)
        if errors:
            return {'success': False, 'errors': errors}

        # Generate secure filename
        file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
        filename = secure_filename(file.filename)
        ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
        safe_filename = f"{file_hash}.{ext}"

        # Create user-specific directory
        user_upload_dir = os.path.join(self.upload_dir, user_id)
        os.makedirs(user_upload_dir, exist_ok=True)

        filepath = os.path.join(user_upload_dir, safe_filename)

        try:
            file.save(filepath)

            file_info = {
                'id': file_hash,
                'original_name': filename,
                'safe_name': safe_filename,
                'size': os.path.getsize(filepath),
                'user_id': user_id,
                'uploaded_at': datetime.utcnow().isoformat(),
                'mime_type': self.mime.from_file(filepath)
            }

            return {'success': True, 'file': file_info}

        except Exception as e:
            return {'success': False, 'error': str(e)}

    def delete_file(self, user_id, file_id):
        """Delete file safely"""
        filepath = os.path.join(self.upload_dir, user_id, f"{file_id}.*")
        import glob
        files = glob.glob(filepath)

        for f in files:
            try:
                os.remove(f)
                return {'success': True}
            except Exception as e:
                return {'success': False, 'error': str(e)}

        return {'success': False, 'error': 'File not found'}

# routes.py
from flask import request, jsonify, send_file, safe_join
from functools import wraps
import os

file_service = FileUploadService(
    app.config['UPLOAD_DIRECTORY'],
    app.config['ALLOWED_EXTENSIONS']
)

@app.route('/api/upload', methods=['POST'])
@token_required
def upload_file():
    if 'file' not in request.files:
        return jsonify({'error': 'No file provided'}), 400

    file = request.files['file']

    result = file_service.save_file(file, current_user.id)

    if result['success']:
        # Save metadata to database
        file_record = FileRecord(
            file_id=result['file']['id'],
            original_name=result['file']['original_name'],
            user_id=current_user.id,
            size=result['file']['size'],
            mime_type=result['file']['mime_type']
        )
        db.session.add(file_record)
        db.session.commit()

        return jsonify(result['file']), 201
    else:
        return jsonify({'errors': result.get('errors') or [result['error']]}), 400

@app.route('/api/files/<file_id>', methods=['GET'])
@token_required
def download_file(file_id):
    file_record = FileRecord.query.filter_by(
        file_id=file_id,
        user_id=current_user.id
    ).first()

    if not file_record:
        return jsonify({'error': 'File not found'}), 404

    # Construct safe file path
    filepath = safe_join(
        app.config['UPLOAD_DIRECTORY'],
        current_user.id,
        file_record.file_id + '.' + file_record.original_name.rsplit('.', 1)[1]
    )

    if filepath and os.path.exists(filepath):
        return send_file(
            filepath,
            mimetype=file_record.mime_type,
            as_attachment=True,
            download_name=file_record.original_name
        )

    return jsonify({'error': 'File not found'}), 404

@app.route('/api/files/<file_id>', methods=['DELETE'])
@token_required
def delete_file(file_id):
    file_record = FileRecord.query.filter_by(
        file_id=file_id,
        user_id=current_user.id
    ).first()

    if not file_record:
        return jsonify({'error': 'File not found'}), 404

    result = file_service.delete_file(current_user.id, file_id)

    if result['success']:
        db.session.delete(file_record)
        db.session.commit()
        return '', 204
    else:
        return jsonify({'error': result['error']}), 500

2. Node.js Express File Upload with Multer

// config.js
const multer = require('multer');
const path = require('path');
const crypto = require('crypto');
const fs = require('fs');

const storage = multer.diskStorage({
    destination: (req, file, cb) => {
        const uploadDir = path.join(__dirname, 'uploads', req.user.id);
        fs.mkdirSync(uploadDir, { recursive: true });
        cb(null, uploadDir);
    },
    filename: (req, file, cb) => {
        const hash = crypto.randomBytes(16).toString('hex');
        const ext = path.extname(file.originalname);
        cb(null, hash + ext);
    }
});

const fileFilter = (req, file, cb) => {
    const allowedMimes = [
        'image/jpeg',
        'image/png',
        'image/gif',
        'application/pdf',
        'text/plain'
    ];

    const allowedExts = ['.pdf', '.txt', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'];
    const ext = path.extname(file.originalname).toLowerCase();

    if (!allowedMimes.includes(file.mimetype) || !allowedExts.includes(ext)) {
        return cb(new Error('Invalid file type'));
    }

    cb(null, true);
};

const upload = multer({
    storage: storage,
    fileFilter: fileFilter,
    limits: {
        fileSize: 50 * 1024 * 1024 // 50 MB
    }
});

module.exports = upload;

// file-service.js
const fs = require('fs').promises;
const path = require('path');
const FileRecord = require('../models/FileRecord');

class FileService {
    async uploadFile(req) {
        if (!req.file) {
            throw new Error('No file provided');
        }

        const fileInfo = {
            id: path.basename(req.file.filename, path.extname(req.file.filename)),
            originalName: req.file.originalname,
            safeName: req.file.filename,
            size: req.file.size,
            mimeType: req.file.mimetype,
            userId: req.user.id,
            uploadedAt: new Date()
        };

        // Save to database
        const record = await FileRecord.create(fileInfo);
        return record;
    }

    async downloadFile(fileId, userId) {
        const record = await FileRecord.findOne({
            where: { id: fileId, userId }
        });

        if (!record) {
            throw new Error('File not found');
        }

        const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
        return { record, filepath };
    }

    async deleteFile(fileId, userId) {
        const record = await FileRecord.findOne({
            where: { id: fileId, userId }
        });

        if (!record) {
            throw new Error('File not found');
        }

        const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
        await fs.unlink(filepath);
        await record.destroy();

        return { success: true };
    }

    async listUserFiles(userId, limit = 20, offset = 0) {
        const { rows, count } = await FileRecord.findAndCountAll({
            where: { userId },
            limit,
            offset,
            order: [['uploadedAt', 'DESC']]
        });

        return { files: rows, total: count };
    }
}

module.exports = new FileService();

// routes.js
const express = require('express');
const upload = require('../config/multer');
const fileService = require('../services/file-service');
const { authenticate } = require('../middleware/auth');

const router = express.Router();

router.post('/upload', authenticate, upload.single('file'), async (req, res, next) => {
    try {
        const file = await fileService.uploadFile(req);
        res.status(201).json(file);
    } catch (error) {
        res.status(400).json({ error: error.message });
    }
});

router.get('/files/:fileId', authenticate, async (req, res, next) => {
    try {
        const { record, filepath } = await fileService.downloadFile(
            req.params.fileId,
            req.user.id
        );
        res.download(filepath, record.originalName);
    } catch (error) {
        res.status(404).json({ error: error.message });
    }
});

router.delete('/files/:fileId', authenticate, async (req, res, next) => {
    try {
        await fileService.deleteFile(req.params.fileId, req.user.id);
        res.status(204).send();
    } catch (error) {
        res.status(404).json({ error: error.message });
    }
});

router.get('/files', authenticate, async (req, res, next) => {
    try {
        const page = parseInt(req.query.page) || 1;
        const limit = parseInt(req.query.limit) || 20;
        const offset = (page - 1) * limit;

        const { files, total } = await fileService.listUserFiles(
            req.user.id,
            limit,
            offset
        );

        res.json({
            data: files,
            pagination: { page, limit, total, pages: Math.ceil(total / limit) }
        });
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
});

module.exports = router;

3. FastAPI File Upload

# main.py
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.responses import FileResponse
import aiofiles
import os
import hashlib
import secrets
from pathlib import Path

app = FastAPI()

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 50 * 1024 * 1024
ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'}

class FileUploadService:
    async def validate_file(self, file: UploadFile) -> list:
        """Validate uploaded file"""
        errors = []

        # Check extension
        file_ext = Path(file.filename).suffix.lower()
        if file_ext not in ALLOWED_EXTENSIONS:
            errors.append(f'File type not allowed')

        # Check file size
        content = await file.read()
        await file.seek(0)

        if len(content) > MAX_FILE_SIZE:
            errors.append(f'File too large')

        return errors

    async def save_file(self, file: UploadFile, user_id: str):
        """Save uploaded file"""
        errors = await self.validate_file(file)
        if errors:
            return {'success': False, 'errors': errors}

        # Generate secure filename
        file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
        file_ext = Path(file.filename).suffix
        safe_filename = f"{file_hash}{file_ext}"

        # Create user directory
        user_dir = UPLOAD_DIR / user_id
        user_dir.mkdir(exist_ok=True)

        filepath = user_dir / safe_filename

        try:
            content = await file.read()
            async with aiofiles.open(filepath, 'wb') as f:
                await f.write(content)

            return {
                'success': True,
                'file': {
                    'id': file_hash,
                    'original_name': file.filename,
                    'safe_name': safe_filename,
                    'size': len(content),
                    'mime_type': file.content_type
                }
            }
        except Exception as e:
            return {'success': False, 'error': str(e)}

file_service = FileUploadService()

@app.post("/api/upload")
async def upload_file(
    file: UploadFile = File(...),
    current_user: dict = Depends(get_current_user)
):
    """Upload file"""
    result = await file_service.save_file(file, current_user['id'])

    if result['success']:
        # Save to database
        file_record = FileRecord(
            file_id=result['file']['id'],
            original_name=result['file']['original_name'],
            user_id=current_user['id'],
            size=result['file']['size'],
            mime_type=result['file']['mime_type']
        )
        db.add(file_record)
        await db.commit()

        return result['file']
    else:
        raise HTTPException(status_code=400, detail=result.get('errors'))

@app.get("/api/files/{file_id}")
async def download_file(
    file_id: str,
    current_user: dict = Depends(get_current_user)
):
    """Download file"""
    file_record = await db.query(FileRecord).filter(
        FileRecord.file_id == file_id,
        FileRecord.user_id == current_user['id']
    ).first()

    if not file_record:
        raise HTTPException(status_code=404, detail="File not found")

    filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name

    return FileResponse(
        path=filepath,
        media_type=file_record.mime_type,
        filename=file_record.original_name
    )

@app.delete("/api/files/{file_id}")
async def delete_file(
    file_id: str,
    current_user: dict = Depends(get_current_user)
):
    """Delete file"""
    file_record = await db.query(FileRecord).filter(
        FileRecord.file_id == file_id,
        FileRecord.user_id == current_user['id']
    ).first()

    if not file_record:
        raise HTTPException(status_code=404, detail="File not found")

    filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name

    try:
        Path(filepath).unlink()
        await db.delete(file_record)
        await db.commit()
        return {'success': True}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

4. S3/Cloud Storage Integration

# s3_service.py
import boto3
from datetime import timedelta
import os

class S3FileService:
    def __init__(self):
        self.s3_client = boto3.client(
            's3',
            aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
            aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
            region_name=os.getenv('AWS_REGION')
        )
        self.bucket_name = os.getenv('S3_BUCKET_NAME')

    def upload_file(self, file, user_id, file_key):
        """Upload file to S3"""
        try:
            self.s3_client.upload_fileobj(
                file,
                self.bucket_name,
                file_key,
                ExtraArgs={
                    'ContentType': file.content_type,
                    'Metadata': {'user_id': user_id}
                }
            )
            return {'success': True, 'key': file_key}
        except Exception as e:
            return {'success': False, 'error': str(e)}

    def get_signed_url(self, file_key, expires_in=3600):
        """Generate signed URL for download"""
        try:
            url = self.s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket_name, 'Key': file_key},
                ExpiresIn=expires_in
            )
            return {'success': True, 'url': url}
        except Exception as e:
            return {'success': False, 'error': str(e)}

    def delete_file(self, file_key):
        """Delete file from S3"""
        try:
            self.s3_client.delete_object(Bucket=self.bucket_name, Key=file_key)
            return {'success': True}
        except Exception as e:
            return {'success': False, 'error': str(e)}

Best Practices

✅ DO

  • Validate file extensions and MIME types
  • Check file size before processing
  • Use secure filenames to prevent directory traversal
  • Store files outside web root
  • Implement virus scanning
  • Use CDN for file delivery
  • Generate signed URLs for direct access
  • Log file upload/download events
  • Implement access control checks
  • Clean up temporary files

❌ DON'T

  • Trust user-provided filenames
  • Store files in web-accessible directories
  • Allow arbitrary file types
  • Skip virus scanning for uploaded files
  • Expose absolute file paths
  • Allow unlimited file sizes
  • Ignore access control
  • Use predictable file paths
  • Store sensitive metadata in filenames
  • Forget to validate file content

Complete Example

@app.post("/upload")
async def upload(file: UploadFile = File(...)):
    if file.size > 10 * 1024 * 1024:
        raise HTTPException(status_code=413, detail="File too large")

    allowed = ['.pdf', '.txt', '.jpg']
    ext = Path(file.filename).suffix
    if ext not in allowed:
        raise HTTPException(status_code=400, detail="Invalid file type")

    filename = f"{uuid4()}{ext}"
    async with aiofiles.open(f"uploads/{filename}", 'wb') as f:
        await f.write(await file.read())

    return {"filename": filename}