Media Workflow Automation

Create automated media processing pipelines with status tracking and notifications

All recipes· advanced-patterns· 15 minutesadvanced

Media Workflow Automation

Objective

Build an automated media processing workflow system that manages processing pipelines, tracks job status, handles errors, and sends notifications on completion.

Step 1: Create Workflow Definitions Table

Define workflow templates.

CREATE TABLE workflow_definitions (
    id INTEGER PRIMARY KEY,
    workflow_code VARCHAR(50) NOT NULL UNIQUE,
    workflow_name VARCHAR(200),
    description TEXT,
    workflow_type VARCHAR(50),
    input_types TEXT,
    output_types TEXT,
    steps_json TEXT,
    default_priority INTEGER DEFAULT 5,
    timeout_minutes INTEGER DEFAULT 60,
    retry_count INTEGER DEFAULT 3,
    is_active BOOLEAN DEFAULT TRUE,
    created_by VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Step 2: Create Workflow Steps Table

Define individual steps.

CREATE TABLE workflow_steps (
    id INTEGER PRIMARY KEY,
    workflow_id INTEGER NOT NULL,
    step_number INTEGER,
    step_name VARCHAR(100),
    step_type VARCHAR(50),
    action VARCHAR(100),
    parameters TEXT,
    required_inputs TEXT,
    expected_outputs TEXT,
    timeout_seconds INTEGER DEFAULT 300,
    on_failure VARCHAR(50) DEFAULT 'stop',
    FOREIGN KEY (workflow_id) REFERENCES workflow_definitions(id)
);

Step 3: Create Workflow Jobs Table

Track job executions.

CREATE TABLE workflow_jobs (
    id INTEGER PRIMARY KEY,
    job_code VARCHAR(50) NOT NULL UNIQUE,
    workflow_id INTEGER NOT NULL,
    input_file_path TEXT,
    input_metadata TEXT,
    output_file_path TEXT,
    output_metadata TEXT,
    priority INTEGER DEFAULT 5,
    status VARCHAR(50) DEFAULT 'pending',
    current_step INTEGER DEFAULT 0,
    progress_percent DECIMAL(5, 2) DEFAULT 0,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    error_message TEXT,
    retry_count INTEGER DEFAULT 0,
    submitted_by VARCHAR(100),
    submitted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (workflow_id) REFERENCES workflow_definitions(id)
);

Step 4: Create Step Executions Table

Track step progress.

CREATE TABLE step_executions (
    id INTEGER PRIMARY KEY,
    job_id INTEGER NOT NULL,
    step_id INTEGER NOT NULL,
    status VARCHAR(50) DEFAULT 'pending',
    input_data TEXT,
    output_data TEXT,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    duration_seconds INTEGER,
    error_message TEXT,
    retry_attempt INTEGER DEFAULT 0,
    FOREIGN KEY (job_id) REFERENCES workflow_jobs(id),
    FOREIGN KEY (step_id) REFERENCES workflow_steps(id)
);

Step 5: Create Media Queue Table

Manage processing queue.

CREATE TABLE media_queue (
    id INTEGER PRIMARY KEY,
    job_id INTEGER NOT NULL,
    queue_name VARCHAR(50),
    priority INTEGER DEFAULT 5,
    scheduled_time TIMESTAMP,
    picked_up_at TIMESTAMP,
    worker_id VARCHAR(100),
    status VARCHAR(50) DEFAULT 'queued',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (job_id) REFERENCES workflow_jobs(id)
);

Step 6: Create Notifications Table

Track workflow notifications.

CREATE TABLE workflow_notifications (
    id INTEGER PRIMARY KEY,
    job_id INTEGER NOT NULL,
    notification_type VARCHAR(50),
    recipient VARCHAR(200),
    channel VARCHAR(50),
    subject VARCHAR(300),
    message TEXT,
    status VARCHAR(50) DEFAULT 'pending',
    sent_at TIMESTAMP,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (job_id) REFERENCES workflow_jobs(id)
);

Step 7: Insert Workflow Definitions

Add workflow templates.

INSERT INTO workflow_definitions (id, workflow_code, workflow_name, description, workflow_type, input_types, output_types, default_priority, timeout_minutes, is_active) VALUES
    (1, 'WF-VIDEO-TRANSCODE', 'Video Transcoding Pipeline', 'Transcode video to multiple formats and resolutions', 'video', 'video/mp4,video/avi,video/mov', 'video/mp4', 5, 120, TRUE),
    (2, 'WF-IMAGE-PROCESS', 'Image Processing Pipeline', 'Resize, optimize, and generate thumbnails', 'image', 'image/jpeg,image/png', 'image/jpeg,image/png,image/webp', 3, 30, TRUE),
    (3, 'WF-AUDIO-TRANSCODE', 'Audio Transcoding Pipeline', 'Convert audio and generate waveforms', 'audio', 'audio/wav,audio/flac', 'audio/mp3,audio/aac', 4, 45, TRUE),
    (4, 'WF-DOC-PROCESS', 'Document Processing Pipeline', 'Extract text, generate thumbnails, convert formats', 'document', 'application/pdf,application/docx', 'application/pdf,text/plain', 4, 60, TRUE),
    (5, 'WF-VIDEO-ANALYZE', 'Video Analysis Pipeline', 'Extract metadata, generate thumbnails, transcribe audio', 'video_analysis', 'video/mp4', 'application/json', 6, 180, TRUE);

Step 8: Insert Workflow Steps

Define step sequences.

INSERT INTO workflow_steps (id, workflow_id, step_number, step_name, step_type, action, parameters, timeout_seconds, on_failure) VALUES
    -- Video Transcoding Pipeline
    (1, 1, 1, 'Validate Input', 'validation', 'validate_video', '{"check_codec": true, "check_duration": true}', 30, 'stop'),
    (2, 1, 2, 'Extract Metadata', 'extraction', 'extract_video_metadata', '{}', 60, 'continue'),
    (3, 1, 3, 'Transcode 1080p', 'transcode', 'transcode_video', '{"resolution": "1080p", "codec": "h264"}', 1800, 'retry'),
    (4, 1, 4, 'Transcode 720p', 'transcode', 'transcode_video', '{"resolution": "720p", "codec": "h264"}', 1200, 'retry'),
    (5, 1, 5, 'Transcode 480p', 'transcode', 'transcode_video', '{"resolution": "480p", "codec": "h264"}', 600, 'retry'),
    (6, 1, 6, 'Generate Thumbnails', 'thumbnail', 'generate_thumbnails', '{"count": 5, "format": "jpeg"}', 120, 'continue'),
    (7, 1, 7, 'Finalize Output', 'finalization', 'finalize_job', '{}', 60, 'stop'),
    -- Image Processing Pipeline
    (8, 2, 1, 'Validate Image', 'validation', 'validate_image', '{}', 30, 'stop'),
    (9, 2, 2, 'Resize Large', 'resize', 'resize_image', '{"max_dimension": 2000}', 60, 'retry'),
    (10, 2, 3, 'Generate Thumbnail', 'thumbnail', 'create_thumbnail', '{"size": 300}', 30, 'continue'),
    (11, 2, 4, 'Optimize', 'optimization', 'optimize_image', '{"quality": 85}', 60, 'retry'),
    (12, 2, 5, 'Convert WebP', 'conversion', 'convert_format', '{"format": "webp"}', 45, 'continue');

Step 9: Insert Sample Jobs

Add workflow jobs.

INSERT INTO workflow_jobs (id, job_code, workflow_id, input_file_path, input_metadata, priority, status, current_step, progress_percent, submitted_by, submitted_at, started_at, completed_at) VALUES
    (1, 'JOB-2024-001', 1, '/uploads/video1.mp4', '{"original_size_mb": 250, "duration_seconds": 600}', 5, 'completed', 7, 100.0, 'user_001', '2024-01-25 10:00:00', '2024-01-25 10:01:00', '2024-01-25 10:45:00'),
    (2, 'JOB-2024-002', 1, '/uploads/video2.mov', '{"original_size_mb": 480, "duration_seconds": 1200}', 5, 'processing', 4, 57.0, 'user_002', '2024-01-25 11:00:00', '2024-01-25 11:02:00', NULL),
    (3, 'JOB-2024-003', 2, '/uploads/image1.jpg', '{"width": 4000, "height": 3000}', 3, 'completed', 5, 100.0, 'user_001', '2024-01-25 11:30:00', '2024-01-25 11:30:30', '2024-01-25 11:32:00'),
    (4, 'JOB-2024-004', 1, '/uploads/video3.avi', '{"original_size_mb": 120, "duration_seconds": 300}', 8, 'pending', 0, 0.0, 'user_003', '2024-01-25 12:00:00', NULL, NULL),
    (5, 'JOB-2024-005', 2, '/uploads/image2.png', '{"width": 2000, "height": 1500}', 3, 'failed', 3, 40.0, 'user_002', '2024-01-25 12:30:00', '2024-01-25 12:30:15', NULL),
    (6, 'JOB-2024-006', 5, '/uploads/video4.mp4', '{"original_size_mb": 350, "duration_seconds": 900}', 6, 'queued', 0, 0.0, 'user_001', '2024-01-25 13:00:00', NULL, NULL);

UPDATE workflow_jobs SET error_message = 'Image optimization failed: corrupt file header' WHERE id = 5;

Step 10: Insert Step Executions

Add execution records.

INSERT INTO step_executions (id, job_id, step_id, status, started_at, completed_at, duration_seconds) VALUES
    -- Job 1 (completed)
    (1, 1, 1, 'completed', '2024-01-25 10:01:00', '2024-01-25 10:01:15', 15),
    (2, 1, 2, 'completed', '2024-01-25 10:01:15', '2024-01-25 10:02:00', 45),
    (3, 1, 3, 'completed', '2024-01-25 10:02:00', '2024-01-25 10:22:00', 1200),
    (4, 1, 4, 'completed', '2024-01-25 10:22:00', '2024-01-25 10:35:00', 780),
    (5, 1, 5, 'completed', '2024-01-25 10:35:00', '2024-01-25 10:42:00', 420),
    (6, 1, 6, 'completed', '2024-01-25 10:42:00', '2024-01-25 10:44:00', 120),
    (7, 1, 7, 'completed', '2024-01-25 10:44:00', '2024-01-25 10:45:00', 60),
    -- Job 2 (processing)
    (8, 2, 1, 'completed', '2024-01-25 11:02:00', '2024-01-25 11:02:20', 20),
    (9, 2, 2, 'completed', '2024-01-25 11:02:20', '2024-01-25 11:03:30', 70),
    (10, 2, 3, 'completed', '2024-01-25 11:03:30', '2024-01-25 11:33:30', 1800),
    (11, 2, 4, 'running', '2024-01-25 11:33:30', NULL, NULL),
    -- Job 3 (completed)
    (12, 3, 8, 'completed', '2024-01-25 11:30:30', '2024-01-25 11:30:45', 15),
    (13, 3, 9, 'completed', '2024-01-25 11:30:45', '2024-01-25 11:31:15', 30),
    (14, 3, 10, 'completed', '2024-01-25 11:31:15', '2024-01-25 11:31:30', 15),
    (15, 3, 11, 'completed', '2024-01-25 11:31:30', '2024-01-25 11:31:50', 20),
    (16, 3, 12, 'completed', '2024-01-25 11:31:50', '2024-01-25 11:32:00', 10);

Step 11: Insert Queue and Notifications

Add queue entries and alerts.

INSERT INTO media_queue (id, job_id, queue_name, priority, status, scheduled_time) VALUES
    (1, 4, 'video_processing', 8, 'queued', '2024-01-25 12:00:00'),
    (2, 6, 'video_analysis', 6, 'queued', '2024-01-25 13:00:00');

INSERT INTO workflow_notifications (id, job_id, notification_type, recipient, channel, subject, message, status, sent_at) VALUES
    (1, 1, 'completion', 'user_001@company.com', 'email', 'Video Processing Complete', 'Your video video1.mp4 has been processed successfully.', 'sent', '2024-01-25 10:45:30'),
    (2, 3, 'completion', 'user_001@company.com', 'email', 'Image Processing Complete', 'Your image image1.jpg has been processed.', 'sent', '2024-01-25 11:32:30'),
    (3, 5, 'failure', 'user_002@company.com', 'email', 'Processing Failed', 'Image processing failed: corrupt file header', 'sent', '2024-01-25 12:35:00');

Step 12: Job Status Dashboard

View all jobs status.

SELECT
    wj.job_code,
    wd.workflow_name,
    wj.status,
    wj.current_step,
    (SELECT COUNT(*) FROM workflow_steps WHERE workflow_id = wd.id) as total_steps,
    wj.progress_percent,
    wj.priority,
    wj.submitted_by,
    wj.submitted_at,
    wj.started_at,
    wj.completed_at
FROM workflow_jobs wj
INNER JOIN workflow_definitions wd ON wj.workflow_id = wd.id
ORDER BY
    CASE wj.status
        WHEN 'failed' THEN 1
        WHEN 'processing' THEN 2
        WHEN 'queued' THEN 3
        WHEN 'pending' THEN 4
        WHEN 'completed' THEN 5
    END,
    wj.priority DESC;

Step 13: Processing Queue Monitor

View queue status.

SELECT
    mq.queue_name,
    COUNT(*) as jobs_in_queue,
    COUNT(CASE WHEN mq.status = 'queued' THEN 1 END) as waiting,
    COUNT(CASE WHEN mq.status = 'processing' THEN 1 END) as processing,
    AVG(wj.priority) as avg_priority,
    MIN(mq.scheduled_time) as next_scheduled
FROM media_queue mq
INNER JOIN workflow_jobs wj ON mq.job_id = wj.id
WHERE mq.status IN ('queued', 'processing')
GROUP BY mq.queue_name
ORDER BY waiting DESC;

Step 14: Step Performance Analysis

Analyze step execution times.

SELECT
    ws.step_name,
    ws.step_type,
    COUNT(se.id) as executions,
    AVG(se.duration_seconds) as avg_duration_sec,
    MIN(se.duration_seconds) as min_duration_sec,
    MAX(se.duration_seconds) as max_duration_sec,
    COUNT(CASE WHEN se.status = 'failed' THEN 1 END) as failures
FROM workflow_steps ws
LEFT JOIN step_executions se ON ws.id = se.step_id
GROUP BY ws.id, ws.step_name, ws.step_type
ORDER BY avg_duration_sec DESC;

Step 15: Workflow Analytics

Overall workflow statistics.

SELECT
    wd.workflow_name,
    wd.workflow_type,
    COUNT(wj.id) as total_jobs,
    COUNT(CASE WHEN wj.status = 'completed' THEN 1 END) as completed,
    COUNT(CASE WHEN wj.status = 'failed' THEN 1 END) as failed,
    COUNT(CASE WHEN wj.status IN ('pending', 'queued', 'processing') THEN 1 END) as in_progress,
    AVG(CASE WHEN wj.completed_at IS NOT NULL
        THEN EXTRACT(EPOCH FROM (wj.completed_at - wj.started_at)) / 60
    END) as avg_duration_minutes
FROM workflow_definitions wd
LEFT JOIN workflow_jobs wj ON wd.id = wj.workflow_id
WHERE wd.is_active = TRUE
GROUP BY wd.id, wd.workflow_name, wd.workflow_type
ORDER BY total_jobs DESC;

Cleanup (Optional)

DROP TABLE IF EXISTS workflow_notifications;
DROP TABLE IF EXISTS media_queue;
DROP TABLE IF EXISTS step_executions;
DROP TABLE IF EXISTS workflow_jobs;
DROP TABLE IF EXISTS workflow_steps;
DROP TABLE IF EXISTS workflow_definitions;

Expected Outcomes

  • Workflows defined with steps
  • Jobs tracked through pipeline
  • Queue management works
  • Notifications sent
  • Analytics available

Job Status Flow

pending -> queued -> processing -> completed
                                -> failed

Key Concepts Learned

  • Workflow definition
  • Step sequencing
  • Job state management
  • Queue processing
  • Performance monitoring

Tags

sqladvancedworkflowautomationpipelinemedia-processing

Run this on your own machine

Install SynapCores Community Edition free, paste the SQL or Cypher above into the bundled web UI, and watch it run.

Download Free CE