Media Workflow Automation
Objective
Build an automated media processing workflow system that manages processing pipelines, tracks job status, handles errors, and sends notifications on completion.
Step 1: Create Workflow Definitions Table
Define workflow templates.
CREATE TABLE workflow_definitions (
id INTEGER PRIMARY KEY,
workflow_code VARCHAR(50) NOT NULL UNIQUE,
workflow_name VARCHAR(200),
description TEXT,
workflow_type VARCHAR(50),
input_types TEXT,
output_types TEXT,
steps_json TEXT,
default_priority INTEGER DEFAULT 5,
timeout_minutes INTEGER DEFAULT 60,
retry_count INTEGER DEFAULT 3,
is_active BOOLEAN DEFAULT TRUE,
created_by VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Step 2: Create Workflow Steps Table
Define individual steps.
CREATE TABLE workflow_steps (
id INTEGER PRIMARY KEY,
workflow_id INTEGER NOT NULL,
step_number INTEGER,
step_name VARCHAR(100),
step_type VARCHAR(50),
action VARCHAR(100),
parameters TEXT,
required_inputs TEXT,
expected_outputs TEXT,
timeout_seconds INTEGER DEFAULT 300,
on_failure VARCHAR(50) DEFAULT 'stop',
FOREIGN KEY (workflow_id) REFERENCES workflow_definitions(id)
);
Step 3: Create Workflow Jobs Table
Track job executions.
CREATE TABLE workflow_jobs (
id INTEGER PRIMARY KEY,
job_code VARCHAR(50) NOT NULL UNIQUE,
workflow_id INTEGER NOT NULL,
input_file_path TEXT,
input_metadata TEXT,
output_file_path TEXT,
output_metadata TEXT,
priority INTEGER DEFAULT 5,
status VARCHAR(50) DEFAULT 'pending',
current_step INTEGER DEFAULT 0,
progress_percent DECIMAL(5, 2) DEFAULT 0,
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
submitted_by VARCHAR(100),
submitted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (workflow_id) REFERENCES workflow_definitions(id)
);
Step 4: Create Step Executions Table
Track step progress.
CREATE TABLE step_executions (
id INTEGER PRIMARY KEY,
job_id INTEGER NOT NULL,
step_id INTEGER NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
input_data TEXT,
output_data TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
duration_seconds INTEGER,
error_message TEXT,
retry_attempt INTEGER DEFAULT 0,
FOREIGN KEY (job_id) REFERENCES workflow_jobs(id),
FOREIGN KEY (step_id) REFERENCES workflow_steps(id)
);
Step 5: Create Media Queue Table
Manage processing queue.
CREATE TABLE media_queue (
id INTEGER PRIMARY KEY,
job_id INTEGER NOT NULL,
queue_name VARCHAR(50),
priority INTEGER DEFAULT 5,
scheduled_time TIMESTAMP,
picked_up_at TIMESTAMP,
worker_id VARCHAR(100),
status VARCHAR(50) DEFAULT 'queued',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (job_id) REFERENCES workflow_jobs(id)
);
Step 6: Create Notifications Table
Track workflow notifications.
CREATE TABLE workflow_notifications (
id INTEGER PRIMARY KEY,
job_id INTEGER NOT NULL,
notification_type VARCHAR(50),
recipient VARCHAR(200),
channel VARCHAR(50),
subject VARCHAR(300),
message TEXT,
status VARCHAR(50) DEFAULT 'pending',
sent_at TIMESTAMP,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (job_id) REFERENCES workflow_jobs(id)
);
Step 7: Insert Workflow Definitions
Add workflow templates.
INSERT INTO workflow_definitions (id, workflow_code, workflow_name, description, workflow_type, input_types, output_types, default_priority, timeout_minutes, is_active) VALUES
(1, 'WF-VIDEO-TRANSCODE', 'Video Transcoding Pipeline', 'Transcode video to multiple formats and resolutions', 'video', 'video/mp4,video/avi,video/mov', 'video/mp4', 5, 120, TRUE),
(2, 'WF-IMAGE-PROCESS', 'Image Processing Pipeline', 'Resize, optimize, and generate thumbnails', 'image', 'image/jpeg,image/png', 'image/jpeg,image/png,image/webp', 3, 30, TRUE),
(3, 'WF-AUDIO-TRANSCODE', 'Audio Transcoding Pipeline', 'Convert audio and generate waveforms', 'audio', 'audio/wav,audio/flac', 'audio/mp3,audio/aac', 4, 45, TRUE),
(4, 'WF-DOC-PROCESS', 'Document Processing Pipeline', 'Extract text, generate thumbnails, convert formats', 'document', 'application/pdf,application/docx', 'application/pdf,text/plain', 4, 60, TRUE),
(5, 'WF-VIDEO-ANALYZE', 'Video Analysis Pipeline', 'Extract metadata, generate thumbnails, transcribe audio', 'video_analysis', 'video/mp4', 'application/json', 6, 180, TRUE);
Step 8: Insert Workflow Steps
Define step sequences.
INSERT INTO workflow_steps (id, workflow_id, step_number, step_name, step_type, action, parameters, timeout_seconds, on_failure) VALUES
-- Video Transcoding Pipeline
(1, 1, 1, 'Validate Input', 'validation', 'validate_video', '{"check_codec": true, "check_duration": true}', 30, 'stop'),
(2, 1, 2, 'Extract Metadata', 'extraction', 'extract_video_metadata', '{}', 60, 'continue'),
(3, 1, 3, 'Transcode 1080p', 'transcode', 'transcode_video', '{"resolution": "1080p", "codec": "h264"}', 1800, 'retry'),
(4, 1, 4, 'Transcode 720p', 'transcode', 'transcode_video', '{"resolution": "720p", "codec": "h264"}', 1200, 'retry'),
(5, 1, 5, 'Transcode 480p', 'transcode', 'transcode_video', '{"resolution": "480p", "codec": "h264"}', 600, 'retry'),
(6, 1, 6, 'Generate Thumbnails', 'thumbnail', 'generate_thumbnails', '{"count": 5, "format": "jpeg"}', 120, 'continue'),
(7, 1, 7, 'Finalize Output', 'finalization', 'finalize_job', '{}', 60, 'stop'),
-- Image Processing Pipeline
(8, 2, 1, 'Validate Image', 'validation', 'validate_image', '{}', 30, 'stop'),
(9, 2, 2, 'Resize Large', 'resize', 'resize_image', '{"max_dimension": 2000}', 60, 'retry'),
(10, 2, 3, 'Generate Thumbnail', 'thumbnail', 'create_thumbnail', '{"size": 300}', 30, 'continue'),
(11, 2, 4, 'Optimize', 'optimization', 'optimize_image', '{"quality": 85}', 60, 'retry'),
(12, 2, 5, 'Convert WebP', 'conversion', 'convert_format', '{"format": "webp"}', 45, 'continue');
Step 9: Insert Sample Jobs
Add workflow jobs.
INSERT INTO workflow_jobs (id, job_code, workflow_id, input_file_path, input_metadata, priority, status, current_step, progress_percent, submitted_by, submitted_at, started_at, completed_at) VALUES
(1, 'JOB-2024-001', 1, '/uploads/video1.mp4', '{"original_size_mb": 250, "duration_seconds": 600}', 5, 'completed', 7, 100.0, 'user_001', '2024-01-25 10:00:00', '2024-01-25 10:01:00', '2024-01-25 10:45:00'),
(2, 'JOB-2024-002', 1, '/uploads/video2.mov', '{"original_size_mb": 480, "duration_seconds": 1200}', 5, 'processing', 4, 57.0, 'user_002', '2024-01-25 11:00:00', '2024-01-25 11:02:00', NULL),
(3, 'JOB-2024-003', 2, '/uploads/image1.jpg', '{"width": 4000, "height": 3000}', 3, 'completed', 5, 100.0, 'user_001', '2024-01-25 11:30:00', '2024-01-25 11:30:30', '2024-01-25 11:32:00'),
(4, 'JOB-2024-004', 1, '/uploads/video3.avi', '{"original_size_mb": 120, "duration_seconds": 300}', 8, 'pending', 0, 0.0, 'user_003', '2024-01-25 12:00:00', NULL, NULL),
(5, 'JOB-2024-005', 2, '/uploads/image2.png', '{"width": 2000, "height": 1500}', 3, 'failed', 3, 40.0, 'user_002', '2024-01-25 12:30:00', '2024-01-25 12:30:15', NULL),
(6, 'JOB-2024-006', 5, '/uploads/video4.mp4', '{"original_size_mb": 350, "duration_seconds": 900}', 6, 'queued', 0, 0.0, 'user_001', '2024-01-25 13:00:00', NULL, NULL);
UPDATE workflow_jobs SET error_message = 'Image optimization failed: corrupt file header' WHERE id = 5;
Step 10: Insert Step Executions
Add execution records.
INSERT INTO step_executions (id, job_id, step_id, status, started_at, completed_at, duration_seconds) VALUES
-- Job 1 (completed)
(1, 1, 1, 'completed', '2024-01-25 10:01:00', '2024-01-25 10:01:15', 15),
(2, 1, 2, 'completed', '2024-01-25 10:01:15', '2024-01-25 10:02:00', 45),
(3, 1, 3, 'completed', '2024-01-25 10:02:00', '2024-01-25 10:22:00', 1200),
(4, 1, 4, 'completed', '2024-01-25 10:22:00', '2024-01-25 10:35:00', 780),
(5, 1, 5, 'completed', '2024-01-25 10:35:00', '2024-01-25 10:42:00', 420),
(6, 1, 6, 'completed', '2024-01-25 10:42:00', '2024-01-25 10:44:00', 120),
(7, 1, 7, 'completed', '2024-01-25 10:44:00', '2024-01-25 10:45:00', 60),
-- Job 2 (processing)
(8, 2, 1, 'completed', '2024-01-25 11:02:00', '2024-01-25 11:02:20', 20),
(9, 2, 2, 'completed', '2024-01-25 11:02:20', '2024-01-25 11:03:30', 70),
(10, 2, 3, 'completed', '2024-01-25 11:03:30', '2024-01-25 11:33:30', 1800),
(11, 2, 4, 'running', '2024-01-25 11:33:30', NULL, NULL),
-- Job 3 (completed)
(12, 3, 8, 'completed', '2024-01-25 11:30:30', '2024-01-25 11:30:45', 15),
(13, 3, 9, 'completed', '2024-01-25 11:30:45', '2024-01-25 11:31:15', 30),
(14, 3, 10, 'completed', '2024-01-25 11:31:15', '2024-01-25 11:31:30', 15),
(15, 3, 11, 'completed', '2024-01-25 11:31:30', '2024-01-25 11:31:50', 20),
(16, 3, 12, 'completed', '2024-01-25 11:31:50', '2024-01-25 11:32:00', 10);
Step 11: Insert Queue and Notifications
Add queue entries and alerts.
INSERT INTO media_queue (id, job_id, queue_name, priority, status, scheduled_time) VALUES
(1, 4, 'video_processing', 8, 'queued', '2024-01-25 12:00:00'),
(2, 6, 'video_analysis', 6, 'queued', '2024-01-25 13:00:00');
INSERT INTO workflow_notifications (id, job_id, notification_type, recipient, channel, subject, message, status, sent_at) VALUES
(1, 1, 'completion', 'user_001@company.com', 'email', 'Video Processing Complete', 'Your video video1.mp4 has been processed successfully.', 'sent', '2024-01-25 10:45:30'),
(2, 3, 'completion', 'user_001@company.com', 'email', 'Image Processing Complete', 'Your image image1.jpg has been processed.', 'sent', '2024-01-25 11:32:30'),
(3, 5, 'failure', 'user_002@company.com', 'email', 'Processing Failed', 'Image processing failed: corrupt file header', 'sent', '2024-01-25 12:35:00');
Step 12: Job Status Dashboard
View all jobs status.
SELECT
wj.job_code,
wd.workflow_name,
wj.status,
wj.current_step,
(SELECT COUNT(*) FROM workflow_steps WHERE workflow_id = wd.id) as total_steps,
wj.progress_percent,
wj.priority,
wj.submitted_by,
wj.submitted_at,
wj.started_at,
wj.completed_at
FROM workflow_jobs wj
INNER JOIN workflow_definitions wd ON wj.workflow_id = wd.id
ORDER BY
CASE wj.status
WHEN 'failed' THEN 1
WHEN 'processing' THEN 2
WHEN 'queued' THEN 3
WHEN 'pending' THEN 4
WHEN 'completed' THEN 5
END,
wj.priority DESC;
Step 13: Processing Queue Monitor
View queue status.
SELECT
mq.queue_name,
COUNT(*) as jobs_in_queue,
COUNT(CASE WHEN mq.status = 'queued' THEN 1 END) as waiting,
COUNT(CASE WHEN mq.status = 'processing' THEN 1 END) as processing,
AVG(wj.priority) as avg_priority,
MIN(mq.scheduled_time) as next_scheduled
FROM media_queue mq
INNER JOIN workflow_jobs wj ON mq.job_id = wj.id
WHERE mq.status IN ('queued', 'processing')
GROUP BY mq.queue_name
ORDER BY waiting DESC;
Step 14: Step Performance Analysis
Analyze step execution times.
SELECT
ws.step_name,
ws.step_type,
COUNT(se.id) as executions,
AVG(se.duration_seconds) as avg_duration_sec,
MIN(se.duration_seconds) as min_duration_sec,
MAX(se.duration_seconds) as max_duration_sec,
COUNT(CASE WHEN se.status = 'failed' THEN 1 END) as failures
FROM workflow_steps ws
LEFT JOIN step_executions se ON ws.id = se.step_id
GROUP BY ws.id, ws.step_name, ws.step_type
ORDER BY avg_duration_sec DESC;
Step 15: Workflow Analytics
Overall workflow statistics.
SELECT
wd.workflow_name,
wd.workflow_type,
COUNT(wj.id) as total_jobs,
COUNT(CASE WHEN wj.status = 'completed' THEN 1 END) as completed,
COUNT(CASE WHEN wj.status = 'failed' THEN 1 END) as failed,
COUNT(CASE WHEN wj.status IN ('pending', 'queued', 'processing') THEN 1 END) as in_progress,
AVG(CASE WHEN wj.completed_at IS NOT NULL
THEN EXTRACT(EPOCH FROM (wj.completed_at - wj.started_at)) / 60
END) as avg_duration_minutes
FROM workflow_definitions wd
LEFT JOIN workflow_jobs wj ON wd.id = wj.workflow_id
WHERE wd.is_active = TRUE
GROUP BY wd.id, wd.workflow_name, wd.workflow_type
ORDER BY total_jobs DESC;
Cleanup (Optional)
DROP TABLE IF EXISTS workflow_notifications;
DROP TABLE IF EXISTS media_queue;
DROP TABLE IF EXISTS step_executions;
DROP TABLE IF EXISTS workflow_jobs;
DROP TABLE IF EXISTS workflow_steps;
DROP TABLE IF EXISTS workflow_definitions;
Expected Outcomes
- Workflows defined with steps
- Jobs tracked through pipeline
- Queue management works
- Notifications sent
- Analytics available
Job Status Flow
pending -> queued -> processing -> completed
-> failed
Key Concepts Learned
- Workflow definition
- Step sequencing
- Job state management
- Queue processing
- Performance monitoring