Direct Video Upload¶
Direct Video Uploads work underneath by using pre-signed S3 Multi Part Uploads. There are several steps for the process:
- Start a multi-part upload
- Split the video input into parts, for each part:
- Get the URL to directly upload the part to our S3 bucket
- Upload the part directly to the bucket
- Save the
parts_list
in a variable of type [(PartNumber, ETag)]
- Complete the upload by sending the
parts_list
. - Optional: List parts
- Optional: Abort the upload
Below we explain each step in detail and in the end provide a working python script that works against the live api.
The same implementation is used in our frontend to upload large videos.
1. Start direct upload¶
See api-reference ⧉ for full reference.
2. Get part url¶
See api-reference ⧉for full reference.
3. Complete upload¶
See api-reference ⧉ for full reference.
4. Optional: List Parts¶
See api-reference ⧉ for full reference.
5. Optional: Abort Upload¶
See api-reference ⧉ for full reference.
Python implementation¶
Below is an implementation in Python that is used in our integration tests before deploying to production:
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "requests==2.31.0",
# "pydantic-settings==2.4.0",
# "sqlalchemy>=2",
# ]
# ///
import dataclasses
from collections.abc import Generator, Iterator
from datetime import datetime
from functools import cached_property
from pathlib import Path
from typing import Annotated, Any, Optional, TypedDict
import pytz
import requests
import sqlalchemy as sa
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, FilePath, PlainSerializer
from pydantic_settings import BaseSettings, SettingsConfigDict
from requests.auth import HTTPBasicAuth
from sqlalchemy import create_engine
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import DeclarativeBase, Mapped, MappedAsDataclass, Session, mapped_column
class Base(MappedAsDataclass, DeclarativeBase):
pass
class VideoUpload(Base):
__tablename__ = "video_upload"
project_id: Mapped[int] = mapped_column()
id: Mapped[int] = mapped_column()
path: Mapped[str] = mapped_column()
created_on: Mapped[datetime] = mapped_column(default_factory=lambda: datetime.now(pytz.UTC), init=False)
upload_id: Mapped[str] = mapped_column()
key: Mapped[str] = mapped_column()
data: Mapped[dict] = mapped_column(JSON(none_as_null=True), default_factory=dict)
__table_args__ = (
sa.PrimaryKeyConstraint("project_id", "id", name="video_upload_pk"),
sa.Index("path_unique_idx", "project_id", "path", unique=True),
)
engine = create_engine("sqlite:///upload.sqlite", echo=False)
Base.metadata.create_all(engine)
MB = 1024 * 1024
def read_file_in_chunks(file_path: Path, chunk_size: int) -> Iterator[bytes]:
"""Lazy function (generator) to read a file piece by piece."""
with open(file_path, "rb") as file_object:
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
class PartDict(TypedDict):
PartNumber: int
ETag: str
PartsList = list[PartDict]
class Metadata(BaseModel):
watermark_id: Optional[int] = Field(default=None, description="Watermark to apply to this video.")
normalize_audio: Optional[bool] = Field(default=None, description="Normalize audio of the video.")
auto_tt: Optional[list[str]] = Field(
default=None,
description="Languages to auto transcribe the video. `None` will inherit project.auto_tt settings.",
)
encoding_tier: Optional[str] = Field(
default=None,
description="Encoding tier to use. If set to `None` it will inherit project.encoding_tier.",
)
size: int = Field(default=0, description="Size of the video file in bytes.")
InIntOutStr = Annotated[
int,
BeforeValidator(int),
PlainSerializer(func=str, return_type=str),
]
@dataclasses.dataclass(frozen=True)
class UploadPart:
id: int
data: bytes
class VideoUploadManager(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
session: requests.Session
project_id: int
video_path: Path
host: str
video_id: InIntOutStr = Field(default=None)
key: str = Field(default=None)
upload_id: str = Field(default=None)
file_size: int = Field(default=None)
# chunk size must be exact on all parts (only the last part can be different)
chunk_size: int = 500 * MB
parts: PartsList = Field(default_factory=lambda: [])
metadata: Metadata = Field(default_factory=Metadata)
existing_part_ids: set[int] = Field(default_factory=set)
@property
def base_url(self) -> str:
return f"{self.host}/api/v1/project/{self.project_id}"
@property
def upload_url(self) -> str:
return f"{self.base_url}/direct_upload/s3/multipart"
def model_post_init(self, __context: Any) -> None:
self.set_video_path(self.video_path)
def set_video_path(self, video_path: Path) -> None:
"""
Change the video_path before starting to upload.
:param video_path: Path to video file.
:return: None
"""
self.video_path = video_path
self.metadata.size = video_path.stat().st_size
def start_video_upload(self) -> requests.Response | None:
"""
Start the direct upload.
:return: Response of the "start" request. Includes the video_id, S3 key, S3 `uploadId` to start
uploading parts and complete the upload.
"""
# check for existing videoUpload in local db!
query = (
sa.select(VideoUpload)
.filter(VideoUpload.project_id == self.project_id)
.filter(VideoUpload.path == self.video_path.as_posix())
)
with Session(engine) as session:
try:
existing_video = session.scalars(query).one()
except NoResultFound:
return self.create_new_video()
else:
# resume by listing parts and getting correct part!
self.video_id = existing_video.id
self.key = existing_video.key
self.upload_id = existing_video.upload_id
# add existing parts!
existing_parts = self.list_parts()
for part in existing_parts:
self.existing_part_ids.add(part["PartNumber"])
self.track_part(part)
def track_part(self, part: PartDict):
self.parts.append(part)
def create_new_video(self):
filename = self.video_path.name
metadata_dict = self.metadata.model_dump(exclude_unset=True)
data = {"filename": filename, "metadata": metadata_dict}
r0 = self.session.post(self.upload_url, json=data)
# create video and save it!
r0.raise_for_status()
self.video_id = int(r0.json()["video"]["id"])
self.key = r0.json()["key"]
self.upload_id = r0.json()["uploadId"]
new_video = VideoUpload(
path=self.video_path.as_posix(),
project_id=self.project_id,
id=self.video_id,
upload_id=self.upload_id,
key=self.key,
)
with Session(engine) as session:
session.add(new_video)
session.commit()
return r0
def do_video_upload(self) -> int:
"""
Start & complete a video upload.
:return: the `id` of the video that was uploaded.
"""
self.start_video_upload()
self.upload_parts()
self.complete_upload()
return self.video_id
def complete_upload(self) -> requests.Response:
"""
Complete the direct upload after uploading all parts.
:return: Response of the "complete" request.
"""
url3 = f"{self.upload_url}/{self.upload_id}/complete"
r3 = self.session.post(url3, params={"key": self.key}, json={"parts": self.parts})
r3.raise_for_status()
# delete the video in sql
query = (
sa.delete(VideoUpload)
.filter(VideoUpload.project_id == self.project_id)
.filter(VideoUpload.id == self.video_id)
)
with Session(engine) as session:
session.execute(query)
session.commit()
return r3
def upload_parts(self) -> None:
"""Upload all parts of the file."""
for part in self.parts_iterator():
if part.id in self.existing_part_ids:
print(f"skipping_part_id={part.id}")
continue
self.upload_part(part)
def parts_iterator(self) -> Generator[UploadPart, None, None]:
for part_id, chunk in enumerate(read_file_in_chunks(self.video_path, self.chunk_size), start=1):
yield UploadPart(part_id, chunk)
def upload_part(self, part: UploadPart) -> None:
"""
Upload a single part.
:param part: The part object.
:param part_id: the `id` of the part starting from 1.
:param chunk: the data in raw bytes.
:return:None
"""
url1 = f"{self.upload_url}/{self.upload_id}/{part.id}"
# get part url
r1 = self.session.get(url1, params={"key": self.key})
r1.raise_for_status()
upload_url = r1.json()["url"]
# upload part to s3
r2 = self.session.put(upload_url, data=part.data)
# print(r2.status_code, r2.content, r2.headers)
r2.raise_for_status()
# track ETags
e_tag = r2.headers["ETag"]
self.track_part({"PartNumber": part.id, "ETag": e_tag})
print(f"uploaded_part{part.id}")
def abort_upload(self) -> requests.Response:
"""
Abort a direct upload.
"""
url2 = f"{self.upload_url}/{self.upload_id}"
r0 = self.session.delete(url2, params={"key": self.key})
r0.raise_for_status()
return r0
def list_parts(self) -> list[PartDict]:
url2 = f"{self.upload_url}/{self.upload_id}"
r0 = self.session.get(url2, params={"key": self.key})
# print(r0.json())
r0.raise_for_status()
return r0.json()
class Settings(
BaseSettings,
cli_parse_args=True,
cli_enforce_required=True,
cli_hide_none_type=True,
cli_avoid_json=True,
):
model_config = SettingsConfigDict(nested_model_default_partial_update=True)
apikey_id: str = Field(description="Your ApiKey.id")
apikey_key: str = Field(description="Your ApiKey.key")
project_id: int = Field(description="Your Project.id")
video_path: FilePath = Field(description="Absolute path to a video file in your computer.")
host: str = Field(default="https://app.heapstream.com", description="Host of the api.")
metadata: Metadata = Field(default=Metadata(), description="Metadata to set for the video.")
@cached_property
def session(self) -> requests.Session:
session = requests.Session()
session.auth = HTTPBasicAuth(username=self.apikey_id, password=self.apikey_key)
return session
def main() -> None:
"""Direct upload a video to HeapStream."""
# https://github.com/pydantic/pydantic/pull/3972
settings = Settings() # pyright: ignore[reportCallIssue]
upload_manager = VideoUploadManager(
session=settings.session,
project_id=settings.project_id,
video_path=settings.video_path,
host=settings.host,
metadata=settings.metadata,
)
upload_manager.do_video_upload()
print(f"Video fully uploaded. video_id={upload_manager.video_id}")
if __name__ == "__main__":
main()