From 220c4db5e6bd5996da4b31abe35a43fc61abb71d Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <postgres@jeltef.nl>
Date: Sun, 19 Oct 2025 23:01:30 +0200
Subject: [PATCH v3 07/10] Refactor and improve pytest infrastructure

This change does a lot of refactoring and adding new features to the
pytest based test infrastructure.

The primary features it adds are:
- A `sql` method on `PGconn`: It takes a query and returns the results
  as native python types.
- A `conn` fixture: Which is a libpq based connection to the default
  Postgres server.
- Use the `pg_config` binary to find the libdir and bindir (can be
  overridden by setting PG_CONFIG). Otherwise I had to use
  LD_LIBRARY_PATH when running pytest manually.

The refactoring it does:
- Rename `pg_server` fixture to `pg` since it'll likely be one of the
  most commonly used ones.
- Rename `pg` module to `pypg` to avoid naming conflict/shadowing
  problems with the newly renamed `pg` fixture
- Move class definitions outside of fixtures to separate modules (either
  in the `pypg` module or the new `libpq` module)
- Move all "general" fixtures to the `pypg.fixtures` module, instead of
  having them be defined in the ssl module.
---
 src/test/pytest/libpq.py                  | 409 ++++++++++++++++++++++
 src/test/pytest/pg/fixtures.py            | 212 -----------
 src/test/pytest/plugins/pgtap.py          |   1 -
 src/test/pytest/{pg => pypg}/__init__.py  |   0
 src/test/pytest/{pg => pypg}/_env.py      |   1 -
 src/test/pytest/{pg => pypg}/_win32.py    |   0
 src/test/pytest/pypg/fixtures.py          | 175 +++++++++
 src/test/pytest/pypg/server.py            | 387 ++++++++++++++++++++
 src/test/pytest/pypg/util.py              |  42 +++
 src/test/pytest/pyt/conftest.py           |   3 +-
 src/test/pytest/pyt/test_libpq.py         |  23 +-
 src/test/pytest/pyt/test_query_helpers.py | 286 +++++++++++++++
 src/test/ssl/pyt/conftest.py              | 136 ++-----
 src/test/ssl/pyt/test_client.py           |  26 +-
 src/test/ssl/pyt/test_server.py           | 380 +-------------------
 15 files changed, 1370 insertions(+), 711 deletions(-)
 create mode 100644 src/test/pytest/libpq.py
 delete mode 100644 src/test/pytest/pg/fixtures.py
 rename src/test/pytest/{pg => pypg}/__init__.py (100%)
 rename src/test/pytest/{pg => pypg}/_env.py (97%)
 rename src/test/pytest/{pg => pypg}/_win32.py (100%)
 create mode 100644 src/test/pytest/pypg/fixtures.py
 create mode 100644 src/test/pytest/pypg/server.py
 create mode 100644 src/test/pytest/pypg/util.py
 create mode 100644 src/test/pytest/pyt/test_query_helpers.py

diff --git a/src/test/pytest/libpq.py b/src/test/pytest/libpq.py
new file mode 100644
index 00000000000..b851a117b66
--- /dev/null
+++ b/src/test/pytest/libpq.py
@@ -0,0 +1,409 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+libpq testing utilities - ctypes bindings and helpers for PostgreSQL's libpq library.
+
+This module provides Python wrappers around libpq for use in pytest tests.
+"""
+
+import contextlib
+import ctypes
+import datetime
+import decimal
+import enum
+import json
+import platform
+import os
+import uuid
+from typing import Any, Callable, Dict
+
+
+class LibpqError(RuntimeError):
+    """
+    Exception class for application-level errors that are encountered during libpq operations.
+    """
+
+    pass
+
+
+class ConnectionStatus(enum.IntEnum):
+    """PostgreSQL connection status codes from libpq."""
+
+    CONNECTION_OK = 0
+    CONNECTION_BAD = 1
+
+
+class ExecStatus(enum.IntEnum):
+    """PostgreSQL result status codes from PQresultStatus."""
+
+    PGRES_EMPTY_QUERY = 0
+    PGRES_COMMAND_OK = 1
+    PGRES_TUPLES_OK = 2
+    PGRES_COPY_OUT = 3
+    PGRES_COPY_IN = 4
+    PGRES_BAD_RESPONSE = 5
+    PGRES_NONFATAL_ERROR = 6
+    PGRES_FATAL_ERROR = 7
+    PGRES_COPY_BOTH = 8
+    PGRES_SINGLE_TUPLE = 9
+    PGRES_PIPELINE_SYNC = 10
+    PGRES_PIPELINE_ABORTED = 11
+
+
+class _PGconn(ctypes.Structure):
+    pass
+
+
+class _PGresult(ctypes.Structure):
+    pass
+
+
+_PGconn_p = ctypes.POINTER(_PGconn)
+_PGresult_p = ctypes.POINTER(_PGresult)
+
+
+def load_libpq_handle(libdir):
+    """
+    Loads a ctypes handle for libpq. Some common function prototypes are
+    initialized for general use.
+    """
+    system = platform.system()
+
+    if system in ("Linux", "FreeBSD", "NetBSD", "OpenBSD"):
+        name = "libpq.so.5"
+    elif system == "Darwin":
+        name = "libpq.5.dylib"
+    elif system == "Windows":
+        name = "libpq.dll"
+    else:
+        assert False, f"the libpq fixture must be updated for {system}"
+
+    libpq_path = os.path.join(libdir, name)
+
+    # XXX ctypes.CDLL() is a little stricter with load paths on Windows. The
+    # preferred way around that is to know the absolute path to libpq.dll, but
+    # that doesn't seem to mesh well with the current test infrastructure. For
+    # now, enable "standard" LoadLibrary behavior.
+    loadopts = {}
+    if system == "Windows":
+        loadopts["winmode"] = 0
+
+    lib = ctypes.CDLL(libpq_path, **loadopts)
+
+    #
+    # Function Prototypes
+    #
+
+    lib.PQconnectdb.restype = _PGconn_p
+    lib.PQconnectdb.argtypes = [ctypes.c_char_p]
+
+    lib.PQstatus.restype = ctypes.c_int
+    lib.PQstatus.argtypes = [_PGconn_p]
+
+    lib.PQexec.restype = _PGresult_p
+    lib.PQexec.argtypes = [_PGconn_p, ctypes.c_char_p]
+
+    lib.PQresultStatus.restype = ctypes.c_int
+    lib.PQresultStatus.argtypes = [_PGresult_p]
+
+    lib.PQclear.restype = None
+    lib.PQclear.argtypes = [_PGresult_p]
+
+    lib.PQerrorMessage.restype = ctypes.c_char_p
+    lib.PQerrorMessage.argtypes = [_PGconn_p]
+
+    lib.PQfinish.restype = None
+    lib.PQfinish.argtypes = [_PGconn_p]
+
+    lib.PQresultErrorMessage.restype = ctypes.c_char_p
+    lib.PQresultErrorMessage.argtypes = [_PGresult_p]
+
+    lib.PQntuples.restype = ctypes.c_int
+    lib.PQntuples.argtypes = [_PGresult_p]
+
+    lib.PQnfields.restype = ctypes.c_int
+    lib.PQnfields.argtypes = [_PGresult_p]
+
+    lib.PQgetvalue.restype = ctypes.c_char_p
+    lib.PQgetvalue.argtypes = [_PGresult_p, ctypes.c_int, ctypes.c_int]
+
+    lib.PQgetisnull.restype = ctypes.c_int
+    lib.PQgetisnull.argtypes = [_PGresult_p, ctypes.c_int, ctypes.c_int]
+
+    lib.PQftype.restype = ctypes.c_uint
+    lib.PQftype.argtypes = [_PGresult_p, ctypes.c_int]
+
+    return lib
+
+
+# PostgreSQL type OIDs and conversion system
+# Type registry - maps OID to converter function
+_type_converters: Dict[int, Callable[[str], Any]] = {}
+_array_to_elem_map: Dict[int, int] = {}
+
+
+def register_type_info(
+    name: str, oid: int, array_oid: int, converter: Callable[[str], Any]
+):
+    """
+    Register a PostgreSQL type with its OID, array OID, and conversion function.
+
+    Usage:
+        register_type_info("bool", 16, 1000, lambda v: v == "t")
+    """
+    _type_converters[oid] = converter
+    if array_oid is not None:
+        _array_to_elem_map[array_oid] = oid
+
+
+# Helper converters
+def _parse_array(value: str, elem_oid: int) -> list:
+    """Parse PostgreSQL array syntax: {elem1,elem2,elem3}"""
+    if not (value.startswith("{") and value.endswith("}")):
+        return value
+
+    inner = value[1:-1]
+    if not inner:
+        return []
+
+    elements = inner.split(",")
+    result = []
+    for elem in elements:
+        elem = elem.strip()
+        if elem == "NULL":
+            result.append(None)
+        else:
+            # Remove quotes if present
+            if elem.startswith('"') and elem.endswith('"'):
+                elem = elem[1:-1]
+            result.append(_convert_pg_value(elem, elem_oid))
+
+    return result
+
+
+# Register standard PostgreSQL types that we'll likely encounter in tests
+register_type_info("bool", 16, 1000, lambda v: v == "t")
+register_type_info("int2", 21, 1005, int)
+register_type_info("int4", 23, 1007, int)
+register_type_info("int8", 20, 1016, int)
+register_type_info("float4", 700, 1021, float)
+register_type_info("float8", 701, 1022, float)
+register_type_info("numeric", 1700, 1231, decimal.Decimal)
+register_type_info("text", 25, 1009, str)
+register_type_info("varchar", 1043, 1015, str)
+register_type_info("date", 1082, 1182, datetime.date.fromisoformat)
+register_type_info("time", 1083, 1183, datetime.time.fromisoformat)
+register_type_info("timestamp", 1114, 1115, datetime.datetime.fromisoformat)
+register_type_info("timestamptz", 1184, 1185, datetime.datetime.fromisoformat)
+register_type_info("uuid", 2950, 2951, uuid.UUID)
+register_type_info("json", 114, 199, json.loads)
+register_type_info("jsonb", 3802, 3807, json.loads)
+
+
+def _convert_pg_value(value: str, type_oid: int) -> Any:
+    """
+    Convert PostgreSQL string value to appropriate Python type based on OID.
+    Uses the registered type converters from register_type_info().
+    """
+    # Check if it's an array type
+    if type_oid in _array_to_elem_map:
+        elem_oid = _array_to_elem_map[type_oid]
+        return _parse_array(value, elem_oid)
+
+    # Use registered converter if available
+    converter = _type_converters.get(type_oid)
+    if converter:
+        return converter(value)
+
+    # Unknown types - return as string
+    return value
+
+
+def simplify_query_results(results) -> Any:
+    """
+    Simplify the results of a query so that the caller doesn't have to unpack
+    lists and tuples of length 1.
+    """
+    if len(results) == 1:
+        row = results[0]
+        if len(row) == 1:
+            # If there's only a single cell, just return the value
+            return row[0]
+        # If there's only a single row, just return that row
+        return row
+
+    if len(results) != 0 and len(results[0]) == 1:
+        # If there's only a single column, return an array of values
+        return [row[0] for row in results]
+
+    # if there are multiple rows and columns, return the results as is
+    return results
+
+
+class PGresult(contextlib.AbstractContextManager):
+    """Wraps a raw _PGresult_p with a more friendly interface."""
+
+    def __init__(self, lib: ctypes.CDLL, res: _PGresult_p):
+        self._lib = lib
+        self._res = res
+
+    def __exit__(self, *exc):
+        self._lib.PQclear(self._res)
+        self._res = None
+
+    def status(self) -> ExecStatus:
+        return ExecStatus(self._lib.PQresultStatus(self._res))
+
+    def error_message(self):
+        """Returns the error message associated with this result."""
+        msg = self._lib.PQresultErrorMessage(self._res)
+        return msg.decode() if msg else ""
+
+    def fetch_all(self):
+        """
+        Fetch all rows and convert to Python types.
+        Returns a list of tuples, with values converted based on their PostgreSQL type.
+        """
+        nrows = self._lib.PQntuples(self._res)
+        ncols = self._lib.PQnfields(self._res)
+
+        # Get type OIDs for each column
+        type_oids = [self._lib.PQftype(self._res, col) for col in range(ncols)]
+
+        results = []
+        for row in range(nrows):
+            row_data = []
+            for col in range(ncols):
+                if self._lib.PQgetisnull(self._res, row, col):
+                    row_data.append(None)
+                else:
+                    value = self._lib.PQgetvalue(self._res, row, col).decode()
+                    row_data.append(_convert_pg_value(value, type_oids[col]))
+            results.append(tuple(row_data))
+
+        return results
+
+
+class PGconn(contextlib.AbstractContextManager):
+    """
+    Wraps a raw _PGconn_p with a more friendly interface. This is just a
+    stub; it's expected to grow.
+    """
+
+    def __init__(
+        self,
+        lib: ctypes.CDLL,
+        handle: _PGconn_p,
+        stack: contextlib.ExitStack,
+    ):
+        self._lib = lib
+        self._handle = handle
+        self._stack = stack
+
+    def __exit__(self, *exc):
+        self._lib.PQfinish(self._handle)
+        self._handle = None
+
+    def exec(self, query: str):
+        """
+        Executes a query via PQexec() and returns a PGresult.
+        """
+        res = self._lib.PQexec(self._handle, query.encode())
+        return self._stack.enter_context(PGresult(self._lib, res))
+
+    def sql(self, query: str):
+        """
+        Executes a query and raises an exception if it fails.
+        Returns the query results with automatic type conversion and simplification.
+        For commands that don't return data (INSERT, UPDATE, etc.), returns None.
+
+        Examples:
+        - SELECT 1 -> 1
+        - SELECT 1, 2 -> (1, 2)
+        - SELECT * FROM generate_series(1, 3) -> [1, 2, 3]
+        - SELECT * FROM (VALUES (1, 'a'), (2, 'b')) t -> [(1, 'a'), (2, 'b')]
+        - CREATE TABLE ... -> None
+        - INSERT INTO ... -> None
+        """
+        res = self.exec(query)
+        status = res.status()
+
+        if status == ExecStatus.PGRES_FATAL_ERROR:
+            error_msg = res.error_message()
+            raise LibpqError(f"Query failed: {error_msg}\nQuery: {query}")
+        elif status == ExecStatus.PGRES_COMMAND_OK:
+            return None
+        elif status == ExecStatus.PGRES_TUPLES_OK:
+            results = res.fetch_all()
+            return simplify_query_results(results)
+        else:
+            error_msg = res.error_message() or f"Unexpected status: {status}"
+            raise LibpqError(f"Query failed: {error_msg}\nQuery: {query}")
+
+
+def connstr(opts: Dict[str, Any]) -> str:
+    """
+    Flattens the provided options into a libpq connection string. Values
+    are converted to str and quoted/escaped as necessary.
+    """
+    settings = []
+
+    for k, v in opts.items():
+        v = str(v)
+        if not v:
+            v = "''"
+        else:
+            v = v.replace("\\", "\\\\")
+            v = v.replace("'", "\\'")
+
+            if " " in v:
+                v = f"'{v}'"
+
+        settings.append(f"{k}={v}")
+
+    return " ".join(settings)
+
+
+def connect(
+    libpq_handle: ctypes.CDLL,
+    stack: contextlib.ExitStack,
+    remaining_timeout_fn: Callable[[], float],
+    **opts,
+) -> PGconn:
+    """
+    Connects to a server, using the given connection options, and
+    returns a PGconn object wrapping the connection handle. A
+    failure will raise LibpqError.
+
+    Connections honor PG_TEST_TIMEOUT_DEFAULT unless connect_timeout is
+    explicitly overridden in opts.
+
+    Args:
+        libpq_handle: ctypes.CDLL handle to libpq library
+        stack: ExitStack for managing connection cleanup
+        remaining_timeout_fn: Function that returns remaining timeout in seconds
+        **opts: Connection options (host, port, dbname, etc.)
+
+    Returns:
+        PGconn: Connected database connection
+
+    Raises:
+        LibpqError: If connection fails
+    """
+
+    if "connect_timeout" not in opts:
+        t = int(remaining_timeout_fn())
+        opts["connect_timeout"] = max(t, 1)
+
+    conn_p = libpq_handle.PQconnectdb(connstr(opts).encode())
+
+    # Check connection status before adding to stack
+    if libpq_handle.PQstatus(conn_p) != ConnectionStatus.CONNECTION_OK:
+        error_msg = libpq_handle.PQerrorMessage(conn_p).decode()
+        # Manually close the failed connection
+        libpq_handle.PQfinish(conn_p)
+        raise LibpqError(error_msg)
+
+    # Connection succeeded - add to stack for cleanup
+    conn = stack.enter_context(PGconn(libpq_handle, conn_p, stack=stack))
+    return conn
diff --git a/src/test/pytest/pg/fixtures.py b/src/test/pytest/pg/fixtures.py
deleted file mode 100644
index b5d3bff69a8..00000000000
--- a/src/test/pytest/pg/fixtures.py
+++ /dev/null
@@ -1,212 +0,0 @@
-# Copyright (c) 2025, PostgreSQL Global Development Group
-
-import contextlib
-import ctypes
-import platform
-import time
-from typing import Any, Callable, Dict
-
-import pytest
-
-from ._env import test_timeout_default
-
-
-@pytest.fixture
-def remaining_timeout():
-    """
-    This fixture provides a function that returns how much of the
-    PG_TEST_TIMEOUT_DEFAULT remains for the current test, in fractional seconds.
-    This value is never less than zero.
-
-    This fixture is per-test, so the deadline is also reset on a per-test basis.
-    """
-    now = time.monotonic()
-    deadline = now + test_timeout_default()
-
-    return lambda: max(deadline - time.monotonic(), 0)
-
-
-class _PGconn(ctypes.Structure):
-    pass
-
-
-class _PGresult(ctypes.Structure):
-    pass
-
-
-_PGconn_p = ctypes.POINTER(_PGconn)
-_PGresult_p = ctypes.POINTER(_PGresult)
-
-
-@pytest.fixture(scope="session")
-def libpq_handle():
-    """
-    Loads a ctypes handle for libpq. Some common function prototypes are
-    initialized for general use.
-    """
-    system = platform.system()
-
-    if system in ("Linux", "FreeBSD", "NetBSD", "OpenBSD"):
-        name = "libpq.so.5"
-    elif system == "Darwin":
-        name = "libpq.5.dylib"
-    elif system == "Windows":
-        name = "libpq.dll"
-    else:
-        assert False, f"the libpq fixture must be updated for {system}"
-
-    # XXX ctypes.CDLL() is a little stricter with load paths on Windows. The
-    # preferred way around that is to know the absolute path to libpq.dll, but
-    # that doesn't seem to mesh well with the current test infrastructure. For
-    # now, enable "standard" LoadLibrary behavior.
-    loadopts = {}
-    if system == "Windows":
-        loadopts["winmode"] = 0
-
-    lib = ctypes.CDLL(name, **loadopts)
-
-    #
-    # Function Prototypes
-    #
-
-    lib.PQconnectdb.restype = _PGconn_p
-    lib.PQconnectdb.argtypes = [ctypes.c_char_p]
-
-    lib.PQstatus.restype = ctypes.c_int
-    lib.PQstatus.argtypes = [_PGconn_p]
-
-    lib.PQexec.restype = _PGresult_p
-    lib.PQexec.argtypes = [_PGconn_p, ctypes.c_char_p]
-
-    lib.PQresultStatus.restype = ctypes.c_int
-    lib.PQresultStatus.argtypes = [_PGresult_p]
-
-    lib.PQclear.restype = None
-    lib.PQclear.argtypes = [_PGresult_p]
-
-    lib.PQerrorMessage.restype = ctypes.c_char_p
-    lib.PQerrorMessage.argtypes = [_PGconn_p]
-
-    lib.PQfinish.restype = None
-    lib.PQfinish.argtypes = [_PGconn_p]
-
-    return lib
-
-
-class PGresult(contextlib.AbstractContextManager):
-    """Wraps a raw _PGresult_p with a more friendly interface."""
-
-    def __init__(self, lib: ctypes.CDLL, res: _PGresult_p):
-        self._lib = lib
-        self._res = res
-
-    def __exit__(self, *exc):
-        self._lib.PQclear(self._res)
-        self._res = None
-
-    def status(self):
-        return self._lib.PQresultStatus(self._res)
-
-
-class PGconn(contextlib.AbstractContextManager):
-    """
-    Wraps a raw _PGconn_p with a more friendly interface. This is just a
-    stub; it's expected to grow.
-    """
-
-    def __init__(
-        self,
-        lib: ctypes.CDLL,
-        handle: _PGconn_p,
-        stack: contextlib.ExitStack,
-    ):
-        self._lib = lib
-        self._handle = handle
-        self._stack = stack
-
-    def __exit__(self, *exc):
-        self._lib.PQfinish(self._handle)
-        self._handle = None
-
-    def exec(self, query: str) -> PGresult:
-        """
-        Executes a query via PQexec() and returns a PGresult.
-        """
-        res = self._lib.PQexec(self._handle, query.encode())
-        return self._stack.enter_context(PGresult(self._lib, res))
-
-
-@pytest.fixture
-def libpq(libpq_handle, remaining_timeout):
-    """
-    Provides a ctypes-based API wrapped around libpq.so. This fixture keeps
-    track of allocated resources and cleans them up during teardown. See
-    _Libpq's public API for details.
-    """
-
-    class _Libpq(contextlib.ExitStack):
-        CONNECTION_OK = 0
-
-        PGRES_EMPTY_QUERY = 0
-
-        class Error(RuntimeError):
-            """
-            libpq.Error is the exception class for application-level errors that
-            are encountered during libpq operations.
-            """
-
-            pass
-
-        def __init__(self):
-            super().__init__()
-            self.lib = libpq_handle
-
-        def _connstr(self, opts: Dict[str, Any]) -> str:
-            """
-            Flattens the provided options into a libpq connection string. Values
-            are converted to str and quoted/escaped as necessary.
-            """
-            settings = []
-
-            for k, v in opts.items():
-                v = str(v)
-                if not v:
-                    v = "''"
-                else:
-                    v = v.replace("\\", "\\\\")
-                    v = v.replace("'", "\\'")
-
-                    if " " in v:
-                        v = f"'{v}'"
-
-                settings.append(f"{k}={v}")
-
-            return " ".join(settings)
-
-        def must_connect(self, **opts) -> PGconn:
-            """
-            Connects to a server, using the given connection options, and
-            returns a libpq.PGconn object wrapping the connection handle. A
-            failure will raise libpq.Error.
-
-            Connections honor PG_TEST_TIMEOUT_DEFAULT unless connect_timeout is
-            explicitly overridden in opts.
-            """
-
-            if "connect_timeout" not in opts:
-                t = int(remaining_timeout())
-                opts["connect_timeout"] = max(t, 1)
-
-            conn_p = self.lib.PQconnectdb(self._connstr(opts).encode())
-
-            # Ensure the connection handle is always closed at the end of the
-            # test.
-            conn = self.enter_context(PGconn(self.lib, conn_p, stack=self))
-
-            if self.lib.PQstatus(conn_p) != self.CONNECTION_OK:
-                raise self.Error(self.lib.PQerrorMessage(conn_p).decode())
-
-            return conn
-
-    with _Libpq() as lib:
-        yield lib
diff --git a/src/test/pytest/plugins/pgtap.py b/src/test/pytest/plugins/pgtap.py
index ef8291e291c..6a729d252e1 100644
--- a/src/test/pytest/plugins/pgtap.py
+++ b/src/test/pytest/plugins/pgtap.py
@@ -2,7 +2,6 @@
 
 import os
 import sys
-from typing import Optional
 
 import pytest
 
diff --git a/src/test/pytest/pg/__init__.py b/src/test/pytest/pypg/__init__.py
similarity index 100%
rename from src/test/pytest/pg/__init__.py
rename to src/test/pytest/pypg/__init__.py
diff --git a/src/test/pytest/pg/_env.py b/src/test/pytest/pypg/_env.py
similarity index 97%
rename from src/test/pytest/pg/_env.py
rename to src/test/pytest/pypg/_env.py
index 6f18af07844..154c986d73e 100644
--- a/src/test/pytest/pg/_env.py
+++ b/src/test/pytest/pypg/_env.py
@@ -2,7 +2,6 @@
 
 import logging
 import os
-from typing import List, Optional
 
 import pytest
 
diff --git a/src/test/pytest/pg/_win32.py b/src/test/pytest/pypg/_win32.py
similarity index 100%
rename from src/test/pytest/pg/_win32.py
rename to src/test/pytest/pypg/_win32.py
diff --git a/src/test/pytest/pypg/fixtures.py b/src/test/pytest/pypg/fixtures.py
new file mode 100644
index 00000000000..cf22c8ec436
--- /dev/null
+++ b/src/test/pytest/pypg/fixtures.py
@@ -0,0 +1,175 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import os
+import contextlib
+import pathlib
+import secrets
+import time
+
+import pytest
+
+from ._env import test_timeout_default
+from .util import capture
+from .server import PostgresServer
+
+from libpq import load_libpq_handle, connect as libpq_connect
+
+
+@pytest.fixture
+def remaining_timeout():
+    """
+    This fixture provides a function that returns how much of the
+    PG_TEST_TIMEOUT_DEFAULT remains for the current test, in fractional seconds.
+    This value is never less than zero.
+
+    This fixture is per-test, so the deadline is also reset on a per-test basis.
+    """
+    now = time.monotonic()
+    deadline = now + test_timeout_default()
+
+    return lambda: max(deadline - time.monotonic(), 0)
+
+
+@pytest.fixture(scope="session")
+def libpq_handle(libdir):
+    """
+    Loads a ctypes handle for libpq. Some common function prototypes are
+    initialized for general use.
+    """
+    return load_libpq_handle(libdir)
+
+
+@pytest.fixture
+def connect(libpq_handle, remaining_timeout):
+    """
+    Returns a function to connect to PostgreSQL via libpq.
+
+    The returned function accepts connection options as keyword arguments
+    (host, port, dbname, etc.) and returns a PGconn object. Connections
+    are automatically cleaned up at the end of the test.
+
+    Example:
+        conn = connect(host='localhost', port=5432, dbname='postgres')
+        result = conn.sql("SELECT 1")
+    """
+    with contextlib.ExitStack() as stack:
+
+        def _connect(**opts):
+            return libpq_connect(libpq_handle, stack, remaining_timeout, **opts)
+
+        yield _connect
+
+
+@pytest.fixture(scope="session")
+def pg_config():
+    """
+    Returns the path to pg_config. Uses PG_CONFIG environment variable if set,
+    otherwise uses 'pg_config' from PATH.
+    """
+    return os.environ.get("PG_CONFIG", "pg_config")
+
+
+@pytest.fixture(scope="session")
+def bindir(pg_config):
+    """
+    Returns the PostgreSQL bin directory using pg_config --bindir.
+    """
+    return capture(pg_config, "--bindir")
+
+
+@pytest.fixture(scope="session")
+def libdir(pg_config):
+    """
+    Returns the PostgreSQL lib directory using pg_config --libdir.
+    """
+    return capture(pg_config, "--libdir")
+
+
+@pytest.fixture(scope="session")
+def datadir(tmp_path_factory):
+    """
+    Returns the directory name to use as the server data directory. If
+    TESTDATADIR is provided, that will be used; otherwise a new temporary
+    directory is created in the pytest temp root.
+    """
+    d = os.getenv("TESTDATADIR")
+    if d:
+        d = pathlib.Path(d)
+    else:
+        d = tmp_path_factory.mktemp("tmp_check")
+
+    return d
+
+
+@pytest.fixture(scope="session")
+def sockdir(tmp_path_factory):
+    """
+    Returns the directory name to use as the server's unix_socket_directories
+    setting. Local client connections use this as the PGHOST.
+
+    At the moment, this is always put under the pytest temp root.
+    """
+    return tmp_path_factory.mktemp("sockfiles")
+
+
+@pytest.fixture(scope="session")
+def winpassword():
+    """The per-session SCRAM password for the server admin on Windows."""
+    return secrets.token_urlsafe(16)
+
+
+@pytest.fixture(scope="session")
+def pg_server_global(bindir, datadir, sockdir, winpassword, libpq_handle):
+    """
+    Starts a running Postgres server listening on localhost. The HBA initially
+    allows only local UNIX connections from the same user.
+
+    Returns a PostgresServer instance with methods for server management, configuration,
+    and creating test databases/users.
+    """
+    server = PostgresServer(bindir, datadir, sockdir, winpassword, libpq_handle)
+
+    yield server
+
+    # Cleanup any test resources
+    server.cleanup()
+
+    # Stop the server
+    server.stop()
+
+
+@pytest.fixture(scope="module")
+def pg_server_module(pg_server_global):
+    """
+    Module-scoped server context. Which can be useful so that certain settings
+    can be overriden at the module level through autouse fixtures. An example
+    of this is in the SSL tests.
+    """
+    with pg_server_global.subcontext() as s:
+        yield s
+
+
+@pytest.fixture
+def pg(pg_server_module, remaining_timeout):
+    """
+    Per-test server context. Use this fixture to make changes to the server
+    which will be rolled back at the end of the test (e.g., creating test
+    users/databases).
+    """
+    pg_server_module.set_timeout(remaining_timeout)
+    with pg_server_module.subcontext() as s:
+        yield s
+
+
+@pytest.fixture
+def conn(pg):
+    """
+    Returns a connected PGconn instance to the test PostgreSQL server.
+    The connection is automatically cleaned up at the end of the test.
+
+    Example:
+        def test_something(conn):
+            result = conn.sql("SELECT 1")
+            assert result == 1
+    """
+    return pg.connect()
diff --git a/src/test/pytest/pypg/server.py b/src/test/pytest/pypg/server.py
new file mode 100644
index 00000000000..d6675cde93d
--- /dev/null
+++ b/src/test/pytest/pypg/server.py
@@ -0,0 +1,387 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import glob
+import os
+import pathlib
+import platform
+import socket
+import subprocess
+import tempfile
+import time
+from collections import namedtuple
+from typing import Callable, Optional
+
+from .util import run
+from libpq import PGconn
+
+
+class FileBackup(contextlib.AbstractContextManager):
+    """
+    A context manager which backs up a file's contents, restoring them on exit.
+    """
+
+    def __init__(self, file: pathlib.Path):
+        super().__init__()
+
+        self._file = file
+
+    def __enter__(self):
+        import shutil
+
+        with tempfile.NamedTemporaryFile(
+            prefix=self._file.name, dir=self._file.parent, delete=False
+        ) as f:
+            self._backup = pathlib.Path(f.name)
+
+        shutil.copyfile(self._file, self._backup)
+
+        return self
+
+    def __exit__(self, *exc):
+        import shutil
+
+        # Swap the backup and the original file, so that the modified contents
+        # can still be inspected in case of failure.
+        tmp = self._backup.parent / (self._backup.name + ".tmp")
+
+        shutil.copyfile(self._file, tmp)
+        shutil.copyfile(self._backup, self._file)
+        shutil.move(tmp, self._backup)
+
+
+class HBA(FileBackup):
+    """
+    Backs up a server's HBA configuration and provides means for temporarily
+    editing it.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "pg_hba.conf")
+
+    def prepend(self, *lines):
+        """
+        Temporarily prepends lines to the server's pg_hba.conf.
+
+        As sugar for aligning HBA columns in the tests, each line can be either
+        a string or a list of strings. List elements will be joined by single
+        spaces before they are written to file.
+        """
+        with open(self._file, "r") as f:
+            prior_data = f.read()
+
+        with open(self._file, "w") as f:
+            for line in lines:
+                if isinstance(line, list):
+                    print(*line, file=f)
+                else:
+                    print(line, file=f)
+
+            f.write(prior_data)
+
+
+class Config(FileBackup):
+    """
+    Backs up a server's postgresql.conf and provides means for temporarily
+    editing it.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "postgresql.conf")
+
+    def set(self, **gucs):
+        """
+        Temporarily appends GUC settings to the server's postgresql.conf.
+        """
+
+        with open(self._file, "a") as f:
+            print(file=f)
+
+            for n, v in gucs.items():
+                v = str(v)
+
+                # TODO: proper quoting
+                v = v.replace("\\", "\\\\")
+                v = v.replace("'", "\\'")
+                v = "'{}'".format(v)
+
+                print(n, "=", v, file=f)
+
+
+Backup = namedtuple("Backup", "conf, hba")
+
+
+class PostgresServer:
+    """
+    Represents a running PostgreSQL server instance with management utilities.
+    Provides methods for configuration, user/database creation, and server control.
+    """
+
+    def __init__(self, bindir, datadir, sockdir, winpassword, libpq_handle):
+        """
+        Initialize and start a PostgreSQL server instance.
+        """
+        self.datadir = datadir
+        self.sockdir = sockdir
+        self.libpq_handle = libpq_handle
+        self._remaining_timeout_fn: Optional[Callable[[], float]] = None
+        self._bindir = bindir
+        self._winpassword = winpassword
+        self._pg_ctl = os.path.join(bindir, "pg_ctl")
+        self._log = os.path.join(datadir, "postgresql.log")
+
+        initdb = os.path.join(bindir, "initdb")
+        pg_ctl = self._pg_ctl
+
+        # Lock down the HBA by default; tests can open it back up later.
+        if platform.system() == "Windows":
+            # On Windows, for admin connections, use SCRAM with a generated password
+            # over local sockets. This requires additional work during initdb.
+            method = "scram-sha-256"
+
+            # NamedTemporaryFile doesn't work very nicely on Windows until Python
+            # 3.12, which introduces NamedTemporaryFile(delete_on_close=False).
+            # Until then, specify delete=False and manually unlink after use.
+            with tempfile.NamedTemporaryFile("w", delete=False) as pwfile:
+                pwfile.write(winpassword)
+
+            run(initdb, "--auth=scram-sha-256", "--pwfile", pwfile.name, datadir)
+            os.unlink(pwfile.name)
+
+        else:
+            # For other OSes we can just use peer auth.
+            method = "peer"
+            run(pg_ctl, "-D", datadir, "init")
+
+        with open(datadir / "pg_hba.conf", "w") as f:
+            print(f"# default: local {method} connections only", file=f)
+            print(f"local all all {method}", file=f)
+
+        # Figure out a port to listen on. Attempt to reserve both IPv4 and IPv6
+        # addresses in one go.
+        #
+        # Note: socket.has_dualstack_ipv6/create_server are only in Python 3.8+.
+        if hasattr(socket, "has_dualstack_ipv6") and socket.has_dualstack_ipv6():
+            addr = ("::1", 0)
+            s = socket.create_server(addr, family=socket.AF_INET6, dualstack_ipv6=True)
+
+            hostaddr, port, _, _ = s.getsockname()
+            addrs = [hostaddr, "127.0.0.1"]
+
+        else:
+            addr = ("127.0.0.1", 0)
+
+            s = socket.socket()
+            s.bind(addr)
+
+            hostaddr, port = s.getsockname()
+            addrs = [hostaddr]
+
+        log = self._log
+
+        with s, open(os.path.join(datadir, "postgresql.conf"), "a") as f:
+            print(file=f)
+            print("unix_socket_directories = '{}'".format(sockdir.as_posix()), file=f)
+            print("listen_addresses = '{}'".format(",".join(addrs)), file=f)
+            print("port =", port, file=f)
+            print("log_connections = all", file=f)
+
+        # Between closing of the socket, s, and server start, we're racing against
+        # anything that wants to open up ephemeral ports, so try not to put any new
+        # work here.
+
+        run(pg_ctl, "-D", datadir, "-l", log, "start")
+
+        # Read the PID file to get the postmaster PID
+        with open(os.path.join(datadir, "postmaster.pid")) as f:
+            pid = int(f.readline().strip())
+
+        # Store the computed values
+        self.hostaddr = hostaddr
+        self.port = port
+        self.pid = pid
+
+        # ExitStack for cleanup callbacks
+        self._cleanup_stack = contextlib.ExitStack()
+
+    def psql(self, *args):
+        """Run psql with the given arguments."""
+        if platform.system() == "Windows":
+            pw = dict(PGPASSWORD=self._winpassword)
+        else:
+            pw = None
+        self._run(os.path.join(self._bindir, "psql"), "-w", *args, addenv=pw)
+
+    def pg_ctl(self, *args):
+        """Run pg_ctl with the given arguments."""
+        self._run(self._pg_ctl, "-l", self._log, *args)
+
+    def _run(self, cmd, *args, addenv: Optional[dict] = None):
+        """Run a command with PG* environment variables set."""
+        subenv = dict(os.environ)
+        subenv.update(
+            {
+                "PGHOST": str(self.sockdir),
+                "PGPORT": str(self.port),
+                "PGDATABASE": "postgres",
+                "PGDATA": str(self.datadir),
+            }
+        )
+        if addenv:
+            subenv.update(addenv)
+        run(cmd, *args, env=subenv)
+
+    def create_users(self, *userkeys: str):
+        """Create test users and register them for cleanup."""
+        usermap = {}
+        for u in userkeys:
+            name = u + "user"
+            usermap[u] = name
+            self.psql("-c", "CREATE USER " + name)
+            self._cleanup_stack.callback(self.psql, "-c", "DROP USER " + name)
+        return usermap
+
+    def create_dbs(self, *dbkeys: str):
+        """Create test databases and register them for cleanup."""
+        dbmap = {}
+        for d in dbkeys:
+            name = d + "db"
+            dbmap[d] = name
+            self.psql("-c", "CREATE DATABASE " + name)
+            self._cleanup_stack.callback(self.psql, "-c", "DROP DATABASE " + name)
+        return dbmap
+
+    @contextlib.contextmanager
+    def reloading(self):
+        """
+        Provides a context manager for making configuration changes.
+
+        If the context suite finishes successfully, the configuration will
+        be reloaded via pg_ctl. On teardown, the configuration changes will
+        be unwound, and the server will be signaled to reload again.
+
+        The context target contains the following attributes which can be
+        used to configure the server:
+        - .conf: modifies postgresql.conf
+        - .hba: modifies pg_hba.conf
+
+        For example:
+
+            with pg_server_session.reloading() as s:
+                s.conf.set(log_connections="on")
+                s.hba.prepend("local all all trust")
+        """
+        # Push a reload onto the stack before making any other
+        # unwindable changes. That way the order of operations will be
+        #
+        #  # test
+        #   - config change 1
+        #   - config change 2
+        #   - reload
+        #  # teardown
+        #   - undo config change 2
+        #   - undo config change 1
+        #   - reload
+        #
+        self._cleanup_stack.callback(self.pg_ctl, "reload")
+        yield self._backup_configuration()
+
+        # Now actually reload
+        self.pg_ctl("reload")
+
+    @contextlib.contextmanager
+    def restarting(self):
+        """Like .reloading(), but with a full server restart."""
+        self._cleanup_stack.callback(self.pg_ctl, "restart")
+        yield self._backup_configuration()
+        self.pg_ctl("restart")
+
+    def _backup_configuration(self):
+        # Wrap the existing HBA and configuration with FileBackups.
+        return Backup(
+            hba=self._cleanup_stack.enter_context(HBA(self.datadir)),
+            conf=self._cleanup_stack.enter_context(Config(self.datadir)),
+        )
+
+    @contextlib.contextmanager
+    def subcontext(self):
+        """
+        Create a new cleanup context for per-test isolation.
+
+        Temporarily replaces the cleanup stack so that any cleanup callbacks
+        registered within this context will be cleaned up when the context exits.
+        """
+        old_stack = self._cleanup_stack
+        self._cleanup_stack = contextlib.ExitStack()
+        try:
+            self._cleanup_stack.__enter__()
+            yield self
+        finally:
+            self._cleanup_stack.__exit__(None, None, None)
+            self._cleanup_stack = old_stack
+
+    def stop(self):
+        """
+        Stop the PostgreSQL server instance.
+
+        Ignores failures if the server is already stopped.
+        """
+        try:
+            run(self._pg_ctl, "-D", self.datadir, "-l", self._log, "stop")
+        except subprocess.CalledProcessError:
+            # Server may have already been stopped
+            pass
+
+    def cleanup(self):
+        """Run all registered cleanup callbacks."""
+        self._cleanup_stack.close()
+
+    def set_timeout(self, remaining_timeout_fn: Callable[[], float]) -> None:
+        """
+        Set the timeout function for connections.
+        This is typically called by pg fixture for each test.
+        """
+        self._remaining_timeout_fn = remaining_timeout_fn
+
+    def connect(self, **opts) -> PGconn:
+        """
+        Creates a connection to this PostgreSQL server instance.
+
+        This is a convenience method that automatically fills in the host, port,
+        and dbname (defaulting to 'postgres') for connecting to this server.
+
+        Args:
+            stack: ExitStack for managing connection cleanup (uses internal stack if not provided)
+            remaining_timeout_fn: Function that returns remaining timeout (uses stored timeout if not provided)
+            **opts: Additional connection options (can override defaults)
+
+        Returns:
+            PGconn: Connected database connection
+
+        Example:
+            conn = pg.connect()
+            conn = pg.connect(dbname='mydb')
+        """
+        from libpq import connect as libpq_connect
+
+        # Set default connection options for this server
+        defaults = {
+            "host": str(self.sockdir),
+            "port": self.port,
+            "dbname": "postgres",
+        }
+
+        # Merge with user-provided options (user options take precedence)
+        defaults.update(opts)
+
+        if self._remaining_timeout_fn is None:
+            raise RuntimeError(
+                "Timeout function not set. Use set_timeout() or pg fixture."
+            )
+
+        return libpq_connect(
+            self.libpq_handle,
+            self._cleanup_stack,
+            self._remaining_timeout_fn,
+            **defaults,
+        )
diff --git a/src/test/pytest/pypg/util.py b/src/test/pytest/pypg/util.py
new file mode 100644
index 00000000000..b2a1e627e4b
--- /dev/null
+++ b/src/test/pytest/pypg/util.py
@@ -0,0 +1,42 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import shlex
+import subprocess
+import sys
+
+
+def eprint(*args, **kwargs):
+    """eprint prints to stderr"""
+    print(*args, file=sys.stderr, **kwargs)
+
+
+def run(*command, check=True, shell=None, silent=False, **kwargs):
+    """run runs the given command and prints it to stderr"""
+
+    if shell is None:
+        shell = len(command) == 1 and isinstance(command[0], str)
+
+    if shell:
+        command = command[0]
+    else:
+        command = list(map(str, command))
+
+    if not silent:
+        if shell:
+            eprint(f"+ {command}")
+        else:
+            # We could normally use shlex.join here, but it's not available in
+            # Python 3.6 which we still like to support
+            unsafe_string_cmd = " ".join(map(shlex.quote, command))
+            eprint(f"+ {unsafe_string_cmd}")
+
+    if silent:
+        kwargs.setdefault("stdout", subprocess.DEVNULL)
+
+    return subprocess.run(command, check=check, shell=shell, **kwargs)
+
+
+def capture(command, *args, stdout=subprocess.PIPE, encoding="utf-8", **kwargs):
+    return run(
+        command, *args, stdout=stdout, encoding=encoding, **kwargs
+    ).stdout.removesuffix("\n")
diff --git a/src/test/pytest/pyt/conftest.py b/src/test/pytest/pyt/conftest.py
index ecb72be26d7..641af0bbac5 100644
--- a/src/test/pytest/pyt/conftest.py
+++ b/src/test/pytest/pyt/conftest.py
@@ -1,3 +1,4 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
-from pg.fixtures import *
+
+from pypg.fixtures import *
diff --git a/src/test/pytest/pyt/test_libpq.py b/src/test/pytest/pyt/test_libpq.py
index 9f0857cc612..4fcf4056f41 100644
--- a/src/test/pytest/pyt/test_libpq.py
+++ b/src/test/pytest/pyt/test_libpq.py
@@ -9,6 +9,8 @@ from typing import Callable
 
 import pytest
 
+from libpq import connstr, LibpqError
+
 
 @pytest.mark.parametrize(
     "opts, expected",
@@ -22,15 +24,15 @@ import pytest
         (dict(keyword=" \\' "), r"keyword=' \\\' '"),
     ],
 )
-def test_connstr(libpq, opts, expected):
-    """Tests the escape behavior for libpq._connstr()."""
-    assert libpq._connstr(opts) == expected
+def test_connstr(opts, expected):
+    """Tests the escape behavior for connstr()."""
+    assert connstr(opts) == expected
 
 
-def test_must_connect_errors(libpq):
-    """Tests that must_connect() raises libpq.Error."""
-    with pytest.raises(libpq.Error, match="invalid connection option"):
-        libpq.must_connect(some_unknown_keyword="whatever")
+def test_must_connect_errors(connect):
+    """Tests that connect() raises LibpqError."""
+    with pytest.raises(LibpqError, match="invalid connection option"):
+        connect(some_unknown_keyword="whatever")
 
 
 @pytest.fixture
@@ -145,7 +147,7 @@ def local_server(tmp_path, remaining_timeout):
         yield s
 
 
-def test_connection_is_finished_on_error(libpq, local_server, remaining_timeout):
+def test_connection_is_finished_on_error(connect, local_server):
     """Tests that PQfinish() gets called at the end of testing."""
     expected_error = "something is wrong"
 
@@ -165,7 +167,6 @@ def test_connection_is_finished_on_error(libpq, local_server, remaining_timeout)
 
     local_server.background(serve_error)
 
-    with pytest.raises(libpq.Error, match=expected_error):
+    with pytest.raises(LibpqError, match=expected_error):
         # Exiting this context should result in PQfinish().
-        with libpq:
-            libpq.must_connect(host=local_server.host, port=local_server.port)
+        connect(host=local_server.host, port=local_server.port)
diff --git a/src/test/pytest/pyt/test_query_helpers.py b/src/test/pytest/pyt/test_query_helpers.py
new file mode 100644
index 00000000000..5a5a1ae1edf
--- /dev/null
+++ b/src/test/pytest/pyt/test_query_helpers.py
@@ -0,0 +1,286 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+Tests for query helper functions with type conversion and result simplification.
+"""
+
+import pytest
+
+
+def test_single_cell_int(conn):
+    """Single cell integer query returns just the value."""
+    result = conn.sql("SELECT 1")
+    assert result == 1
+    assert isinstance(result, int)
+
+
+def test_single_cell_string(conn):
+    """Single cell string query returns just the value."""
+    result = conn.sql("SELECT 'hello'")
+    assert result == "hello"
+    assert isinstance(result, str)
+
+
+def test_single_cell_bool(conn):
+    """Single cell boolean query returns just the value."""
+
+    result = conn.sql("SELECT true")
+    assert result is True
+    assert isinstance(result, bool)
+
+    result = conn.sql("SELECT false")
+    assert result is False
+
+
+def test_single_cell_float(conn):
+    """Single cell float query returns just the value."""
+
+    result = conn.sql("SELECT 3.14::float4")
+    assert isinstance(result, float)
+    assert abs(result - 3.14) < 0.01
+
+
+def test_single_cell_null(conn):
+    """Single cell NULL query returns None."""
+
+    result = conn.sql("SELECT NULL")
+    assert result is None
+
+
+def test_single_row_multiple_columns(conn):
+    """Single row with multiple columns returns a tuple."""
+
+    result = conn.sql("SELECT 1, 'hello', true")
+    assert result == (1, "hello", True)
+    assert isinstance(result, tuple)
+
+
+def test_single_column_multiple_rows(conn):
+    """Single column with multiple rows returns a list of values."""
+
+    result = conn.sql("SELECT * FROM generate_series(1, 3)")
+    assert result == [1, 2, 3]
+    assert isinstance(result, list)
+
+
+def test_multiple_rows_and_columns(conn):
+    """Multiple rows and columns returns list of tuples."""
+
+    result = conn.sql("SELECT * FROM (VALUES (1, 'a'), (2, 'b'), (3, 'c')) AS t")
+    assert result == [(1, "a"), (2, "b"), (3, "c")]
+    assert isinstance(result, list)
+    assert all(isinstance(row, tuple) for row in result)
+
+
+def test_empty_result(conn):
+    """Empty result set returns empty list."""
+
+    result = conn.sql("SELECT 1 WHERE false")
+    assert result == []
+
+
+def test_query_error_handling(conn):
+    """Query errors raise RuntimeError with actual error message."""
+
+    with pytest.raises(RuntimeError) as exc_info:
+        conn.sql("SELECT * FROM nonexistent_table")
+
+    error_msg = str(exc_info.value)
+    assert "nonexistent_table" in error_msg or "does not exist" in error_msg
+
+
+def test_division_by_zero_error(conn):
+    """Division by zero raises RuntimeError."""
+
+    with pytest.raises(RuntimeError) as exc_info:
+        conn.sql("SELECT 1/0")
+
+    error_msg = str(exc_info.value)
+    assert "division by zero" in error_msg.lower()
+
+
+def test_simple_exec_create_table(conn):
+    """sql for CREATE TABLE returns None."""
+
+    result = conn.sql("CREATE TEMP TABLE test_table (id int, name text)")
+    assert result is None
+
+    # Verify table was created
+    count = conn.sql("SELECT COUNT(*) FROM test_table")
+    assert count == 0
+
+
+def test_simple_exec_insert(conn):
+    """sql for INSERT returns None."""
+
+    conn.sql("CREATE TEMP TABLE test_table (id int, name text)")
+    result = conn.sql("INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob')")
+    assert result is None
+
+    # Verify data was inserted
+    count = conn.sql("SELECT COUNT(*) FROM test_table")
+    assert count == 2
+
+
+def test_type_conversion_mixed(conn):
+    """Test mixed type conversion in a single row."""
+
+    result = conn.sql(
+        "SELECT 42::int4, 123::int8, 3.14::float8, 'text', true, NULL"
+    )
+    assert result == (42, 123, 3.14, "text", True, None)
+    assert isinstance(result[0], int)
+    assert isinstance(result[1], int)
+    assert isinstance(result[2], float)
+    assert isinstance(result[3], str)
+    assert isinstance(result[4], bool)
+    assert result[5] is None
+
+
+def test_multiple_queries_same_connection(conn):
+    """Test running multiple queries on the same connection."""
+
+    result1 = conn.sql("SELECT 1")
+    assert result1 == 1
+
+    result2 = conn.sql("SELECT 'hello', 'world'")
+    assert result2 == ("hello", "world")
+
+    result3 = conn.sql("SELECT * FROM generate_series(1, 5)")
+    assert result3 == [1, 2, 3, 4, 5]
+
+
+def test_date_type(conn):
+    """Test date type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '2025-10-20'::date")
+    assert result == datetime.date(2025, 10, 20)
+    assert isinstance(result, datetime.date)
+
+
+def test_timestamp_type(conn):
+    """Test timestamp type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '2025-10-20 15:30:45'::timestamp")
+    assert result == datetime.datetime(2025, 10, 20, 15, 30, 45)
+    assert isinstance(result, datetime.datetime)
+
+
+def test_time_type(conn):
+    """Test time type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '15:30:45'::time")
+    assert result == datetime.time(15, 30, 45)
+    assert isinstance(result, datetime.time)
+
+
+def test_numeric_type(conn):
+    """Test numeric/decimal type conversion."""
+    import decimal
+
+    result = conn.sql("SELECT 123.456::numeric")
+    assert result == decimal.Decimal("123.456")
+    assert isinstance(result, decimal.Decimal)
+
+
+def test_int_array(conn):
+    """Test integer array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[1, 2, 3, 4, 5]")
+    assert result == [1, 2, 3, 4, 5]
+    assert isinstance(result, list)
+    assert all(isinstance(x, int) for x in result)
+
+
+def test_text_array(conn):
+    """Test text array type conversion."""
+
+    result = conn.sql("SELECT ARRAY['hello', 'world', 'test']")
+    assert result == ["hello", "world", "test"]
+    assert isinstance(result, list)
+    assert all(isinstance(x, str) for x in result)
+
+
+def test_bool_array(conn):
+    """Test boolean array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[true, false, true]")
+    assert result == [True, False, True]
+    assert isinstance(result, list)
+    assert all(isinstance(x, bool) for x in result)
+
+
+def test_empty_array(conn):
+    """Test empty array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[]::int[]")
+    assert result == []
+    assert isinstance(result, list)
+
+
+def test_json_type(conn):
+    """Test JSON type (parsed to dict)."""
+
+    result = conn.sql('SELECT \'{"key": "value"}\'::json')
+    assert isinstance(result, dict)
+    assert result == {"key": "value"}
+
+
+def test_jsonb_type(conn):
+    """Test JSONB type (parsed to dict)."""
+
+    result = conn.sql('SELECT \'{"name": "test", "count": 42}\'::jsonb')
+    assert isinstance(result, dict)
+    assert result == {"name": "test", "count": 42}
+
+
+def test_json_array(conn):
+    """Test JSON array type."""
+
+    result = conn.sql("SELECT '[1, 2, 3, 4, 5]'::json")
+    assert isinstance(result, list)
+    assert result == [1, 2, 3, 4, 5]
+
+
+def test_json_nested(conn):
+    """Test nested JSON object."""
+
+    result = conn.sql(
+        'SELECT \'{"user": {"id": 1, "name": "Alice"}, "active": true}\'::json'
+    )
+    assert isinstance(result, dict)
+    assert result == {"user": {"id": 1, "name": "Alice"}, "active": True}
+
+
+def test_mixed_types_with_arrays(conn):
+    """Test mixed types including arrays in a single row."""
+
+    result = conn.sql("SELECT 42, 'text', ARRAY[1, 2, 3], true")
+    assert result == (42, "text", [1, 2, 3], True)
+    assert isinstance(result[0], int)
+    assert isinstance(result[1], str)
+    assert isinstance(result[2], list)
+    assert isinstance(result[3], bool)
+
+
+def test_uuid_type(conn):
+    """Test UUID type conversion."""
+    import uuid
+
+    test_uuid = "550e8400-e29b-41d4-a716-446655440000"
+    result = conn.sql(f"SELECT '{test_uuid}'::uuid")
+    assert result == uuid.UUID(test_uuid)
+    assert isinstance(result, uuid.UUID)
+
+
+def test_uuid_generation(conn):
+    """Test generated UUID type conversion."""
+    import uuid
+
+    result = conn.sql("SELECT uuidv4()")
+    assert isinstance(result, uuid.UUID)
+    # Check it's a valid UUID by ensuring it can be converted to string
+    assert len(str(result)) == 36  # UUID string format length
diff --git a/src/test/ssl/pyt/conftest.py b/src/test/ssl/pyt/conftest.py
index 85d2c994828..6e8699e0971 100644
--- a/src/test/ssl/pyt/conftest.py
+++ b/src/test/ssl/pyt/conftest.py
@@ -1,19 +1,14 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
 import datetime
-import os
-import pathlib
-import platform
-import secrets
-import socket
+import re
 import subprocess
 import tempfile
 from collections import namedtuple
 
 import pytest
 
-import pg
-from pg.fixtures import *
+from pypg.fixtures import *
 
 
 @pytest.fixture(scope="session")
@@ -135,108 +130,51 @@ def certs(cryptography, tmp_path_factory):
     return _Certs()
 
 
-@pytest.fixture(scope="session")
-def datadir(tmp_path_factory):
+@pytest.fixture(scope="module", autouse=True)
+def ssl_setup(pg_server_module, certs, datadir):
     """
-    Returns the directory name to use as the server data directory. If
-    TESTDATADIR is provided, that will be used; otherwise a new temporary
-    directory is created in the pytest temp root.
+    Sets up required server settings for all tests in this module.
     """
-    d = os.getenv("TESTDATADIR")
-    if d:
-        d = pathlib.Path(d)
-    else:
-        d = tmp_path_factory.mktemp("tmp_check")
+    try:
+        with pg_server_module.restarting() as s:
+            s.conf.set(
+                ssl="on",
+                ssl_ca_file=certs.ca.certpath,
+                ssl_cert_file=certs.server.certpath,
+                ssl_key_file=certs.server.keypath,
+            )
 
-    return d
+            # Reject by default.
+            s.hba.prepend("hostssl all all all reject")
 
+    except subprocess.CalledProcessError:
+        # This is a decent place to skip if the server isn't set up for SSL.
+        logpath = datadir / "postgresql.log"
+        unsupported = re.compile("SSL is not supported")
 
-@pytest.fixture(scope="session")
-def sockdir(tmp_path_factory):
-    """
-    Returns the directory name to use as the server's unix_socket_directories
-    setting. Local client connections use this as the PGHOST.
+        with open(logpath, "r") as log:
+            for line in log:
+                if unsupported.search(line):
+                    pytest.skip("the server does not support SSL")
 
-    At the moment, this is always put under the pytest temp root.
-    """
-    return tmp_path_factory.mktemp("sockfiles")
+        # Some other error happened.
+        raise
 
+    users = pg_server_module.create_users("ssl")
+    dbs = pg_server_module.create_dbs("ssl")
 
-@pytest.fixture(scope="session")
-def winpassword():
-    """The per-session SCRAM password for the server admin on Windows."""
-    return secrets.token_urlsafe(16)
+    return (users, dbs)
 
 
-@pytest.fixture(scope="session")
-def server_instance(certs, datadir, sockdir, winpassword):
+@pytest.fixture(scope="module")
+def client_cert(ssl_setup, certs):
     """
-    Starts a running Postgres server listening on localhost. The HBA initially
-    allows only local UNIX connections from the same user.
-
-    TODO: when installcheck is supported, this should optionally point to the
-    currently running server instead.
+    Creates a Cert for the "ssl" user.
     """
+    from cryptography import x509
+    from cryptography.x509.oid import NameOID
+
+    users, _ = ssl_setup
+    user = users["ssl"]
 
-    # Lock down the HBA by default; tests can open it back up later.
-    if platform.system() == "Windows":
-        # On Windows, for admin connections, use SCRAM with a generated password
-        # over local sockets. This requires additional work during initdb.
-        method = "scram-sha-256"
-
-        # NamedTemporaryFile doesn't work very nicely on Windows until Python
-        # 3.12, which introduces NamedTemporaryFile(delete_on_close=False).
-        # Until then, specify delete=False and manually unlink after use.
-        with tempfile.NamedTemporaryFile("w", delete=False) as pwfile:
-            pwfile.write(winpassword)
-
-        subprocess.check_call(
-            ["initdb", "--auth=scram-sha-256", "--pwfile", pwfile.name, datadir]
-        )
-        os.unlink(pwfile.name)
-
-    else:
-        # For other OSes we can just use peer auth.
-        method = "peer"
-        subprocess.check_call(["pg_ctl", "-D", datadir, "init"])
-
-    with open(datadir / "pg_hba.conf", "w") as f:
-        print(f"# default: local {method} connections only", file=f)
-        print(f"local all all {method}", file=f)
-
-    # Figure out a port to listen on. Attempt to reserve both IPv4 and IPv6
-    # addresses in one go.
-    #
-    # Note: socket.has_dualstack_ipv6/create_server are only in Python 3.8+.
-    if hasattr(socket, "has_dualstack_ipv6") and socket.has_dualstack_ipv6():
-        addr = ("::1", 0)
-        s = socket.create_server(addr, family=socket.AF_INET6, dualstack_ipv6=True)
-
-        hostaddr, port, _, _ = s.getsockname()
-        addrs = [hostaddr, "127.0.0.1"]
-
-    else:
-        addr = ("127.0.0.1", 0)
-
-        s = socket.socket()
-        s.bind(addr)
-
-        hostaddr, port = s.getsockname()
-        addrs = [hostaddr]
-
-    log = os.path.join(datadir, "postgresql.log")
-
-    with s, open(os.path.join(datadir, "postgresql.conf"), "a") as f:
-        print(file=f)
-        print("unix_socket_directories = '{}'".format(sockdir.as_posix()), file=f)
-        print("listen_addresses = '{}'".format(",".join(addrs)), file=f)
-        print("port =", port, file=f)
-        print("log_connections = all", file=f)
-
-    # Between closing of the socket, s, and server start, we're racing against
-    # anything that wants to open up ephemeral ports, so try not to put any new
-    # work here.
-
-    subprocess.check_call(["pg_ctl", "-D", datadir, "-l", log, "start"])
-    yield (hostaddr, port)
-    subprocess.check_call(["pg_ctl", "-D", datadir, "-l", log, "stop"])
+    return certs.new(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, user)]))
diff --git a/src/test/ssl/pyt/test_client.py b/src/test/ssl/pyt/test_client.py
index 28110ae0717..247681f93cb 100644
--- a/src/test/ssl/pyt/test_client.py
+++ b/src/test/ssl/pyt/test_client.py
@@ -10,10 +10,11 @@ from typing import Callable
 
 import pytest
 
-import pg
+import pypg
+from libpq import LibpqError, ExecStatus
 
 # This suite opens up local TCP ports and is hidden behind PG_TEST_EXTRA=ssl.
-pytestmark = pg.require_test_extra("ssl")
+pytestmark = pypg.require_test_extra("ssl")
 
 
 @pytest.fixture(scope="session", autouse=True)
@@ -192,7 +193,7 @@ def ssl_server(tcp_server_class, certs):
 
 
 @pytest.mark.parametrize("sslmode", ("require", "verify-ca", "verify-full"))
-def test_server_with_ssl_disabled(libpq, tcp_server, certs, sslmode):
+def test_server_with_ssl_disabled(connect, tcp_server, certs, sslmode):
     """
     Make sure client refuses to talk to non-SSL servers with stricter
     sslmodes.
@@ -214,16 +215,15 @@ def test_server_with_ssl_disabled(libpq, tcp_server, certs, sslmode):
 
     tcp_server.background(refuse_ssl)
 
-    with pytest.raises(libpq.Error, match="server does not support SSL"):
-        with libpq:  # XXX tests shouldn't need to do this
-            libpq.must_connect(
-                **tcp_server.conninfo,
-                sslrootcert=certs.ca.certpath,
-                sslmode=sslmode,
-            )
+    with pytest.raises(LibpqError, match="server does not support SSL"):
+        connect(
+            **tcp_server.conninfo,
+            sslrootcert=certs.ca.certpath,
+            sslmode=sslmode,
+        )
 
 
-def test_verify_full_connection(libpq, ssl_server, certs):
+def test_verify_full_connection(connect, ssl_server, certs):
     """Completes a verify-full connection and empty query."""
 
     def handle_empty_query(s: ssl.SSLSocket):
@@ -269,10 +269,10 @@ def test_verify_full_connection(libpq, ssl_server, certs):
 
     ssl_server.background_ssl(handle_empty_query)
 
-    conn = libpq.must_connect(
+    conn = connect(
         **ssl_server.conninfo,
         sslrootcert=certs.ca.certpath,
         sslmode="verify-full",
     )
     with conn:
-        assert conn.exec("").status() == libpq.PGRES_EMPTY_QUERY
+        assert conn.exec("").status() == ExecStatus.PGRES_EMPTY_QUERY
diff --git a/src/test/ssl/pyt/test_server.py b/src/test/ssl/pyt/test_server.py
index 2d0be735371..60628d0c067 100644
--- a/src/test/ssl/pyt/test_server.py
+++ b/src/test/ssl/pyt/test_server.py
@@ -1,25 +1,16 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
-import contextlib
-import os
-import pathlib
-import platform
 import re
-import shutil
 import socket
 import ssl
 import struct
-import subprocess
-import tempfile
-from collections import namedtuple
-from typing import Dict, List, Union
 
 import pytest
 
-import pg
+import pypg
 
 # This suite opens up local TCP ports and is hidden behind PG_TEST_EXTRA=ssl.
-pytestmark = pg.require_test_extra("ssl")
+pytestmark = pypg.require_test_extra("ssl")
 
 
 #
@@ -27,363 +18,6 @@ pytestmark = pg.require_test_extra("ssl")
 #
 
 
-@pytest.fixture(scope="session")
-def connenv(server_instance, sockdir, datadir):
-    """
-    Provides the values for several PG* environment variables needed for our
-    utility programs to connect to the server_instance.
-    """
-    return {
-        "PGHOST": str(sockdir),
-        "PGPORT": str(server_instance[1]),
-        "PGDATABASE": "postgres",
-        "PGDATA": str(datadir),
-    }
-
-
-class FileBackup(contextlib.AbstractContextManager):
-    """
-    A context manager which backs up a file's contents, restoring them on exit.
-    """
-
-    def __init__(self, file: pathlib.Path):
-        super().__init__()
-
-        self._file = file
-
-    def __enter__(self):
-        with tempfile.NamedTemporaryFile(
-            prefix=self._file.name, dir=self._file.parent, delete=False
-        ) as f:
-            self._backup = pathlib.Path(f.name)
-
-        shutil.copyfile(self._file, self._backup)
-
-        return self
-
-    def __exit__(self, *exc):
-        # Swap the backup and the original file, so that the modified contents
-        # can still be inspected in case of failure.
-        #
-        # TODO: this is less helpful if there are multiple layers, because it's
-        # not clear which backup to look at. Can the backup name be printed as
-        # part of the failed test output? Should we only swap on test failure?
-        tmp = self._backup.parent / (self._backup.name + ".tmp")
-
-        shutil.copyfile(self._file, tmp)
-        shutil.copyfile(self._backup, self._file)
-        shutil.move(tmp, self._backup)
-
-
-class HBA(FileBackup):
-    """
-    Backs up a server's HBA configuration and provides means for temporarily
-    editing it. See also pg_server, which provides an instance of this class and
-    context managers for enforcing the reload/restart order of operations.
-    """
-
-    def __init__(self, datadir: pathlib.Path):
-        super().__init__(datadir / "pg_hba.conf")
-
-    def prepend(self, *lines: Union[str, List[str]]):
-        """
-        Temporarily prepends lines to the server's pg_hba.conf.
-
-        As sugar for aligning HBA columns in the tests, each line can be either
-        a string or a list of strings. List elements will be joined by single
-        spaces before they are written to file.
-        """
-        with open(self._file, "r") as f:
-            prior_data = f.read()
-
-        with open(self._file, "w") as f:
-            for l in lines:
-                if isinstance(l, list):
-                    print(*l, file=f)
-                else:
-                    print(l, file=f)
-
-            f.write(prior_data)
-
-
-class Config(FileBackup):
-    """
-    Backs up a server's postgresql.conf and provides means for temporarily
-    editing it. See also pg_server, which provides an instance of this class and
-    context managers for enforcing the reload/restart order of operations.
-    """
-
-    def __init__(self, datadir: pathlib.Path):
-        super().__init__(datadir / "postgresql.conf")
-
-    def set(self, **gucs):
-        """
-        Temporarily appends GUC settings to the server's postgresql.conf.
-        """
-
-        with open(self._file, "a") as f:
-            print(file=f)
-
-            for n, v in gucs.items():
-                v = str(v)
-
-                # TODO: proper quoting
-                v = v.replace("\\", "\\\\")
-                v = v.replace("'", "\\'")
-                v = "'{}'".format(v)
-
-                print(n, "=", v, file=f)
-
-
-@pytest.fixture(scope="session")
-def pg_server_session(server_instance, connenv, datadir, winpassword):
-    """
-    Provides common routines for configuring and connecting to the
-    server_instance. For example:
-
-        users = pg_server_session.create_users("one", "two")
-        dbs = pg_server_session.create_dbs("default")
-
-        with pg_server_session.reloading() as s:
-            s.hba.prepend(["local", dbs["default"], users["two"], "peer"])
-
-        conn = connect_somehow(**pg_server_session.conninfo)
-        ...
-
-    Attributes of note are
-    - .conninfo: provides TCP connection info for the server
-
-    This fixture unwinds its configuration changes at the end of the pytest
-    session. For more granular changes, pg_server_session.subcontext() splits
-    off a "nested" context to allow smaller scopes.
-    """
-
-    class _Server(contextlib.ExitStack):
-        conninfo = dict(
-            hostaddr=server_instance[0],
-            port=server_instance[1],
-        )
-
-        # for _backup_configuration()
-        _Backup = namedtuple("Backup", "conf, hba")
-
-        def subcontext(self):
-            """
-            Creates a new server stack instance that can be tied to a smaller
-            scope than "session".
-            """
-            # So far, there doesn't seem to be a need to link the two objects,
-            # since HBA/Config/FileBackup operate directly on the filesystem and
-            # will appear to "nest" naturally.
-            return self.__class__()
-
-        def create_users(self, *userkeys: str) -> Dict[str, str]:
-            """
-            Creates new users which will be dropped at the end of the server
-            context.
-
-            For each provided key, a related user name will be selected and
-            stored in a map. This map is returned to let calling code look up
-            the selected usernames (instead of hardcoding them and potentially
-            stomping on an existing installation).
-            """
-            usermap = {}
-
-            for u in userkeys:
-                # TODO: use a uniquifier to support installcheck
-                name = u + "user"
-                usermap[u] = name
-
-                # TODO: proper escaping
-                self.psql("-c", "CREATE USER " + name)
-                self.callback(self.psql, "-c", "DROP USER " + name)
-
-            return usermap
-
-        def create_dbs(self, *dbkeys: str) -> Dict[str, str]:
-            """
-            Creates new databases which will be dropped at the end of the server
-            context. See create_users() for the meaning of the keys and returned
-            map.
-            """
-            dbmap = {}
-
-            for d in dbkeys:
-                # TODO: use a uniquifier to support installcheck
-                name = d + "db"
-                dbmap[d] = name
-
-                # TODO: proper escaping
-                self.psql("-c", "CREATE DATABASE " + name)
-                self.callback(self.psql, "-c", "DROP DATABASE " + name)
-
-            return dbmap
-
-        @contextlib.contextmanager
-        def reloading(self):
-            """
-            Provides a context manager for making configuration changes.
-
-            If the context suite finishes successfully, the configuration will
-            be reloaded via pg_ctl. On teardown, the configuration changes will
-            be unwound, and the server will be signaled to reload again.
-
-            The context target contains the following attributes which can be
-            used to configure the server:
-            - .conf: modifies postgresql.conf
-            - .hba: modifies pg_hba.conf
-
-            For example:
-
-                with pg_server_session.reloading() as s:
-                    s.conf.set(log_connections="on")
-                    s.hba.prepend("local all all trust")
-            """
-            try:
-                # Push a reload onto the stack before making any other
-                # unwindable changes. That way the order of operations will be
-                #
-                #  # test
-                #   - config change 1
-                #   - config change 2
-                #   - reload
-                #  # teardown
-                #   - undo config change 2
-                #   - undo config change 1
-                #   - reload
-                #
-                self.callback(self.pg_ctl, "reload")
-                yield self._backup_configuration()
-            except:
-                # We only want to reload at the end of the suite if there were
-                # no errors. During exceptions, the pushed callback handles
-                # things instead, so there's nothing to do here.
-                raise
-            else:
-                # Suite completed successfully.
-                self.pg_ctl("reload")
-
-        @contextlib.contextmanager
-        def restarting(self):
-            """Like .reloading(), but with a full server restart."""
-            try:
-                self.callback(self.pg_ctl, "restart")
-                yield self._backup_configuration()
-            except:
-                raise
-            else:
-                self.pg_ctl("restart")
-
-        def psql(self, *args):
-            """
-            Runs psql with the given arguments. Password prompts are always
-            disabled. On Windows, the admin password will be included in the
-            environment.
-            """
-            if platform.system() == "Windows":
-                pw = dict(PGPASSWORD=winpassword)
-            else:
-                pw = None
-
-            self._run("psql", "-w", *args, addenv=pw)
-
-        def pg_ctl(self, *args):
-            """
-            Runs pg_ctl with the given arguments. Log output will be placed in
-            postgresql.log in the server's data directory.
-
-            TODO: put the log in TESTLOGDIR
-            """
-            self._run("pg_ctl", "-l", str(datadir / "postgresql.log"), *args)
-
-        def _run(self, cmd, *args, addenv: dict = None):
-            # Override the existing environment with the connenv values and
-            # anything the caller wanted to add. (Python 3.9 gives us the
-            # less-ugly `os.environ | connenv` merge operator.)
-            subenv = dict(os.environ, **connenv)
-            if addenv:
-                subenv.update(addenv)
-
-            subprocess.check_call([cmd, *args], env=subenv)
-
-        def _backup_configuration(self):
-            # Wrap the existing HBA and configuration with FileBackups.
-            return self._Backup(
-                hba=self.enter_context(HBA(datadir)),
-                conf=self.enter_context(Config(datadir)),
-            )
-
-    with _Server() as s:
-        yield s
-
-
-@pytest.fixture(scope="module", autouse=True)
-def ssl_setup(pg_server_session, certs, datadir):
-    """
-    Sets up required server settings for all tests in this module. The fixture
-    variable is a tuple (users, dbs) containing the user and database names that
-    have been chosen for the test session.
-    """
-    try:
-        with pg_server_session.restarting() as s:
-            s.conf.set(
-                ssl="on",
-                ssl_ca_file=certs.ca.certpath,
-                ssl_cert_file=certs.server.certpath,
-                ssl_key_file=certs.server.keypath,
-            )
-
-            # Reject by default.
-            s.hba.prepend("hostssl all all all reject")
-
-    except subprocess.CalledProcessError:
-        # This is a decent place to skip if the server isn't set up for SSL.
-        logpath = datadir / "postgresql.log"
-        unsupported = re.compile("SSL is not supported")
-
-        with open(logpath, "r") as log:
-            for line in log:
-                if unsupported.search(line):
-                    pytest.skip("the server does not support SSL")
-
-        # Some other error happened.
-        raise
-
-    users = pg_server_session.create_users(
-        "ssl",
-    )
-
-    dbs = pg_server_session.create_dbs(
-        "ssl",
-    )
-
-    return (users, dbs)
-
-
-@pytest.fixture(scope="module")
-def client_cert(ssl_setup, certs):
-    """
-    Creates a Cert for the "ssl" user.
-    """
-    from cryptography import x509
-    from cryptography.x509.oid import NameOID
-
-    users, _ = ssl_setup
-    user = users["ssl"]
-
-    return certs.new(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, user)]))
-
-
-@pytest.fixture
-def pg_server(pg_server_session):
-    """
-    A per-test instance of pg_server_session. Use this fixture to make changes
-    to the server which will be rolled back at the end of every test.
-    """
-    with pg_server_session.subcontext() as s:
-        yield s
-
-
 #
 # Tests
 #
@@ -394,8 +28,8 @@ CLIENT = "client"
 SERVER = "server"
 
 
+# fmt: off
 @pytest.mark.parametrize(
-    # fmt: off
     "auth_method,                    creds,  expected_error",
 [
     # Trust allows anything.
@@ -416,10 +50,10 @@ SERVER = "server"
     ("cert",                         CLIENT, None),
     ("cert",                         SERVER, "authentication failed for user"),
 ],
-    # fmt: on
 )
+# fmt: on
 def test_direct_ssl_certificate_authentication(
-    pg_server,
+    pg,
     ssl_setup,
     certs,
     client_cert,
@@ -440,7 +74,7 @@ def test_direct_ssl_certificate_authentication(
     user = users["ssl"]
     db = dbs["ssl"]
 
-    with pg_server.reloading() as s:
+    with pg.reloading() as s:
         s.hba.prepend(
             ["hostssl", db, user, "127.0.0.1/32", auth_method],
             ["hostssl", db, user, "::1/128", auth_method],
@@ -461,7 +95,7 @@ def test_direct_ssl_certificate_authentication(
 
     # Make a direct SSL connection. There's no SSLRequest in the handshake; we
     # simply wrap a TCP connection with OpenSSL.
-    addr = (pg_server.conninfo["hostaddr"], pg_server.conninfo["port"])
+    addr = (pg.hostaddr, pg.port)
     with socket.create_connection(addr) as s:
         s.settimeout(remaining_timeout())  # XXX this resets every operation
 
-- 
2.51.1

