diff --git a/src/node.py b/src/node.py index 8ef07a9f..a1ed137c 100644 --- a/src/node.py +++ b/src/node.py @@ -1002,7 +1002,12 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem raise return - def start(self, params=[], wait=True, exec_env=None) -> PostgresNode: + def start( + self, + params: typing.Optional[typing.List[str]] = None, + wait: bool = True, + exec_env: typing.Optional[typing.Dict] = None, + ) -> PostgresNode: """ Starts the PostgreSQL node using pg_ctl and set flag 'is_started'. By default, it waits for the operation to complete before returning. @@ -1016,7 +1021,11 @@ def start(self, params=[], wait=True, exec_env=None) -> PostgresNode: Returns: This instance of :class:`.PostgresNode`. """ - self.start2(params, wait, exec_env) + assert params is None or type(params) == list # noqa: E721 + assert type(wait) == bool # noqa: E721 + assert exec_env is None or type(exec_env) == dict # noqa: E721 + + self._start(params, wait, exec_env) if not wait: # Postmaster process is starting in background @@ -1029,7 +1038,12 @@ def start(self, params=[], wait=True, exec_env=None) -> PostgresNode: assert type(self._manually_started_pm_pid) == int # noqa: E721 return self - def start2(self, params=[], wait=True, exec_env=None) -> None: + def start2( + self, + params: typing.Optional[typing.List[str]] = None, + wait: bool = True, + exec_env: typing.Optional[typing.Dict] = None, + ) -> None: """ Starts the PostgreSQL node using pg_ctl. By default, it waits for the operation to complete before returning. @@ -1041,9 +1055,25 @@ def start2(self, params=[], wait=True, exec_env=None) -> None: wait: wait until operation completes. Returns: - This instance of :class:`.PostgresNode`. + None. """ + assert params is None or type(params) == list # noqa: E721 + assert type(wait) == bool # noqa: E721 + assert exec_env is None or type(exec_env) == dict # noqa: E721 + + self._start(params, wait, exec_env) + return + + def _start( + self, + params: typing.Optional[typing.List[str]] = None, + wait: bool = True, + exec_env: typing.Optional[typing.Dict] = None, + ) -> None: + assert params is None or type(params) == list # noqa: E721 + assert type(wait) == bool # noqa: E721 assert exec_env is None or type(exec_env) == dict # noqa: E721 + assert __class__._C_MAX_START_ATEMPTS > 1 if self._port is None: @@ -1051,11 +1081,17 @@ def start2(self, params=[], wait=True, exec_env=None) -> None: assert type(self._port) == int # noqa: E721 - _params = [self._get_bin_path("pg_ctl"), - "-D", self.data_dir, - "-l", self.pg_log_file, - "-w" if wait else '-W', # --wait or --no-wait - "start"] + params # yapf: disable + _params = [ + self._get_bin_path("pg_ctl"), + "start", + "-D", self.data_dir, + "-l", self.pg_log_file, + "-w" if wait else '-W', # --wait or --no-wait + ] + + if params is not None: + assert type(params) == list # noqa: E721 + _params += params def LOCAL__start_node(): # 'error' will be None on Windows diff --git a/tests/helpers/global_data.py b/tests/helpers/global_data.py index 5c3f7a46..13c61e36 100644 --- a/tests/helpers/global_data.py +++ b/tests/helpers/global_data.py @@ -76,3 +76,9 @@ class PostgresNodeServices: OsOpsDescrs.sm_local_os_ops, PortManagers.sm_local2_port_manager ) + + sm_locals_and_remotes = [ + sm_local, + sm_local2, + sm_remote, + ] diff --git a/tests/helpers/pg_node_utils.py b/tests/helpers/pg_node_utils.py new file mode 100644 index 00000000..b9395e57 --- /dev/null +++ b/tests/helpers/pg_node_utils.py @@ -0,0 +1,189 @@ +from src import PostgresNode +from src import PortManager +from src import OsOperations +from src import NodeStatus +from src.node import PostgresNodeLogReader + +from tests.helpers.utils import Utils as HelperUtils +from tests.helpers.utils import T_WAIT_TIME + +from tests.helpers.global_data import PostgresNodeService + +import typing + + +class PostgresNodeUtils: + class PostgresNodeUtilsException(Exception): + pass + + class PortConflictNodeException(PostgresNodeUtilsException): + _data_dir: str + _port: int + + def __init__(self, data_dir: str, port: int): + assert type(data_dir) == str # noqa: E721 + assert type(port) == int # noqa: E721 + + super().__init__() + + self._data_dir = data_dir + self._port = port + return + + @property + def data_dir(self) -> str: + assert type(self._data_dir) == str # noqa: E721 + return self._data_dir + + @property + def port(self) -> int: + assert type(self._port) == int # noqa: E721 + return self._port + + @property + def message(self) -> str: + assert type(self._data_dir) == str # noqa: E721 + assert type(self._port) == int # noqa: E721 + + r = "PostgresNode [data:{}][port: {}] conflicts with port of another instance.".format( + self._data_dir, + self._port, + ) + assert type(r) == str # noqa: E721 + return r + + def __str__(self) -> str: + r = self.message + assert type(r) == str # noqa: E721 + return r + + def __repr__(self) -> str: + # It must be overrided! + assert type(self) == __class__ # noqa: E721 + r = "{}({}, {})".format( + __class__.__name__, + repr(self._data_dir), + repr(self._port), + ) + assert type(r) == str # noqa: E721 + return r + + # -------------------------------------------------------------------- + class StartNodeException(PostgresNodeUtilsException): + _data_dir: str + _files: typing.Optional[typing.Iterable] + + def __init__( + self, + data_dir: str, + files: typing.Optional[typing.Iterable] = None + ): + assert type(data_dir) == str # noqa: E721 + assert files is None or isinstance(files, typing.Iterable) + + super().__init__() + + self._data_dir = data_dir + self._files = files + return + + @property + def message(self) -> str: + assert self._data_dir is None or type(self._data_dir) == str # noqa: E721 + assert self._files is None or isinstance(self._files, typing.Iterable) + + msg_parts = [] + + msg_parts.append("PostgresNode [data_dir: {}] is not started.".format( + self._data_dir + )) + + for f, lines in self._files or []: + assert type(f) == str # noqa: E721 + assert type(lines) in [str, bytes] # noqa: E721 + msg_parts.append(u'{}\n----\n{}\n'.format(f, lines)) + + return "\n".join(msg_parts) + + @property + def data_dir(self) -> typing.Optional[str]: + assert type(self._data_dir) == str # noqa: E721 + return self._data_dir + + @property + def files(self) -> typing.Optional[typing.Iterable]: + assert self._files is None or isinstance(self._files, typing.Iterable) + return self._files + + def __repr__(self) -> str: + assert type(self._data_dir) == str # noqa: E721 + assert self._files is None or isinstance(self._files, typing.Iterable) + + r = "{}({}, {})".format( + __class__.__name__, + repr(self._data_dir), + repr(self._files), + ) + assert type(r) == str # noqa: E721 + return r + + # -------------------------------------------------------------------- + @staticmethod + def get_node( + node_svc: PostgresNodeService, + name: typing.Optional[str] = None, + port: typing.Optional[int] = None, + port_manager: typing.Optional[PortManager] = None + ) -> PostgresNode: + assert isinstance(node_svc, PostgresNodeService) + assert isinstance(node_svc.os_ops, OsOperations) + assert isinstance(node_svc.port_manager, PortManager) + + if port_manager is None: + port_manager = node_svc.port_manager + + return PostgresNode( + name, + port=port, + os_ops=node_svc.os_ops, + port_manager=port_manager if port is None else None + ) + + # -------------------------------------------------------------------- + @staticmethod + def wait_for_running_state( + node: PostgresNode, + node_log_reader: PostgresNodeLogReader, + timeout: T_WAIT_TIME, + ): + assert type(node) == PostgresNode # noqa: E721 + assert type(node_log_reader) == PostgresNodeLogReader # noqa: E721 + assert type(timeout) in [int, float] + assert node_log_reader._node is node + assert timeout > 0 + + for _ in HelperUtils.WaitUntil( + timeout=timeout + ): + s = node.status() + + if s == NodeStatus.Running: + return + + assert s == NodeStatus.Stopped + + blocks = node_log_reader.read() + assert type(blocks) == list # noqa: E721 + + for block in blocks: + assert type(block) == PostgresNodeLogReader.LogDataBlock # noqa: E721 + + if 'Is another postmaster already running on port' in block.data: + raise __class__.PortConflictNodeException(node.data_dir, node.port) + + if 'database system is shut down' in block.data: + raise __class__.StartNodeException( + node.data_dir, + node._collect_special_files(), + ) + continue diff --git a/tests/helpers/utils.py b/tests/helpers/utils.py new file mode 100644 index 00000000..50badf01 --- /dev/null +++ b/tests/helpers/utils.py @@ -0,0 +1,65 @@ +import typing +import time +import logging + + +T_WAIT_TIME = typing.Union[int, float] + + +class Utils: + @staticmethod + def PrintAndSleep(wait: T_WAIT_TIME): + assert type(wait) in [int, float] + logging.info("Wait for {} second(s)".format(wait)) + time.sleep(wait) + return + + @staticmethod + def WaitUntil( + error_message: str = "Did not complete", + timeout: T_WAIT_TIME = 30, + interval: T_WAIT_TIME = 1, + notification_interval: T_WAIT_TIME = 5, + ): + """ + Loop until the timeout is reached. If the timeout is reached, raise an + exception with the given error message. + + Source of idea: pgbouncer + """ + assert type(timeout) in [int, float] + assert type(interval) in [int, float] + assert type(notification_interval) in [int, float] + assert timeout >= 0 + assert interval >= 0 + assert notification_interval >= 0 + + start_ts = time.monotonic() + end_ts = start_ts + timeout + last_printed_progress = start_ts + last_iteration_ts = start_ts + + yield + attempt = 1 + + while end_ts > time.monotonic(): + if (timeout > 5 and time.monotonic() - last_printed_progress) > notification_interval: + last_printed_progress = time.monotonic() + + m = "{} in {} seconds and {} attempts - will retry".format( + error_message, + time.monotonic() - start_ts, + attempt, + ) + logging.info(m) + + interval_remaining = last_iteration_ts + interval - time.monotonic() + if interval_remaining > 0: + time.sleep(interval_remaining) + + last_iteration_ts = time.monotonic() + yield + attempt += 1 + continue + + raise TimeoutError(error_message + " in time") diff --git a/tests/units/node/PostgresNode/__init__.py b/tests/units/node/PostgresNode/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/units/node/PostgresNode/test_setM001__start.py b/tests/units/node/PostgresNode/test_setM001__start.py new file mode 100644 index 00000000..09dc7ab8 --- /dev/null +++ b/tests/units/node/PostgresNode/test_setM001__start.py @@ -0,0 +1,217 @@ +from __future__ import annotations + +from tests.helpers.global_data import PostgresNodeService +from tests.helpers.global_data import PostgresNodeServices +from tests.helpers.global_data import OsOperations +from tests.helpers.global_data import PortManager +from tests.helpers.utils import Utils as HelperUtils +from tests.helpers.pg_node_utils import PostgresNodeUtils as PostgresNodeTestUtils + +from src import PostgresNode +from src import NodeStatus +from src import NodeConnection + +from src.node import PostgresNodeLogReader + +import pytest +import typing +import logging + + +class TestSet001__start: + @pytest.fixture( + params=PostgresNodeServices.sm_locals_and_remotes, + ids=[descr.sign for descr in PostgresNodeServices.sm_locals_and_remotes] + ) + def node_svc(self, request: pytest.FixtureRequest) -> PostgresNodeService: + assert isinstance(request, pytest.FixtureRequest) + assert isinstance(request.param, PostgresNodeService) + assert isinstance(request.param.os_ops, OsOperations) + assert isinstance(request.param.port_manager, PortManager) + return request.param + + class tagData001: + wait: typing.Optional[bool] + + def __init__(self, wait: typing.Optional[bool]): + assert wait is None or type(wait) == bool # noqa: E721 + self.wait = wait + return + + sm_Data001: typing.List[tagData001] = [ + tagData001(None), + tagData001(True) + ] + + @pytest.fixture( + params=sm_Data001, + ids=["wait={}".format(x.wait) for x in sm_Data001] + ) + def data001(self, request: pytest.FixtureRequest) -> tagData001: + assert isinstance(request, pytest.FixtureRequest) + assert type(request.param).__name__ == "tagData001" + return request.param + + def test_001__wait_true( + self, + node_svc: PostgresNodeService, + data001: tagData001 + ): + assert isinstance(node_svc, PostgresNodeService) + assert type(data001) == __class__.tagData001 # noqa: E721 + assert data001.wait is None or type(data001.wait) == bool # noqa: E721 + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + kwargs = {} + + if data001.wait is not None: + assert data001.wait == True # noqa: E712 + kwargs["wait"] = data001.wait + + node.start(**kwargs) + assert node.is_started + assert node.status() == NodeStatus.Running + + # Internals + assert type(node._manually_started_pm_pid) == int # noqa: E721 + assert node._manually_started_pm_pid != 0 + assert node._manually_started_pm_pid != node._C_PM_PID__IS_NOT_DETECTED + assert node._manually_started_pm_pid == node.pid + return + + def test_002__wait_false(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + + C_MAX_ATTEMPTS = 3 + + attempt = 0 + + while True: + assert type(attempt) == int # noqa: E721 + assert attempt >= 0 + assert attempt <= C_MAX_ATTEMPTS + + if attempt == C_MAX_ATTEMPTS: + raise RuntimeError("Node is not started") + + attempt += 1 + + logging.info("------------- attempt #{}".format(attempt)) + + if attempt > 1: + HelperUtils.PrintAndSleep(5) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + node_log_reader = PostgresNodeLogReader(node, from_beginnig=False) + node.start(wait=False) + assert node.is_started + assert node.status() in [NodeStatus.Stopped, NodeStatus.Running] + + # Internals + assert type(node._manually_started_pm_pid) == int # noqa: E721 + assert node._manually_started_pm_pid == node._C_PM_PID__IS_NOT_DETECTED + + logging.info("Wait for running state ...") + + try: + PostgresNodeTestUtils.wait_for_running_state( + node=node, + node_log_reader=node_log_reader, + timeout=60, + ) + except PostgresNodeTestUtils.PortConflictNodeException as e: + logging.warning("Exception {}: {}".format( + type(e).__name__, + e, + )) + continue + + logging.info("Node is running.") + assert node.status() == NodeStatus.Running + return + + def test_003__exec_env( + self, + node_svc: PostgresNodeService, + ): + assert isinstance(node_svc, PostgresNodeService) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + C_ENV_NAME = "MYTESTVAR" + C_ENV_VALUE = "abcdefg" + + envs = { + C_ENV_NAME: C_ENV_VALUE + } + + node.start(exec_env=envs) + assert node.is_started + assert node.status() == NodeStatus.Running + + with node.connect(dbname="postgres") as cn: + assert type(cn) == NodeConnection # noqa: E721 + + cn.execute("CREATE TEMP TABLE cmd_out(content text);") + cn.commit() + cn.execute("COPY cmd_out FROM PROGRAM 'bash -c \'\'echo ${}\'\'';".format( + C_ENV_NAME, + )) + cn.commit() + recs = cn.execute("select content from cmd_out;") + assert type(recs) == list # noqa: E721 + assert len(recs) == 1 + assert type(recs[0]) == tuple # noqa: E721 + rec = recs[0] + assert len(rec) == 1 + assert rec[0] == C_ENV_VALUE + logging.info("Env has value [{}]. It is OK!".find(rec[0])) + return + + def test_004__params_is_None( + self, + node_svc: PostgresNodeService, + ): + assert isinstance(node_svc, PostgresNodeService) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + node.start(params=None) + assert node.is_started + assert node.status() == NodeStatus.Running + return + + def test_005__params_is_empty( + self, + node_svc: PostgresNodeService, + ): + assert isinstance(node_svc, PostgresNodeService) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + node.start(params=[]) + assert node.is_started + assert node.status() == NodeStatus.Running + return diff --git a/tests/units/node/PostgresNode/test_setM002__start2.py b/tests/units/node/PostgresNode/test_setM002__start2.py new file mode 100644 index 00000000..fe50944c --- /dev/null +++ b/tests/units/node/PostgresNode/test_setM002__start2.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +from tests.helpers.global_data import PostgresNodeService +from tests.helpers.global_data import PostgresNodeServices +from tests.helpers.global_data import OsOperations +from tests.helpers.global_data import PortManager +from tests.helpers.utils import Utils as HelperUtils +from tests.helpers.pg_node_utils import PostgresNodeUtils as PostgresNodeTestUtils + +from src import PostgresNode +from src import NodeStatus +from src import NodeConnection + +from src.node import PostgresNodeLogReader + +import pytest +import typing +import logging + + +class TestSet002__start2: + @pytest.fixture( + params=PostgresNodeServices.sm_locals_and_remotes, + ids=[descr.sign for descr in PostgresNodeServices.sm_locals_and_remotes] + ) + def node_svc(self, request: pytest.FixtureRequest) -> PostgresNodeService: + assert isinstance(request, pytest.FixtureRequest) + assert isinstance(request.param, PostgresNodeService) + assert isinstance(request.param.os_ops, OsOperations) + assert isinstance(request.param.port_manager, PortManager) + return request.param + + class tagData001: + wait: typing.Optional[bool] + + def __init__(self, wait: typing.Optional[bool]): + assert wait is None or type(wait) == bool # noqa: E721 + self.wait = wait + return + + sm_Data001: typing.List[tagData001] = [ + tagData001(None), + tagData001(True) + ] + + @pytest.fixture( + params=sm_Data001, + ids=["wait={}".format(x.wait) for x in sm_Data001] + ) + def data001(self, request: pytest.FixtureRequest) -> tagData001: + assert isinstance(request, pytest.FixtureRequest) + assert type(request.param).__name__ == "tagData001" + return request.param + + def test_001__wait_true( + self, + node_svc: PostgresNodeService, + data001: tagData001 + ): + assert isinstance(node_svc, PostgresNodeService) + assert type(data001) == __class__.tagData001 # noqa: E721 + assert data001.wait is None or type(data001.wait) == bool # noqa: E721 + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + kwargs = {} + + if data001.wait is not None: + assert data001.wait == True # noqa: E712 + kwargs["wait"] = data001.wait + + node.start2(**kwargs) + assert not node.is_started + assert node.status() == NodeStatus.Running + + # Internals + assert node._manually_started_pm_pid is None + return + + def test_002__wait_false(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + + C_MAX_ATTEMPTS = 3 + + attempt = 0 + + while True: + assert type(attempt) == int # noqa: E721 + assert attempt >= 0 + assert attempt <= C_MAX_ATTEMPTS + + if attempt == C_MAX_ATTEMPTS: + raise RuntimeError("Node is not started") + + logging.info("------------- attempt #{}".format(attempt)) + + if attempt > 1: + HelperUtils.PrintAndSleep(5) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + node_log_reader = PostgresNodeLogReader(node, from_beginnig=False) + node.start2(wait=False) + assert not node.is_started + assert node.status() in [NodeStatus.Stopped, NodeStatus.Running] + + # Internals + assert node._manually_started_pm_pid is None + + logging.info("Wait for running state ...") + + try: + PostgresNodeTestUtils.wait_for_running_state( + node=node, + node_log_reader=node_log_reader, + timeout=60, + ) + except PostgresNodeTestUtils.PortConflictNodeException as e: + logging.warning("Exception {}: {}".format( + type(e).__name__, + e, + )) + continue + + logging.info("Node is running.") + assert node.status() == NodeStatus.Running + return + + def test_003__exec_env( + self, + node_svc: PostgresNodeService, + ): + assert isinstance(node_svc, PostgresNodeService) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + C_ENV_NAME = "MYTESTVAR" + C_ENV_VALUE = "abcdefg" + + envs = { + C_ENV_NAME: C_ENV_VALUE + } + + node.start2(exec_env=envs) + assert not node.is_started + assert node.status() == NodeStatus.Running + + with node.connect(dbname="postgres") as cn: + assert type(cn) == NodeConnection # noqa: E721 + + cn.execute("CREATE TEMP TABLE cmd_out(content text);") + cn.commit() + cn.execute("COPY cmd_out FROM PROGRAM 'bash -c \'\'echo ${}\'\'';".format( + C_ENV_NAME, + )) + cn.commit() + recs = cn.execute("select content from cmd_out;") + assert type(recs) == list # noqa: E721 + assert len(recs) == 1 + assert type(recs[0]) == tuple # noqa: E721 + rec = recs[0] + assert len(rec) == 1 + assert rec[0] == C_ENV_VALUE + logging.info("Env has value [{}]. It is OK!".find(rec[0])) + return + + def test_004__params_is_None( + self, + node_svc: PostgresNodeService, + ): + assert isinstance(node_svc, PostgresNodeService) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + node.start(params=None) + assert node.is_started + assert node.status() == NodeStatus.Running + return + + def test_005__params_is_empty( + self, + node_svc: PostgresNodeService, + ): + assert isinstance(node_svc, PostgresNodeService) + + with PostgresNodeTestUtils.get_node(node_svc) as node: + assert type(node) == PostgresNode # noqa: E721 + node.init() + assert not node.is_started + assert node.status() == NodeStatus.Stopped + + node.start(params=[]) + assert node.is_started + assert node.status() == NodeStatus.Running + return diff --git a/tests/units/node/__init__.py b/tests/units/node/__init__.py new file mode 100644 index 00000000..e69de29b