Source code for pyngrok.process

__copyright__ = "Copyright (c) 2018-2024 Alex Laird"
__license__ = "MIT"

import atexit
import logging
import os
import subprocess
import threading
import time
from http import HTTPStatus
from typing import Dict, List, Optional, Any
from urllib.request import Request, urlopen

import yaml

from pyngrok import conf, installer
from pyngrok.conf import PyngrokConfig
from pyngrok.exception import PyngrokNgrokError, PyngrokSecurityError, PyngrokError
from pyngrok.installer import SUPPORTED_NGROK_VERSIONS
from pyngrok.log import NgrokLog

logger = logging.getLogger(__name__)
ngrok_logger = logging.getLogger(f"{__name__}.ngrok")


[docs] class NgrokProcess: """ An object containing information about the ``ngrok`` process. """ def __init__(self, proc: subprocess.Popen, # type: ignore pyngrok_config: PyngrokConfig) -> None: #: The child process that is running ``ngrok``. self.proc: subprocess.Popen = proc # type: ignore #: The ``pyngrok`` configuration to use with ``ngrok``. self.pyngrok_config: PyngrokConfig = pyngrok_config #: The API URL for the ``ngrok`` web interface. self.api_url: Optional[str] = None #: A list of the most recent logs from ``ngrok``, limited in size to ``max_logs``. self.logs: List[NgrokLog] = [] #: If ``ngrok`` startup fails, this will be the log of the failure. self.startup_error: Optional[str] = None self._tunnel_started = False self._client_connected = False self._monitor_thread: Optional[threading.Thread] = None self._monitor_thread_alive = False def __repr__(self) -> str: return f"<NgrokProcess: \"{self.api_url}\">" def __str__(self) -> str: # pragma: no cover return f"NgrokProcess: \"{self.api_url}\"" @staticmethod def _line_has_error(log: NgrokLog) -> bool: return log.lvl in ["ERROR", "CRITICAL"]
[docs] def _log_startup_line(self, line: str) -> Optional[NgrokLog]: """ Parse the given startup log line and use it to manage the startup state of the ``ngrok`` process. :param line: The line to be parsed and logged. :return: The parsed log. """ log = self._log_line(line) if log is None: return None elif self._line_has_error(log): self.startup_error = log.err elif log.msg: # Log ngrok startup states as they come in if "starting web service" in log.msg and log.addr is not None: self.api_url = f"http://{log.addr}" elif "tunnel session started" in log.msg: self._tunnel_started = True elif "client session established" in log.msg: self._client_connected = True return log
[docs] def _log_line(self, line: str) -> Optional[NgrokLog]: """ Parse, log, and emit (if ``log_event_callback`` in :class:`~pyngrok.conf.PyngrokConfig` is registered) the given log line. :param line: The line to be processed. :return: The parsed log. """ log = NgrokLog(line) if log.line == "": return None ngrok_logger.log(getattr(logging, log.lvl), log.line) self.logs.append(log) if len(self.logs) > self.pyngrok_config.max_logs: self.logs.pop(0) if self.pyngrok_config.log_event_callback is not None: self.pyngrok_config.log_event_callback(log) return log
[docs] def healthy(self) -> bool: """ Check whether the ``ngrok`` process has finished starting up and is in a running, healthy state. :return: ``True`` if the ``ngrok`` process is started, running, and healthy. :raises: :class:`~pyngrok.exception.PyngrokSecurityError`: When the ``url`` is not supported. """ if self.api_url is None or \ not self._tunnel_started or \ not self._client_connected: return False if not self.api_url.lower().startswith("http"): raise PyngrokSecurityError(f"URL must start with \"http\": {self.api_url}") # Ensure the process is available for requests before registering it as healthy request = Request(f"{self.api_url}/api/tunnels") response = urlopen(request) if response.getcode() != HTTPStatus.OK: return False return self.proc.poll() is None
def _monitor_process(self) -> None: self._monitor_thread_alive = True while self._monitor_thread_alive and self.proc.poll() is None: if self.proc.stdout is None: logger.debug("No stdout when monitoring the process, this may or may not be an issue") continue self._log_line(self.proc.stdout.readline()) self._monitor_thread = None
[docs] def start_monitor_thread(self) -> None: """ Start a thread that will monitor the ``ngrok`` process and its logs until it completes. If a monitor thread is already running, nothing will be done. """ if self._monitor_thread is None: logger.debug("Monitor thread will be started") self._monitor_thread = threading.Thread(target=self._monitor_process, daemon=True) self._monitor_thread.start()
[docs] def stop_monitor_thread(self) -> None: """ Set the monitor thread to stop monitoring the ``ngrok`` process after the next log event. This will not necessarily terminate the thread immediately, as the thread may currently be idle, rather it sets a flag on the thread telling it to terminate the next time it wakes up. This has no impact on the ``ngrok`` process itself, only ``pyngrok``'s monitor of the process and its logs. """ if self._monitor_thread is not None: logger.debug("Monitor thread will be stopped") self._monitor_thread_alive = False
[docs] def set_auth_token(pyngrok_config: PyngrokConfig, token: str) -> None: """ Set the ``ngrok`` auth token in the config file, enabling authenticated features (for instance, more concurrent tunnels, custom subdomains, etc.). :param pyngrok_config: The ``pyngrok`` configuration to use when interacting with the ``ngrok`` binary. :param token: The auth token to set. :raises: :class:`~pyngrok.exception.PyngrokError`: When the ``ngrok_version`` is not supported. :raises: :class:`~pyngrok.exception.PyngrokNgrokError`: When ``ngrok`` could not start. """ if pyngrok_config.ngrok_version == "v2": start = [pyngrok_config.ngrok_path, "authtoken", token, "--log=stdout"] elif pyngrok_config.ngrok_version == "v3": start = [pyngrok_config.ngrok_path, "config", "add-authtoken", token, "--log=stdout"] else: raise PyngrokError(f"\"ngrok_version\" must be a supported version: {SUPPORTED_NGROK_VERSIONS}") if pyngrok_config.config_path: logger.info(f"Updating authtoken for \"config_path\": {pyngrok_config.config_path}") start.append(f"--config={pyngrok_config.config_path}") else: logger.info( f"Updating authtoken for default \"config_path\" of \"ngrok_path\": {pyngrok_config.ngrok_path}") result = str(subprocess.check_output(start)) if "Authtoken saved" not in result: raise PyngrokNgrokError(f"An error occurred when saving the auth token: {result}")
[docs] def is_process_running(ngrok_path: str) -> bool: """ Check if the ``ngrok`` process is currently running. :param ngrok_path: The path to the ``ngrok`` binary. :return: ``True`` if ``ngrok`` is running from the given path. """ if ngrok_path in _current_processes: # Ensure the process is still running and hasn't been killed externally, otherwise cleanup if _current_processes[ngrok_path].proc.poll() is None: return True else: logger.debug( f"Removing stale process for \"ngrok_path\" {ngrok_path}") _current_processes.pop(ngrok_path, None) return False
[docs] def get_process(pyngrok_config: PyngrokConfig) -> NgrokProcess: """ Get the current ``ngrok`` process for the given config's ``ngrok_path``. If ``ngrok`` is not running, calling this method will first start a process with :class:`~pyngrok.conf.PyngrokConfig`. :param pyngrok_config: The ``pyngrok`` configuration to use when interacting with the ``ngrok`` binary. :return: The ``ngrok`` process. """ if is_process_running(pyngrok_config.ngrok_path): return _current_processes[pyngrok_config.ngrok_path] return _start_process(pyngrok_config)
[docs] def kill_process(ngrok_path: str) -> None: """ Terminate the ``ngrok`` processes, if running, for the given path. This method will not block, it will just issue a kill request. :param ngrok_path: The path to the ``ngrok`` binary. """ if is_process_running(ngrok_path): ngrok_process = _current_processes[ngrok_path] logger.info(f"Killing ngrok process: {ngrok_process.proc.pid}") try: ngrok_process.proc.kill() ngrok_process.proc.wait() except OSError as e: # pragma: no cover # If the process was already killed, nothing to do but cleanup state if e.errno != 3: raise e _current_processes.pop(ngrok_path, None) else: logger.debug(f"\"ngrok_path\" {ngrok_path} is not running a process")
[docs] def run_process(ngrok_path: str, args: List[str]) -> None: """ Start a blocking ``ngrok`` process with the binary at the given path and the passed args. This method is meant for invoking ``ngrok`` directly (for instance, from the command line) and is not necessarily compatible with non-blocking API methods. For that, use :func:`~pyngrok.process.get_process`. :param ngrok_path: The path to the ``ngrok`` binary. :param args: The args to pass to ``ngrok``. """ _validate_path(ngrok_path) start = [ngrok_path] + args subprocess.call(start)
[docs] def capture_run_process(ngrok_path: str, args: List[str]) -> str: """ Start a blocking ``ngrok`` process with the binary at the given path and the passed args. When the process returns, so will this method, and the captured output from the process along with it. This method is meant for invoking ``ngrok`` directly (for instance, from the command line) and is not necessarily compatible with non-blocking API methods. For that, use :func:`~pyngrok.process.get_process`. :param ngrok_path: The path to the ``ngrok`` binary. :param args: The args to pass to ``ngrok``. :return: The output from the process. """ _validate_path(ngrok_path) start = [ngrok_path] + args output = subprocess.check_output(start) return output.decode("utf-8").strip()
[docs] def _validate_path(ngrok_path: str) -> None: """ Validate the given path exists, is a ``ngrok`` binary, and is ready to be started, otherwise raise a relevant exception. :param ngrok_path: The path to the ``ngrok`` binary. :raises: :class:`~pyngrok.exception.PyngrokNgrokError`: When ``ngrok`` could not start. """ if not os.path.exists(ngrok_path): raise PyngrokNgrokError( f"ngrok binary was not found. Be sure to call \"ngrok.install_ngrok()\" first " f"for \"ngrok_path\": {ngrok_path}") if ngrok_path in _current_processes: raise PyngrokNgrokError(f"ngrok is already running for the \"ngrok_path\": {ngrok_path}")
def _validate_config(config_path: str) -> None: with open(config_path, "r") as config_file: config = yaml.safe_load(config_file) if config is not None: installer.validate_config(config) def _terminate_process(process: subprocess.Popen) -> None: # type: ignore if process is None: return try: process.terminate() except OSError: # pragma: no cover logger.debug(f"ngrok process already terminated: {process.pid}")
[docs] def _start_process(pyngrok_config: PyngrokConfig) -> NgrokProcess: """ Start a ``ngrok`` process with no tunnels. This will start the ``ngrok`` web interface, against which HTTP requests can be made to create, interact with, and destroy tunnels. :param pyngrok_config: The ``pyngrok`` configuration to use when interacting with the ``ngrok`` binary. :return: The ``ngrok`` process. :raises: :class:`~pyngrok.exception.PyngrokNgrokError`: When ``ngrok`` could not start. """ config_path = conf.get_config_path(pyngrok_config) _validate_path(pyngrok_config.ngrok_path) _validate_config(config_path) start = [pyngrok_config.ngrok_path, "start", "--none", "--log=stdout"] if pyngrok_config.config_path: logger.info(f"Starting ngrok with config file: {pyngrok_config.config_path}") start.append(f"--config={pyngrok_config.config_path}") if pyngrok_config.auth_token: logger.info("Overriding default auth token") start.append(f"--authtoken={pyngrok_config.auth_token}") if pyngrok_config.region: logger.info(f"Starting ngrok in region: {pyngrok_config.region}") start.append(f"--region={pyngrok_config.region}") popen_kwargs: Dict[str, Any] = {"stdout": subprocess.PIPE, "universal_newlines": True} if os.name == "posix": popen_kwargs.update(start_new_session=pyngrok_config.start_new_session) elif pyngrok_config.start_new_session: logger.warning("Ignoring start_new_session=True, which requires POSIX") proc = subprocess.Popen(start, **popen_kwargs) atexit.register(_terminate_process, proc) logger.debug(f"ngrok process starting with PID: {proc.pid}") ngrok_process = NgrokProcess(proc, pyngrok_config) _current_processes[pyngrok_config.ngrok_path] = ngrok_process timeout = time.time() + pyngrok_config.startup_timeout while time.time() < timeout: if proc.stdout is None: logger.debug("No stdout when starting the process, this may or may not be an issue") break line = proc.stdout.readline() ngrok_process._log_startup_line(line) if ngrok_process.healthy(): logger.debug(f"ngrok process has started with API URL: {ngrok_process.api_url}") ngrok_process.startup_error = None if pyngrok_config.monitor_thread: ngrok_process.start_monitor_thread() break elif ngrok_process.proc.poll() is not None: break if not ngrok_process.healthy(): # If the process did not come up in a healthy state, clean up the state kill_process(pyngrok_config.ngrok_path) if ngrok_process.startup_error is not None: raise PyngrokNgrokError(f"The ngrok process errored on start: {ngrok_process.startup_error}.", ngrok_process.logs, ngrok_process.startup_error) else: raise PyngrokNgrokError("The ngrok process was unable to start.", ngrok_process.logs) return ngrok_process
_current_processes: Dict[str, NgrokProcess] = {}