diff --git a/.env.example b/.env.example index 204d86228..71aa4a3d8 100644 --- a/.env.example +++ b/.env.example @@ -104,3 +104,10 @@ OPEN_CATALOG_WEBHOOK_KEY=changeme WAYBACK_MACHINE_ACCESS_KEY=changeme WAYBACK_MACHINE_SECRET_KEY=changeme ENABLE_WAYBACK_TASKS=false + +# Video transcoding settings +VIDEO_TRANSCODING_STATUS_UPDATE_FREQUENCY=30 +TRANSCODE_RESULT_TEMPLATE=./test_videos_webhook/cloudwatch_sns_complete.json +TRANSCODE_ERROR_TEMPLATE=./test_videos_webhook/cloudwatch_sns_error.json +VIDEO_S3_TRANSCODE_BUCKET=changeme +POST_TRANSCODE_ACTIONS=videos.api.update_video_job diff --git a/.secrets.baseline b/.secrets.baseline index e722f470f..73a2ab997 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -113,9 +113,9 @@ "filename": "README.md", "hashed_secret": "be4fc4886bd949b369d5e092eb87494f12e57e5b", "is_verified": false, - "line_number": 247 + "line_number": 262 } ] }, - "generated_at": "2024-09-04T01:40:31Z" + "generated_at": "2025-06-23T07:34:17Z" } diff --git a/README.md b/README.md index ae6220d1c..acc078876 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,21 @@ OCW Studio manages deployments for OCW courses. +## Recent Updates + +### Video Transcoding Enhancements (December 2024) + +Recent improvements to the video transcoding system include: + +- **Enhanced error handling** in video processing and transcoding workflows +- **Local testing support** for video transcoding with mock AWS MediaConvert callbacks +- **Improved video job status tracking** with comprehensive unit test coverage +- **New API functions** for MediaConvert job management (`get_media_convert_job`, `prepare_job_results`) +- **Automated transcoding status updates** via periodic Celery tasks for development environments +- **Template-based result mocking** for testing transcoding workflows without AWS dependencies + +For detailed configuration, see the [Enabling AWS MediaConvert transcoding](#enabling-aws-mediaconvert-transcoding) section. + **SECTIONS** - [ocw_studio](#ocw_studio) @@ -480,14 +495,29 @@ AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_STORAGE_BUCKET_NAME VIDEO_S3_TRANSCODE_ENDPOINT +VIDEO_S3_TRANSCODE_PREFIX +VIDEO_S3_TRANSCODE_BUCKET AWS_ROLE_NAME DRIVE_SHARED_ID DRIVE_SERVICE_ACCOUNT_CREDS API_BEARER_TOKEN +POST_TRANSCODE_ACTIONS ``` This will allow for videos to be submitted for transcoding to the AWS MediaConvert service. This is done automatically once a video has been synced to Studio from Google Drive. +## Local Development and Testing + +For local development and testing, additional environment variables can be configured to mock transcoding behavior: + +``` +VIDEO_TRANSCODING_STATUS_UPDATE_FREQUENCY=30 +TRANSCODE_RESULT_TEMPLATE=./test_videos_webhook/cloudwatch_sns_complete.json +TRANSCODE_ERROR_TEMPLATE=./test_videos_webhook/cloudwatch_sns_error.json +``` + +These settings enable a periodic task that simulates AWS MediaConvert callbacks for testing video transcoding workflows locally without requiring actual AWS MediaConvert services. + # Enabling 3Play integration The following environment variables need to be defined in your .env file (for a pre-configured 3Play account): diff --git a/app.json b/app.json index 0188f097e..a59b37516 100644 --- a/app.json +++ b/app.json @@ -751,6 +751,26 @@ "YT_UPLOAD_LIMIT": { "description": "Max Youtube uploads allowed per day", "required": false + }, + "VIDEO_TRANSCODING_STATUS_UPDATE_FREQUENCY": { + "description": "Frequency in seconds for checking transcoding video statuses (dev only)", + "required": false + }, + "TRANSCODE_RESULT_TEMPLATE": { + "description": "Template file for mock transcoding results (dev only)", + "required": false + }, + "TRANSCODE_ERROR_TEMPLATE": { + "description": "Template file for mock transcoding error results (dev only)", + "required": false + }, + "VIDEO_S3_TRANSCODE_BUCKET": { + "description": "S3 bucket name for MediaConvert transcoded videos", + "required": false + }, + "POST_TRANSCODE_ACTIONS": { + "description": "Python function path for post-transcoding actions", + "required": false } }, "keywords": ["Django", "Python", "MIT", "Office of Digital Learning"], diff --git a/main/settings.py b/main/settings.py index d09e0f962..ab9225278 100644 --- a/main/settings.py +++ b/main/settings.py @@ -553,6 +553,29 @@ default=50, description="Max Youtube uploads allowed per day", ) + +# Transcoding settings for local testing +VIDEO_TRANSCODING_STATUS_UPDATE_FREQUENCY = get_int( + name="VIDEO_TRANSCODING_STATUS_UPDATE_FREQUENCY", + default=30, + dev_only=True, + description="Frequency in seconds for checking transcoding video statuses", +) + +TRANSCODE_RESULT_TEMPLATE = get_string( + name="TRANSCODE_RESULT_TEMPLATE", + default="./test_videos_webhook/cloudwatch_sns_complete.json", + dev_only=True, + description="Template file for mock transcoding results", +) + +TRANSCODE_ERROR_TEMPLATE = get_string( + name="TRANSCODE_ERROR_TEMPLATE", + default="./test_videos_webhook/cloudwatch_sns_error.json", + dev_only=True, + description="Template file for mock transcoding error results", +) + # OCW metadata fields FIELD_RESOURCETYPE = get_string( name="FIELD_RESOURCETYPE", @@ -764,6 +787,12 @@ "schedule": CHECK_EXTERNAL_RESOURCE_STATUS_FREQUENCY, } +if ENVIRONMENT.lower() == "staging": + CELERY_BEAT_SCHEDULE["update-video-transcoding-statuses"] = { + "task": "videos.tasks.update_video_transcoding_statuses", + "schedule": VIDEO_TRANSCODING_STATUS_UPDATE_FREQUENCY, + } + # django cache back-ends CACHES = { "default": { @@ -1307,7 +1336,7 @@ name="PUBLISH_POSTHOG_FEATURE_FLAG_REQUEST_TIMEOUT_MS", default=3000, description=( - "Timeout (ms) for PostHog feature flag requests, " "published to pipelines" + "Timeout (ms) for PostHog feature flag requests, published to pipelines" ), required=False, ) diff --git a/test_videos_webhook/cloudwatch_sns_complete.json b/test_videos_webhook/cloudwatch_sns_complete.json index cd8e09e24..a0d44c15e 100644 --- a/test_videos_webhook/cloudwatch_sns_complete.json +++ b/test_videos_webhook/cloudwatch_sns_complete.json @@ -3,17 +3,17 @@ "id": "c120fe11-87db-c292-b3e5-1cc90740f6e1", "detail-type": "MediaConvert Job State Change", "source": "aws.mediaconvert", - "account": "AWS_ACCOUNT_ID", + "account": "", "time": "2021-08-05T16:52:33Z", - "region": "{aws_region}", + "region": "", "resources": [ - "arn:aws:mediaconvert:AWS_REGION:AWS_ACCOUNT_ID:jobs/26235173873033-qav1eq" + "arn:aws:mediaconvert:::jobs/" ], "detail": { "timestamp": 1628172900136, - "accountId": "AWS_ACCOUNT_ID", - "queue": "arn:aws:mediaconvert:AWS_REGION:AWS_ACCOUNT_ID:queues/Default", - "jobId": "VIDEO_JOB_ID", + "accountId": "", + "queue": "arn:aws:mediaconvert:::queues/", + "jobId": "", "status": "COMPLETE", "userMetadata": {}, "outputGroupDetails": [ @@ -21,7 +21,7 @@ "outputDetails": [ { "outputFilePaths": [ - "s3://AWS_BUCKET/TRANSCODE_PREFIX/SHORT_ID/DRIVE_FILE_ID/testvid_youtube.mp4" + "s3://////_youtube.mp4" ], "durationInMs": 132033, "videoDetails": { @@ -31,7 +31,7 @@ }, { "outputFilePaths": [ - "s3://AWS_BUCKET/TRANSCODE_PREFIX/SHORT_ID/DRIVE_FILE_ID/testvid_360p_16_9.mp4" + "s3://////_360p_16_9.mp4" ], "durationInMs": 132033, "videoDetails": { @@ -41,7 +41,7 @@ }, { "outputFilePaths": [ - "s3://AWS_BUCKET/TRANSCODE_PREFIX/SHORT_ID/DRIVE_FILE_ID/testvid_360p_4_3.mp4" + "s3://////_360p_4_3.mp4" ], "durationInMs": 132033, "videoDetails": { diff --git a/test_videos_webhook/cloudwatch_sns_error.json b/test_videos_webhook/cloudwatch_sns_error.json index ef069532d..ab020e20f 100644 --- a/test_videos_webhook/cloudwatch_sns_error.json +++ b/test_videos_webhook/cloudwatch_sns_error.json @@ -3,17 +3,17 @@ "id": "c8879bt5-730e-6a80-3340-80712099846e", "detail-type": "MediaConvert Job State Change", "source": "aws.mediaconvert", - "account": "AWS_ACCOUNT_ID", + "account": "", "time": "2021-08-05T19:15:37Z", - "region": "AWS_REGION", + "region": "", "resources": [ - "arn:aws:mediaconvert:AWS_REGION:AWS_ACCOUNT_ID:jobs/VIDEO_JOB_ID" + "arn:aws:mediaconvert:::jobs/" ], "detail": { "timestamp": 1628190937233, - "accountId": "919801701561", - "queue": "arn:aws:mediaconvert:AWS_REGION:AWS_ACCOUNT_ID:queues/Default", - "jobId": "VIDEO_JOB_ID", + "accountId": "", + "queue": "arn:aws:mediaconvert:::queues/", + "jobId": "", "status": "ERROR", "errorCode": 1030, "errorMessage": "Video codec [indeo4] is not a supported input video codec", diff --git a/videos/README.md b/videos/README.md index 6be8a5881..aa692ed69 100644 --- a/videos/README.md +++ b/videos/README.md @@ -14,7 +14,7 @@ This document describes the components of the video workflow for OCW. # Overview -This assumes that [Google Drive sync](/README.md#enabling-google-drive-integration), [YouTube integration](/README.md#enabling-youtube-integration), [AWS MediaConvert](/README.md#enabling-aws-transcoding), and [3Play submission](/README.md#enabling-3play-integration) are all enabled, which is required for the video workflow. +This assumes that [Google Drive sync](/README.md#enabling-google-drive-integration), [YouTube integration](/README.md#enabling-youtube-integration), [AWS MediaConvert](/README.md#enabling-aws-mediaconvert-transcoding), and [3Play submission](/README.md#enabling-3play-integration) are all enabled, which is required for the video workflow. The high-level description of the process is below, and each subsequent section contains additional details, including links to the relevant code. @@ -22,7 +22,8 @@ The high-level description of the process is below, and each subsequent section - Upload a video with the name `.` to the `videos_final` folder on Google Drive, where `` is a valid video extension, such as `mp4`. If there are pre-existing captions that should be uploaded with the video (as opposed to requesting captions/transcript from 3Play), then these should be named _exactly_ `_captions.vtt` and `_transcript.pdf`, and uploaded into the `files_final` folder on Google Drive. - Sync using the Studio UI. This uploads the video to S3. - As soon as the upload to S3 is complete, Studio initiates a celery task to submit the video to the AWS Media Convert service. -- Once trancoding is complete, the video is uploaded to YouTube (set as unlisted prior to the course being published). +- The enhanced transcoding system monitors job progress with automatic status updates and comprehensive error handling. +- Once transcoding is complete, the video is uploaded to YouTube (set as unlisted prior to the course being published). - After the video has been successfully uploaded to YouTube, and if there are no pre-existing captions, Studio sends a transcript request to 3Play. - Once 3Play completes the transcript job, the captions (`.vtt` format) and transcript (`.pdf` format) are fetched and associated with the video. - On any publish action, the video metadata and YouTube metadata are updated, assuming the information has been received from the external services. @@ -34,7 +35,17 @@ Users upload videos in a valid video format to the `videos_final` folder. Whethe The parameters of the AWS transcode request are defined through the AWS interface, and the role is defined [here](https://github.com/mitodl/ol-infrastructure/blob/main/src/ol_infrastructure/applications/ocw_studio/__main__.py). Some example JSONs used for triggering MediaConvert job are in [this folder](/test_videos_webhook/). -The [`TranscodeJobView` endpoint](/videos/views.py) listens for the webhook that is sent when the transcoding job is complete. +## Enhanced Transcoding Features + +The transcoding system has been enhanced with the following features: + +- **Enhanced error handling** in video processing workflows with comprehensive logging +- **Local testing support** for transcoding workflows using mock AWS MediaConvert callbacks +- **Automated status updates** via the [`update_video_transcoding_statuses`](/videos/tasks.py) Celery task +- **Template-based result processing** using [`prepare_job_results`](/videos/api.py) for flexible response handling +- **MediaConvert job management** via [`get_media_convert_job`](/videos/api.py) for real-time job status checking + +The [`TranscodeJobView` endpoint](/videos/views.py) listens for the webhook that is sent when the transcoding job is complete. For local development, the system can simulate these webhooks using template files and periodic status updates. # YouTube Submission @@ -59,6 +70,8 @@ In cases where something may have gone wrong with the data, often due to legacy # Testing PRs with Transcoding +## Production-like Testing + Before working on, testing, or reviewing any PR that requires a video to be uploaded to YouTube, make sure that AWS buckets (instead of local Minio storage) are being used for testing. To do that, set `OCW_STUDIO_ENVIRONMENT` to any value other than `dev`. Set the following variables to the same values as for RC: @@ -74,13 +87,44 @@ DRIVE_SERVICE_ACCOUNT_CREDS DRIVE_SHARED_ID VIDEO_S3_TRANSCODE_ENDPOINT VIDEO_S3_TRANSCODE_PREFIX +VIDEO_S3_TRANSCODE_BUCKET ``` Upload the video to the course's Google Drive folder, as described in the [Google Drive Sync and AWS Transcoding](#google-drive-sync-and-aws-transcoding) section above. Wait for the video transcoding job to complete, which requires an amount of time proportional to the length of the video; for a very short video, this should only take a few minutes. -Next, the response to the transcode request needs to be simulated. This is because the AWS MediaConvert service will not send a webhook notification to the local OCW Studio instance, but rather to the RC URL. +## Local Development Testing + +For local development and testing without AWS dependencies, you can use the enhanced mock transcoding system: + +### Configuration + +Add these environment variables to your `.env` file: + +``` +VIDEO_TRANSCODING_STATUS_UPDATE_FREQUENCY=30 +TRANSCODE_RESULT_TEMPLATE=./test_videos_webhook/cloudwatch_sns_complete.json +TRANSCODE_ERROR_TEMPLATE=./test_videos_webhook/cloudwatch_sns_error.json +POST_TRANSCODE_ACTIONS=videos.api.update_video_job +``` + +### Testing Workflow + +1. **Upload Video**: Upload a video to the course's Google Drive folder and sync it through the Studio UI +2. **Automatic Processing**: The system will automatically: -To simulate the response, use cURL, Postman, or an equivalent tool to POST a message to `https://localhost:8043/api/transcode-jobs/`, with the body as in the example below, updated to match the relevant environment variables, course name, and video name. + - Create a `VideoJob` with a mock job ID + - Start periodic status checking via the `update_video_transcoding_statuses` task + - Use template files to simulate AWS MediaConvert responses + - Process results using the enhanced `prepare_job_results` function + +3. **Monitor Progress**: Check the Django admin interface to see: + - `VideoJob` status updates + - `VideoFile` objects created from mock transcoding results + - Comprehensive error logging if issues occur + +### Manual Testing (Legacy Method) + +If you need to manually simulate transcoding responses, use cURL, Postman, or an equivalent tool to POST a message to `https://localhost:8043/api/transcode-jobs/`, with the body as in the example below, updated to match the relevant environment variables, course name, and video name. ```json { @@ -148,3 +192,5 @@ making sure to set the values in `<>`. In particular, set The `DriveFile` will be the one associated with the video: http://localhost:8043/admin/gdrive_sync/drivefile/. If this completes successfully, the `VideoJob` status in Django admin should be `COMPLETE`, and there should now be three new `VideoFile` objects populated with `status`, `destination`, and `s3_key` fields. + +**Note**: The enhanced transcoding system now uses the `prepare_job_results` function to process these responses, which supports template variables like ``, ``, ``, and various AWS settings, making manual testing more flexible and realistic. diff --git a/videos/api.py b/videos/api.py index 62fc98cea..d8e8ac4f9 100644 --- a/videos/api.py +++ b/videos/api.py @@ -1,8 +1,10 @@ """APi functions for video processing""" +import json import logging import os +import boto3 import botocore from django.conf import settings from mitol.transcoding.api import media_convert_job @@ -67,8 +69,11 @@ def create_media_convert_job(video: Video): video.save() -def process_video_outputs(video: Video, output_group_details: dict): +def process_video_outputs(video: Video, output_group_details: list): """Create video model objects for each output""" + if not output_group_details: + return + for group_detail in output_group_details: for output_detail in group_detail.get("outputDetails", []): for path in output_detail.get("outputFilePaths", []): @@ -96,15 +101,16 @@ def update_video_job(results: dict): video_job = VideoJob.objects.get(job_id=results.get("jobId")) video_job.job_output = results - status = results.get("status") + status = results.get("status", "") video = video_job.video - if status == "COMPLETE": + if status.lower() == VideoJobStatus.COMPLETE.lower(): video_job.status = VideoJobStatus.COMPLETE try: - process_video_outputs(video, results.get("outputGroupDetails")) + output_group_details = results.get("outputGroupDetails", []) + process_video_outputs(video, output_group_details) except: # pylint:disable=bare-except # noqa: E722 log.exception("Error processing video outputs for job %s", video_job.job_id) - elif status == "ERROR": + elif status.lower() == VideoJobStatus.ERROR.lower(): video.status = VideoStatus.FAILED video_job.status = VideoJobStatus.FAILED log.error( @@ -117,3 +123,68 @@ def update_video_job(results: dict): video_job.error_message = results.get("errorMessage") video_job.save() video.save() + + +def get_media_convert_job(job_id: str) -> dict: + """ + Get the MediaConvert job details. + Args: + job_id (str): The MediaConvert job ID. + Returns: + dict: The MediaConvert job details. + """ + client = boto3.client( + "mediaconvert", + region_name=settings.AWS_REGION, + aws_access_key_id=settings.AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, + endpoint_url=settings.VIDEO_S3_TRANSCODE_ENDPOINT, + ) + + return client.get_job(Id=job_id) + + +def prepare_job_results(video: Video, job: VideoJob, results: str) -> dict: + """ + Prepare the results from the MediaConvert job. + Args: + results (str): The MediaConvert job results. + Returns: + dict: The prepared results. + """ + # Load the results from the JSON file + + results = results.replace("", job.job_id).replace( + "", video.website.short_id + ) + + for key in [ + "AWS_ACCOUNT_ID", + "AWS_REGION", + "VIDEO_TRANSCODE_QUEUE", + "VIDEO_S3_TRANSCODE_BUCKET", + "VIDEO_S3_TRANSCODE_PREFIX", + ]: + results = results.replace(f"<{key}>", getattr(settings, key, "")) + + # Extract drive file ID from source_key structure + drive_file_id = "" + min_source_key_parts = 3 + try: + source_key_parts = video.source_key.split("/") + if len(source_key_parts) >= min_source_key_parts: + drive_file_id = source_key_parts[-2] # Second to last part + except (AttributeError, IndexError): + pass + + results = results.replace("", drive_file_id).replace( + "", "video" + ) + + # Decode the JSON string + try: + return json.loads(results) + + except json.JSONDecodeError: + log.exception("Failed to decode MediaConvert job results") + return {} diff --git a/videos/api_test.py b/videos/api_test.py index 90ea00b7f..8e859df00 100644 --- a/videos/api_test.py +++ b/videos/api_test.py @@ -4,10 +4,13 @@ from os import path import pytest +from botocore.exceptions import ClientError from gdrive_sync.factories import DriveFileFactory from videos.api import ( create_media_convert_job, + get_media_convert_job, + prepare_job_results, prepare_video_download_file, process_video_outputs, update_video_job, @@ -158,3 +161,381 @@ def test_update_video_job_error(mocker): assert video_job.status == VideoJobStatus.FAILED assert video_job.video.status == VideoStatus.FAILED mock_log.assert_called_once() + + +def test_update_video_job_unknown_status(mocker): + """The video job should handle unknown status gracefully""" + video_job = VideoJobFactory.create(status=VideoJobStatus.CREATED) + mock_job = mocker.patch("videos.api.VideoJob.objects.get") + mock_job.return_value = video_job + + # Create mock data with unknown status + data = { + "jobId": video_job.job_id, + "status": "UNKNOWN_STATUS", + "outputGroupDetails": [], + } + + update_video_job(data) + video_job.refresh_from_db() + + # Should update job_output but not change status since it's unknown + assert video_job.job_output == data + assert video_job.status == VideoJobStatus.CREATED # Should remain unchanged + + +def test_update_video_job_missing_output_group_details(mocker): + """The video job should handle missing outputGroupDetails gracefully""" + video_job = VideoJobFactory.create(status=VideoJobStatus.CREATED) + mock_job = mocker.patch("videos.api.VideoJob.objects.get") + mock_job.return_value = video_job + + # Create mock data without outputGroupDetails + data = {"jobId": video_job.job_id, "status": "COMPLETE"} + + mock_process_outputs = mocker.patch("videos.api.process_video_outputs") + + update_video_job(data) + video_job.refresh_from_db() + + # Should still call process_video_outputs even with empty outputGroupDetails + mock_process_outputs.assert_called_once_with(video_job.video, []) + assert video_job.status == VideoJobStatus.COMPLETE + + +def test_update_video_job_empty_error_details(mocker): + """The video job should handle missing error details in error status""" + video_job = VideoJobFactory.create(status=VideoJobStatus.CREATED) + mock_job = mocker.patch("videos.api.VideoJob.objects.get") + mock_job.return_value = video_job + mock_log = mocker.patch("videos.api.log.error") + + # Create mock data with error status but no error details + data = {"jobId": video_job.job_id, "status": "ERROR"} + + update_video_job(data) + video_job.refresh_from_db() + + assert video_job.status == VideoJobStatus.FAILED + assert video_job.video.status == VideoStatus.FAILED + assert video_job.error_code == "None" # str(None) + assert video_job.error_message is None + mock_log.assert_called_once() + + +def test_update_video_job_case_insensitive_status(mocker): + """The video job should handle case-insensitive status matching""" + video_job = VideoJobFactory.create(status=VideoJobStatus.CREATED) + mock_job = mocker.patch("videos.api.VideoJob.objects.get") + mock_job.return_value = video_job + + test_cases = ["complete", "COMPLETE", "Complete", "error", "ERROR", "Error"] + + for status in test_cases: + video_job.status = VideoJobStatus.CREATED + video_job.save() + + data = {"jobId": video_job.job_id, "status": status, "outputGroupDetails": []} + + update_video_job(data) + video_job.refresh_from_db() + + if status.lower() == "complete": + assert video_job.status == VideoJobStatus.COMPLETE + elif status.lower() == "error": + assert video_job.status == VideoJobStatus.FAILED + assert video_job.video.status == VideoStatus.FAILED + + +def test_get_media_convert_job(settings, mocker): + """get_media_convert_job should return MediaConvert job details""" + job_id = "test_job_id" + mock_boto = mocker.patch("videos.api.boto3") + mock_job_data = { + "Job": { + "Id": job_id, + "Status": "COMPLETE", + "Settings": {}, + } + } + mock_boto.client.return_value.get_job.return_value = mock_job_data + + result = get_media_convert_job(job_id) + + mock_boto.client.assert_called_once_with( + "mediaconvert", + region_name=settings.AWS_REGION, + aws_access_key_id=settings.AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, + endpoint_url=settings.VIDEO_S3_TRANSCODE_ENDPOINT, + ) + mock_boto.client.return_value.get_job.assert_called_once_with(Id=job_id) + assert result == mock_job_data + + +def test_get_media_convert_job_client_error(settings, mocker): + """get_media_convert_job should handle AWS client errors gracefully""" + job_id = "test_job_id" + mock_boto = mocker.patch("videos.api.boto3") + mock_boto.client.return_value.get_job.side_effect = ClientError( + {"Error": {"Code": "JobNotFound", "Message": "Job not found"}}, "GetJob" + ) + + with pytest.raises(ClientError): + get_media_convert_job(job_id) + + mock_boto.client.assert_called_once_with( + "mediaconvert", + region_name=settings.AWS_REGION, + aws_access_key_id=settings.AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, + endpoint_url=settings.VIDEO_S3_TRANSCODE_ENDPOINT, + ) + + +@pytest.mark.parametrize( + "missing_setting", + [ + "AWS_REGION", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "VIDEO_S3_TRANSCODE_ENDPOINT", + ], +) +def test_get_media_convert_job_missing_settings(settings, mocker, missing_setting): + """get_media_convert_job should handle missing AWS settings""" + job_id = "test_job_id" + + # Remove the setting + delattr(settings, missing_setting) + + mock_boto = mocker.patch("videos.api.boto3") + + # This should still work as boto3 client handles missing values gracefully + # by using None or defaults + get_media_convert_job(job_id) + + mock_boto.client.assert_called_once() + + +def test_prepare_job_results(settings): + """prepare_job_results should replace template placeholders with actual values""" + video = VideoFactory.create() + # Set a proper source_key format for testing DRIVE_FILE_ID extraction + video.source_key = "gdrive_uploads/test_short_id/test_drive_file_id/test_video.mp4" + video.save() + video_job = VideoJobFactory.create(video=video) + + # Set up required settings + settings.AWS_ACCOUNT_ID = "123456789" + settings.AWS_REGION = "us-east-1" + settings.VIDEO_TRANSCODE_QUEUE = "test-queue" + settings.VIDEO_S3_TRANSCODE_BUCKET = "test-bucket" + settings.VIDEO_S3_TRANSCODE_PREFIX = "test-prefix" + + template_results = """ + { + "jobId": "", + "status": "COMPLETE", + "accountId": "", + "region": "", + "queue": "", + "bucket": "", + "prefix": "", + "shortId": "", + "driveFileId": "", + "videoName": "" + } + """ + + result = prepare_job_results(video, video_job, template_results) + + assert result["jobId"] == video_job.job_id + assert result["accountId"] == settings.AWS_ACCOUNT_ID + assert result["region"] == settings.AWS_REGION + assert result["queue"] == settings.VIDEO_TRANSCODE_QUEUE + assert result["bucket"] == settings.VIDEO_S3_TRANSCODE_BUCKET + assert result["prefix"] == settings.VIDEO_S3_TRANSCODE_PREFIX + assert result["shortId"] == video.website.short_id + assert result["driveFileId"] == "test_drive_file_id" + assert result["videoName"] == "video" + + +def test_prepare_job_results_invalid_json(mocker): + """prepare_job_results should handle invalid JSON gracefully""" + mock_log = mocker.patch("videos.api.log.exception") + video = VideoFactory.create() + video_job = VideoJobFactory.create(video=video) + + invalid_json = "{ invalid json }" + + result = prepare_job_results(video, video_job, invalid_json) + + assert result == {} + mock_log.assert_called_once_with("Failed to decode MediaConvert job results") + + +def test_prepare_job_results_missing_settings(settings): + """prepare_job_results should handle missing settings gracefully""" + video = VideoFactory.create() + video_job = VideoJobFactory.create(video=video) + + # Remove some settings + delattr(settings, "AWS_ACCOUNT_ID") + delattr(settings, "VIDEO_TRANSCODE_QUEUE") + + template_results = """ + { + "jobId": "", + "accountId": "", + "queue": "" + } + """ + + result = prepare_job_results(video, video_job, template_results) + + assert result["jobId"] == video_job.job_id + # Missing settings should be replaced with empty strings + assert result["accountId"] == "" + assert result["queue"] == "" + + +def test_prepare_job_results_empty_template(): + """prepare_job_results should handle empty template gracefully""" + video = VideoFactory.create() + video_job = VideoJobFactory.create(video=video) + + empty_template = "" + result = prepare_job_results(video, video_job, empty_template) + + assert result == {} + + +def test_prepare_job_results_malformed_json(): + """prepare_job_results should handle malformed JSON beyond just invalid syntax""" + video = VideoFactory.create() + video_job = VideoJobFactory.create(video=video) + + # JSON with unmatched braces after template replacement + malformed_template = '{"jobId": "", "data": {' + + result = prepare_job_results(video, video_job, malformed_template) + + assert result == {} + + +@pytest.mark.parametrize( + "template_placeholders", + [ + ("", "job_id_value"), + ("", "short_id_value"), + ("", "drive_file_id_value"), + ("", "video"), + ], +) +def test_prepare_job_results_individual_placeholders(settings, template_placeholders): + """Test that individual placeholders are correctly replaced""" + video = VideoFactory.create() + video.website.short_id = "short_id_value" + video.website.save() + # Set a proper source_key format for testing DRIVE_FILE_ID extraction + video.source_key = ( + "gdrive_uploads/short_id_value/drive_file_id_value/test_video.mp4" + ) + video.save() + video_job = VideoJobFactory.create(video=video, job_id="job_id_value") + + placeholder, expected_value = template_placeholders + template = f'{{"test": "{placeholder}"}}' + + result = prepare_job_results(video, video_job, template) + + if placeholder == "": + # Extract drive file ID from source_key (second to last part) + try: + source_key_parts = video.source_key.split("/") + expected_value = source_key_parts[-2] if len(source_key_parts) >= 3 else "" + except (AttributeError, IndexError): + expected_value = "" + elif placeholder == "": + expected_value = video.website.short_id + elif placeholder == "": + expected_value = video_job.job_id + + assert result["test"] == expected_value + + +def test_process_video_outputs_empty_output_group(mocker): + """process_video_outputs should handle empty output group gracefully""" + mock_prepare_download = mocker.patch("videos.api.prepare_video_download_file") + video = VideoFactory.create() + + # Test with empty output group details list + process_video_outputs(video, []) + + # Should not call prepare_video_download_file when output group is empty + mock_prepare_download.assert_not_called() + assert video.videofiles.count() == 0 + + +def test_process_video_outputs_malformed_paths(mocker): + """process_video_outputs should handle malformed S3 paths gracefully""" + mock_prepare_download = mocker.patch("videos.api.prepare_video_download_file") + video = VideoFactory.create() + + # Test with malformed S3 paths + malformed_outputs = [ + { + "outputDetails": [ + { + "outputFilePaths": [ + "invalid-path-without-s3-prefix", + "s3://", # Empty path after s3:// + "s3://bucket/", # Path with only bucket + ] + } + ] + } + ] + + process_video_outputs(video, malformed_outputs) + + # Should create at least one VideoFile object despite malformed paths + # Due to unique constraint on s3_key, multiple empty/same keys result in only one object + assert video.videofiles.count() >= 1 + mock_prepare_download.assert_called_once_with(video) + + +@pytest.mark.parametrize("file_extension", [".mp4", ".mov", ".avi", ".webm", ""]) +def test_process_video_outputs_various_extensions(mocker, file_extension): + """process_video_outputs should handle various file extensions""" + mock_prepare_download = mocker.patch("videos.api.prepare_video_download_file") + video = VideoFactory.create() + + outputs = [ + { + "outputDetails": [ + { + "outputFilePaths": [ + f"s3://bucket/path/video_youtube{file_extension}", + f"s3://bucket/path/video_archive{file_extension}", + ] + } + ] + } + ] + + process_video_outputs(video, outputs) + + assert video.videofiles.count() == 2 + + # Check destinations are correctly assigned based on filename + youtube_files = video.videofiles.filter(destination=DESTINATION_YOUTUBE) + archive_files = video.videofiles.filter(destination=DESTINATION_ARCHIVE) + + assert youtube_files.count() == 1 + assert archive_files.count() == 1 + assert "youtube" in youtube_files.first().s3_key + assert "archive" in archive_files.first().s3_key + + mock_prepare_download.assert_called_once_with(video) diff --git a/videos/constants.py b/videos/constants.py index 801c89e5b..7afa8cc76 100644 --- a/videos/constants.py +++ b/videos/constants.py @@ -33,6 +33,7 @@ class VideoJobStatus: CREATED = STATUS_CREATED FAILED = STATUS_FAILED COMPLETE = STATUS_COMPLETE + ERROR = "Error" class VideoFileStatus: diff --git a/videos/tasks.py b/videos/tasks.py index 223782b5a..1baaf5e6f 100644 --- a/videos/tasks.py +++ b/videos/tasks.py @@ -1,12 +1,15 @@ """Video tasks""" import logging +from pathlib import Path from urllib.parse import urljoin import celery +from botocore.exceptions import ClientError from django.conf import settings from django.db import transaction from django.db.models import Q +from django.utils.module_loading import import_string from googleapiclient.errors import HttpError from mitol.common.utils import now_in_utc from mitol.mail.api import get_message_sender @@ -32,14 +35,16 @@ from main.constants import STATUS_CREATED from main.s3_utils import get_boto3_resource from videos import threeplay_api +from videos.api import get_media_convert_job, prepare_job_results from videos.constants import ( DESTINATION_YOUTUBE, YT_THUMBNAIL_IMG, VideoFileStatus, + VideoJobStatus, VideoStatus, YouTubeStatus, ) -from videos.models import Video, VideoFile +from videos.models import Video, VideoFile, VideoJob from videos.utils import create_new_content from videos.youtube import ( API_QUOTA_ERROR_MSG, @@ -134,7 +139,8 @@ def start_transcript_job(video_id: int): folder_name, youtube_id, video_resource.title ) - threeplay_file_id = response.get("data").get("id") + data = response.get("data") if response else {} + threeplay_file_id = data.get("id") if data else None if ( threeplay_file_id @@ -561,3 +567,70 @@ def copy_video_resource(source_course_id, destination_course_id, source_resource create_drivefile( new_gdrive_file, new_resource, destination_course, "videos" ) + + +@app.task +def update_video_transcoding_statuses(): + """ + Check on statuses of all transcoding videos and update their status if appropriate. + This task mocks AWS MediaConvert callbacks for local development/testing. + """ + transcoding_videos = Video.objects.filter(status=VideoStatus.TRANSCODING) + log.info("Checking status of %d transcoding videos", transcoding_videos.count()) + + for video in transcoding_videos: + log.debug("Checking video transcoding status", extra={"video_id": video.id}) + + try: + # Get the latest video job for this video + video_job = VideoJob.objects.filter(video=video).latest("created_on") + + media_convert_job = get_media_convert_job(video_job.job_id) + + results = None + # Check if the job should be marked as complete (mock logic) + # In a real scenario, this would query AWS MediaConvert + if ( + media_convert_job["Job"]["Status"].lower() + == VideoJobStatus.COMPLETE.lower() + ): + with Path(settings.TRANSCODE_RESULT_TEMPLATE).open( + encoding="utf-8" + ) as f: + results = prepare_job_results(video, video_job, f.read()) + + elif ( + media_convert_job["Job"]["Status"].lower() + == VideoJobStatus.ERROR.lower() + ): + with Path(settings.TRANSCODE_ERROR_TEMPLATE).open( + encoding="utf-8" + ) as f: + results = prepare_job_results(video, video_job, f.read()) + log.error( + "Transcoding failed", + extra={"video_id": video.id, "job_id": video_job.job_id}, + ) + + # Execute post-transcode actions + if results is not None: + for action in settings.POST_TRANSCODE_ACTIONS: + import_string(action)(results.get("detail", {})) + + except VideoJob.DoesNotExist: + log.exception( + "No VideoJob object exists for transcoding video", + extra={"video_id": video.id}, + ) + video.status = VideoStatus.FAILED + video.save() + except ClientError as exc: + log.exception( + "Error when checking video transcoding status", + extra={ + "video_id": video.id, + "response": str(exc) if exc.response else None, + }, + ) + video.status = VideoStatus.FAILED + video.save() diff --git a/videos/tasks_test.py b/videos/tasks_test.py index 0117adc72..a0b047d5a 100644 --- a/videos/tasks_test.py +++ b/videos/tasks_test.py @@ -7,6 +7,7 @@ import pytest import pytz +from botocore.exceptions import ClientError from django.conf import settings from googleapiclient.errors import HttpError, ResumableUploadError from moto import mock_aws @@ -36,7 +37,7 @@ VideoStatus, YouTubeStatus, ) -from videos.factories import VideoFactory, VideoFileFactory +from videos.factories import VideoFactory, VideoFileFactory, VideoJobFactory from videos.models import VideoFile from videos.tasks import ( attempt_to_update_missing_transcripts, @@ -893,3 +894,261 @@ def test_create_drivefile(mocker): "download_link": mock_gdrive_dl.get(DRIVE_FILE_DOWNLOAD_LINK), }, } + + +@pytest.mark.parametrize("job_status", ["COMPLETE", "ERROR"]) +def test_update_video_transcoding_statuses(settings, mocker, job_status): + """update_video_transcoding_statuses should check transcoding videos and update their status""" + # Set up test settings + settings.TRANSCODE_RESULT_TEMPLATE = ( + "./test_videos_webhook/cloudwatch_sns_complete.json" + ) + settings.TRANSCODE_ERROR_TEMPLATE = ( + "./test_videos_webhook/cloudwatch_sns_error.json" + ) + settings.POST_TRANSCODE_ACTIONS = "videos.api.update_video_job" + + # Create test data + video = VideoFactory.create(status=VideoStatus.TRANSCODING) + video_job = VideoJobFactory.create(video=video, job_id="test_job_id") + + # Mock dependencies + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + mock_get_job.return_value = {"Job": {"Status": job_status}} + + mock_prepare_results = mocker.patch("videos.tasks.prepare_job_results") + mock_results = {"detail": {"jobId": video_job.job_id, "status": job_status}} + mock_prepare_results.return_value = mock_results + + mock_import_string = mocker.patch("videos.tasks.import_string") + mock_post_transcode_action = mocker.Mock() + mock_import_string.return_value = mock_post_transcode_action + + mock_path_open = mocker.patch("pathlib.Path.open") + mock_path_open.return_value.__enter__.return_value.read.return_value = ( + '{"test": "data"}' + ) + + # Import the task function + from videos.tasks import update_video_transcoding_statuses + + # Execute the task + update_video_transcoding_statuses() + + # Verify calls + mock_get_job.assert_called_once_with(video_job.job_id) + mock_prepare_results.assert_called_once() + mock_post_transcode_action.assert_called_once_with(mock_results["detail"]) + + +def test_update_video_transcoding_statuses_no_video_job(mocker): + """update_video_transcoding_statuses should handle missing VideoJob gracefully""" + video = VideoFactory.create(status=VideoStatus.TRANSCODING) + + from videos.tasks import update_video_transcoding_statuses + + update_video_transcoding_statuses() + + video.refresh_from_db() + assert video.status == VideoStatus.FAILED + + +def test_update_video_transcoding_statuses_client_error(mocker): + """update_video_transcoding_statuses should handle AWS client errors gracefully""" + video = VideoFactory.create(status=VideoStatus.TRANSCODING) + VideoJobFactory.create(video=video, job_id="test_job_id") + + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + mock_get_job.side_effect = ClientError( + {"Error": {"Code": "BadRequest", "Message": "Test error"}}, "GetJob" + ) + + from videos.tasks import update_video_transcoding_statuses + + update_video_transcoding_statuses() + + video.refresh_from_db() + assert video.status == VideoStatus.FAILED + + +def test_update_video_transcoding_statuses_multiple_videos(settings, mocker): + """update_video_transcoding_statuses should handle multiple transcoding videos""" + # Set up test settings + settings.TRANSCODE_RESULT_TEMPLATE = ( + "./test_videos_webhook/cloudwatch_sns_complete.json" + ) + settings.POST_TRANSCODE_ACTIONS = "videos.api.update_video_job" + + # Create multiple test videos + videos = VideoFactory.create_batch(3, status=VideoStatus.TRANSCODING) + for i, video in enumerate(videos): + VideoJobFactory.create(video=video, job_id=f"job_{i}") + + # Mock dependencies + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + mock_get_job.return_value = {"Job": {"Status": "COMPLETE"}} + + mock_prepare_results = mocker.patch("videos.tasks.prepare_job_results") + mock_prepare_results.return_value = {"detail": {"status": "COMPLETE"}} + + mock_import_string = mocker.patch("videos.tasks.import_string") + mock_post_transcode_action = mocker.Mock() + mock_import_string.return_value = mock_post_transcode_action + + mock_path_open = mocker.patch("pathlib.Path.open") + mock_path_open.return_value.__enter__.return_value.read.return_value = ( + '{"test": "data"}' + ) + + from videos.tasks import update_video_transcoding_statuses + + update_video_transcoding_statuses() + + # Verify calls for each video + assert mock_get_job.call_count == 3 + assert mock_prepare_results.call_count == 3 + assert mock_post_transcode_action.call_count == 3 + + +def test_update_video_transcoding_statuses_mixed_statuses(settings, mocker): + """update_video_transcoding_statuses should handle videos with different job statuses""" + # Set up test settings + settings.TRANSCODE_RESULT_TEMPLATE = ( + "./test_videos_webhook/cloudwatch_sns_complete.json" + ) + settings.TRANSCODE_ERROR_TEMPLATE = ( + "./test_videos_webhook/cloudwatch_sns_error.json" + ) + settings.POST_TRANSCODE_ACTIONS = "videos.api.update_video_job" + + # Create test videos + video1 = VideoFactory.create(status=VideoStatus.TRANSCODING) + video2 = VideoFactory.create(status=VideoStatus.TRANSCODING) + VideoJobFactory.create(video=video1, job_id="complete_job") + VideoJobFactory.create(video=video2, job_id="error_job") + + # Mock dependencies + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + mock_get_job.side_effect = [ + {"Job": {"Status": "COMPLETE"}}, + {"Job": {"Status": "ERROR"}}, + ] + + mock_prepare_results = mocker.patch("videos.tasks.prepare_job_results") + mock_prepare_results.side_effect = [ + {"detail": {"status": "COMPLETE"}}, + {"detail": {"status": "ERROR"}}, + ] + + mock_import_string = mocker.patch("videos.tasks.import_string") + mock_post_transcode_action = mocker.Mock() + mock_import_string.return_value = mock_post_transcode_action + + mock_path_open = mocker.patch("pathlib.Path.open") + mock_path_open.return_value.__enter__.return_value.read.return_value = ( + '{"test": "data"}' + ) + + from videos.tasks import update_video_transcoding_statuses + + update_video_transcoding_statuses() + + # Verify both videos were processed + assert mock_get_job.call_count == 2 + assert mock_prepare_results.call_count == 2 + assert mock_post_transcode_action.call_count == 2 + + +def test_update_video_transcoding_statuses_no_transcoding_videos(mocker): + """update_video_transcoding_statuses should handle no transcoding videos gracefully""" + # Create videos with other statuses + VideoFactory.create(status=VideoStatus.COMPLETE) + VideoFactory.create(status=VideoStatus.FAILED) + + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + + from videos.tasks import update_video_transcoding_statuses + + update_video_transcoding_statuses() + + # No jobs should be checked + mock_get_job.assert_not_called() + + +def test_update_video_transcoding_statuses_invalid_post_action(settings, mocker): + """update_video_transcoding_statuses should handle invalid post-transcode action""" + # Set up test settings + settings.TRANSCODE_RESULT_TEMPLATE = ( + "./test_videos_webhook/cloudwatch_sns_complete.json" + ) + settings.POST_TRANSCODE_ACTIONS = "non.existent.function" + + video = VideoFactory.create(status=VideoStatus.TRANSCODING) + VideoJobFactory.create(video=video, job_id="test_job_id") + + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + mock_get_job.return_value = {"Job": {"Status": "COMPLETE"}} + + mock_prepare_results = mocker.patch("videos.tasks.prepare_job_results") + mock_prepare_results.return_value = {"detail": {"status": "COMPLETE"}} + + mock_import_string = mocker.patch("videos.tasks.import_string") + mock_import_string.side_effect = ImportError("Module not found") + + mock_path_open = mocker.patch("pathlib.Path.open") + mock_path_open.return_value.__enter__.return_value.read.return_value = ( + '{"test": "data"}' + ) + + from videos.tasks import update_video_transcoding_statuses + + # This should not raise an exception even if the import fails + with pytest.raises(ImportError): + update_video_transcoding_statuses() + + +def test_update_video_transcoding_statuses_file_not_found(settings, mocker): + """update_video_transcoding_statuses should handle missing template files""" + # Set up test settings with non-existent template files + settings.TRANSCODE_RESULT_TEMPLATE = "./non_existent_file.json" + settings.POST_TRANSCODE_ACTIONS = "videos.api.update_video_job" + + video = VideoFactory.create(status=VideoStatus.TRANSCODING) + VideoJobFactory.create(video=video, job_id="test_job_id") + + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + mock_get_job.return_value = {"Job": {"Status": "COMPLETE"}} + + mock_path_open = mocker.patch("pathlib.Path.open") + mock_path_open.side_effect = FileNotFoundError("File not found") + + from videos.tasks import update_video_transcoding_statuses + + # This should not raise an exception even if the template file is missing + with pytest.raises(FileNotFoundError): + update_video_transcoding_statuses() + + +@pytest.mark.parametrize("job_status", ["PROCESSING", "SUBMITTED", "PROGRESSING"]) +def test_update_video_transcoding_statuses_other_statuses(settings, mocker, job_status): + """update_video_transcoding_statuses should handle other MediaConvert job statuses""" + # Set up test settings + settings.POST_TRANSCODE_ACTIONS = "videos.api.update_video_job" + + video = VideoFactory.create(status=VideoStatus.TRANSCODING) + VideoJobFactory.create(video=video, job_id="test_job_id") + + mock_get_job = mocker.patch("videos.tasks.get_media_convert_job") + mock_get_job.return_value = {"Job": {"Status": job_status}} + + mock_prepare_results = mocker.patch("videos.tasks.prepare_job_results") + mock_import_string = mocker.patch("videos.tasks.import_string") + + from videos.tasks import update_video_transcoding_statuses + + update_video_transcoding_statuses() + + # Should check the job but not prepare results or call post-actions for intermediate statuses + mock_get_job.assert_called_once() + mock_prepare_results.assert_not_called() + mock_import_string.assert_not_called()