Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions src/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,12 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
raise
return

def start(self, params=[], wait=True, exec_env=None) -> PostgresNode:
def start(
self,
params: typing.Optional[typing.List[str]] = None,
wait: bool = True,
exec_env: typing.Optional[typing.Dict] = None,
) -> PostgresNode:
"""
Starts the PostgreSQL node using pg_ctl and set flag 'is_started'.
By default, it waits for the operation to complete before returning.
Expand All @@ -1016,7 +1021,11 @@ def start(self, params=[], wait=True, exec_env=None) -> PostgresNode:
Returns:
This instance of :class:`.PostgresNode`.
"""
self.start2(params, wait, exec_env)
assert params is None or type(params) == list # noqa: E721
assert type(wait) == bool # noqa: E721
assert exec_env is None or type(exec_env) == dict # noqa: E721

self._start(params, wait, exec_env)

if not wait:
# Postmaster process is starting in background
Expand All @@ -1029,7 +1038,12 @@ def start(self, params=[], wait=True, exec_env=None) -> PostgresNode:
assert type(self._manually_started_pm_pid) == int # noqa: E721
return self

def start2(self, params=[], wait=True, exec_env=None) -> None:
def start2(
self,
params: typing.Optional[typing.List[str]] = None,
wait: bool = True,
exec_env: typing.Optional[typing.Dict] = None,
) -> None:
"""
Starts the PostgreSQL node using pg_ctl.
By default, it waits for the operation to complete before returning.
Expand All @@ -1041,21 +1055,43 @@ def start2(self, params=[], wait=True, exec_env=None) -> None:
wait: wait until operation completes.

Returns:
This instance of :class:`.PostgresNode`.
None.
"""
assert params is None or type(params) == list # noqa: E721
assert type(wait) == bool # noqa: E721
assert exec_env is None or type(exec_env) == dict # noqa: E721

self._start(params, wait, exec_env)
return

def _start(
self,
params: typing.Optional[typing.List[str]] = None,
wait: bool = True,
exec_env: typing.Optional[typing.Dict] = None,
) -> None:
assert params is None or type(params) == list # noqa: E721
assert type(wait) == bool # noqa: E721
assert exec_env is None or type(exec_env) == dict # noqa: E721

assert __class__._C_MAX_START_ATEMPTS > 1

if self._port is None:
raise InvalidOperationException("Can't start PostgresNode. Port is not defined.")

assert type(self._port) == int # noqa: E721

_params = [self._get_bin_path("pg_ctl"),
"-D", self.data_dir,
"-l", self.pg_log_file,
"-w" if wait else '-W', # --wait or --no-wait
"start"] + params # yapf: disable
_params = [
self._get_bin_path("pg_ctl"),
"start",
"-D", self.data_dir,
"-l", self.pg_log_file,
"-w" if wait else '-W', # --wait or --no-wait
]

if params is not None:
assert type(params) == list # noqa: E721
_params += params

def LOCAL__start_node():
# 'error' will be None on Windows
Expand Down
6 changes: 6 additions & 0 deletions tests/helpers/global_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,9 @@ class PostgresNodeServices:
OsOpsDescrs.sm_local_os_ops,
PortManagers.sm_local2_port_manager
)

sm_locals_and_remotes = [
sm_local,
sm_local2,
sm_remote,
]
189 changes: 189 additions & 0 deletions tests/helpers/pg_node_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from src import PostgresNode
from src import PortManager
from src import OsOperations
from src import NodeStatus
from src.node import PostgresNodeLogReader

from tests.helpers.utils import Utils as HelperUtils
from tests.helpers.utils import T_WAIT_TIME

from tests.helpers.global_data import PostgresNodeService

import typing


class PostgresNodeUtils:
class PostgresNodeUtilsException(Exception):
pass

class PortConflictNodeException(PostgresNodeUtilsException):
_data_dir: str
_port: int

def __init__(self, data_dir: str, port: int):
assert type(data_dir) == str # noqa: E721
assert type(port) == int # noqa: E721

super().__init__()

self._data_dir = data_dir
self._port = port
return

@property
def data_dir(self) -> str:
assert type(self._data_dir) == str # noqa: E721
return self._data_dir

@property
def port(self) -> int:
assert type(self._port) == int # noqa: E721
return self._port

@property
def message(self) -> str:
assert type(self._data_dir) == str # noqa: E721
assert type(self._port) == int # noqa: E721

r = "PostgresNode [data:{}][port: {}] conflicts with port of another instance.".format(
self._data_dir,
self._port,
)
assert type(r) == str # noqa: E721
return r

def __str__(self) -> str:
r = self.message
assert type(r) == str # noqa: E721
return r

def __repr__(self) -> str:
# It must be overrided!
assert type(self) == __class__ # noqa: E721
r = "{}({}, {})".format(
__class__.__name__,
repr(self._data_dir),
repr(self._port),
)
assert type(r) == str # noqa: E721
return r

# --------------------------------------------------------------------
class StartNodeException(PostgresNodeUtilsException):
_data_dir: str
_files: typing.Optional[typing.Iterable]

def __init__(
self,
data_dir: str,
files: typing.Optional[typing.Iterable] = None
):
assert type(data_dir) == str # noqa: E721
assert files is None or isinstance(files, typing.Iterable)

super().__init__()

self._data_dir = data_dir
self._files = files
return

@property
def message(self) -> str:
assert self._data_dir is None or type(self._data_dir) == str # noqa: E721
assert self._files is None or isinstance(self._files, typing.Iterable)

msg_parts = []

msg_parts.append("PostgresNode [data_dir: {}] is not started.".format(
self._data_dir
))

for f, lines in self._files or []:
assert type(f) == str # noqa: E721
assert type(lines) in [str, bytes] # noqa: E721
msg_parts.append(u'{}\n----\n{}\n'.format(f, lines))

return "\n".join(msg_parts)

@property
def data_dir(self) -> typing.Optional[str]:
assert type(self._data_dir) == str # noqa: E721
return self._data_dir

@property
def files(self) -> typing.Optional[typing.Iterable]:
assert self._files is None or isinstance(self._files, typing.Iterable)
return self._files

def __repr__(self) -> str:
assert type(self._data_dir) == str # noqa: E721
assert self._files is None or isinstance(self._files, typing.Iterable)

r = "{}({}, {})".format(
__class__.__name__,
repr(self._data_dir),
repr(self._files),
)
assert type(r) == str # noqa: E721
return r

# --------------------------------------------------------------------
@staticmethod
def get_node(
node_svc: PostgresNodeService,
name: typing.Optional[str] = None,
port: typing.Optional[int] = None,
port_manager: typing.Optional[PortManager] = None
) -> PostgresNode:
assert isinstance(node_svc, PostgresNodeService)
assert isinstance(node_svc.os_ops, OsOperations)
assert isinstance(node_svc.port_manager, PortManager)

if port_manager is None:
port_manager = node_svc.port_manager

return PostgresNode(
name,
port=port,
os_ops=node_svc.os_ops,
port_manager=port_manager if port is None else None
)

# --------------------------------------------------------------------
@staticmethod
def wait_for_running_state(
node: PostgresNode,
node_log_reader: PostgresNodeLogReader,
timeout: T_WAIT_TIME,
):
assert type(node) == PostgresNode # noqa: E721
assert type(node_log_reader) == PostgresNodeLogReader # noqa: E721
assert type(timeout) in [int, float]
assert node_log_reader._node is node
assert timeout > 0

for _ in HelperUtils.WaitUntil(
timeout=timeout
):
s = node.status()

if s == NodeStatus.Running:
return

assert s == NodeStatus.Stopped

blocks = node_log_reader.read()
assert type(blocks) == list # noqa: E721

for block in blocks:
assert type(block) == PostgresNodeLogReader.LogDataBlock # noqa: E721

if 'Is another postmaster already running on port' in block.data:
raise __class__.PortConflictNodeException(node.data_dir, node.port)

if 'database system is shut down' in block.data:
raise __class__.StartNodeException(
node.data_dir,
node._collect_special_files(),
)
continue
65 changes: 65 additions & 0 deletions tests/helpers/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import typing
import time
import logging


T_WAIT_TIME = typing.Union[int, float]


class Utils:
@staticmethod
def PrintAndSleep(wait: T_WAIT_TIME):
assert type(wait) in [int, float]
logging.info("Wait for {} second(s)".format(wait))
time.sleep(wait)
return

@staticmethod
def WaitUntil(
error_message: str = "Did not complete",
timeout: T_WAIT_TIME = 30,
interval: T_WAIT_TIME = 1,
notification_interval: T_WAIT_TIME = 5,
):
"""
Loop until the timeout is reached. If the timeout is reached, raise an
exception with the given error message.

Source of idea: pgbouncer
"""
assert type(timeout) in [int, float]
assert type(interval) in [int, float]
assert type(notification_interval) in [int, float]
assert timeout >= 0
assert interval >= 0
assert notification_interval >= 0

start_ts = time.monotonic()
end_ts = start_ts + timeout
last_printed_progress = start_ts
last_iteration_ts = start_ts

yield
attempt = 1

while end_ts > time.monotonic():
if (timeout > 5 and time.monotonic() - last_printed_progress) > notification_interval:
last_printed_progress = time.monotonic()

m = "{} in {} seconds and {} attempts - will retry".format(
error_message,
time.monotonic() - start_ts,
attempt,
)
logging.info(m)

interval_remaining = last_iteration_ts + interval - time.monotonic()
if interval_remaining > 0:
time.sleep(interval_remaining)

last_iteration_ts = time.monotonic()
yield
attempt += 1
continue

raise TimeoutError(error_message + " in time")
Empty file.
Loading