Skip to content

Client API Reference

pythorvision.client

Stream

Bases: BaseModel

Represents an active video stream and its associated resources.

Attributes:

Name Type Description
camera Camera

The camera being streamed.

capability Capability

The capability used for the stream.

port int

The network port used for the SRT stream.

video_path Path

The path to the recorded video file.

gstreamer_pipeline str

The GStreamer pipeline command used.

process Popen

The Popen object for the running GStreamer process.

gstreamer_log_file Optional[TextIOBase]

The file handle for GStreamer logs.

gstreamer_log_file_path Optional[Path]

The path to the GStreamer log file.

created_at datetime

The timestamp when the stream was created.

ThorVisionClient

Bases: BaseModel

Client for interacting with the ThorVision server to manage camera streams.

Attributes:

Name Type Description
host str

The hostname or IP address of the ThorVision server.

port int

The port number of the ThorVision server.

streams Dict[int, Stream]

A dictionary of active streams, keyed by camera ID.

__del__

__del__()

Ensure all streams are cleaned up when the client is destroyed.

Source code in pythorvision/client.py
def __del__(self):
    """Ensure all streams are cleaned up when the client is destroyed."""
    self.clean_streams()

clean_streams

clean_streams()

Stop all active streams and clean up all resources.

This is useful for gracefully shutting down the client.

Source code in pythorvision/client.py
def clean_streams(self):
    """Stop all active streams and clean up all resources.

    This is useful for gracefully shutting down the client.
    """
    logger.info("Starting cleanup of all streams")
    camera_ids = list(self.streams.keys())

    for camera_id in camera_ids:
        try:
            self.stop_stream(camera_id)
        except Exception as e:
            logger.error(f"Error stopping stream for camera {camera_id}: {e}")

    logger.info("All resources cleaned up")

list_cameras

list_cameras()

Retrieve a list of available cameras from the ThorVision server.

Returns:

Type Description
List[Camera]

List[Camera]: A list of Camera objects.

Raises:

Type Description
RequestException

If there is an issue communicating with the server.

Source code in pythorvision/client.py
def list_cameras(self) -> List[Camera]:
    """Retrieve a list of available cameras from the ThorVision server.

    Returns:
        List[Camera]: A list of Camera objects.

    Raises:
        requests.exceptions.RequestException: If there is an issue
            communicating with the server.
    """
    response = requests.get(f"{self._base_url}/cameras", timeout=5)
    response.raise_for_status()
    cameras_data = response.json()
    return [Camera(**cam_data) for cam_data in cameras_data]

model_post_init

model_post_init(__context)

Initialize the client after the model is created.

This method sets up the base URL for the server and performs initial checks for server connectivity and GStreamer installation.

Parameters:

Name Type Description Default
__context Any

The context for model initialization.

required
Source code in pythorvision/client.py
def model_post_init(self, __context: Any) -> None:
    """Initialize the client after the model is created.

    This method sets up the base URL for the server and performs initial
    checks for server connectivity and GStreamer installation.

    Args:
        __context (Any): The context for model initialization.
    """
    self._base_url = f"http://{self.host}:{self.port}"
    logger.info(f"Initializing ThorVisionClient for {self._base_url}")
    self._check_host()
    self._check_gstreamer()

start_stream_with_recording

start_stream_with_recording(camera, capability, output_dir, split_max_files=0, split_max_time_sec=0, split_max_size_mb=0, gstreamer_debug=False)

Start a camera stream and record it to a file.

This method requests the server to start streaming a camera's feed, then launches a local GStreamer process to receive and record the stream. The recording can be split into multiple files based on time, size, or number of files.

Parameters:

Name Type Description Default
camera Camera

The camera to start streaming.

required
capability Capability

The desired stream capability (resolution, format, etc.).

required
output_dir str

The directory to save the recording files.

required
split_max_files Optional[int]

The maximum number of files to create before overwriting. 0 for no limit. Defaults to 0.

0
split_max_time_sec Optional[int]

The maximum duration of each file in seconds. 0 for no limit. Defaults to 0.

0
split_max_size_mb Optional[int]

The maximum size of each file in megabytes. 0 for no limit. Defaults to 0.

0
gstreamer_debug bool

If True, enables GStreamer debug logging. Defaults to False.

False

Returns:

Name Type Description
Stream Stream

A Stream object representing the active stream.

Raises:

Type Description
ValueError

If the selected capability is not a supported format.

RuntimeError

If the stream fails to start on the server or if the local GStreamer process fails.

Source code in pythorvision/client.py
def start_stream_with_recording(
    self,
    camera: Camera,
    capability: Capability,
    output_dir: str,
    split_max_files: Optional[int] = 0,
    split_max_time_sec: Optional[int] = 0,
    split_max_size_mb: Optional[int] = 0,
    gstreamer_debug: bool = False
) -> Stream:
    """Start a camera stream and record it to a file.

    This method requests the server to start streaming a camera's feed,
    then launches a local GStreamer process to receive and record the
    stream. The recording can be split into multiple files based on
    time, size, or number of files.

    Args:
        camera (Camera): The camera to start streaming.
        capability (Capability): The desired stream capability (resolution,
            format, etc.).
        output_dir (str): The directory to save the recording files.
        split_max_files (Optional[int]): The maximum number of files to
            create before overwriting. 0 for no limit. Defaults to 0.
        split_max_time_sec (Optional[int]): The maximum duration of each
            file in seconds. 0 for no limit. Defaults to 0.
        split_max_size_mb (Optional[int]): The maximum size of each file in
            megabytes. 0 for no limit. Defaults to 0.
        gstreamer_debug (bool): If True, enables GStreamer debug logging.
            Defaults to False.

    Returns:
        Stream: A Stream object representing the active stream.

    Raises:
        ValueError: If the selected capability is not a supported format.
        RuntimeError: If the stream fails to start on the server or if the
            local GStreamer process fails.
    """

    camera_id = camera.id
    capability_str = capability.to_gstreamer_capability()

    logger.info(
        f"Starting stream for camera {camera_id} ({camera.name}) "
        f"with capability: {capability_str}"
    )

    if capability.media_type != "image/jpeg":
        error_msg = (
            f"Capability {capability_str} is not in a supported format. "
            "Only image/jpeg capabilities are supported"
        )
        logger.error(error_msg)
        raise ValueError(error_msg)

    if camera_id in self.streams:
        existing_stream = self.streams[camera_id]
        logger.info(
            f"Camera {camera_id} is already streaming on port {existing_stream.port}. "
            "Returning existing stream."
        )
        return existing_stream

    port = self._get_available_port()
    logger.debug(f"Assigned port {port} for camera {camera_id}")

    payload = {"id": camera_id, "port": port, "capability": capability_str}

    try:
        response = requests.post(f"{self._base_url}/jpeg", json=payload, timeout=5)
        response.raise_for_status()
        logger.info(f"Started JPEG stream for camera {camera_id} on port {port}")
    except requests.RequestException as e:
        logger.error(f"Failed to start stream on server: {e}")
        raise RuntimeError(f"Failed to start stream on server for camera {camera_id}") from e

    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    refined_camera_name = ''.join(c if c.isalnum() else '_' for c in camera.name)
    file_basename = f"{camera_id}_{refined_camera_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    video_path = output_path / f"{file_basename}-%02d.mkv"
    gst_output_path = video_path.as_posix()

    gstreamer_log_file: Optional[io.TextIOBase] = None
    try:
        if gstreamer_debug:
            gstreamer_log_file_path = (output_path / f"{file_basename}.gstreamer.log")
            logger.info(f"GStreamer debug logs will be saved to: {gstreamer_log_file_path}")
            gstreamer_log_file = open(gstreamer_log_file_path, 'w', buffering=1)
            stdout_dest = stderr_dest = gstreamer_log_file
        else:
            gstreamer_log_file_path = None
            stdout_dest = stderr_dest = subprocess.DEVNULL

        pipeline_cmd = (
            f'"{self._gst_launch_path}" -e -v '
            f'srtclientsrc uri=srt://{self.host}:{port} latency=500 ! '
            'queue ! jpegparse ! tee name=t ! '
            f'queue ! splitmuxsink max-files={split_max_files} '
            f'max-size-time={split_max_time_sec * 1000000000} '
            f'max-size-bytes={split_max_size_mb * 1000000} '
            f'muxer-factory=matroskamux location="{gst_output_path}" '
            't. ! queue ! fpsdisplaysink fps-update-interval=30000 '
            'text-overlay=false video-sink=fakesink sync=false'
        )

        env = os.environ.copy()
        if gstreamer_debug:
            env['GST_DEBUG'] = '3'

        pipeline_args = shlex.split(pipeline_cmd)

        popen_kwargs = {
            "stdout": stdout_dest,
            "stderr": stderr_dest,
            "text": True,
            "bufsize": 1,
            "env": env
        }

        if os.name == 'nt':
            popen_kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP

        logger.debug(f"Starting GStreamer with FPS monitoring: {pipeline_cmd}")
        process = subprocess.Popen(pipeline_args, **popen_kwargs)

        time.sleep(1)

        if process.poll() is not None:
            logger.error("GStreamer process failed to start")
            if gstreamer_debug:
                error_msg = (
                    "Failed to start GStreamer. Check debug log file at "
                    f"{gstreamer_log_file_path}."
                )
            else:
                error_msg = (
                    "Failed to start GStreamer. Enable gstreamer_debug=True for details."
                )
            raise RuntimeError(error_msg)

        logger.info(f"Started recording for camera {camera_id} to {video_path}")

        new_stream = Stream(
            camera=camera,
            capability=capability,
            port=port,
            video_path=video_path,
            gstreamer_pipeline=pipeline_cmd,
            process=process,
            gstreamer_log_file=gstreamer_log_file,
            gstreamer_log_file_path=gstreamer_log_file_path,
        )
        self.streams[camera_id] = new_stream

        return new_stream

    except Exception as e:
        logger.error(f"Failed to start GStreamer process: {e}")

        if 'process' in locals() and process.poll() is None:
            logger.warning(f"Cleaning up orphaned GStreamer process for camera {camera_id}")
            if os.name == 'nt':
                process.send_signal(signal.CTRL_C_EVENT)
            else:
                process.send_signal(signal.SIGINT)
            try:
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
                process.wait(timeout=2)

        if gstreamer_log_file:
            gstreamer_log_file.close()

        try:
            requests.post(f"{self._base_url}/stop", json={"id": camera_id}, timeout=5)
        except requests.RequestException:
            pass
        raise RuntimeError(f"Failed to start GStreamer for camera {camera_id}") from e

stop_stream

stop_stream(camera_id)

Stop the stream for a specific camera.

This terminates the local GStreamer process by sending an interrupt signal, allowing it to finalize recordings. It then sends a request to the server to stop sending the stream.

Parameters:

Name Type Description Default
camera_id int

The ID of the camera to stop.

required

Raises:

Type Description
ValueError

If no active stream is found for the given camera ID.

Source code in pythorvision/client.py
def stop_stream(self, camera_id: int) -> None:
    """Stop the stream for a specific camera.

    This terminates the local GStreamer process by sending an interrupt
    signal, allowing it to finalize recordings. It then sends a request to
    the server to stop sending the stream.

    Args:
        camera_id (int): The ID of the camera to stop.

    Raises:
        ValueError: If no active stream is found for the given camera ID.
    """
    logger.info(f"Stopping stream for camera {camera_id}")
    stream = self.streams.pop(camera_id, None)

    if not stream:
        raise ValueError(f"No active stream found for camera {camera_id}")

    try:
        if stream.process and stream.process.poll() is None:
            logger.debug(f"Terminating GStreamer process for camera {camera_id}")

            if os.name == 'nt':
                stream.process.send_signal(signal.CTRL_BREAK_EVENT)
            else:
                stream.process.send_signal(signal.SIGINT)

            try:
                stream.process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                logger.warning("Process didn't terminate gracefully, forcing kill")
                stream.process.kill()
                stream.process.wait(timeout=2)

        logger.info(f"Successfully stopped local recording process for camera {camera_id}")
    except Exception as e:
        logger.error(f"Error stopping GStreamer process: {e}")
    finally:
        if stream.gstreamer_log_file:
            stream.gstreamer_log_file.close()

    payload = {"id": camera_id}

    try:
        response = requests.post(f"{self._base_url}/stop", json=payload, timeout=5)
        response.raise_for_status()
        logger.info(f"Successfully stopped stream on server for camera {camera_id}")
    except requests.exceptions.HTTPError as e:
        logger.error(
            f"Server error stopping stream: {e.response.status_code} - {e.response.text}"
        )
        logger.warning(
            "Failed to stop stream on server, but local resources have been cleaned up."
        )
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to communicate with server to stop stream: {e}")
        logger.warning(
            "Failed to stop stream on server, but local resources have been cleaned up."
        )