Skip to content

Client API Reference

pythorvision.client

Stream

Bases: BaseModel

Represents an active video stream and its associated resources.

Attributes:

Name Type Description
camera Camera

The camera being streamed.

capability Capability

The capability used for the stream.

port int

The network port used for the SRT stream.

video_path Path

The path to the recorded video file.

gstreamer_pipeline str

The GStreamer pipeline command used.

process Popen

The Popen object for the running GStreamer process.

gstreamer_log_file Optional[TextIOBase]

The file handle for GStreamer logs.

gstreamer_log_file_path Optional[Path]

The path to the GStreamer log file.

created_at datetime

The timestamp when the stream was created.

ThorVisionClient

Bases: BaseModel

Client for interacting with the ThorVision server to manage camera streams.

Attributes:

Name Type Description
host str

The hostname or IP address of the ThorVision server.

port int

The port number of the ThorVision server.

streams Dict[int, Stream]

A dictionary of active streams, keyed by camera ID.

__del__

__del__()

Ensure all streams are cleaned up when the client is destroyed.

Source code in pythorvision/client.py
def __del__(self):
    """Ensure all streams are cleaned up when the client is destroyed."""
    self.clean_streams()

clean_streams

clean_streams()

Stop all active streams and clean up all resources.

This is useful for gracefully shutting down the client.

Source code in pythorvision/client.py
def clean_streams(self):
    """Stop all active streams and clean up all resources.

    This is useful for gracefully shutting down the client.
    """
    logger.info("Starting cleanup of all streams")
    camera_ids = list(self.streams.keys())

    for camera_id in camera_ids:
        try:
            self.stop_stream(camera_id)
        except Exception as e:
            logger.error(f"Error stopping stream for camera {camera_id}: {e}")

    logger.info("All resources cleaned up")

get_log staticmethod

get_log(log_name, host='192.168.177.100', port=8000)

Retrieve the content of a specific log file from the ThorVision server.

Parameters:

Name Type Description Default
log_name str

The name of the log file to retrieve.

required
host str

The hostname or IP address of the ThorVision server.

'192.168.177.100'
port int

The port number of the ThorVision server.

8000

Returns:

Name Type Description
str str

The content of the log file.

Raises:

Type Description
RequestException

If there is an issue communicating with the server.

Source code in pythorvision/client.py
@staticmethod
def get_log(log_name: str, host: str = "192.168.177.100", port: int = 8000) -> str:
    """Retrieve the content of a specific log file from the ThorVision server.

    Args:
        log_name (str): The name of the log file to retrieve.
        host (str): The hostname or IP address of the ThorVision server.
        port (int): The port number of the ThorVision server.

    Returns:
        str: The content of the log file.

    Raises:
        requests.exceptions.RequestException: If there is an issue
            communicating with the server.
    """
    base_url = f"http://{host}:{port}"
    logger.debug(f"Requesting log content for '{log_name}' from {base_url}/logs/{log_name}")
    try:
        response = requests.get(f"{base_url}/logs/{log_name}", timeout=5)
        response.raise_for_status()
        logger.info(f"Successfully retrieved log file '{log_name}'")
        return response.text
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to retrieve log file '{log_name}': {e}")
        raise

list_cameras

list_cameras()

Retrieve a list of available cameras from the ThorVision server.

Returns:

Type Description
List[Camera]

List[Camera]: A list of Camera objects.

Raises:

Type Description
RequestException

If there is an issue communicating with the server.

Source code in pythorvision/client.py
def list_cameras(self) -> List[Camera]:
    """Retrieve a list of available cameras from the ThorVision server.

    Returns:
        List[Camera]: A list of Camera objects.

    Raises:
        requests.exceptions.RequestException: If there is an issue
            communicating with the server.
    """
    response = requests.get(f"{self._base_url}/cameras", timeout=5)
    response.raise_for_status()
    cameras_data = response.json()
    return [Camera(**cam_data) for cam_data in cameras_data]

list_logs staticmethod

list_logs(host='192.168.177.100', port=8000)

Retrieve a list of available log files from the ThorVision server.

Parameters:

Name Type Description Default
host str

The hostname or IP address of the ThorVision server.

'192.168.177.100'
port int

The port number of the ThorVision server.

8000

Returns:

Type Description
List[str]

List[str]: A list of log filenames.

Raises:

Type Description
RequestException

If there is an issue communicating with the server.

Source code in pythorvision/client.py
@staticmethod
def list_logs(host: str = "192.168.177.100", port: int = 8000) -> List[str]:
    """Retrieve a list of available log files from the ThorVision server.

    Args:
        host (str): The hostname or IP address of the ThorVision server.
        port (int): The port number of the ThorVision server.

    Returns:
        List[str]: A list of log filenames.

    Raises:
        requests.exceptions.RequestException: If there is an issue
            communicating with the server.
    """
    base_url = f"http://{host}:{port}"
    logger.debug(f"Requesting log list from {base_url}/logs")
    try:
        response = requests.get(f"{base_url}/logs", timeout=5)
        response.raise_for_status()
        log_files = response.json()
        logger.info(f"Successfully retrieved {len(log_files)} log file entries")
        return log_files
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to retrieve log list: {e}")
        raise

model_post_init

model_post_init(__context)

Initialize the client after the model is created.

This method sets up the base URL for the server and performs initial checks for server connectivity and GStreamer installation.

Parameters:

Name Type Description Default
__context Any

The context for model initialization.

required
Source code in pythorvision/client.py
def model_post_init(self, __context: Any) -> None:
    """Initialize the client after the model is created.

    This method sets up the base URL for the server and performs initial
    checks for server connectivity and GStreamer installation.

    Args:
        __context (Any): The context for model initialization.
    """
    self._base_url = f"http://{self.host}:{self.port}"
    logger.info(f"Initializing ThorVisionClient for {self._base_url}")
    self._check_host()
    self._check_gstreamer()

start_stream_with_recording

start_stream_with_recording(camera, capability, output_dir, split_max_files=0, split_max_time_sec=0, split_max_size_mb=0, gstreamer_debug=False)

Start a camera stream and record it to a file.

This method requests the server to start streaming a camera's feed, then launches a local GStreamer process to receive and record the stream. The recording can be split into multiple files based on time, size, or number of files.

Parameters:

Name Type Description Default
camera Camera

The camera to start streaming.

required
capability Capability

The desired stream capability (resolution, format, etc.).

required
output_dir str

The directory to save the recording files.

required
split_max_files Optional[int]

The maximum number of files to create before overwriting. 0 for no limit. Defaults to 0.

0
split_max_time_sec Optional[int]

The maximum duration of each file in seconds. 0 for no limit. Defaults to 0.

0
split_max_size_mb Optional[int]

The maximum size of each file in megabytes. 0 for no limit. Defaults to 0.

0
gstreamer_debug bool

If True, enables GStreamer debug logging. Defaults to False.

False

Returns:

Name Type Description
Stream Stream

A Stream object representing the active stream.

Raises:

Type Description
ValueError

If the selected capability is not a supported format.

RuntimeError

If the stream fails to start on the server or if the local GStreamer process fails.

Source code in pythorvision/client.py
def start_stream_with_recording(
    self,
    camera: Camera,
    capability: Capability,
    output_dir: str,
    split_max_files: Optional[int] = 0,
    split_max_time_sec: Optional[int] = 0,
    split_max_size_mb: Optional[int] = 0,
    gstreamer_debug: bool = False
) -> Stream:
    """Start a camera stream and record it to a file.

    This method requests the server to start streaming a camera's feed,
    then launches a local GStreamer process to receive and record the
    stream. The recording can be split into multiple files based on
    time, size, or number of files.

    Args:
        camera (Camera): The camera to start streaming.
        capability (Capability): The desired stream capability (resolution,
            format, etc.).
        output_dir (str): The directory to save the recording files.
        split_max_files (Optional[int]): The maximum number of files to
            create before overwriting. 0 for no limit. Defaults to 0.
        split_max_time_sec (Optional[int]): The maximum duration of each
            file in seconds. 0 for no limit. Defaults to 0.
        split_max_size_mb (Optional[int]): The maximum size of each file in
            megabytes. 0 for no limit. Defaults to 0.
        gstreamer_debug (bool): If True, enables GStreamer debug logging.
            Defaults to False.

    Returns:
        Stream: A Stream object representing the active stream.

    Raises:
        ValueError: If the selected capability is not a supported format.
        RuntimeError: If the stream fails to start on the server or if the
            local GStreamer process fails.
    """

    camera_id = camera.id
    capability_str = capability.to_gstreamer_capability()

    logger.info(
        f"Starting stream for camera {camera_id} ({camera.name}) "
        f"with capability: {capability_str}"
    )

    if capability.media_type != "image/jpeg":
        error_msg = (
            f"Capability {capability_str} is not in a supported format. "
            "Only image/jpeg capabilities are supported"
        )
        logger.error(error_msg)
        raise ValueError(error_msg)

    if camera_id in self.streams:
        existing_stream = self.streams[camera_id]
        logger.info(
            f"Camera {camera_id} is already streaming on port {existing_stream.port}. "
            "Returning existing stream."
        )
        return existing_stream

    port = self._get_available_port()
    logger.debug(f"Assigned port {port} for camera {camera_id}")

    payload = {"id": camera_id, "port": port, "capability": capability_str}

    try:
        response = requests.post(f"{self._base_url}/jpeg", json=payload, timeout=5)
        response.raise_for_status()
        logger.info(f"Started JPEG stream for camera {camera_id} on port {port}")
    except requests.RequestException as e:
        logger.error(f"Failed to start stream on server: {e}")
        raise RuntimeError(f"Failed to start stream on server for camera {camera_id}") from e

    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    refined_camera_name = ''.join(c if c.isalnum() else '_' for c in camera.name)
    file_basename = f"{camera_id}_{refined_camera_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    video_path = output_path / f"{file_basename}-%02d.mkv"
    gst_output_path = video_path.as_posix()

    gstreamer_log_file: Optional[io.TextIOBase] = None
    try:
        if gstreamer_debug:
            gstreamer_log_file_path = (output_path / f"{file_basename}.gstreamer.log")
            logger.info(f"GStreamer debug logs will be saved to: {gstreamer_log_file_path}")
            gstreamer_log_file = open(gstreamer_log_file_path, 'w', buffering=1)
            stdout_dest = stderr_dest = gstreamer_log_file
        else:
            gstreamer_log_file_path = None
            stdout_dest = stderr_dest = subprocess.DEVNULL

        pipeline_cmd = (
            f'"{self._gst_launch_path}" -e -v '
            f'srtclientsrc uri=srt://{self.host}:{port} latency=500 ! '
            'queue ! jpegparse ! tee name=t ! '
            f'queue ! splitmuxsink max-files={split_max_files} '
            f'max-size-time={split_max_time_sec * 1000000000} '
            f'max-size-bytes={split_max_size_mb * 1000000} '
            f'muxer-factory=matroskamux location="{gst_output_path}" '
            't. ! queue ! fpsdisplaysink fps-update-interval=30000 '
            'text-overlay=false video-sink=fakesink sync=false'
        )

        env = os.environ.copy()
        if gstreamer_debug:
            env['GST_DEBUG'] = '3'

        pipeline_args = shlex.split(pipeline_cmd)

        popen_kwargs = {
            "stdout": stdout_dest,
            "stderr": stderr_dest,
            "text": True,
            "bufsize": 1,
            "env": env
        }

        if os.name == 'nt':
            popen_kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP

        logger.debug(f"Starting GStreamer with FPS monitoring: {pipeline_cmd}")
        process = subprocess.Popen(pipeline_args, **popen_kwargs)

        time.sleep(1)

        if process.poll() is not None:
            logger.error("GStreamer process failed to start")
            if gstreamer_debug:
                error_msg = (
                    "Failed to start GStreamer. Check debug log file at "
                    f"{gstreamer_log_file_path}."
                )
            else:
                error_msg = (
                    "Failed to start GStreamer. Enable gstreamer_debug=True for details."
                )
            raise RuntimeError(error_msg)

        logger.info(f"Started recording for camera {camera_id} to {video_path}")

        new_stream = Stream(
            camera=camera,
            capability=capability,
            port=port,
            video_path=video_path,
            gstreamer_pipeline=pipeline_cmd,
            process=process,
            gstreamer_log_file=gstreamer_log_file,
            gstreamer_log_file_path=gstreamer_log_file_path,
        )
        self.streams[camera_id] = new_stream

        return new_stream

    except Exception as e:
        logger.error(f"Failed to start GStreamer process: {e}")

        if 'process' in locals() and process.poll() is None:
            logger.warning(f"Cleaning up orphaned GStreamer process for camera {camera_id}")
            if os.name == 'nt':
                process.send_signal(signal.CTRL_C_EVENT)
            else:
                process.send_signal(signal.SIGINT)
            try:
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
                process.wait(timeout=2)

        if gstreamer_log_file:
            gstreamer_log_file.close()

        try:
            requests.post(f"{self._base_url}/stop", json={"id": camera_id}, timeout=5)
        except requests.RequestException:
            pass
        raise RuntimeError(f"Failed to start GStreamer for camera {camera_id}") from e

stop_stream

stop_stream(camera_id)

Stop the stream for a specific camera.

This terminates the local GStreamer process by sending an interrupt signal, allowing it to finalize recordings. It then sends a request to the server to stop sending the stream.

Parameters:

Name Type Description Default
camera_id int

The ID of the camera to stop.

required

Raises:

Type Description
ValueError

If no active stream is found for the given camera ID.

Source code in pythorvision/client.py
def stop_stream(self, camera_id: int) -> None:
    """Stop the stream for a specific camera.

    This terminates the local GStreamer process by sending an interrupt
    signal, allowing it to finalize recordings. It then sends a request to
    the server to stop sending the stream.

    Args:
        camera_id (int): The ID of the camera to stop.

    Raises:
        ValueError: If no active stream is found for the given camera ID.
    """
    logger.info(f"Stopping stream for camera {camera_id}")
    stream = self.streams.pop(camera_id, None)

    if not stream:
        raise ValueError(f"No active stream found for camera {camera_id}")

    try:
        if stream.process and stream.process.poll() is None:
            logger.debug(f"Terminating GStreamer process for camera {camera_id}")

            if os.name == 'nt':
                stream.process.send_signal(signal.CTRL_BREAK_EVENT)
            else:
                stream.process.send_signal(signal.SIGINT)

            try:
                stream.process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                logger.warning("Process didn't terminate gracefully, forcing kill")
                stream.process.kill()
                stream.process.wait(timeout=2)

        logger.info(f"Successfully stopped local recording process for camera {camera_id}")
    except Exception as e:
        logger.error(f"Error stopping GStreamer process: {e}")
    finally:
        if stream.gstreamer_log_file:
            stream.gstreamer_log_file.close()

    payload = {"id": camera_id}

    try:
        response = requests.post(f"{self._base_url}/stop", json=payload, timeout=5)
        response.raise_for_status()
        logger.info(f"Successfully stopped stream on server for camera {camera_id}")
    except requests.exceptions.HTTPError as e:
        logger.error(
            f"Server error stopping stream: {e.response.status_code} - {e.response.text}"
        )
        logger.warning(
            "Failed to stop stream on server, but local resources have been cleaned up."
        )
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to communicate with server to stop stream: {e}")
        logger.warning(
            "Failed to stop stream on server, but local resources have been cleaned up."
        )