Skip to content

Direct Video Upload

Direct Video Uploads work underneath by using pre-signed S3 Multi Part Uploads. There are several steps for the process:

  1. Start a multi-part upload
  2. Split the video input into parts, for each part:
    1. Get the URL to directly upload the part to our S3 bucket
    2. Upload the part directly to the bucket
    3. Save the parts_list in a variable of type [(PartNumber, ETag)]
  3. Complete the upload by sending the parts_list.
  4. Optional: List parts
  5. Optional: Abort the upload

Below we explain each step in detail and in the end provide a working python script that works against the live api.

The same implementation is used in our frontend to upload large videos.

1. Start direct upload

See api-reference ⧉ for full reference.

2. Get part url

See api-reference ⧉for full reference.

3. Complete upload

See api-reference ⧉ for full reference.

4. Optional: List Parts

See api-reference ⧉ for full reference.

5. Optional: Abort Upload

See api-reference ⧉ for full reference.

Python implementation

Below is an implementation in Python that is used in our integration tests before deploying to production:

# /// script
# requires-python = ">=3.12"
# dependencies = [
#     "requests==2.31.0",
#     "pydantic-settings==2.4.0",
#     "sqlalchemy>=2",
# ]
# ///
import dataclasses
from collections.abc import Generator, Iterator
from datetime import datetime
from functools import cached_property
from pathlib import Path
from typing import Annotated, Any, Optional, TypedDict

import pytz
import requests
import sqlalchemy as sa
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, FilePath, PlainSerializer
from pydantic_settings import BaseSettings, SettingsConfigDict
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import DeclarativeBase, Mapped, MappedAsDataclass, Session, mapped_column


class Base(MappedAsDataclass, DeclarativeBase):
    pass


class VideoUpload(Base):
    __tablename__ = "video_upload"
    project_id: Mapped[int] = mapped_column()
    id: Mapped[int] = mapped_column()
    path: Mapped[str] = mapped_column()
    created_on: Mapped[datetime] = mapped_column(default_factory=lambda: datetime.now(pytz.UTC), init=False)
    upload_id: Mapped[str] = mapped_column()
    key: Mapped[str] = mapped_column()
    data: Mapped[dict] = mapped_column(JSON(none_as_null=True), default_factory=dict)
    __table_args__ = (
        sa.PrimaryKeyConstraint("project_id", "id", name="video_upload_pk"),
        sa.Index("path_unique_idx", "project_id", "path", unique=True),
    )


engine = create_engine("sqlite:///upload.sqlite", echo=False)

Base.metadata.create_all(engine)

MB = 1024 * 1024


def read_file_in_chunks(file_path: Path, chunk_size: int) -> Iterator[bytes]:
    """Lazy function (generator) to read a file piece by piece."""
    with open(file_path, "rb") as file_object:
        while True:
            data = file_object.read(chunk_size)
            if not data:
                break
            yield data


class PartDict(TypedDict):
    PartNumber: int
    ETag: str


PartsList = list[PartDict]


class Metadata(BaseModel):
    watermark_id: Optional[int] = Field(default=None, description="Watermark to apply to this video.")
    normalize_audio: Optional[bool] = Field(default=None, description="Normalize audio of the video.")
    auto_tt: Optional[list[str]] = Field(
        default=None,
        description="Languages to auto transcribe the video. `None` will inherit project.auto_tt settings.",
    )
    encoding_tier: Optional[str] = Field(
        default=None,
        description="Encoding tier to use. If set to `None` it will inherit project.encoding_tier.",
    )
    size: int = Field(default=0, description="Size of the video file in bytes.")


InIntOutStr = Annotated[
    int,
    BeforeValidator(int),
    PlainSerializer(func=str, return_type=str),
]


@dataclasses.dataclass(frozen=True)
class UploadPart:
    id: int
    data: bytes


class VideoUploadManager(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    session: requests.Session
    project_id: int
    video_path: Path
    host: str
    video_id: InIntOutStr = Field(default=None)
    key: str = Field(default=None)
    upload_id: str = Field(default=None)
    file_size: int = Field(default=None)
    # chunk size must be exact on all parts (only the last part can be different)
    chunk_size: int = 500 * MB
    parts: PartsList = Field(default_factory=lambda: [])
    metadata: Metadata = Field(default_factory=Metadata)
    existing_part_ids: set[int] = Field(default_factory=set)

    @property
    def base_url(self) -> str:
        return f"{self.host}/api/v1/project/{self.project_id}"

    @property
    def upload_url(self) -> str:
        return f"{self.base_url}/direct_upload/s3/multipart"

    def model_post_init(self, __context: Any) -> None:
        self.set_video_path(self.video_path)

    def set_video_path(self, video_path: Path) -> None:
        """
        Change the video_path before starting to upload.
        :param video_path: Path to video file.
        :return: None
        """
        self.video_path = video_path
        self.metadata.size = video_path.stat().st_size

    def start_video_upload(self) -> requests.Response | None:
        """
        Start the direct upload.
        :return: Response of the "start" request. Includes the video_id, S3 key, S3 `uploadId` to start
        uploading parts and complete the upload.
        """
        # check for existing videoUpload in local db!
        query = (
            sa.select(VideoUpload)
            .filter(VideoUpload.project_id == self.project_id)
            .filter(VideoUpload.path == self.video_path.as_posix())
        )
        with Session(engine) as session:
            try:
                existing_video = session.scalars(query).one()
            except NoResultFound:
                return self.create_new_video()
            else:
                # resume by listing parts and getting correct part!
                self.video_id = existing_video.id
                self.key = existing_video.key
                self.upload_id = existing_video.upload_id
                # add existing parts!
                existing_parts = self.list_parts()
                for part in existing_parts:
                    self.existing_part_ids.add(part["PartNumber"])
                    self.track_part(part)

    def track_part(self, part: PartDict):
        self.parts.append(part)

    def create_new_video(self):
        filename = self.video_path.name
        metadata_dict = self.metadata.model_dump(exclude_unset=True)
        data = {"filename": filename, "metadata": metadata_dict}
        r0 = self.session.post(self.upload_url, json=data)
        # create video and save it!
        r0.raise_for_status()
        self.video_id = int(r0.json()["video"]["id"])
        self.key = r0.json()["key"]
        self.upload_id = r0.json()["uploadId"]
        new_video = VideoUpload(
            path=self.video_path.as_posix(),
            project_id=self.project_id,
            id=self.video_id,
            upload_id=self.upload_id,
            key=self.key,
        )
        with Session(engine) as session:
            session.add(new_video)
            session.commit()
        return r0

    def do_video_upload(self) -> int:
        """
        Start & complete a video upload.
        :return: the `id` of the video that was uploaded.
        """
        self.start_video_upload()
        self.upload_parts()
        self.complete_upload()
        return self.video_id

    def complete_upload(self) -> requests.Response:
        """
        Complete the direct upload after uploading all parts.
        :return: Response of the "complete" request.
        """
        url3 = f"{self.upload_url}/{self.upload_id}/complete"
        r3 = self.session.post(url3, params={"key": self.key}, json={"parts": self.parts})
        r3.raise_for_status()
        # delete the video in sql
        query = (
            sa.delete(VideoUpload)
            .filter(VideoUpload.project_id == self.project_id)
            .filter(VideoUpload.id == self.video_id)
        )
        with Session(engine) as session:
            session.execute(query)
            session.commit()
        return r3

    def upload_parts(self) -> None:
        """Upload all parts of the file."""
        for part in self.parts_iterator():
            if part.id in self.existing_part_ids:
                print(f"skipping_part_id={part.id}")
                continue
            self.upload_part(part)

    def parts_iterator(self) -> Generator[UploadPart, None, None]:
        for part_id, chunk in enumerate(read_file_in_chunks(self.video_path, self.chunk_size), start=1):
            yield UploadPart(part_id, chunk)

    def upload_part(self, part: UploadPart) -> None:
        """
        Upload a single part.
        :param part: The part object.
        :param part_id: the `id` of the part starting from 1.
        :param chunk: the data in raw bytes.
        :return:None
        """
        url1 = f"{self.upload_url}/{self.upload_id}/{part.id}"
        # get part url
        r1 = self.session.get(url1, params={"key": self.key})
        r1.raise_for_status()
        upload_url = r1.json()["url"]
        # upload part to s3
        r2 = self.session.put(upload_url, data=part.data)
        # print(r2.status_code, r2.content, r2.headers)
        r2.raise_for_status()
        # track ETags
        e_tag = r2.headers["ETag"]
        self.track_part({"PartNumber": part.id, "ETag": e_tag})
        print(f"uploaded_part{part.id}")

    def abort_upload(self) -> requests.Response:
        """
        Abort a direct upload.
        """
        url2 = f"{self.upload_url}/{self.upload_id}"
        r0 = self.session.delete(url2, params={"key": self.key})
        r0.raise_for_status()
        return r0

    def list_parts(self) -> list[PartDict]:
        url2 = f"{self.upload_url}/{self.upload_id}"
        r0 = self.session.get(url2, params={"key": self.key})
        # print(r0.json())
        r0.raise_for_status()
        return r0.json()


class Settings(
    BaseSettings,
    cli_parse_args=True,
    cli_enforce_required=True,
    cli_hide_none_type=True,
    cli_avoid_json=True,
):
    model_config = SettingsConfigDict(nested_model_default_partial_update=True)

    apikey_id: str = Field(description="Your ApiKey.id")
    apikey_key: str = Field(description="Your ApiKey.key")
    project_id: int = Field(description="Your Project.id")
    video_path: FilePath = Field(description="Absolute path to a video file in your computer.")
    host: str = Field(default="https://app.heapstream.com", description="Host of the api.")
    metadata: Metadata = Field(default=Metadata(), description="Metadata to set for the video.")

    @cached_property
    def session(self) -> requests.Session:
        session = requests.Session()
        session.auth = HTTPBasicAuth(username=self.apikey_id, password=self.apikey_key)
        return session


def main() -> None:
    """Direct upload a video to HeapStream."""
    # https://github.com/pydantic/pydantic/pull/3972
    settings = Settings()  # pyright: ignore[reportCallIssue]
    upload_manager = VideoUploadManager(
        session=settings.session,
        project_id=settings.project_id,
        video_path=settings.video_path,
        host=settings.host,
        metadata=settings.metadata,
    )
    upload_manager.do_video_upload()
    print(f"Video fully uploaded. video_id={upload_manager.video_id}")


if __name__ == "__main__":
    main()