__copyright__ = "Copyright (c) 2018-2024 Alex Laird"
__license__ = "MIT"
import atexit
import logging
import os
import subprocess
import threading
import time
from http import HTTPStatus
from typing import Dict, List, Optional, Any
from urllib.request import Request, urlopen
import yaml
from pyngrok import conf, installer
from pyngrok.conf import PyngrokConfig
from pyngrok.exception import PyngrokNgrokError, PyngrokSecurityError, PyngrokError
from pyngrok.installer import SUPPORTED_NGROK_VERSIONS
from pyngrok.log import NgrokLog
logger = logging.getLogger(__name__)
ngrok_logger = logging.getLogger(f"{__name__}.ngrok")
[docs]
class NgrokProcess:
"""
An object containing information about the ``ngrok`` process.
"""
def __init__(self,
proc: subprocess.Popen, # type: ignore
pyngrok_config: PyngrokConfig) -> None:
#: The child process that is running ``ngrok``.
self.proc: subprocess.Popen = proc # type: ignore
#: The ``pyngrok`` configuration to use with ``ngrok``.
self.pyngrok_config: PyngrokConfig = pyngrok_config
#: The API URL for the ``ngrok`` web interface.
self.api_url: Optional[str] = None
#: A list of the most recent logs from ``ngrok``, limited in size to ``max_logs``.
self.logs: List[NgrokLog] = []
#: If ``ngrok`` startup fails, this will be the log of the failure.
self.startup_error: Optional[str] = None
self._tunnel_started = False
self._client_connected = False
self._monitor_thread: Optional[threading.Thread] = None
self._monitor_thread_alive = False
def __repr__(self) -> str:
return f"<NgrokProcess: \"{self.api_url}\">"
def __str__(self) -> str: # pragma: no cover
return f"NgrokProcess: \"{self.api_url}\""
@staticmethod
def _line_has_error(log: NgrokLog) -> bool:
return log.lvl in ["ERROR", "CRITICAL"]
[docs]
def _log_startup_line(self, line: str) -> Optional[NgrokLog]:
"""
Parse the given startup log line and use it to manage the startup state
of the ``ngrok`` process.
:param line: The line to be parsed and logged.
:return: The parsed log.
"""
log = self._log_line(line)
if log is None:
return None
elif self._line_has_error(log):
self.startup_error = log.err
elif log.msg:
# Log ngrok startup states as they come in
if "starting web service" in log.msg and log.addr is not None:
self.api_url = f"http://{log.addr}"
elif "tunnel session started" in log.msg:
self._tunnel_started = True
elif "client session established" in log.msg:
self._client_connected = True
return log
[docs]
def _log_line(self, line: str) -> Optional[NgrokLog]:
"""
Parse, log, and emit (if ``log_event_callback`` in :class:`~pyngrok.conf.PyngrokConfig` is registered) the
given log line.
:param line: The line to be processed.
:return: The parsed log.
"""
log = NgrokLog(line)
if log.line == "":
return None
ngrok_logger.log(getattr(logging, log.lvl), log.line)
self.logs.append(log)
if len(self.logs) > self.pyngrok_config.max_logs:
self.logs.pop(0)
if self.pyngrok_config.log_event_callback is not None:
self.pyngrok_config.log_event_callback(log)
return log
[docs]
def healthy(self) -> bool:
"""
Check whether the ``ngrok`` process has finished starting up and is in a running, healthy state.
:return: ``True`` if the ``ngrok`` process is started, running, and healthy.
:raises: :class:`~pyngrok.exception.PyngrokSecurityError`: When the ``url`` is not supported.
"""
if self.api_url is None or \
not self._tunnel_started or \
not self._client_connected:
return False
if not self.api_url.lower().startswith("http"):
raise PyngrokSecurityError(f"URL must start with \"http\": {self.api_url}")
# Ensure the process is available for requests before registering it as healthy
request = Request(f"{self.api_url}/api/tunnels")
response = urlopen(request)
if response.getcode() != HTTPStatus.OK:
return False
return self.proc.poll() is None
def _monitor_process(self) -> None:
self._monitor_thread_alive = True
while self._monitor_thread_alive and self.proc.poll() is None:
if self.proc.stdout is None:
logger.debug("No stdout when monitoring the process, this may or may not be an issue")
continue
self._log_line(self.proc.stdout.readline())
self._monitor_thread = None
[docs]
def start_monitor_thread(self) -> None:
"""
Start a thread that will monitor the ``ngrok`` process and its logs until it completes.
If a monitor thread is already running, nothing will be done.
"""
if self._monitor_thread is None:
logger.debug("Monitor thread will be started")
self._monitor_thread = threading.Thread(target=self._monitor_process,
daemon=True)
self._monitor_thread.start()
[docs]
def stop_monitor_thread(self) -> None:
"""
Set the monitor thread to stop monitoring the ``ngrok`` process after the next log event. This will not
necessarily terminate the thread immediately, as the thread may currently be idle, rather it sets a flag
on the thread telling it to terminate the next time it wakes up.
This has no impact on the ``ngrok`` process itself, only ``pyngrok``'s monitor of the process and
its logs.
"""
if self._monitor_thread is not None:
logger.debug("Monitor thread will be stopped")
self._monitor_thread_alive = False
[docs]
def set_auth_token(pyngrok_config: PyngrokConfig,
token: str) -> None:
"""
Set the ``ngrok`` auth token in the config file, enabling authenticated features (for instance,
more concurrent tunnels, custom subdomains, etc.).
:param pyngrok_config: The ``pyngrok`` configuration to use when interacting with the ``ngrok`` binary.
:param token: The auth token to set.
:raises: :class:`~pyngrok.exception.PyngrokError`: When the ``ngrok_version`` is not supported.
:raises: :class:`~pyngrok.exception.PyngrokNgrokError`: When ``ngrok`` could not start.
"""
if pyngrok_config.ngrok_version == "v2":
start = [pyngrok_config.ngrok_path, "authtoken", token, "--log=stdout"]
elif pyngrok_config.ngrok_version == "v3":
start = [pyngrok_config.ngrok_path, "config", "add-authtoken", token, "--log=stdout"]
else:
raise PyngrokError(f"\"ngrok_version\" must be a supported version: {SUPPORTED_NGROK_VERSIONS}")
if pyngrok_config.config_path:
logger.info(f"Updating authtoken for \"config_path\": {pyngrok_config.config_path}")
start.append(f"--config={pyngrok_config.config_path}")
else:
logger.info(
f"Updating authtoken for default \"config_path\" of \"ngrok_path\": {pyngrok_config.ngrok_path}")
result = str(subprocess.check_output(start))
if "Authtoken saved" not in result:
raise PyngrokNgrokError(f"An error occurred when saving the auth token: {result}")
[docs]
def is_process_running(ngrok_path: str) -> bool:
"""
Check if the ``ngrok`` process is currently running.
:param ngrok_path: The path to the ``ngrok`` binary.
:return: ``True`` if ``ngrok`` is running from the given path.
"""
if ngrok_path in _current_processes:
# Ensure the process is still running and hasn't been killed externally, otherwise cleanup
if _current_processes[ngrok_path].proc.poll() is None:
return True
else:
logger.debug(
f"Removing stale process for \"ngrok_path\" {ngrok_path}")
_current_processes.pop(ngrok_path, None)
return False
[docs]
def get_process(pyngrok_config: PyngrokConfig) -> NgrokProcess:
"""
Get the current ``ngrok`` process for the given config's ``ngrok_path``.
If ``ngrok`` is not running, calling this method will first start a process with
:class:`~pyngrok.conf.PyngrokConfig`.
:param pyngrok_config: The ``pyngrok`` configuration to use when interacting with the ``ngrok`` binary.
:return: The ``ngrok`` process.
"""
if is_process_running(pyngrok_config.ngrok_path):
return _current_processes[pyngrok_config.ngrok_path]
return _start_process(pyngrok_config)
[docs]
def kill_process(ngrok_path: str) -> None:
"""
Terminate the ``ngrok`` processes, if running, for the given path. This method will not block, it will just
issue a kill request.
:param ngrok_path: The path to the ``ngrok`` binary.
"""
if is_process_running(ngrok_path):
ngrok_process = _current_processes[ngrok_path]
logger.info(f"Killing ngrok process: {ngrok_process.proc.pid}")
try:
ngrok_process.proc.kill()
ngrok_process.proc.wait()
except OSError as e: # pragma: no cover
# If the process was already killed, nothing to do but cleanup state
if e.errno != 3:
raise e
_current_processes.pop(ngrok_path, None)
else:
logger.debug(f"\"ngrok_path\" {ngrok_path} is not running a process")
[docs]
def run_process(ngrok_path: str, args: List[str]) -> None:
"""
Start a blocking ``ngrok`` process with the binary at the given path and the passed args.
This method is meant for invoking ``ngrok`` directly (for instance, from the command line) and is not
necessarily compatible with non-blocking API methods. For that, use :func:`~pyngrok.process.get_process`.
:param ngrok_path: The path to the ``ngrok`` binary.
:param args: The args to pass to ``ngrok``.
"""
_validate_path(ngrok_path)
start = [ngrok_path] + args
subprocess.call(start)
[docs]
def capture_run_process(ngrok_path: str, args: List[str]) -> str:
"""
Start a blocking ``ngrok`` process with the binary at the given path and the passed args. When the process
returns, so will this method, and the captured output from the process along with it.
This method is meant for invoking ``ngrok`` directly (for instance, from the command line) and is not
necessarily compatible with non-blocking API methods. For that, use :func:`~pyngrok.process.get_process`.
:param ngrok_path: The path to the ``ngrok`` binary.
:param args: The args to pass to ``ngrok``.
:return: The output from the process.
"""
_validate_path(ngrok_path)
start = [ngrok_path] + args
output = subprocess.check_output(start)
return output.decode("utf-8").strip()
[docs]
def _validate_path(ngrok_path: str) -> None:
"""
Validate the given path exists, is a ``ngrok`` binary, and is ready to be started, otherwise raise a
relevant exception.
:param ngrok_path: The path to the ``ngrok`` binary.
:raises: :class:`~pyngrok.exception.PyngrokNgrokError`: When ``ngrok`` could not start.
"""
if not os.path.exists(ngrok_path):
raise PyngrokNgrokError(
f"ngrok binary was not found. Be sure to call \"ngrok.install_ngrok()\" first "
f"for \"ngrok_path\": {ngrok_path}")
if ngrok_path in _current_processes:
raise PyngrokNgrokError(f"ngrok is already running for the \"ngrok_path\": {ngrok_path}")
def _validate_config(config_path: str) -> None:
with open(config_path, "r") as config_file:
config = yaml.safe_load(config_file)
if config is not None:
installer.validate_config(config)
def _terminate_process(process: subprocess.Popen) -> None: # type: ignore
if process is None:
return
try:
process.terminate()
except OSError: # pragma: no cover
logger.debug(f"ngrok process already terminated: {process.pid}")
[docs]
def _start_process(pyngrok_config: PyngrokConfig) -> NgrokProcess:
"""
Start a ``ngrok`` process with no tunnels. This will start the ``ngrok`` web interface, against
which HTTP requests can be made to create, interact with, and destroy tunnels.
:param pyngrok_config: The ``pyngrok`` configuration to use when interacting with the ``ngrok`` binary.
:return: The ``ngrok`` process.
:raises: :class:`~pyngrok.exception.PyngrokNgrokError`: When ``ngrok`` could not start.
"""
config_path = conf.get_config_path(pyngrok_config)
_validate_path(pyngrok_config.ngrok_path)
_validate_config(config_path)
start = [pyngrok_config.ngrok_path, "start", "--none", "--log=stdout"]
if pyngrok_config.config_path:
logger.info(f"Starting ngrok with config file: {pyngrok_config.config_path}")
start.append(f"--config={pyngrok_config.config_path}")
if pyngrok_config.auth_token:
logger.info("Overriding default auth token")
start.append(f"--authtoken={pyngrok_config.auth_token}")
if pyngrok_config.region:
logger.info(f"Starting ngrok in region: {pyngrok_config.region}")
start.append(f"--region={pyngrok_config.region}")
popen_kwargs: Dict[str, Any] = {"stdout": subprocess.PIPE, "universal_newlines": True}
if os.name == "posix":
popen_kwargs.update(start_new_session=pyngrok_config.start_new_session)
elif pyngrok_config.start_new_session:
logger.warning("Ignoring start_new_session=True, which requires POSIX")
proc = subprocess.Popen(start, **popen_kwargs)
atexit.register(_terminate_process, proc)
logger.debug(f"ngrok process starting with PID: {proc.pid}")
ngrok_process = NgrokProcess(proc, pyngrok_config)
_current_processes[pyngrok_config.ngrok_path] = ngrok_process
timeout = time.time() + pyngrok_config.startup_timeout
while time.time() < timeout:
if proc.stdout is None:
logger.debug("No stdout when starting the process, this may or may not be an issue")
break
line = proc.stdout.readline()
ngrok_process._log_startup_line(line)
if ngrok_process.healthy():
logger.debug(f"ngrok process has started with API URL: {ngrok_process.api_url}")
ngrok_process.startup_error = None
if pyngrok_config.monitor_thread:
ngrok_process.start_monitor_thread()
break
elif ngrok_process.proc.poll() is not None:
break
if not ngrok_process.healthy():
# If the process did not come up in a healthy state, clean up the state
kill_process(pyngrok_config.ngrok_path)
if ngrok_process.startup_error is not None:
raise PyngrokNgrokError(f"The ngrok process errored on start: {ngrok_process.startup_error}.",
ngrok_process.logs,
ngrok_process.startup_error)
else:
raise PyngrokNgrokError("The ngrok process was unable to start.", ngrok_process.logs)
return ngrok_process
_current_processes: Dict[str, NgrokProcess] = {}