public inbox for [email protected]  
help / color / mirror / Atom feed
From: Jelte Fennema-Nio <[email protected]>
To: Jacob Champion <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Cc: Robert Haas <[email protected]>
Cc: Daniel Gustafsson <[email protected]>
Cc: Andres Freund <[email protected]>
Cc: Tom Lane <[email protected]>
Cc: Peter Eisentraut <[email protected]>
Cc: Nazir Bilal Yavuz <[email protected]>
Subject: Re: RFC: adding pytest as a supported test framework
Date: Wed, 22 Oct 2025 14:44:15 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <CAOYmi+=CrBFBRX-foRRES2tx2wXBJJhbsJGjgFbWvPcmBJuK-Q@mail.gmail.com>
References: <CAOYmi+kThkM9Z87u=R_Wi7fCor2i+UZKAyq0UCyprzCwTQvqgA@mail.gmail.com>
	<[email protected]>
	<CAOYmi+niQdwFdX7srOiD8zdme_rxEp2m4JGdqzK=+dS6dpV2Og@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CA+Tgmobhsqh8c+ySXaPf5MdVwFYW6ceQ05N7pav6dzecxJGJoQ@mail.gmail.com>
	<CAGECzQTtGoPGQMZactCAytbPvNcA59Zk7RA7SEmzObyAm7K=5w@mail.gmail.com>
	<CA+TgmoZCQtUu4u7XJ1umD8k5pTuM3h2zLDyQ1uoVOP-==gJz_Q@mail.gmail.com>
	<CAGECzQTzjVaMrKi5SCqL+H63pOx8r1DP0JpJ0v2SfaE0fMaNmg@mail.gmail.com>
	<CA+TgmoYvLLAJthL9ev-qc3gVLYX5WC3y3T5hsObaEnVOAx+e6g@mail.gmail.com>
	<CAGECzQQ6nHqhwBOr3fW_+_nxn82TJxMPZGAeXD-WKKJPYHYQqw@mail.gmail.com>
	<CA+TgmoboC3zPe5uzY58_QxGaGexGGpSBTNSDMZ4_Tzo-nrne=w@mail.gmail.com>
	<CAOYmi+moGFuBaaAHq=kixGk0YuOZan09Wn5O4WAzA-xLTEumaA@mail.gmail.com>
	<CA+TgmoY4DRMVEV7CVo1D8p==ExsN69W89rF7-Ax4V=M9HGWL7g@mail.gmail.com>
	<CA+TgmoZV8B35Mvx8DqHtJjHDQxscJhwOWzNWiPj+dsUhwaUfSg@mail.gmail.com>
	<[email protected]>
	<CAOYmi+k214g1tgREvQkUcr=OCiVKyaTDu1ja+OGAygG1y=jhPQ@mail.gmail.com>
	<CAOYmi+nEqA2LmetcJKUDmctypPLLumkVwj3vQ3idYd8yAGza5Q@mail.gmail.com>
	<CAOYmi+kebAt6wSX7ee0c0kMzV7r0hp93bAt10V5a88yHHUKwog@mail.gmail.com>
	<CAOYmi+=CrBFBRX-foRRES2tx2wXBJJhbsJGjgFbWvPcmBJuK-Q@mail.gmail.com>

On Mon, 22 Sept 2025 at 22:30, Jacob Champion <[email protected]> wrote:
> Done this way in v2-0002

Okay I finally managed to do some testing of this patchset while working
on a patchset of mine where I'm adding a GoAway message to the protocol
(should be ready to be published soon)

First of all: THANK YOU! It's a great base to start from and I hope we
can relatively soon have something merged, that we can gradually
improve.

I had some problems using it for my own tests though. The primary
reasons for that were:
1. It was missing functionality to send queries and get results.
2. A lot of the fixtures I wanted to use were located in the ssl tests
   directory instead of the shared fixtures module.
3. When running pytest manually I had to configure LD_LIBRARY_PATH

So here's your patchset with an additional commit on top that does a
bunch of refactoring/renaming and adding features. I hope you like it. I
tried to make the most common actions easy to do.

The primary features it adds are:
- A `sql` method on `PGconn`: It takes a query and returns the results
    as native python types.
- A `conn` fixture: Which is a libpq based connection to the default
    Postgres server.
- Use the `pg_config` binary to find the libdir and bindir (can be
    overridden by setting PG_CONFIG). Otherwise I had to use
    LD_LIBRARY_PATH when running pytest manually.

The refactoring it does:
- Rename `pg_server` fixture to `pg` since it'll likely be one of the
    most commonly used ones.
- Rename `pg` module to `pypg` to avoid naming conflict/shadowing
    problems with the newly renamed `pg` fixture
- Move class definitions outside of fixtures to separate modules (either
    in the `pypg` module or the new `libpq` module)
- Move all "general" fixtures to the `pypg.fixtures` module, instead of
    having them be defined in the ssl module.


Attachments:

  [text/x-patch] v3-0001-meson-Include-TAP-tests-in-the-configuration-summ.patch (1.0K, 2-v3-0001-meson-Include-TAP-tests-in-the-configuration-summ.patch)
  download | inline diff:
From 6be9c11e14a7cba6877f6d8c0397cbb94fb62f6b Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Fri, 5 Sep 2025 16:39:08 -0700
Subject: [PATCH v3 01/10] meson: Include TAP tests in the configuration
 summary

...to make it obvious when they've been enabled. prove is added to the
executables list for good measure.

TODO: does Autoconf need something similar?

Per complaint by Peter Eisentraut.
---
 meson.build | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/meson.build b/meson.build
index 395416a6060..37ed68ceeb4 100644
--- a/meson.build
+++ b/meson.build
@@ -3952,6 +3952,7 @@ summary(
     'bison': '@0@ @1@'.format(bison.full_path(), bison_version),
     'dtrace': dtrace,
     'flex': '@0@ @1@'.format(flex.full_path(), flex_version),
+    'prove': prove,
   },
   section: 'Programs',
 )
@@ -3988,3 +3989,11 @@ summary(
   section: 'External libraries',
   list_sep: ' ',
 )
+
+summary(
+  {
+    'tap': tap_tests_enabled,
+  },
+  section: 'Other features',
+  list_sep: ' ',
+)
-- 
2.51.1



  [text/x-patch] v3-0002-Add-support-for-pytest-test-suites.patch (35.9K, 3-v3-0002-Add-support-for-pytest-test-suites.patch)
  download | inline diff:
From 2a35a86f10914e95fd6e63e4224ab62a973a6a93 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 13 Aug 2025 10:58:56 -0700
Subject: [PATCH v3 02/10] Add support for pytest test suites

Specify --enable-pytest/-Dpytest=enabled at configure time. This
contains no Postgres test logic -- it is just a "vanilla" pytest
skeleton.

I've written a custom pgtap output plugin, used by the Meson mtest
runner, to fully control what we see during CI test failures. The
pytest-tap plugin would have been preferable, but it's now in
maintenance mode, and it has problems with accidentally suppressing
important collection failures.

test_something.py is intended to show a sample failure in the CI.

TODOs:
- OpenBSD has an ANSI-related terminal bug, but I'm not sure if the bug
  is in Cirrus, the image, pytest, Python, or readline. The TERM envvar
  is unset to work around it. If this workaround is removed, a bad ANSI
  escape is inserted into the pgtap output and mtest is unable to parse
  it.
- The Chocolatey CI setup is subpar. Need to find a way to bless the
  dependencies in use rather than pulling from pip... or maybe that will
  be done by the image baker.
---
 .cirrus.tasks.yml                     |  38 +++--
 .gitignore                            |   1 +
 config/check_pytest.py                | 150 ++++++++++++++++++++
 config/conftest.py                    |  18 +++
 config/pytest-requirements.txt        |  21 +++
 configure                             | 108 +++++++++++++-
 configure.ac                          |  25 +++-
 meson.build                           |  92 ++++++++++++
 meson_options.txt                     |   8 +-
 pytest.ini                            |   6 +
 src/Makefile.global.in                |  23 +++
 src/makefiles/meson.build             |   2 +
 src/test/Makefile                     |  11 +-
 src/test/meson.build                  |   1 +
 src/test/pytest/Makefile              |  20 +++
 src/test/pytest/README                |   1 +
 src/test/pytest/meson.build           |  16 +++
 src/test/pytest/plugins/pgtap.py      | 193 ++++++++++++++++++++++++++
 src/test/pytest/pyt/test_something.py |  17 +++
 19 files changed, 736 insertions(+), 15 deletions(-)
 create mode 100644 config/check_pytest.py
 create mode 100644 config/conftest.py
 create mode 100644 config/pytest-requirements.txt
 create mode 100644 pytest.ini
 create mode 100644 src/test/pytest/Makefile
 create mode 100644 src/test/pytest/README
 create mode 100644 src/test/pytest/meson.build
 create mode 100644 src/test/pytest/plugins/pgtap.py
 create mode 100644 src/test/pytest/pyt/test_something.py

diff --git a/.cirrus.tasks.yml b/.cirrus.tasks.yml
index eca9d62fc22..80f9b394bd2 100644
--- a/.cirrus.tasks.yml
+++ b/.cirrus.tasks.yml
@@ -21,7 +21,8 @@ env:
 
   # target to test, for all but windows
   CHECK: check-world PROVE_FLAGS=$PROVE_FLAGS
-  CHECKFLAGS: -Otarget
+  # TODO were we avoiding --keep-going on purpose?
+  CHECKFLAGS: -Otarget --keep-going
   PROVE_FLAGS: --timer
   # Build test dependencies as part of the build step, to see compiler
   # errors/warnings in one place.
@@ -44,6 +45,7 @@ env:
     -Dldap=enabled
     -Dssl=openssl
     -Dtap_tests=enabled
+    -Dpytest=enabled
     -Dplperl=enabled
     -Dplpython=enabled
     -Ddocs=enabled
@@ -222,7 +224,9 @@ task:
     chown root:postgres /tmp/cores
     sysctl kern.corefile='/tmp/cores/%N.%P.core'
   setup_additional_packages_script: |
-    #pkg install -y ...
+    pkg install -y \
+      py311-packaging \
+      py311-pytest
 
   # NB: Intentionally build without -Dllvm. The freebsd image size is already
   # large enough to make VM startup slow, and even without llvm freebsd
@@ -311,7 +315,10 @@ task:
           -Dpam=enabled
 
       setup_additional_packages_script: |
-        #pkgin -y install ...
+        pkgin -y install \
+          py312-packaging \
+          py312-test
+        ln -s /usr/pkg/bin/pytest-3.12 /usr/pkg/bin/pytest
       <<: *netbsd_task_template
 
     - name: OpenBSD - Meson
@@ -322,6 +329,7 @@ task:
         OS_NAME: openbsd
         IMAGE_FAMILY: pg-ci-openbsd-postgres
         PKGCONFIG_PATH: '/usr/lib/pkgconfig:/usr/local/lib/pkgconfig'
+        TERM: # TODO why does pytest print ANSI escapes on OpenBSD?
 
         MESON_FEATURES: >-
           -Dbsd_auth=enabled
@@ -330,7 +338,9 @@ task:
           -Duuid=e2fs
 
       setup_additional_packages_script: |
-        #pkg_add -I ...
+        pkg_add -I \
+          py3-test \
+          py3-packaging
       # Always core dump to ${CORE_DUMP_DIR}
       set_core_dump_script: sysctl -w kern.nosuidcoredump=2
       <<: *openbsd_task_template
@@ -489,8 +499,10 @@ task:
     EOF
 
   setup_additional_packages_script: |
-    #apt-get update
-    #DEBIAN_FRONTEND=noninteractive apt-get -y install ...
+    apt-get update
+    DEBIAN_FRONTEND=noninteractive apt-get -y install \
+      python3-pytest \
+      python3-packaging
 
   matrix:
     # SPECIAL:
@@ -513,14 +525,15 @@ task:
         su postgres <<-EOF
           ./configure \
             --enable-cassert --enable-injection-points --enable-debug \
-            --enable-tap-tests --enable-nls \
+            --enable-tap-tests --enable-pytest --enable-nls \
             --with-segsize-blocks=6 \
             --with-libnuma \
             --with-liburing \
             \
             ${LINUX_CONFIGURE_FEATURES} \
             \
-            CLANG="ccache clang-16"
+            CLANG="ccache clang-16" \
+            PYTEST="env LD_PRELOAD=/lib/x86_64-linux-gnu/libasan.so.8 pytest"
         EOF
       build_script: su postgres -c "make -s -j${BUILD_JOBS} world-bin"
       upload_caches: ccache
@@ -650,6 +663,8 @@ task:
       p5.34-io-tty
       p5.34-ipc-run
       python312
+      py312-packaging
+      py312-pytest
       tcl
       zstd
 
@@ -699,6 +714,7 @@ task:
     sh src/tools/ci/ci_macports_packages.sh $MACOS_PACKAGE_LIST
     # system python doesn't provide headers
     sudo /opt/local/bin/port select python3 python312
+    sudo /opt/local/bin/port select pytest pytest312
     # Make macports install visible for subsequent steps
     echo PATH=/opt/local/sbin/:/opt/local/bin/:$PATH >> $CIRRUS_ENV
   upload_caches: macports
@@ -772,6 +788,8 @@ task:
       -Dldap=enabled
       -Dssl=openssl
       -Dtap_tests=enabled
+      -Dpytest=enabled
+      -DPYTEST=c:\Windows\system32\config\systemprofile\AppData\Roaming\Python\Python310\Scripts\pytest.exe
       -Dplperl=enabled
       -Dplpython=enabled
 
@@ -780,8 +798,10 @@ task:
   depends_on: SanityCheck
   only_if: $CI_WINDOWS_ENABLED
 
+  # XXX Does Chocolatey really not have any Python package installers?
   setup_additional_packages_script: |
     REM choco install -y --no-progress ...
+    pip3 install --user packaging pytest
 
   setup_hosts_file_script: |
     echo 127.0.0.1 pg-loadbalancetest >> c:\Windows\System32\Drivers\etc\hosts
@@ -844,7 +864,7 @@ task:
     folder: ${CCACHE_DIR}
 
   setup_additional_packages_script: |
-    REM C:\msys64\usr\bin\pacman.exe -S --noconfirm ...
+    C:\msys64\usr\bin\pacman.exe -S --noconfirm mingw-w64-ucrt-x86_64-python-packaging mingw-w64-ucrt-x86_64-python-pytest
 
   mingw_info_script: |
     %BASH% -c "where gcc"
diff --git a/.gitignore b/.gitignore
index 4e911395fe3..268426003b1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,6 +31,7 @@ win32ver.rc
 *.exe
 lib*dll.def
 lib*.pc
+__pycache__/
 
 # Local excludes in root directory
 /GNUmakefile
diff --git a/config/check_pytest.py b/config/check_pytest.py
new file mode 100644
index 00000000000..1562d16bcda
--- /dev/null
+++ b/config/check_pytest.py
@@ -0,0 +1,150 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+#
+# Verify that pytest-requirements.txt is satisfied. This would probably be
+# easier with pip, but requiring pip on build machines is a non-starter for
+# many.
+#
+# This is coded as a pytest suite in order to check the Python distribution in
+# use by pytest, as opposed to the Python distribution being linked against
+# Postgres. In some setups they are separate.
+#
+# The design philosophy of this script is to bend over backwards to help people
+# figure out what is missing. The target audience for error output is the
+# buildfarm operator who just wants to get the tests running, not the test
+# developer who presumably already knows how to solve these problems.
+
+import importlib
+import sys
+from typing import List, Union  # needed for earlier Python versions
+
+# importlib.metadata is part of the standard library from 3.8 onwards. Earlier
+# Python versions have an official backport called importlib_metadata, which can
+# generally be installed as a separate OS package (python3-importlib-metadata).
+# This complication can be removed once we stop supporting Python 3.7.
+try:
+    from importlib import metadata
+except ImportError:
+    try:
+        import importlib_metadata as metadata
+    except ImportError:
+        # package_version() will need to fall back. This is unlikely to happen
+        # in practice, because pytest 7.x depends on importlib_metadata itself.
+        metadata = None
+
+
+def report(*args):
+    """
+    Prints a configure-time message to the user. (The configure scripts will
+    display these messages and ignore the output from the pytest suite.) This
+    assumes --capture=no is in use, to avoid pytest's standard stream capture.
+    """
+    print(*args, file=sys.stderr)
+
+
+def package_version(pkg: str) -> Union[str, None]:
+    """
+    Returns the version of the named package, or None if the package is not
+    installed.
+
+    This function prefers to use the distribution package version, if we have
+    the necessary prerequisites. Otherwise it will fall back to the __version__
+    of the imported module, which aligns with pytest.importorskip().
+    """
+    if metadata is not None:
+        try:
+            return metadata.version(pkg)
+        except metadata.PackageNotFoundError:
+            return None
+
+    # This is an older Python and we don't have importlib_metadata. Fall back to
+    # __version__ instead.
+    try:
+        mod = importlib.import_module(pkg)
+    except ModuleNotFoundError:
+        return None
+
+    if hasattr(mod, "__version__"):
+        return mod.__version__
+
+    # We're out of options. If this turns out to cause problems in practice, we
+    # might need to require importlib_metadata on older buildfarm members. But
+    # since our top-level requirements list will be small, and this possibility
+    # will eventually age out with newer Pythons, don't spend more effort on
+    # this case for now.
+    report(f"Fix check_pytest.py! {pkg} has no __version__")
+    assert False, "internal error in package_version()"
+
+
+def packaging_check(requirements: List[str]) -> bool:
+    """
+    Reports the status of each required package to the configure program.
+    Returns True if all dependencies were found.
+    """
+    report()  # an opening newline makes the configure output easier to read
+
+    try:
+        # packaging contains the PyPA definitions of requirement specifiers.
+        # This is contained in a separate OS package (for example,
+        # python3-packaging), but it's extremely likely that the user has it
+        # installed already, because modern versions of pytest depend on it too.
+        import packaging
+        from packaging.requirements import Requirement
+
+    except ImportError as err:
+        # We don't even have enough prerequisites to check our prerequisites.
+        # Print the import error as-is.
+        report(err)
+        return False
+
+    # Strip extraneous whitespace, whole-line comments, and empty lines from our
+    # specifier list.
+    requirements = [r.strip() for r in requirements]
+    requirements = [r for r in requirements if r and r[0] != "#"]
+
+    found = True
+    for spec in requirements:
+        req = Requirement(spec)
+
+        # Skip any packages marked as unneeded for this particular Python env.
+        if req.marker and not req.marker.evaluate():
+            continue
+
+        # Make sure the package is installed...
+        version = package_version(req.name)
+        if version is None:
+            report(f"package '{req.name}': not installed")
+            found = False
+            continue
+
+        # ...and that it has a compatible version.
+        if not req.specifier.contains(version):
+            report(
+                "package '{}': has version {}, but '{}' is required".format(
+                    req.name, version, req.specifier
+                ),
+            )
+            found = False
+            continue
+
+        # Report installed packages too, to mirror check_modules.pl.
+        report(f"package '{req.name}': installed (version {version})")
+
+    return found
+
+
+def test_packages(requirements_file):
+    """
+    Entry point.
+    """
+    try:
+        with open(requirements_file, "r") as f:
+            requirements = f.readlines()
+
+        all_found = packaging_check(requirements)
+
+    except Exception as err:
+        # Surface any breakage to the configure script before failing the test.
+        report(err)
+        raise
+
+    assert all_found, "required packages are missing"
diff --git a/config/conftest.py b/config/conftest.py
new file mode 100644
index 00000000000..a9c2bc546e8
--- /dev/null
+++ b/config/conftest.py
@@ -0,0 +1,18 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+#
+# Support for check_pytest.py. The configure script provides the path to
+# pytest-requirements.txt via the --requirements option added here.
+
+import pytest
+
+
+def pytest_addoption(parser):
+    parser.addoption(
+        "--requirements",
+        help="path to pytest-requirements.txt",
+    )
+
+
[email protected]
+def requirements_file(request):
+    return request.config.getoption("--requirements")
diff --git a/config/pytest-requirements.txt b/config/pytest-requirements.txt
new file mode 100644
index 00000000000..b941624b2f3
--- /dev/null
+++ b/config/pytest-requirements.txt
@@ -0,0 +1,21 @@
+#
+# This file contains the Python packages which are required in order for us to
+# enable pytest.
+#
+# The syntax is a *subset* of pip's requirements.txt syntax, so that both pip
+# and check_pytest.py can use it. Only whole-line comments and standard Python
+# dependency specifiers are allowed. pip-specific goodies like includes and
+# environment substitutions are not supported; keep it simple.
+#
+# Packages belong here if their absence should cause a configuration failure. If
+# you'd like to make a package optional, consider using pytest.importorskip()
+# instead.
+#
+
+# pytest 7.0 was the last version which supported Python 3.6, but the BSDs have
+# started putting 8.x into ports, so we support both. (pytest 8 can be used
+# throughout once we drop support for Python 3.7.)
+pytest >= 7.0, < 9
+
+# packaging is used by check_pytest.py at configure time.
+packaging
diff --git a/configure b/configure
index 22cd866147b..aa93fa5f0aa 100755
--- a/configure
+++ b/configure
@@ -630,6 +630,7 @@ vpath_build
 PG_SYSROOT
 PG_VERSION_NUM
 LDFLAGS_EX_BE
+PYTEST
 PROVE
 DBTOEPUB
 FOP
@@ -771,6 +772,7 @@ CFLAGS
 CC
 enable_injection_points
 PG_TEST_EXTRA
+enable_pytest
 enable_tap_tests
 enable_dtrace
 DTRACEFLAGS
@@ -849,6 +851,7 @@ enable_profiling
 enable_coverage
 enable_dtrace
 enable_tap_tests
+enable_pytest
 enable_injection_points
 with_blocksize
 with_segsize
@@ -1549,7 +1552,10 @@ Optional Features:
   --enable-profiling      build with profiling enabled
   --enable-coverage       build with coverage testing instrumentation
   --enable-dtrace         build with DTrace support
-  --enable-tap-tests      enable TAP tests (requires Perl and IPC::Run)
+  --enable-tap-tests      enable (Perl-based) TAP tests (requires Perl and
+                          IPC::Run)
+  --enable-pytest         enable (Python-based) pytest suites (requires
+                          Python)
   --enable-injection-points
                           enable injection points (for testing)
   --enable-depend         turn on automatic dependency tracking
@@ -3631,7 +3637,7 @@ fi
 
 
 #
-# TAP tests
+# Test frameworks
 #
 
 
@@ -3659,6 +3665,32 @@ fi
 
 
 
+
+# Check whether --enable-pytest was given.
+if test "${enable_pytest+set}" = set; then :
+  enableval=$enable_pytest;
+  case $enableval in
+    yes)
+      :
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --enable-pytest option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  enable_pytest=no
+
+fi
+
+
+
+
+
+
 #
 # Injection points
 #
@@ -19074,6 +19106,78 @@ $as_echo "$modulestderr" >&6; }
   fi
 fi
 
+if test "$enable_pytest" = yes; then
+  if test -z "$PYTEST"; then
+  for ac_prog in pytest py.test
+do
+  # Extract the first word of "$ac_prog", so it can be a program name with args.
+set dummy $ac_prog; ac_word=$2
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for $ac_word" >&5
+$as_echo_n "checking for $ac_word... " >&6; }
+if ${ac_cv_path_PYTEST+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  case $PYTEST in
+  [\\/]* | ?:[\\/]*)
+  ac_cv_path_PYTEST="$PYTEST" # Let the user override the test with a path.
+  ;;
+  *)
+  as_save_IFS=$IFS; IFS=$PATH_SEPARATOR
+for as_dir in $PATH
+do
+  IFS=$as_save_IFS
+  test -z "$as_dir" && as_dir=.
+    for ac_exec_ext in '' $ac_executable_extensions; do
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
+    ac_cv_path_PYTEST="$as_dir/$ac_word$ac_exec_ext"
+    $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
+    break 2
+  fi
+done
+  done
+IFS=$as_save_IFS
+
+  ;;
+esac
+fi
+PYTEST=$ac_cv_path_PYTEST
+if test -n "$PYTEST"; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: $PYTEST" >&5
+$as_echo "$PYTEST" >&6; }
+else
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5
+$as_echo "no" >&6; }
+fi
+
+
+  test -n "$PYTEST" && break
+done
+
+else
+  # Report the value of PYTEST in configure's output in all cases.
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for PYTEST" >&5
+$as_echo_n "checking for PYTEST... " >&6; }
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: $PYTEST" >&5
+$as_echo "$PYTEST" >&6; }
+fi
+
+  if test -z "$PYTEST"; then
+    as_fn_error $? "pytest not found" "$LINENO" 5
+  fi
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for Python packages required for pytest" >&5
+$as_echo_n "checking for Python packages required for pytest... " >&6; }
+  modulestderr=`$PYTEST -c "$srcdir/pytest.ini" --confcutdir="$srcdir/config" --capture=no "$srcdir/config/check_pytest.py" --requirements "$srcdir/config/pytest-requirements.txt" 2>&1 >/dev/null`
+  if test $? -eq 0; then
+    echo "$modulestderr" >&5
+    { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5
+$as_echo "yes" >&6; }
+  else
+    { $as_echo "$as_me:${as_lineno-$LINENO}: result: $modulestderr" >&5
+$as_echo "$modulestderr" >&6; }
+    as_fn_error $? "Additional Python packages are required to run the pytest suites" "$LINENO" 5
+  fi
+fi
+
 # If compiler will take -Wl,--as-needed (or various platform-specific
 # spellings thereof) then add that to LDFLAGS.  This is much easier than
 # trying to filter LIBS to the minimum for each executable.
diff --git a/configure.ac b/configure.ac
index e44943aa6fe..25442050f34 100644
--- a/configure.ac
+++ b/configure.ac
@@ -225,11 +225,16 @@ AC_SUBST(DTRACEFLAGS)])
 AC_SUBST(enable_dtrace)
 
 #
-# TAP tests
+# Test frameworks
 #
 PGAC_ARG_BOOL(enable, tap-tests, no,
-              [enable TAP tests (requires Perl and IPC::Run)])
+              [enable (Perl-based) TAP tests (requires Perl and IPC::Run)])
 AC_SUBST(enable_tap_tests)
+
+PGAC_ARG_BOOL(enable, pytest, no,
+              [enable (Python-based) pytest suites (requires Python)])
+AC_SUBST(enable_pytest)
+
 AC_ARG_VAR(PG_TEST_EXTRA,
            [enable selected extra tests (overridden at runtime by PG_TEST_EXTRA environment variable)])
 
@@ -2415,6 +2420,22 @@ if test "$enable_tap_tests" = yes; then
   fi
 fi
 
+if test "$enable_pytest" = yes; then
+  PGAC_PATH_PROGS(PYTEST, pytest py.test)
+  if test -z "$PYTEST"; then
+    AC_MSG_ERROR([pytest not found])
+  fi
+  AC_MSG_CHECKING(for Python packages required for pytest)
+  [modulestderr=`$PYTEST -c "$srcdir/pytest.ini" --confcutdir="$srcdir/config" --capture=no "$srcdir/config/check_pytest.py" --requirements "$srcdir/config/pytest-requirements.txt" 2>&1 >/dev/null`]
+  if test $? -eq 0; then
+    echo "$modulestderr" >&AS_MESSAGE_LOG_FD
+    AC_MSG_RESULT(yes)
+  else
+    AC_MSG_RESULT([$modulestderr])
+    AC_MSG_ERROR([Additional Python packages are required to run the pytest suites])
+  fi
+fi
+
 # If compiler will take -Wl,--as-needed (or various platform-specific
 # spellings thereof) then add that to LDFLAGS.  This is much easier than
 # trying to filter LIBS to the minimum for each executable.
diff --git a/meson.build b/meson.build
index 37ed68ceeb4..06eb7a19210 100644
--- a/meson.build
+++ b/meson.build
@@ -1702,6 +1702,39 @@ endif
 
 
 
+###############################################################
+# Library: pytest
+###############################################################
+
+pytest_enabled = false
+pytest = not_found_dep
+
+pytestopt = get_option('pytest')
+if not pytestopt.disabled()
+  pytest = find_program(get_option('PYTEST'), native: true, required: pytestopt)
+  if pytest.found()
+    pytest_check = run_command(pytest,
+                               '-c', 'pytest.ini',
+                               '--confcutdir=config',
+                               '--capture=no',
+                               'config/check_pytest.py',
+                               '--requirements', 'config/pytest-requirements.txt',
+                               check: false)
+    if pytest_check.returncode() != 0
+      message(pytest_check.stderr())
+      if pytestopt.enabled()
+        error('Additional Python packages are required to run the pytest suites.')
+      else
+        warning('Additional Python packages are required to run the pytest suites.')
+      endif
+    else
+      pytest_enabled = true
+    endif
+  endif
+endif
+
+
+
 ###############################################################
 # Library: zstd
 ###############################################################
@@ -3779,6 +3812,63 @@ foreach test_dir : tests
         )
       endforeach
       install_suites += test_group
+    elif kind == 'pytest'
+      testwrap_pytest = testwrap_base
+      if not pytest_enabled
+        testwrap_pytest += ['--skip', 'pytest not enabled']
+      endif
+
+      test_command = [
+        pytest.full_path(),
+        '-c', meson.project_source_root() / 'pytest.ini',
+        '--verbose',
+        '-p', 'pgtap',  # enable our test reporter plugin
+        '-ra',  # show skipped and xfailed tests too
+      ]
+
+      # Add temporary install, the build directory for non-installed binaries and
+      # also test/ for non-installed test binaries built separately.
+      env = test_env
+      env.prepend('PATH', temp_install_bindir, test_dir['bd'], test_dir['bd'] / 'test')
+      temp_install_datadir = '@0@@1@'.format(test_install_destdir, dir_prefix / dir_data)
+      env.set('share_contrib_dir', temp_install_datadir / 'contrib')
+      env.prepend('PYTHONPATH', meson.project_source_root() / 'src' / 'test' / 'pytest' / 'plugins')
+
+      foreach name, value : t.get('env', {})
+        env.set(name, value)
+      endforeach
+
+      test_group = test_dir['name']
+      test_kwargs = {
+        'protocol': 'tap',
+        'suite': test_group,
+        'timeout': 1000,
+        'depends': test_deps + t.get('deps', []),
+        'env': env,
+      } + t.get('test_kwargs', {})
+
+      foreach onetest : t['tests']
+        # Make test names prettier, remove pyt/ and .py
+        onetest_p = onetest
+        if onetest_p.startswith('pyt/')
+          onetest_p = onetest.split('pyt/')[1]
+        endif
+        if onetest_p.endswith('.py')
+          onetest_p = fs.stem(onetest_p)
+        endif
+
+        test(test_dir['name'] / onetest_p,
+          python,
+          kwargs: test_kwargs,
+          args: testwrap_pytest + [
+            '--testgroup', test_dir['name'],
+            '--testname', onetest_p,
+            '--', test_command,
+            test_dir['sd'] / onetest,
+          ],
+        )
+      endforeach
+      install_suites += test_group
     else
       error('unknown kind @0@ of test in @1@'.format(kind, test_dir['sd']))
     endif
@@ -3953,6 +4043,7 @@ summary(
     'dtrace': dtrace,
     'flex': '@0@ @1@'.format(flex.full_path(), flex_version),
     'prove': prove,
+    'pytest': pytest,
   },
   section: 'Programs',
 )
@@ -3993,6 +4084,7 @@ summary(
 summary(
   {
     'tap': tap_tests_enabled,
+    'pytest': pytest_enabled,
   },
   section: 'Other features',
   list_sep: ' ',
diff --git a/meson_options.txt b/meson_options.txt
index 06bf5627d3c..88f22e699d9 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -41,7 +41,10 @@ option('cassert', type: 'boolean', value: false,
   description: 'Enable assertion checks (for debugging)')
 
 option('tap_tests', type: 'feature', value: 'auto',
-  description: 'Enable TAP tests')
+  description: 'Enable (Perl-based) TAP tests')
+
+option('pytest', type: 'feature', value: 'auto',
+  description: 'Enable (Python-based) pytest suites')
 
 option('injection_points', type: 'boolean', value: false,
   description: 'Enable injection points')
@@ -195,6 +198,9 @@ option('PERL', type: 'string', value: 'perl',
 option('PROVE', type: 'string', value: 'prove',
   description: 'Path to prove binary')
 
+option('PYTEST', type: 'array', value: ['pytest', 'py.test'],
+  description: 'Path to pytest binary')
+
 option('PYTHON', type: 'array', value: ['python3', 'python'],
   description: 'Path to python binary')
 
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 00000000000..8e8388f3afc
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,6 @@
+[pytest]
+minversion = 7.0
+
+# Ignore ./config (which contains the configure-time check_pytest.py tests) by
+# default.
+addopts = --ignore ./config
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 0aa389bc710..8a6885206ce 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -211,6 +211,7 @@ enable_dtrace	= @enable_dtrace@
 enable_coverage	= @enable_coverage@
 enable_injection_points = @enable_injection_points@
 enable_tap_tests	= @enable_tap_tests@
+enable_pytest	= @enable_pytest@
 
 python_includespec	= @python_includespec@
 python_libdir		= @python_libdir@
@@ -353,6 +354,7 @@ MSGFMT  = @MSGFMT@
 MSGFMT_FLAGS = @MSGFMT_FLAGS@
 MSGMERGE = @MSGMERGE@
 OPENSSL	= @OPENSSL@
+PYTEST	= @PYTEST@
 PYTHON	= @PYTHON@
 TAR	= @TAR@
 XGETTEXT = @XGETTEXT@
@@ -507,6 +509,27 @@ prove_installcheck = @echo "TAP tests not enabled. Try configuring with --enable
 prove_check = $(prove_installcheck)
 endif
 
+ifeq ($(enable_pytest),yes)
+
+pytest_installcheck = @echo "Installcheck is not currently supported for pytest."
+
+define pytest_check
+echo "# +++ pytest check in $(subdir) +++" && \
+rm -rf '$(CURDIR)'/tmp_check && \
+$(MKDIR_P) '$(CURDIR)'/tmp_check && \
+cd $(srcdir) && \
+   TESTLOGDIR='$(CURDIR)/tmp_check/log' \
+   TESTDATADIR='$(CURDIR)/tmp_check' \
+   PYTHONPATH='$(abs_top_srcdir)/src/test/pytest/plugins:$$PYTHONPATH' \
+   $(with_temp_install) \
+   $(PYTEST) -c '$(abs_top_srcdir)/pytest.ini' --verbose -ra ./pyt/
+endef
+
+else
+pytest_installcheck = @echo "pytest is not enabled. Try configuring with --enable-pytest"
+pytest_check = $(pytest_installcheck)
+endif
+
 # Installation.
 
 install_bin = @install_bin@
diff --git a/src/makefiles/meson.build b/src/makefiles/meson.build
index 0def244c901..f68acd57bc4 100644
--- a/src/makefiles/meson.build
+++ b/src/makefiles/meson.build
@@ -56,6 +56,7 @@ pgxs_kv = {
   'enable_nls': libintl.found() ? 'yes' : 'no',
   'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
   'enable_tap_tests': tap_tests_enabled ? 'yes' : 'no',
+  'enable_pytest': pytest_enabled ? 'yes' : 'no',
   'enable_debug': get_option('debug') ? 'yes' : 'no',
   'enable_coverage': 'no',
   'enable_dtrace': dtrace.found() ? 'yes' : 'no',
@@ -145,6 +146,7 @@ pgxs_bins = {
   'OPENSSL': openssl,
   'PERL': perl,
   'PROVE': prove,
+  'PYTEST': pytest,
   'PYTHON': python,
   'TAR': tar,
   'ZSTD': program_zstd,
diff --git a/src/test/Makefile b/src/test/Makefile
index 511a72e6238..0be9771d71f 100644
--- a/src/test/Makefile
+++ b/src/test/Makefile
@@ -12,7 +12,16 @@ subdir = src/test
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS = perl postmaster regress isolation modules authentication recovery subscription
+SUBDIRS = \
+	authentication \
+	isolation \
+	modules \
+	perl \
+	postmaster \
+	pytest \
+	recovery \
+	regress \
+	subscription
 
 ifeq ($(with_icu),yes)
 SUBDIRS += icu
diff --git a/src/test/meson.build b/src/test/meson.build
index ccc31d6a86a..d08a6ef61c2 100644
--- a/src/test/meson.build
+++ b/src/test/meson.build
@@ -5,6 +5,7 @@ subdir('isolation')
 
 subdir('authentication')
 subdir('postmaster')
+subdir('pytest')
 subdir('recovery')
 subdir('subscription')
 subdir('modules')
diff --git a/src/test/pytest/Makefile b/src/test/pytest/Makefile
new file mode 100644
index 00000000000..2bdca96ccbe
--- /dev/null
+++ b/src/test/pytest/Makefile
@@ -0,0 +1,20 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for pytest
+#
+# Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/pytest/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/pytest
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+check:
+	$(pytest_check)
+
+clean distclean maintainer-clean:
+	rm -rf tmp_check
diff --git a/src/test/pytest/README b/src/test/pytest/README
new file mode 100644
index 00000000000..1333ed77b7e
--- /dev/null
+++ b/src/test/pytest/README
@@ -0,0 +1 @@
+TODO
diff --git a/src/test/pytest/meson.build b/src/test/pytest/meson.build
new file mode 100644
index 00000000000..abd128dfa24
--- /dev/null
+++ b/src/test/pytest/meson.build
@@ -0,0 +1,16 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+if not pytest_enabled
+  subdir_done()
+endif
+
+tests += {
+  'name': 'pytest',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'pytest': {
+    'tests': [
+      'pyt/test_something.py',
+    ],
+  },
+}
diff --git a/src/test/pytest/plugins/pgtap.py b/src/test/pytest/plugins/pgtap.py
new file mode 100644
index 00000000000..ef8291e291c
--- /dev/null
+++ b/src/test/pytest/plugins/pgtap.py
@@ -0,0 +1,193 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import os
+import sys
+from typing import Optional
+
+import pytest
+
+#
+# Helpers
+#
+
+
+class TAP:
+    """
+    A basic API for reporting via the TAP protocol.
+    """
+
+    def __init__(self):
+        self.count = 0
+
+        # XXX interacts poorly with testwrap's boilerplate diagnostics
+        # self.print("TAP version 13")
+
+    def expect(self, num: int):
+        self.print(f"1..{num}")
+
+    def print(self, *args):
+        print(*args, file=sys.__stdout__)
+
+    def ok(self, name: str):
+        self.count += 1
+        self.print("ok", self.count, "-", name)
+
+    def skip(self, name: str, reason: str):
+        self.count += 1
+        self.print("ok", self.count, "-", name, "# skip", reason)
+
+    def fail(self, name: str, details: str):
+        self.count += 1
+        self.print("not ok", self.count, "-", name)
+
+        # mtest has some odd behavior around TAP tests where it won't print
+        # diagnostics on failure if they're part of the stdout stream, so we
+        # might as well just dump the details directly to stderr instead.
+        print(details, file=sys.__stderr__)
+
+
+tap = TAP()
+
+
+class TestNotes:
+    """
+    Annotations for a single test. The existing pytest hooks keep interesting
+    information somewhat separated across the different stages
+    (setup/test/teardown), so this class is used to correlate them.
+    """
+
+    skipped = False
+    skip_reason = None
+
+    failed = False
+    details = ""
+
+
+# Register a custom key in the stash dictionary for keeping our TestNotes.
+notes_key = pytest.StashKey[TestNotes]()
+
+
+#
+# Hook Implementations
+#
+
+
[email protected](tryfirst=True)
+def pytest_configure(config):
+    """
+    Hijacks the standard streams as soon as possible during pytest startup. The
+    pytest-formatted output gets logged to file instead, and we'll use the
+    original sys.__stdout__/__stderr__ streams for the TAP protocol.
+    """
+    logdir = os.getenv("TESTLOGDIR")
+    if not logdir:
+        raise RuntimeError("pgtap requires the TESTLOGDIR envvar to be set")
+
+    os.makedirs(logdir)
+    logpath = os.path.join(logdir, "pytest.log")
+    sys.stdout = sys.stderr = open(logpath, "a", buffering=1)
+
+
[email protected](trylast=True)
+def pytest_sessionfinish(session, exitstatus):
+    """
+    Suppresses nonzero exit codes due to failed tests. (In that case, we want
+    Meson to report a failure count, not a generic ERROR.)
+    """
+    if exitstatus == pytest.ExitCode.TESTS_FAILED:
+        session.exitstatus = pytest.ExitCode.OK
+
+
[email protected]
+def pytest_collectreport(report):
+    # Include collection failures directly in Meson error output.
+    if report.failed:
+        print(report.longreprtext, file=sys.__stderr__)
+
+
[email protected]
+def pytest_internalerror(excrepr, excinfo):
+    # Include internal errors directly in Meson error output.
+    print(excrepr, file=sys.__stderr__)
+
+
+#
+# Hook Wrappers
+#
+# In pytest parlance, a "wrapper" for a hook can inspect and optionally modify
+# existing hooks' behavior, but it does not replace the hook chain. This is done
+# through a generator-style API which chains the hooks together (see the use of
+# `yield`).
+#
+
+
[email protected](hookwrapper=True)
+def pytest_collection(session):
+    """Reports the number of gathered tests after collection is finished."""
+    res = yield
+    tap.expect(session.testscollected)
+    return res
+
+
[email protected](hookwrapper=True)
+def pytest_runtest_makereport(item, call):
+    """
+    Annotates a test item with our TestNotes and grabs relevant information for
+    reporting.
+
+    This is called multiple times per test, so it's not correct to print the TAP
+    result here. (A test and its teardown stage can both fail, and we want to
+    see the details for both.) We instead combine all the information for use by
+    our pytest_runtest_protocol wrapper later on.
+    """
+    res = yield
+
+    if notes_key not in item.stash:
+        item.stash[notes_key] = TestNotes()
+    notes = item.stash[notes_key]
+
+    report = res.get_result()
+    if report.passed:
+        pass  # no annotation needed
+
+    elif report.skipped:
+        notes.skipped = True
+        _, _, notes.skip_reason = report.longrepr
+
+    elif report.failed:
+        notes.failed = True
+
+        if not notes.details:
+            notes.details += "{:_^72}\n\n".format(f" {report.head_line} ")
+
+        if report.when in ("setup", "teardown"):
+            notes.details += "\n{:_^72}\n\n".format(
+                f" Error during {report.when} of {report.head_line} "
+            )
+
+        notes.details += report.longreprtext + "\n"
+
+    else:
+        raise RuntimeError("pytest_runtest_makereport received unknown test status")
+
+    return res
+
+
[email protected](hookwrapper=True)
+def pytest_runtest_protocol(item, nextitem):
+    """
+    Reports the TAP result for this test item using our gathered TestNotes.
+    """
+    res = yield
+
+    assert notes_key in item.stash, "pgtap didn't annotate a test item?"
+    notes = item.stash[notes_key]
+
+    if notes.failed:
+        tap.fail(item.nodeid, notes.details)
+    elif notes.skipped:
+        tap.skip(item.nodeid, notes.skip_reason)
+    else:
+        tap.ok(item.nodeid)
+
+    return res
diff --git a/src/test/pytest/pyt/test_something.py b/src/test/pytest/pyt/test_something.py
new file mode 100644
index 00000000000..5bd45618512
--- /dev/null
+++ b/src/test/pytest/pyt/test_something.py
@@ -0,0 +1,17 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import pytest
+
+
[email protected]
+def hey():
+    yield
+    raise "uh-oh"
+
+
+def test_something(hey):
+    assert 2 == 4
+
+
+def test_something_else():
+    assert 2 == 2
-- 
2.51.1



  [text/x-patch] v3-0003-WIP-pytest-Add-some-SSL-client-tests.patch (35.2K, 4-v3-0003-WIP-pytest-Add-some-SSL-client-tests.patch)
  download | inline diff:
From f0cf8b502b183c113a82a113b8c0a75c9b0f7904 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Tue, 19 Aug 2025 12:56:45 -0700
Subject: [PATCH v3 03/10] WIP: pytest: Add some SSL client tests

This is a sample client-only test suite. It tests some handshake
failures against a mock server, as well as a full SSL handshake + empty
query + response.

pyca/cryptography is added as a new package dependency. Certificates for
testing are generated on the fly.

The `pg` test package contains some helpers and fixtures (as well as
some self-tests for more complicated behavior). Of note:

- pg.require_test_extra() lets you mark a test/class/module as skippable
  if PG_TEST_EXTRA does not contain the necessary strings.

- pg.remaining_timeout() is a function which can be repeatedly called to
  determine how much of the PG_TEST_TIMEOUT_DEFAULT remains for the
  current test item.

- pg.libpq is a fixture that wraps libpq.so in a more friendly, but
  still low-level, ctypes FFI. Allocated resources are unwound and
  released during test teardown.

The mock design is threaded: the server socket is listening on a
background thread, and the test provides the server logic via a
callback. There is some additional work still needed to make this
production-ready; see the notes for _TCPServer.background(). (Currently,
an exception in the wrong place could result in a hang-until-timeout
rather than an immediate failure.)

TODOs:
- local_server and tcp_server_class are nearly identical and should
  share code.
- fix exception-related timeouts for .background()
- figure out the proper use of "session" vs "module" scope
- ensure that pq.libpq unwinds (to close connections) before tcp_server;
  see comment in test_server_with_ssl_disabled()
---
 .cirrus.tasks.yml                 |  18 +-
 config/pytest-requirements.txt    |  10 ++
 pytest.ini                        |   3 +
 src/test/pytest/meson.build       |   1 +
 src/test/pytest/pg/__init__.py    |   3 +
 src/test/pytest/pg/_env.py        |  55 ++++++
 src/test/pytest/pg/fixtures.py    | 212 +++++++++++++++++++++++
 src/test/pytest/pyt/conftest.py   |   3 +
 src/test/pytest/pyt/test_libpq.py | 171 ++++++++++++++++++
 src/test/ssl/Makefile             |   2 +
 src/test/ssl/meson.build          |   6 +
 src/test/ssl/pyt/conftest.py      | 129 ++++++++++++++
 src/test/ssl/pyt/test_client.py   | 278 ++++++++++++++++++++++++++++++
 13 files changed, 885 insertions(+), 6 deletions(-)
 create mode 100644 src/test/pytest/pg/__init__.py
 create mode 100644 src/test/pytest/pg/_env.py
 create mode 100644 src/test/pytest/pg/fixtures.py
 create mode 100644 src/test/pytest/pyt/conftest.py
 create mode 100644 src/test/pytest/pyt/test_libpq.py
 create mode 100644 src/test/ssl/pyt/conftest.py
 create mode 100644 src/test/ssl/pyt/test_client.py

diff --git a/.cirrus.tasks.yml b/.cirrus.tasks.yml
index 80f9b394bd2..4e744f1c105 100644
--- a/.cirrus.tasks.yml
+++ b/.cirrus.tasks.yml
@@ -225,6 +225,7 @@ task:
     sysctl kern.corefile='/tmp/cores/%N.%P.core'
   setup_additional_packages_script: |
     pkg install -y \
+      py311-cryptography \
       py311-packaging \
       py311-pytest
 
@@ -316,6 +317,7 @@ task:
 
       setup_additional_packages_script: |
         pkgin -y install \
+          py312-cryptography \
           py312-packaging \
           py312-test
         ln -s /usr/pkg/bin/pytest-3.12 /usr/pkg/bin/pytest
@@ -339,8 +341,9 @@ task:
 
       setup_additional_packages_script: |
         pkg_add -I \
-          py3-test \
-          py3-packaging
+          py3-cryptography \
+          py3-packaging \
+          py3-test
       # Always core dump to ${CORE_DUMP_DIR}
       set_core_dump_script: sysctl -w kern.nosuidcoredump=2
       <<: *openbsd_task_template
@@ -501,8 +504,9 @@ task:
   setup_additional_packages_script: |
     apt-get update
     DEBIAN_FRONTEND=noninteractive apt-get -y install \
-      python3-pytest \
-      python3-packaging
+      python3-cryptography \
+      python3-packaging \
+      python3-pytest
 
   matrix:
     # SPECIAL:
@@ -643,6 +647,7 @@ task:
     CIRRUS_WORKING_DIR: ${HOME}/pgsql/
     CCACHE_DIR: ${HOME}/ccache
     MACPORTS_CACHE: ${HOME}/macports-cache
+    PYTEST_DEBUG_TEMPROOT: /tmp  # default is too long for UNIX sockets on Mac
 
     MESON_FEATURES: >-
       -Dbonjour=enabled
@@ -663,6 +668,7 @@ task:
       p5.34-io-tty
       p5.34-ipc-run
       python312
+      py312-cryptography
       py312-packaging
       py312-pytest
       tcl
@@ -801,7 +807,7 @@ task:
   # XXX Does Chocolatey really not have any Python package installers?
   setup_additional_packages_script: |
     REM choco install -y --no-progress ...
-    pip3 install --user packaging pytest
+    pip3 install --user cryptography packaging pytest
 
   setup_hosts_file_script: |
     echo 127.0.0.1 pg-loadbalancetest >> c:\Windows\System32\Drivers\etc\hosts
@@ -864,7 +870,7 @@ task:
     folder: ${CCACHE_DIR}
 
   setup_additional_packages_script: |
-    C:\msys64\usr\bin\pacman.exe -S --noconfirm mingw-w64-ucrt-x86_64-python-packaging mingw-w64-ucrt-x86_64-python-pytest
+    C:\msys64\usr\bin\pacman.exe -S --noconfirm mingw-w64-ucrt-x86_64-python-cryptography mingw-w64-ucrt-x86_64-python-packaging mingw-w64-ucrt-x86_64-python-pytest
 
   mingw_info_script: |
     %BASH% -c "where gcc"
diff --git a/config/pytest-requirements.txt b/config/pytest-requirements.txt
index b941624b2f3..0bd6cadf608 100644
--- a/config/pytest-requirements.txt
+++ b/config/pytest-requirements.txt
@@ -19,3 +19,13 @@ pytest >= 7.0, < 9
 
 # packaging is used by check_pytest.py at configure time.
 packaging
+
+# Notes on the cryptography package:
+# - 3.3.2 is shipped on Debian bullseye.
+# - 3.4.x drops support for Python 2, making it a version of note for older LTS
+#   distros.
+# - 35.x switched versioning schemes and moved to Rust parsing.
+# - 40.x is the last version supporting Python 3.6.
+# XXX Is it appropriate to require cryptography, or should we simply skip
+# dependent tests?
+cryptography >= 3.3.2
diff --git a/pytest.ini b/pytest.ini
index 8e8388f3afc..e7aa84f3a84 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -4,3 +4,6 @@ minversion = 7.0
 # Ignore ./config (which contains the configure-time check_pytest.py tests) by
 # default.
 addopts = --ignore ./config
+
+# Common test code can be found here.
+pythonpath = src/test/pytest
diff --git a/src/test/pytest/meson.build b/src/test/pytest/meson.build
index abd128dfa24..f53193e8686 100644
--- a/src/test/pytest/meson.build
+++ b/src/test/pytest/meson.build
@@ -11,6 +11,7 @@ tests += {
   'pytest': {
     'tests': [
       'pyt/test_something.py',
+      'pyt/test_libpq.py',
     ],
   },
 }
diff --git a/src/test/pytest/pg/__init__.py b/src/test/pytest/pg/__init__.py
new file mode 100644
index 00000000000..ef8faf54ca4
--- /dev/null
+++ b/src/test/pytest/pg/__init__.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+from ._env import has_test_extra, require_test_extra
diff --git a/src/test/pytest/pg/_env.py b/src/test/pytest/pg/_env.py
new file mode 100644
index 00000000000..6f18af07844
--- /dev/null
+++ b/src/test/pytest/pg/_env.py
@@ -0,0 +1,55 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import logging
+import os
+from typing import List, Optional
+
+import pytest
+
+logger = logging.getLogger(__name__)
+
+
+def has_test_extra(key: str) -> bool:
+    """
+    Returns True if the PG_TEST_EXTRA environment variable contains the given
+    key.
+    """
+    extra = os.getenv("PG_TEST_EXTRA", "")
+    return key in extra.split()
+
+
+def require_test_extra(*keys: str) -> bool:
+    """
+    A convenience annotation which will skip tests if all of the required keys
+    are not present in PG_TEST_EXTRA.
+
+    To skip a particular test function or class:
+
+        @pg.require_test_extra("ldap")
+        def test_some_ldap_feature():
+            ...
+
+    To skip an entire module:
+
+        pytestmark = pg.require_test_extra("ssl", "kerberos")
+    """
+    return pytest.mark.skipif(
+        not all([has_test_extra(k) for k in keys]),
+        reason="requires {} to be set in PG_TEST_EXTRA".format(", ".join(keys)),
+    )
+
+
+def test_timeout_default() -> int:
+    """
+    Returns the value of the PG_TEST_TIMEOUT_DEFAULT environment variable, in
+    seconds, or 180 if one was not provided.
+    """
+    default = os.getenv("PG_TEST_TIMEOUT_DEFAULT", "")
+    if not default:
+        return 180
+
+    try:
+        return int(default)
+    except ValueError as v:
+        logger.warning("PG_TEST_TIMEOUT_DEFAULT could not be parsed: " + str(v))
+        return 180
diff --git a/src/test/pytest/pg/fixtures.py b/src/test/pytest/pg/fixtures.py
new file mode 100644
index 00000000000..b5d3bff69a8
--- /dev/null
+++ b/src/test/pytest/pg/fixtures.py
@@ -0,0 +1,212 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import ctypes
+import platform
+import time
+from typing import Any, Callable, Dict
+
+import pytest
+
+from ._env import test_timeout_default
+
+
[email protected]
+def remaining_timeout():
+    """
+    This fixture provides a function that returns how much of the
+    PG_TEST_TIMEOUT_DEFAULT remains for the current test, in fractional seconds.
+    This value is never less than zero.
+
+    This fixture is per-test, so the deadline is also reset on a per-test basis.
+    """
+    now = time.monotonic()
+    deadline = now + test_timeout_default()
+
+    return lambda: max(deadline - time.monotonic(), 0)
+
+
+class _PGconn(ctypes.Structure):
+    pass
+
+
+class _PGresult(ctypes.Structure):
+    pass
+
+
+_PGconn_p = ctypes.POINTER(_PGconn)
+_PGresult_p = ctypes.POINTER(_PGresult)
+
+
[email protected](scope="session")
+def libpq_handle():
+    """
+    Loads a ctypes handle for libpq. Some common function prototypes are
+    initialized for general use.
+    """
+    system = platform.system()
+
+    if system in ("Linux", "FreeBSD", "NetBSD", "OpenBSD"):
+        name = "libpq.so.5"
+    elif system == "Darwin":
+        name = "libpq.5.dylib"
+    elif system == "Windows":
+        name = "libpq.dll"
+    else:
+        assert False, f"the libpq fixture must be updated for {system}"
+
+    # XXX ctypes.CDLL() is a little stricter with load paths on Windows. The
+    # preferred way around that is to know the absolute path to libpq.dll, but
+    # that doesn't seem to mesh well with the current test infrastructure. For
+    # now, enable "standard" LoadLibrary behavior.
+    loadopts = {}
+    if system == "Windows":
+        loadopts["winmode"] = 0
+
+    lib = ctypes.CDLL(name, **loadopts)
+
+    #
+    # Function Prototypes
+    #
+
+    lib.PQconnectdb.restype = _PGconn_p
+    lib.PQconnectdb.argtypes = [ctypes.c_char_p]
+
+    lib.PQstatus.restype = ctypes.c_int
+    lib.PQstatus.argtypes = [_PGconn_p]
+
+    lib.PQexec.restype = _PGresult_p
+    lib.PQexec.argtypes = [_PGconn_p, ctypes.c_char_p]
+
+    lib.PQresultStatus.restype = ctypes.c_int
+    lib.PQresultStatus.argtypes = [_PGresult_p]
+
+    lib.PQclear.restype = None
+    lib.PQclear.argtypes = [_PGresult_p]
+
+    lib.PQerrorMessage.restype = ctypes.c_char_p
+    lib.PQerrorMessage.argtypes = [_PGconn_p]
+
+    lib.PQfinish.restype = None
+    lib.PQfinish.argtypes = [_PGconn_p]
+
+    return lib
+
+
+class PGresult(contextlib.AbstractContextManager):
+    """Wraps a raw _PGresult_p with a more friendly interface."""
+
+    def __init__(self, lib: ctypes.CDLL, res: _PGresult_p):
+        self._lib = lib
+        self._res = res
+
+    def __exit__(self, *exc):
+        self._lib.PQclear(self._res)
+        self._res = None
+
+    def status(self):
+        return self._lib.PQresultStatus(self._res)
+
+
+class PGconn(contextlib.AbstractContextManager):
+    """
+    Wraps a raw _PGconn_p with a more friendly interface. This is just a
+    stub; it's expected to grow.
+    """
+
+    def __init__(
+        self,
+        lib: ctypes.CDLL,
+        handle: _PGconn_p,
+        stack: contextlib.ExitStack,
+    ):
+        self._lib = lib
+        self._handle = handle
+        self._stack = stack
+
+    def __exit__(self, *exc):
+        self._lib.PQfinish(self._handle)
+        self._handle = None
+
+    def exec(self, query: str) -> PGresult:
+        """
+        Executes a query via PQexec() and returns a PGresult.
+        """
+        res = self._lib.PQexec(self._handle, query.encode())
+        return self._stack.enter_context(PGresult(self._lib, res))
+
+
[email protected]
+def libpq(libpq_handle, remaining_timeout):
+    """
+    Provides a ctypes-based API wrapped around libpq.so. This fixture keeps
+    track of allocated resources and cleans them up during teardown. See
+    _Libpq's public API for details.
+    """
+
+    class _Libpq(contextlib.ExitStack):
+        CONNECTION_OK = 0
+
+        PGRES_EMPTY_QUERY = 0
+
+        class Error(RuntimeError):
+            """
+            libpq.Error is the exception class for application-level errors that
+            are encountered during libpq operations.
+            """
+
+            pass
+
+        def __init__(self):
+            super().__init__()
+            self.lib = libpq_handle
+
+        def _connstr(self, opts: Dict[str, Any]) -> str:
+            """
+            Flattens the provided options into a libpq connection string. Values
+            are converted to str and quoted/escaped as necessary.
+            """
+            settings = []
+
+            for k, v in opts.items():
+                v = str(v)
+                if not v:
+                    v = "''"
+                else:
+                    v = v.replace("\\", "\\\\")
+                    v = v.replace("'", "\\'")
+
+                    if " " in v:
+                        v = f"'{v}'"
+
+                settings.append(f"{k}={v}")
+
+            return " ".join(settings)
+
+        def must_connect(self, **opts) -> PGconn:
+            """
+            Connects to a server, using the given connection options, and
+            returns a libpq.PGconn object wrapping the connection handle. A
+            failure will raise libpq.Error.
+
+            Connections honor PG_TEST_TIMEOUT_DEFAULT unless connect_timeout is
+            explicitly overridden in opts.
+            """
+
+            if "connect_timeout" not in opts:
+                t = int(remaining_timeout())
+                opts["connect_timeout"] = max(t, 1)
+
+            conn_p = self.lib.PQconnectdb(self._connstr(opts).encode())
+
+            # Ensure the connection handle is always closed at the end of the
+            # test.
+            conn = self.enter_context(PGconn(self.lib, conn_p, stack=self))
+
+            if self.lib.PQstatus(conn_p) != self.CONNECTION_OK:
+                raise self.Error(self.lib.PQerrorMessage(conn_p).decode())
+
+            return conn
+
+    with _Libpq() as lib:
+        yield lib
diff --git a/src/test/pytest/pyt/conftest.py b/src/test/pytest/pyt/conftest.py
new file mode 100644
index 00000000000..ecb72be26d7
--- /dev/null
+++ b/src/test/pytest/pyt/conftest.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+from pg.fixtures import *
diff --git a/src/test/pytest/pyt/test_libpq.py b/src/test/pytest/pyt/test_libpq.py
new file mode 100644
index 00000000000..9f0857cc612
--- /dev/null
+++ b/src/test/pytest/pyt/test_libpq.py
@@ -0,0 +1,171 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import os
+import socket
+import struct
+import threading
+from typing import Callable
+
+import pytest
+
+
[email protected](
+    "opts, expected",
+    [
+        (dict(), ""),
+        (dict(port=5432), "port=5432"),
+        (dict(port=5432, dbname="postgres"), "port=5432 dbname=postgres"),
+        (dict(host=""), "host=''"),
+        (dict(host=" "), r"host=' '"),
+        (dict(keyword="'"), r"keyword=\'"),
+        (dict(keyword=" \\' "), r"keyword=' \\\' '"),
+    ],
+)
+def test_connstr(libpq, opts, expected):
+    """Tests the escape behavior for libpq._connstr()."""
+    assert libpq._connstr(opts) == expected
+
+
+def test_must_connect_errors(libpq):
+    """Tests that must_connect() raises libpq.Error."""
+    with pytest.raises(libpq.Error, match="invalid connection option"):
+        libpq.must_connect(some_unknown_keyword="whatever")
+
+
[email protected]
+def local_server(tmp_path, remaining_timeout):
+    """
+    Opens up a local UNIX socket for mocking a Postgres server on a background
+    thread. See the _Server API for usage.
+
+    This fixture requires AF_UNIX support; dependent tests will be skipped on
+    platforms that don't provide it.
+    """
+
+    try:
+        from socket import AF_UNIX
+    except ImportError:
+        pytest.skip("AF_UNIX not supported on this platform")
+
+    class _Server(contextlib.ExitStack):
+        """
+        Implementation class for local_server. See .background() for the primary
+        entry point for tests. Postgres clients may connect to this server via
+        local_server.host/local_server.port.
+
+        _Server derives from contextlib.ExitStack to provide easy cleanup of
+        associated resources; see the documentation for that class for a full
+        explanation.
+        """
+
+        def __init__(self):
+            super().__init__()
+
+            self.host = tmp_path
+            self.port = 5432
+
+            self._thread = None
+            self._thread_exc = None
+            self._listener = self.enter_context(
+                socket.socket(AF_UNIX, socket.SOCK_STREAM),
+            )
+
+        def bind_and_listen(self):
+            """
+            Does the actual work of binding the UNIX socket using the Postgres
+            server conventions and listening for connections.
+
+            The listen backlog is currently hardcoded to one.
+            """
+            sockfile = self.host / ".s.PGSQL.{}".format(self.port)
+
+            # Lock down the permissions on the new socket.
+            prev_mask = os.umask(0o077)
+
+            # Bind (creating the socket file), and immediately register it for
+            # deletion from disk when the stack is cleaned up.
+            self._listener.bind(bytes(sockfile))
+            self.callback(os.unlink, sockfile)
+
+            os.umask(prev_mask)
+
+            self._listener.listen(1)
+
+        def background(self, fn: Callable[[socket.socket], None]) -> None:
+            """
+            Accepts a client connection on a background thread and passes it to
+            the provided callback. Any exceptions raised from the callback will
+            be re-raised on the main thread during fixture teardown.
+
+            Blocking operations on the connected socket default to using the
+            remaining_timeout(), though this can be changed by the test via the
+            socket's .settimeout().
+            """
+
+            def _bg():
+                try:
+                    self._listener.settimeout(remaining_timeout())
+                    sock, _ = self._listener.accept()
+
+                    with sock:
+                        sock.settimeout(remaining_timeout())
+                        fn(sock)
+
+                except Exception as e:
+                    # Save the exception for re-raising on the main thread.
+                    self._thread_exc = e
+
+            # TODO: rather than using callback(), consider explicitly signaling
+            # the fn() implementation to stop early if we get an exception.
+            # Otherwise we'll hang until the end of the timeout.
+            self._thread = threading.Thread(target=_bg)
+            self.callback(self._join)
+
+            self._thread.start()
+
+        def _join(self):
+            """
+            Waits for the background thread to finish and raises any thrown
+            exception. This is called during fixture teardown.
+            """
+            # Give a little bit of wiggle room on the join timeout, since we're
+            # racing against the test's own use of remaining_timeout(). (It's
+            # preferable to let tests report timeouts; the stack traces will
+            # help with debugging.)
+            self._thread.join(remaining_timeout() + 1)
+            if self._thread.is_alive():
+                raise TimeoutError("background thread is still running after timeout")
+
+            if self._thread_exc is not None:
+                raise self._thread_exc
+
+    with _Server() as s:
+        s.bind_and_listen()
+        yield s
+
+
+def test_connection_is_finished_on_error(libpq, local_server, remaining_timeout):
+    """Tests that PQfinish() gets called at the end of testing."""
+    expected_error = "something is wrong"
+
+    def serve_error(s: socket.socket) -> None:
+        pktlen = struct.unpack("!I", s.recv(4))[0]
+
+        # Quick check for the startup packet version.
+        version = struct.unpack("!HH", s.recv(4))
+        assert version == (3, 0)
+
+        # Discard the remainder of the startup packet and send a v2 error.
+        s.recv(pktlen - 8)
+        s.send(b"E" + expected_error.encode() + b"\0")
+
+        # And now the socket should be closed.
+        assert not s.recv(1), "client sent unexpected data"
+
+    local_server.background(serve_error)
+
+    with pytest.raises(libpq.Error, match=expected_error):
+        # Exiting this context should result in PQfinish().
+        with libpq:
+            libpq.must_connect(host=local_server.host, port=local_server.port)
diff --git a/src/test/ssl/Makefile b/src/test/ssl/Makefile
index e8a1639db2d..895ea5ea41c 100644
--- a/src/test/ssl/Makefile
+++ b/src/test/ssl/Makefile
@@ -30,6 +30,8 @@ clean distclean:
 # Doesn't depend on sslfiles because we don't rebuild them by default
 check:
 	$(prove_check)
+	# XXX these suites should run independently, not serially
+	$(pytest_check)
 
 installcheck:
 	$(prove_installcheck)
diff --git a/src/test/ssl/meson.build b/src/test/ssl/meson.build
index d8e0fb518e0..a0ee2af0899 100644
--- a/src/test/ssl/meson.build
+++ b/src/test/ssl/meson.build
@@ -15,4 +15,10 @@ tests += {
       't/003_sslinfo.pl',
     ],
   },
+  'pytest': {
+    'tests': [
+      'pyt/test_client.py',
+      'pyt/test_server.py',
+    ],
+  },
 }
diff --git a/src/test/ssl/pyt/conftest.py b/src/test/ssl/pyt/conftest.py
new file mode 100644
index 00000000000..fb4db372f03
--- /dev/null
+++ b/src/test/ssl/pyt/conftest.py
@@ -0,0 +1,129 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import datetime
+import tempfile
+from collections import namedtuple
+
+import pytest
+
+import pg
+from pg.fixtures import *
+
+
[email protected](scope="session")
+def cryptography():
+    return pytest.importorskip("cryptography", "3.3.2")
+
+
+Cert = namedtuple("Cert", "cert, certpath, key, keypath")
+
+
[email protected](scope="session")
+def certs(cryptography, tmp_path_factory):
+    """
+    Caches commonly used certificates at the session level, and provides a way
+    to create new ones.
+
+    - certs.ca: the root CA certificate
+
+    - certs.server: the "standard" server certficate, signed by certs.ca
+
+    - certs.server_host: the hostname of the certs.server certificate
+
+    - certs.new(): creates a custom certificate, signed by certs.ca
+    """
+
+    from cryptography import x509
+    from cryptography.hazmat.primitives import hashes, serialization
+    from cryptography.hazmat.primitives.asymmetric import rsa
+    from cryptography.x509.oid import NameOID
+
+    tmpdir = tmp_path_factory.mktemp("test-certs")
+
+    class _Certs:
+        def __init__(self):
+            self.ca = self.new(
+                x509.Name(
+                    [x509.NameAttribute(NameOID.COMMON_NAME, "PG pytest CA")],
+                ),
+                ca=True,
+            )
+
+            self.server_host = "example.org"
+            self.server = self.new(
+                x509.Name(
+                    [x509.NameAttribute(NameOID.COMMON_NAME, self.server_host)],
+                )
+            )
+
+        def new(self, subject: x509.Name, *, ca=False) -> Cert:
+            """
+            Creates and signs a new Cert with the given subject name. If ca is
+            True, the certificate will be self-signed; otherwise the certificate
+            is signed by self.ca.
+            """
+            key = rsa.generate_private_key(
+                public_exponent=65537,
+                key_size=2048,
+            )
+
+            builder = x509.CertificateBuilder()
+            now = datetime.datetime.now(datetime.timezone.utc)
+
+            builder = (
+                builder.subject_name(subject)
+                .public_key(key.public_key())
+                .serial_number(x509.random_serial_number())
+                .not_valid_before(now)
+                .not_valid_after(now + datetime.timedelta(hours=1))
+            )
+
+            if ca:
+                builder = builder.issuer_name(subject)
+            else:
+                builder = builder.issuer_name(self.ca.cert.subject)
+
+            builder = builder.add_extension(
+                x509.BasicConstraints(ca=ca, path_length=None),
+                critical=True,
+            )
+
+            cert = builder.sign(
+                private_key=key if ca else self.ca.key,
+                algorithm=hashes.SHA256(),
+            )
+
+            # Dump the certificate and key to file.
+            keypath = self._tofile(
+                key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.PKCS8,
+                    serialization.NoEncryption(),
+                ),
+                suffix=".key",
+            )
+            certpath = self._tofile(
+                cert.public_bytes(serialization.Encoding.PEM),
+                suffix="-ca.crt" if ca else ".crt",
+            )
+
+            return Cert(
+                cert=cert,
+                certpath=certpath,
+                key=key,
+                keypath=keypath,
+            )
+
+        def _tofile(self, data: bytes, *, suffix) -> str:
+            """
+            Dumps data to a file on disk with the requested suffix and returns
+            the path. The file is located somewhere in pytest's temporary
+            directory root.
+            """
+            f = tempfile.NamedTemporaryFile(suffix=suffix, dir=tmpdir, delete=False)
+            with f:
+                f.write(data)
+
+            return f.name
+
+    return _Certs()
diff --git a/src/test/ssl/pyt/test_client.py b/src/test/ssl/pyt/test_client.py
new file mode 100644
index 00000000000..28110ae0717
--- /dev/null
+++ b/src/test/ssl/pyt/test_client.py
@@ -0,0 +1,278 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import ctypes
+import socket
+import ssl
+import struct
+import threading
+from typing import Callable
+
+import pytest
+
+import pg
+
+# This suite opens up local TCP ports and is hidden behind PG_TEST_EXTRA=ssl.
+pytestmark = pg.require_test_extra("ssl")
+
+
[email protected](scope="session", autouse=True)
+def skip_if_no_ssl_support(libpq_handle):
+    """Skips tests if SSL support is not configured."""
+
+    # Declare PQsslAttribute().
+    PQsslAttribute = libpq_handle.PQsslAttribute
+    PQsslAttribute.restype = ctypes.c_char_p
+    PQsslAttribute.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
+
+    if not PQsslAttribute(None, b"library"):
+        pytest.skip("requires SSL support to be configured")
+
+
+#
+# Test Fixtures
+#
+
+
[email protected]
+def tcp_server_class(remaining_timeout):
+    """
+    Metafixture to combine related logic for tcp_server and ssl_server.
+
+    TODO: combine with test_libpq.local_server
+    """
+
+    class _TCPServer(contextlib.ExitStack):
+        """
+        Implementation class for tcp_server. See .background() for the primary
+        entry point for tests. Postgres clients may connect to this server via
+        **tcp_server.conninfo.
+
+        _TCPServer derives from contextlib.ExitStack to provide easy cleanup of
+        associated resources; see the documentation for that class for a full
+        explanation.
+        """
+
+        def __init__(self):
+            super().__init__()
+
+            self._thread = None
+            self._thread_exc = None
+            self._listener = self.enter_context(
+                socket.socket(socket.AF_INET, socket.SOCK_STREAM),
+            )
+
+            self._bind_and_listen()
+            sockname = self._listener.getsockname()
+            self.conninfo = dict(
+                hostaddr=sockname[0],
+                port=sockname[1],
+            )
+
+        def _bind_and_listen(self):
+            """
+            Does the actual work of binding the socket and listening for
+            connections.
+
+            The listen backlog is currently hardcoded to one.
+            """
+            self._listener.bind(("127.0.0.1", 0))
+            self._listener.listen(1)
+
+        def background(self, fn: Callable[[socket.socket], None]) -> None:
+            """
+            Accepts a client connection on a background thread and passes it to
+            the provided callback. Any exceptions raised from the callback will
+            be re-raised on the main thread during fixture teardown.
+
+            Blocking operations on the connected socket default to using the
+            remaining_timeout(), though this can be changed by the test via the
+            socket's .settimeout().
+            """
+
+            def _bg():
+                try:
+                    self._listener.settimeout(remaining_timeout())
+                    sock, _ = self._listener.accept()
+
+                    with sock:
+                        sock.settimeout(remaining_timeout())
+                        fn(sock)
+
+                except Exception as e:
+                    # Save the exception for re-raising on the main thread.
+                    self._thread_exc = e
+
+            # TODO: rather than using callback(), consider explicitly signaling
+            # the fn() implementation to stop early if we get an exception.
+            # Otherwise we'll hang until the end of the timeout.
+            self._thread = threading.Thread(target=_bg)
+            self.callback(self._join)
+
+            self._thread.start()
+
+        def _join(self):
+            """
+            Waits for the background thread to finish and raises any thrown
+            exception. This is called during fixture teardown.
+            """
+            # Give a little bit of wiggle room on the join timeout, since we're
+            # racing against the test's own use of remaining_timeout(). (It's
+            # preferable to let tests report timeouts; the stack traces will
+            # help with debugging.)
+            self._thread.join(remaining_timeout() + 1)
+            if self._thread.is_alive():
+                raise TimeoutError("background thread is still running after timeout")
+
+            if self._thread_exc is not None:
+                raise self._thread_exc
+
+    return _TCPServer
+
+
[email protected]
+def tcp_server(tcp_server_class):
+    """
+    Opens up a local TCP socket for mocking a Postgres server on a background
+    thread. See the _TCPServer API for usage.
+    """
+    with tcp_server_class() as s:
+        yield s
+
+
[email protected]
+def ssl_server(tcp_server_class, certs):
+    """
+    Like tcp_server, but with an additional .background_ssl() method which will
+    perform a SSLRequest handshake on the socket before handing the connection
+    to the test callback.
+
+    This server uses certs.server as its identity.
+    """
+
+    class _SSLServer(tcp_server_class):
+        def __init__(self):
+            super().__init__()
+
+            self.conninfo["host"] = certs.server_host
+
+            self._ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+            self._ctx.load_cert_chain(certs.server.certpath, certs.server.keypath)
+
+        def background_ssl(self, fn: Callable[[ssl.SSLSocket], None]) -> None:
+            """
+            Invokes a server callback as with .background(), but an SSLRequest
+            handshake is performed first, and the socket provided to the
+            callback has been wrapped in an OpenSSL layer.
+            """
+
+            def handshake(s: socket.socket):
+                pktlen = struct.unpack("!I", s.recv(4))[0]
+
+                # Make sure we get an SSLRequest.
+                version = struct.unpack("!HH", s.recv(4))
+                assert version == (1234, 5679)
+                assert pktlen == 8
+
+                # Accept the SSLRequest.
+                s.send(b"S")
+
+                with self._ctx.wrap_socket(s, server_side=True) as wrapped:
+                    fn(wrapped)
+
+            self.background(handshake)
+
+    with _SSLServer() as s:
+        yield s
+
+
+#
+# Tests
+#
+
+
[email protected]("sslmode", ("require", "verify-ca", "verify-full"))
+def test_server_with_ssl_disabled(libpq, tcp_server, certs, sslmode):
+    """
+    Make sure client refuses to talk to non-SSL servers with stricter
+    sslmodes.
+    """
+
+    def refuse_ssl(s: socket.socket):
+        pktlen = struct.unpack("!I", s.recv(4))[0]
+
+        # Make sure we get an SSLRequest.
+        version = struct.unpack("!HH", s.recv(4))
+        assert version == (1234, 5679)
+        assert pktlen == 8
+
+        # Refuse the SSLRequest.
+        s.send(b"N")
+
+        # Wait for the client to close the connection.
+        assert not s.recv(1), "client sent unexpected data"
+
+    tcp_server.background(refuse_ssl)
+
+    with pytest.raises(libpq.Error, match="server does not support SSL"):
+        with libpq:  # XXX tests shouldn't need to do this
+            libpq.must_connect(
+                **tcp_server.conninfo,
+                sslrootcert=certs.ca.certpath,
+                sslmode=sslmode,
+            )
+
+
+def test_verify_full_connection(libpq, ssl_server, certs):
+    """Completes a verify-full connection and empty query."""
+
+    def handle_empty_query(s: ssl.SSLSocket):
+        pktlen = struct.unpack("!I", s.recv(4))[0]
+
+        # Check the startup packet version, then discard the remainder.
+        version = struct.unpack("!HH", s.recv(4))
+        assert version == (3, 0)
+        s.recv(pktlen - 8)
+
+        # Send the required litany of server messages.
+        s.send(struct.pack("!cII", b"R", 8, 0))  # AuthenticationOK
+
+        # ParameterStatus: client_encoding
+        key = b"client_encoding\0"
+        val = b"UTF-8\0"
+        s.send(struct.pack("!cI", b"S", 4 + len(key) + len(val)) + key + val)
+
+        # ParameterStatus: DateStyle
+        key = b"DateStyle\0"
+        val = b"ISO, MDY\0"
+        s.send(struct.pack("!cI", b"S", 4 + len(key) + len(val)) + key + val)
+
+        s.send(struct.pack("!cIII", b"K", 12, 1234, 1234))  # BackendKeyData
+        s.send(struct.pack("!cIc", b"Z", 5, b"I"))  # ReadyForQuery
+
+        # Expect an empty query.
+        pkttype = s.recv(1)
+        assert pkttype == b"Q"
+        pktlen = struct.unpack("!I", s.recv(4))[0]
+        assert s.recv(pktlen - 4) == b"\0"
+
+        # Send an EmptyQueryResponse+ReadyForQuery.
+        s.send(struct.pack("!cI", b"I", 4))
+        s.send(struct.pack("!cIc", b"Z", 5, b"I"))
+
+        # libpq should terminate and close the connection.
+        assert s.recv(1) == b"X"
+        pktlen = struct.unpack("!I", s.recv(4))[0]
+        assert pktlen == 4
+
+        assert not s.recv(1), "client sent unexpected data"
+
+    ssl_server.background_ssl(handle_empty_query)
+
+    conn = libpq.must_connect(
+        **ssl_server.conninfo,
+        sslrootcert=certs.ca.certpath,
+        sslmode="verify-full",
+    )
+    with conn:
+        assert conn.exec("").status() == libpq.PGRES_EMPTY_QUERY
-- 
2.51.1



  [text/x-patch] v3-0004-WIP-pytest-Add-some-server-side-SSL-tests.patch (29.3K, 5-v3-0004-WIP-pytest-Add-some-server-side-SSL-tests.patch)
  download | inline diff:
From dc96eb0721074a522568f5ac608522ea30001b6b Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Fri, 22 Aug 2025 17:39:40 -0700
Subject: [PATCH v3 04/10] WIP: pytest: Add some server-side SSL tests

In the same vein as the previous commit, this is a server-only test
suite operating against a mock client. The test itself is a heavily
parameterized check for direct-SSL handshake behavior, using a
combination of "standard" and "custom" certificates via the certs
fixture.

installcheck is currently unsupported, but the architecture has some
extension points that should make it possible later. For now, a new
server is always started for the test session.

New session-level fixtures have been added which probably need to
migrate to the `pg` package. Of note:

- datadir points to the server's data directory
- sockdir points to the server's UNIX socket/lock directory
- server_instance actually inits and starts a server via the pg_ctl on
  PATH (and could eventually point at an installcheck target)

Wrapping these session-level fixtures is pg_server[_session], which
provides APIs for configuration changes that unwind themselves at the
end of fixture scopes. There's also an example of nested scopes, via
pg_server_session.subcontext(). Many TODOs remain before we're on par
with Test::Cluster, but this should illustrate my desired architecture
pretty well.

Windows currently uses SCRAM-over-UNIX for the admin account rather than
SSPI-over-TCP. There's some dead Win32 code in pg.current_windows_user,
but I've kept it as an illustration of how a developer might write such
code for SSPI. I'll probably remove it in a future patch version.

TODOs:
- port more server configuration behavior from PostgreSQL::Test::Cluster
- decide again on "session" vs. "module" scope for server fixtures
- improve remaining_timeout() integration with socket operations; at the
  moment, the timeout resets on every call rather than decrementing
---
 src/test/pytest/pg/__init__.py  |   1 +
 src/test/pytest/pg/_win32.py    | 145 +++++++++
 src/test/ssl/pyt/conftest.py    | 113 +++++++
 src/test/ssl/pyt/test_server.py | 538 ++++++++++++++++++++++++++++++++
 4 files changed, 797 insertions(+)
 create mode 100644 src/test/pytest/pg/_win32.py
 create mode 100644 src/test/ssl/pyt/test_server.py

diff --git a/src/test/pytest/pg/__init__.py b/src/test/pytest/pg/__init__.py
index ef8faf54ca4..5dae49b6406 100644
--- a/src/test/pytest/pg/__init__.py
+++ b/src/test/pytest/pg/__init__.py
@@ -1,3 +1,4 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
 from ._env import has_test_extra, require_test_extra
+from ._win32 import current_windows_user
diff --git a/src/test/pytest/pg/_win32.py b/src/test/pytest/pg/_win32.py
new file mode 100644
index 00000000000..3fd67b10191
--- /dev/null
+++ b/src/test/pytest/pg/_win32.py
@@ -0,0 +1,145 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import ctypes
+import platform
+
+
+def current_windows_user():
+    """
+    A port of pg_regress.c's current_windows_user() helper. Returns
+    (accountname, domainname).
+
+    XXX This is dead code now, but I'm keeping it as a motivating example of
+    Win32 interaction, and someone may find it useful in the future when writing
+    SSPI tests?
+    """
+    try:
+        advapi32 = ctypes.windll.advapi32
+        kernel32 = ctypes.windll.kernel32
+    except AttributeError:
+        raise RuntimeError(
+            f"current_windows_user() is not supported on {platform.system()}"
+        )
+
+    def raise_winerror_when_false(result, func, arguments):
+        """
+        A ctypes errcheck handler that raises WinError (which will contain the
+        result of GetLastError()) when the function's return value is false.
+        """
+        if not result:
+            raise ctypes.WinError()
+
+    #
+    # Function Prototypes
+    #
+
+    from ctypes import wintypes
+
+    # GetCurrentProcess
+    kernel32.GetCurrentProcess.restype = wintypes.HANDLE
+    kernel32.GetCurrentProcess.argtypes = []
+
+    # OpenProcessToken
+    TOKEN_READ = 0x00020008
+
+    advapi32.OpenProcessToken.restype = wintypes.BOOL
+    advapi32.OpenProcessToken.argtypes = [
+        wintypes.HANDLE,
+        wintypes.DWORD,
+        wintypes.PHANDLE,
+    ]
+    advapi32.OpenProcessToken.errcheck = raise_winerror_when_false
+
+    # GetTokenInformation
+    PSID = wintypes.LPVOID  # we don't need the internals
+    TOKEN_INFORMATION_CLASS = wintypes.INT
+    TokenUser = 1
+
+    class SID_AND_ATTRIBUTES(ctypes.Structure):
+        _fields_ = [
+            ("Sid", PSID),
+            ("Attributes", wintypes.DWORD),
+        ]
+
+    class TOKEN_USER(ctypes.Structure):
+        _fields_ = [
+            ("User", SID_AND_ATTRIBUTES),
+        ]
+
+    advapi32.GetTokenInformation.restype = wintypes.BOOL
+    advapi32.GetTokenInformation.argtypes = [
+        wintypes.HANDLE,
+        TOKEN_INFORMATION_CLASS,
+        wintypes.LPVOID,
+        wintypes.DWORD,
+        wintypes.PDWORD,
+    ]
+    advapi32.GetTokenInformation.errcheck = raise_winerror_when_false
+
+    # LookupAccountSid
+    SID_NAME_USE = wintypes.INT
+    PSID_NAME_USE = ctypes.POINTER(SID_NAME_USE)
+
+    advapi32.LookupAccountSidW.restype = wintypes.BOOL
+    advapi32.LookupAccountSidW.argtypes = [
+        wintypes.LPCWSTR,
+        PSID,
+        wintypes.LPWSTR,
+        wintypes.LPDWORD,
+        wintypes.LPWSTR,
+        wintypes.LPDWORD,
+        PSID_NAME_USE,
+    ]
+    advapi32.LookupAccountSidW.errcheck = raise_winerror_when_false
+
+    #
+    # Implementation (see pg_SSPI_recv_auth())
+    #
+
+    # Get the current process token...
+    token = wintypes.HANDLE()
+    proc = kernel32.GetCurrentProcess()
+    advapi32.OpenProcessToken(proc, TOKEN_READ, token)
+
+    # ...then read the TOKEN_USER struct for that token...
+    info = TOKEN_USER()
+    infolen = wintypes.DWORD()
+
+    try:
+        # (GetTokenInformation creates a buffer bigger than TOKEN_USER, so we
+        # have to query the correct length first.)
+        advapi32.GetTokenInformation(token, TokenUser, None, 0, ctypes.byref(infolen))
+        assert False, "GetTokenInformation succeeded unexpectedly"
+
+    except OSError as err:
+        assert err.winerror == 122  # insufficient buffer
+
+        ctypes.resize(info, infolen.value)
+        advapi32.GetTokenInformation(
+            token,
+            TokenUser,
+            ctypes.byref(info),
+            ctypes.sizeof(info),
+            ctypes.byref(infolen),
+        )
+
+    # ...then pull the account and domain names out of the user SID.
+    MAXPGPATH = 1024
+
+    account = ctypes.create_unicode_buffer(MAXPGPATH)
+    domain = ctypes.create_unicode_buffer(MAXPGPATH)
+    accountlen = wintypes.DWORD(ctypes.sizeof(account))
+    domainlen = wintypes.DWORD(ctypes.sizeof(domain))
+    use = SID_NAME_USE()
+
+    advapi32.LookupAccountSidW(
+        None,
+        info.User.Sid,
+        account,
+        ctypes.byref(accountlen),
+        domain,
+        ctypes.byref(domainlen),
+        ctypes.byref(use),
+    )
+
+    return (account.value, domain.value)
diff --git a/src/test/ssl/pyt/conftest.py b/src/test/ssl/pyt/conftest.py
index fb4db372f03..85d2c994828 100644
--- a/src/test/ssl/pyt/conftest.py
+++ b/src/test/ssl/pyt/conftest.py
@@ -1,6 +1,12 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
 import datetime
+import os
+import pathlib
+import platform
+import secrets
+import socket
+import subprocess
 import tempfile
 from collections import namedtuple
 
@@ -127,3 +133,110 @@ def certs(cryptography, tmp_path_factory):
             return f.name
 
     return _Certs()
+
+
[email protected](scope="session")
+def datadir(tmp_path_factory):
+    """
+    Returns the directory name to use as the server data directory. If
+    TESTDATADIR is provided, that will be used; otherwise a new temporary
+    directory is created in the pytest temp root.
+    """
+    d = os.getenv("TESTDATADIR")
+    if d:
+        d = pathlib.Path(d)
+    else:
+        d = tmp_path_factory.mktemp("tmp_check")
+
+    return d
+
+
[email protected](scope="session")
+def sockdir(tmp_path_factory):
+    """
+    Returns the directory name to use as the server's unix_socket_directories
+    setting. Local client connections use this as the PGHOST.
+
+    At the moment, this is always put under the pytest temp root.
+    """
+    return tmp_path_factory.mktemp("sockfiles")
+
+
[email protected](scope="session")
+def winpassword():
+    """The per-session SCRAM password for the server admin on Windows."""
+    return secrets.token_urlsafe(16)
+
+
[email protected](scope="session")
+def server_instance(certs, datadir, sockdir, winpassword):
+    """
+    Starts a running Postgres server listening on localhost. The HBA initially
+    allows only local UNIX connections from the same user.
+
+    TODO: when installcheck is supported, this should optionally point to the
+    currently running server instead.
+    """
+
+    # Lock down the HBA by default; tests can open it back up later.
+    if platform.system() == "Windows":
+        # On Windows, for admin connections, use SCRAM with a generated password
+        # over local sockets. This requires additional work during initdb.
+        method = "scram-sha-256"
+
+        # NamedTemporaryFile doesn't work very nicely on Windows until Python
+        # 3.12, which introduces NamedTemporaryFile(delete_on_close=False).
+        # Until then, specify delete=False and manually unlink after use.
+        with tempfile.NamedTemporaryFile("w", delete=False) as pwfile:
+            pwfile.write(winpassword)
+
+        subprocess.check_call(
+            ["initdb", "--auth=scram-sha-256", "--pwfile", pwfile.name, datadir]
+        )
+        os.unlink(pwfile.name)
+
+    else:
+        # For other OSes we can just use peer auth.
+        method = "peer"
+        subprocess.check_call(["pg_ctl", "-D", datadir, "init"])
+
+    with open(datadir / "pg_hba.conf", "w") as f:
+        print(f"# default: local {method} connections only", file=f)
+        print(f"local all all {method}", file=f)
+
+    # Figure out a port to listen on. Attempt to reserve both IPv4 and IPv6
+    # addresses in one go.
+    #
+    # Note: socket.has_dualstack_ipv6/create_server are only in Python 3.8+.
+    if hasattr(socket, "has_dualstack_ipv6") and socket.has_dualstack_ipv6():
+        addr = ("::1", 0)
+        s = socket.create_server(addr, family=socket.AF_INET6, dualstack_ipv6=True)
+
+        hostaddr, port, _, _ = s.getsockname()
+        addrs = [hostaddr, "127.0.0.1"]
+
+    else:
+        addr = ("127.0.0.1", 0)
+
+        s = socket.socket()
+        s.bind(addr)
+
+        hostaddr, port = s.getsockname()
+        addrs = [hostaddr]
+
+    log = os.path.join(datadir, "postgresql.log")
+
+    with s, open(os.path.join(datadir, "postgresql.conf"), "a") as f:
+        print(file=f)
+        print("unix_socket_directories = '{}'".format(sockdir.as_posix()), file=f)
+        print("listen_addresses = '{}'".format(",".join(addrs)), file=f)
+        print("port =", port, file=f)
+        print("log_connections = all", file=f)
+
+    # Between closing of the socket, s, and server start, we're racing against
+    # anything that wants to open up ephemeral ports, so try not to put any new
+    # work here.
+
+    subprocess.check_call(["pg_ctl", "-D", datadir, "-l", log, "start"])
+    yield (hostaddr, port)
+    subprocess.check_call(["pg_ctl", "-D", datadir, "-l", log, "stop"])
diff --git a/src/test/ssl/pyt/test_server.py b/src/test/ssl/pyt/test_server.py
new file mode 100644
index 00000000000..2d0be735371
--- /dev/null
+++ b/src/test/ssl/pyt/test_server.py
@@ -0,0 +1,538 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import os
+import pathlib
+import platform
+import re
+import shutil
+import socket
+import ssl
+import struct
+import subprocess
+import tempfile
+from collections import namedtuple
+from typing import Dict, List, Union
+
+import pytest
+
+import pg
+
+# This suite opens up local TCP ports and is hidden behind PG_TEST_EXTRA=ssl.
+pytestmark = pg.require_test_extra("ssl")
+
+
+#
+# Test Fixtures
+#
+
+
[email protected](scope="session")
+def connenv(server_instance, sockdir, datadir):
+    """
+    Provides the values for several PG* environment variables needed for our
+    utility programs to connect to the server_instance.
+    """
+    return {
+        "PGHOST": str(sockdir),
+        "PGPORT": str(server_instance[1]),
+        "PGDATABASE": "postgres",
+        "PGDATA": str(datadir),
+    }
+
+
+class FileBackup(contextlib.AbstractContextManager):
+    """
+    A context manager which backs up a file's contents, restoring them on exit.
+    """
+
+    def __init__(self, file: pathlib.Path):
+        super().__init__()
+
+        self._file = file
+
+    def __enter__(self):
+        with tempfile.NamedTemporaryFile(
+            prefix=self._file.name, dir=self._file.parent, delete=False
+        ) as f:
+            self._backup = pathlib.Path(f.name)
+
+        shutil.copyfile(self._file, self._backup)
+
+        return self
+
+    def __exit__(self, *exc):
+        # Swap the backup and the original file, so that the modified contents
+        # can still be inspected in case of failure.
+        #
+        # TODO: this is less helpful if there are multiple layers, because it's
+        # not clear which backup to look at. Can the backup name be printed as
+        # part of the failed test output? Should we only swap on test failure?
+        tmp = self._backup.parent / (self._backup.name + ".tmp")
+
+        shutil.copyfile(self._file, tmp)
+        shutil.copyfile(self._backup, self._file)
+        shutil.move(tmp, self._backup)
+
+
+class HBA(FileBackup):
+    """
+    Backs up a server's HBA configuration and provides means for temporarily
+    editing it. See also pg_server, which provides an instance of this class and
+    context managers for enforcing the reload/restart order of operations.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "pg_hba.conf")
+
+    def prepend(self, *lines: Union[str, List[str]]):
+        """
+        Temporarily prepends lines to the server's pg_hba.conf.
+
+        As sugar for aligning HBA columns in the tests, each line can be either
+        a string or a list of strings. List elements will be joined by single
+        spaces before they are written to file.
+        """
+        with open(self._file, "r") as f:
+            prior_data = f.read()
+
+        with open(self._file, "w") as f:
+            for l in lines:
+                if isinstance(l, list):
+                    print(*l, file=f)
+                else:
+                    print(l, file=f)
+
+            f.write(prior_data)
+
+
+class Config(FileBackup):
+    """
+    Backs up a server's postgresql.conf and provides means for temporarily
+    editing it. See also pg_server, which provides an instance of this class and
+    context managers for enforcing the reload/restart order of operations.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "postgresql.conf")
+
+    def set(self, **gucs):
+        """
+        Temporarily appends GUC settings to the server's postgresql.conf.
+        """
+
+        with open(self._file, "a") as f:
+            print(file=f)
+
+            for n, v in gucs.items():
+                v = str(v)
+
+                # TODO: proper quoting
+                v = v.replace("\\", "\\\\")
+                v = v.replace("'", "\\'")
+                v = "'{}'".format(v)
+
+                print(n, "=", v, file=f)
+
+
[email protected](scope="session")
+def pg_server_session(server_instance, connenv, datadir, winpassword):
+    """
+    Provides common routines for configuring and connecting to the
+    server_instance. For example:
+
+        users = pg_server_session.create_users("one", "two")
+        dbs = pg_server_session.create_dbs("default")
+
+        with pg_server_session.reloading() as s:
+            s.hba.prepend(["local", dbs["default"], users["two"], "peer"])
+
+        conn = connect_somehow(**pg_server_session.conninfo)
+        ...
+
+    Attributes of note are
+    - .conninfo: provides TCP connection info for the server
+
+    This fixture unwinds its configuration changes at the end of the pytest
+    session. For more granular changes, pg_server_session.subcontext() splits
+    off a "nested" context to allow smaller scopes.
+    """
+
+    class _Server(contextlib.ExitStack):
+        conninfo = dict(
+            hostaddr=server_instance[0],
+            port=server_instance[1],
+        )
+
+        # for _backup_configuration()
+        _Backup = namedtuple("Backup", "conf, hba")
+
+        def subcontext(self):
+            """
+            Creates a new server stack instance that can be tied to a smaller
+            scope than "session".
+            """
+            # So far, there doesn't seem to be a need to link the two objects,
+            # since HBA/Config/FileBackup operate directly on the filesystem and
+            # will appear to "nest" naturally.
+            return self.__class__()
+
+        def create_users(self, *userkeys: str) -> Dict[str, str]:
+            """
+            Creates new users which will be dropped at the end of the server
+            context.
+
+            For each provided key, a related user name will be selected and
+            stored in a map. This map is returned to let calling code look up
+            the selected usernames (instead of hardcoding them and potentially
+            stomping on an existing installation).
+            """
+            usermap = {}
+
+            for u in userkeys:
+                # TODO: use a uniquifier to support installcheck
+                name = u + "user"
+                usermap[u] = name
+
+                # TODO: proper escaping
+                self.psql("-c", "CREATE USER " + name)
+                self.callback(self.psql, "-c", "DROP USER " + name)
+
+            return usermap
+
+        def create_dbs(self, *dbkeys: str) -> Dict[str, str]:
+            """
+            Creates new databases which will be dropped at the end of the server
+            context. See create_users() for the meaning of the keys and returned
+            map.
+            """
+            dbmap = {}
+
+            for d in dbkeys:
+                # TODO: use a uniquifier to support installcheck
+                name = d + "db"
+                dbmap[d] = name
+
+                # TODO: proper escaping
+                self.psql("-c", "CREATE DATABASE " + name)
+                self.callback(self.psql, "-c", "DROP DATABASE " + name)
+
+            return dbmap
+
+        @contextlib.contextmanager
+        def reloading(self):
+            """
+            Provides a context manager for making configuration changes.
+
+            If the context suite finishes successfully, the configuration will
+            be reloaded via pg_ctl. On teardown, the configuration changes will
+            be unwound, and the server will be signaled to reload again.
+
+            The context target contains the following attributes which can be
+            used to configure the server:
+            - .conf: modifies postgresql.conf
+            - .hba: modifies pg_hba.conf
+
+            For example:
+
+                with pg_server_session.reloading() as s:
+                    s.conf.set(log_connections="on")
+                    s.hba.prepend("local all all trust")
+            """
+            try:
+                # Push a reload onto the stack before making any other
+                # unwindable changes. That way the order of operations will be
+                #
+                #  # test
+                #   - config change 1
+                #   - config change 2
+                #   - reload
+                #  # teardown
+                #   - undo config change 2
+                #   - undo config change 1
+                #   - reload
+                #
+                self.callback(self.pg_ctl, "reload")
+                yield self._backup_configuration()
+            except:
+                # We only want to reload at the end of the suite if there were
+                # no errors. During exceptions, the pushed callback handles
+                # things instead, so there's nothing to do here.
+                raise
+            else:
+                # Suite completed successfully.
+                self.pg_ctl("reload")
+
+        @contextlib.contextmanager
+        def restarting(self):
+            """Like .reloading(), but with a full server restart."""
+            try:
+                self.callback(self.pg_ctl, "restart")
+                yield self._backup_configuration()
+            except:
+                raise
+            else:
+                self.pg_ctl("restart")
+
+        def psql(self, *args):
+            """
+            Runs psql with the given arguments. Password prompts are always
+            disabled. On Windows, the admin password will be included in the
+            environment.
+            """
+            if platform.system() == "Windows":
+                pw = dict(PGPASSWORD=winpassword)
+            else:
+                pw = None
+
+            self._run("psql", "-w", *args, addenv=pw)
+
+        def pg_ctl(self, *args):
+            """
+            Runs pg_ctl with the given arguments. Log output will be placed in
+            postgresql.log in the server's data directory.
+
+            TODO: put the log in TESTLOGDIR
+            """
+            self._run("pg_ctl", "-l", str(datadir / "postgresql.log"), *args)
+
+        def _run(self, cmd, *args, addenv: dict = None):
+            # Override the existing environment with the connenv values and
+            # anything the caller wanted to add. (Python 3.9 gives us the
+            # less-ugly `os.environ | connenv` merge operator.)
+            subenv = dict(os.environ, **connenv)
+            if addenv:
+                subenv.update(addenv)
+
+            subprocess.check_call([cmd, *args], env=subenv)
+
+        def _backup_configuration(self):
+            # Wrap the existing HBA and configuration with FileBackups.
+            return self._Backup(
+                hba=self.enter_context(HBA(datadir)),
+                conf=self.enter_context(Config(datadir)),
+            )
+
+    with _Server() as s:
+        yield s
+
+
[email protected](scope="module", autouse=True)
+def ssl_setup(pg_server_session, certs, datadir):
+    """
+    Sets up required server settings for all tests in this module. The fixture
+    variable is a tuple (users, dbs) containing the user and database names that
+    have been chosen for the test session.
+    """
+    try:
+        with pg_server_session.restarting() as s:
+            s.conf.set(
+                ssl="on",
+                ssl_ca_file=certs.ca.certpath,
+                ssl_cert_file=certs.server.certpath,
+                ssl_key_file=certs.server.keypath,
+            )
+
+            # Reject by default.
+            s.hba.prepend("hostssl all all all reject")
+
+    except subprocess.CalledProcessError:
+        # This is a decent place to skip if the server isn't set up for SSL.
+        logpath = datadir / "postgresql.log"
+        unsupported = re.compile("SSL is not supported")
+
+        with open(logpath, "r") as log:
+            for line in log:
+                if unsupported.search(line):
+                    pytest.skip("the server does not support SSL")
+
+        # Some other error happened.
+        raise
+
+    users = pg_server_session.create_users(
+        "ssl",
+    )
+
+    dbs = pg_server_session.create_dbs(
+        "ssl",
+    )
+
+    return (users, dbs)
+
+
[email protected](scope="module")
+def client_cert(ssl_setup, certs):
+    """
+    Creates a Cert for the "ssl" user.
+    """
+    from cryptography import x509
+    from cryptography.x509.oid import NameOID
+
+    users, _ = ssl_setup
+    user = users["ssl"]
+
+    return certs.new(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, user)]))
+
+
[email protected]
+def pg_server(pg_server_session):
+    """
+    A per-test instance of pg_server_session. Use this fixture to make changes
+    to the server which will be rolled back at the end of every test.
+    """
+    with pg_server_session.subcontext() as s:
+        yield s
+
+
+#
+# Tests
+#
+
+
+# For use with the `creds` parameter below.
+CLIENT = "client"
+SERVER = "server"
+
+
[email protected](
+    # fmt: off
+    "auth_method,                    creds,  expected_error",
+[
+    # Trust allows anything.
+    ("trust",                        None,   None),
+    ("trust",                        CLIENT, None),
+    ("trust",                        SERVER, None),
+
+    # verify-ca allows any CA-signed certificate.
+    ("trust clientcert=verify-ca",   None,   "requires a valid client certificate"),
+    ("trust clientcert=verify-ca",   CLIENT, None),
+    ("trust clientcert=verify-ca",   SERVER, None),
+
+    # cert and verify-full allow only the correct certificate.
+    ("trust clientcert=verify-full", None,   "requires a valid client certificate"),
+    ("trust clientcert=verify-full", CLIENT, None),
+    ("trust clientcert=verify-full", SERVER, "authentication failed for user"),
+    ("cert",                         None,   "requires a valid client certificate"),
+    ("cert",                         CLIENT, None),
+    ("cert",                         SERVER, "authentication failed for user"),
+],
+    # fmt: on
+)
+def test_direct_ssl_certificate_authentication(
+    pg_server,
+    ssl_setup,
+    certs,
+    client_cert,
+    remaining_timeout,
+    # test parameters
+    auth_method,
+    creds,
+    expected_error,
+):
+    """
+    Tests direct SSL connections with various client-certificate/HBA
+    combinations.
+    """
+
+    # Set up the HBA as desired by the test.
+    users, dbs = ssl_setup
+
+    user = users["ssl"]
+    db = dbs["ssl"]
+
+    with pg_server.reloading() as s:
+        s.hba.prepend(
+            ["hostssl", db, user, "127.0.0.1/32", auth_method],
+            ["hostssl", db, user, "::1/128", auth_method],
+        )
+
+    # Configure the SSL settings for the client.
+    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+    ctx.load_verify_locations(cafile=certs.ca.certpath)
+    ctx.set_alpn_protocols(["postgresql"])  # for direct SSL
+
+    # Load up a client certificate if required by the test.
+    if creds == CLIENT:
+        ctx.load_cert_chain(client_cert.certpath, client_cert.keypath)
+    elif creds == SERVER:
+        # Using a server certificate as the client credential is expected to
+        # work only for clientcert=verify-ca (and `trust`, naturally).
+        ctx.load_cert_chain(certs.server.certpath, certs.server.keypath)
+
+    # Make a direct SSL connection. There's no SSLRequest in the handshake; we
+    # simply wrap a TCP connection with OpenSSL.
+    addr = (pg_server.conninfo["hostaddr"], pg_server.conninfo["port"])
+    with socket.create_connection(addr) as s:
+        s.settimeout(remaining_timeout())  # XXX this resets every operation
+
+        with ctx.wrap_socket(s, server_hostname=certs.server_host) as conn:
+            # Build and send the startup packet.
+            startup_options = dict(
+                user=user,
+                database=db,
+                application_name="pytest",
+            )
+
+            payload = b""
+            for k, v in startup_options.items():
+                payload += k.encode() + b"\0"
+                payload += str(v).encode() + b"\0"
+            payload += b"\0"  # null terminator
+
+            pktlen = 4 + 4 + len(payload)
+            conn.send(struct.pack("!IHH", pktlen, 3, 0) + payload)
+
+            if not expected_error:
+                # Expect an AuthenticationOK to come back.
+                pkttype, pktlen = struct.unpack("!cI", conn.recv(5))
+                assert pkttype == b"R"
+                assert pktlen == 8
+
+                authn_result = struct.unpack("!I", conn.recv(4))[0]
+                assert authn_result == 0
+
+                # Read and discard to ReadyForQuery.
+                while True:
+                    pkttype, pktlen = struct.unpack("!cI", conn.recv(5))
+                    payload = conn.recv(pktlen - 4)
+
+                    if pkttype == b"Z":
+                        assert payload == b"I"
+                        break
+
+                # Send an empty query.
+                conn.send(struct.pack("!cI", b"Q", 5) + b"\0")
+
+                # Expect EmptyQueryResponse+ReadyForQuery.
+                pkttype, pktlen = struct.unpack("!cI", conn.recv(5))
+                assert pkttype == b"I"
+                assert pktlen == 4
+
+                pkttype, pktlen = struct.unpack("!cI", conn.recv(5))
+                assert pkttype == b"Z"
+
+                payload = conn.recv(pktlen - 4)
+                assert payload == b"I"
+
+            else:
+                # Match the expected authentication error.
+                pkttype, pktlen = struct.unpack("!cI", conn.recv(5))
+                assert pkttype == b"E"
+
+                payload = conn.recv(pktlen - 4)
+                msg = None
+
+                for component in payload.split(b"\0"):
+                    if not component:
+                        break  # end of message
+
+                    key, val = component[:1], component[1:]
+                    if key == b"S":
+                        assert val == b"FATAL"
+                    elif key == b"M":
+                        msg = val.decode()
+
+                assert re.search(expected_error, msg), "server error did not match"
+
+            # Terminate.
+            conn.send(struct.pack("!cI", b"X", 4))
-- 
2.51.1



  [text/x-patch] v3-0005-ci-Add-MTEST_SUITES-for-optional-test-tailoring.patch (3.1K, 6-v3-0005-ci-Add-MTEST_SUITES-for-optional-test-tailoring.patch)
  download | inline diff:
From 09b5c7d9d04966b842ac19f66aac9e2dd7097b3e Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Tue, 2 Sep 2025 15:37:53 -0700
Subject: [PATCH v3 05/10] ci: Add MTEST_SUITES for optional test tailoring

Should make it easier to control the test cycle time for Cirrus. Add the
desired suites (remembering `--suite setup`!) to the top-level envvar.
---
 .cirrus.tasks.yml | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/.cirrus.tasks.yml b/.cirrus.tasks.yml
index 4e744f1c105..706a809f641 100644
--- a/.cirrus.tasks.yml
+++ b/.cirrus.tasks.yml
@@ -28,6 +28,7 @@ env:
   # errors/warnings in one place.
   MBUILD_TARGET: all testprep
   MTEST_ARGS: --print-errorlogs --no-rebuild -C build
+  MTEST_SUITES: # --suite setup --suite ssl --suite ...
   PGCTLTIMEOUT: 120 # avoids spurious failures during parallel tests
   TEMP_CONFIG: ${CIRRUS_WORKING_DIR}/src/tools/ci/pg_ci_base.conf
   PG_TEST_EXTRA: kerberos ldap ssl libpq_encryption load_balance oauth
@@ -247,7 +248,7 @@ task:
   test_world_script: |
     su postgres <<-EOF
       ulimit -c unlimited
-      meson test $MTEST_ARGS --num-processes ${TEST_JOBS}
+      meson test $MTEST_ARGS --num-processes ${TEST_JOBS} ${MTEST_SUITES}
     EOF
 
   # test runningcheck, freebsd chosen because it's currently fast enough
@@ -391,7 +392,7 @@ task:
       # Otherwise tests will fail on OpenBSD, due to inability to start enough
       # processes.
       ulimit -p 256
-      meson test $MTEST_ARGS --num-processes ${TEST_JOBS}
+      meson test $MTEST_ARGS --num-processes ${TEST_JOBS} ${MTEST_SUITES}
     EOF
 
   on_failure:
@@ -605,7 +606,7 @@ task:
       test_world_script: |
         su postgres <<-EOF
           ulimit -c unlimited
-          meson test $MTEST_ARGS --num-processes ${TEST_JOBS}
+          meson test $MTEST_ARGS --num-processes ${TEST_JOBS} ${MTEST_SUITES}
         EOF
         # so that we don't upload 64bit logs if 32bit fails
         rm -rf build/
@@ -617,7 +618,7 @@ task:
       test_world_32_script: |
         su postgres <<-EOF
           ulimit -c unlimited
-          PYTHONCOERCECLOCALE=0 LANG=C meson test $MTEST_ARGS -C build-32 --num-processes ${TEST_JOBS}
+          PYTHONCOERCECLOCALE=0 LANG=C meson test $MTEST_ARGS -C build-32 --num-processes ${TEST_JOBS} ${MTEST_SUITES}
         EOF
 
       on_failure:
@@ -743,7 +744,7 @@ task:
   test_world_script: |
     ulimit -c unlimited # default is 0
     ulimit -n 1024 # default is 256, pretty low
-    meson test $MTEST_ARGS --num-processes ${TEST_JOBS}
+    meson test $MTEST_ARGS --num-processes ${TEST_JOBS} ${MTEST_SUITES}
 
   on_failure:
     <<: *on_failure_meson
@@ -826,7 +827,7 @@ task:
 
   check_world_script: |
     vcvarsall x64
-    meson test %MTEST_ARGS% --num-processes %TEST_JOBS%
+    meson test %MTEST_ARGS% --num-processes %TEST_JOBS% %MTEST_SUITES%
 
   on_failure:
     <<: *on_failure_meson
@@ -887,7 +888,7 @@ task:
   upload_caches: ccache
 
   test_world_script: |
-    %BASH% -c "meson test %MTEST_ARGS% --num-processes %TEST_JOBS%"
+    %BASH% -c "meson test %MTEST_ARGS% --num-processes %TEST_JOBS% %MTEST_SUITES%"
 
   on_failure:
     <<: *on_failure_meson
-- 
2.51.1



  [text/x-patch] v3-0006-XXX-run-pytest-and-ssl-suite-all-OSes.patch (1.4K, 7-v3-0006-XXX-run-pytest-and-ssl-suite-all-OSes.patch)
  download | inline diff:
From 6912ea5437feedeb9ca65e312d1726c23671ad54 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Tue, 2 Sep 2025 15:38:52 -0700
Subject: [PATCH v3 06/10] XXX run pytest and ssl suite, all OSes

---
 .cirrus.star      | 2 +-
 .cirrus.tasks.yml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/.cirrus.star b/.cirrus.star
index e9bb672b959..7c1caaa12f1 100644
--- a/.cirrus.star
+++ b/.cirrus.star
@@ -73,7 +73,7 @@ def compute_environment_vars():
     # REPO_CI_AUTOMATIC_TRIGGER_TASKS="task_name other_task" under "Repository
     # Settings" on Cirrus CI's website.
 
-    default_manual_trigger_tasks = ['mingw', 'netbsd', 'openbsd']
+    default_manual_trigger_tasks = []
 
     repo_ci_automatic_trigger_tasks = env.get('REPO_CI_AUTOMATIC_TRIGGER_TASKS', '')
     for task in default_manual_trigger_tasks:
diff --git a/.cirrus.tasks.yml b/.cirrus.tasks.yml
index 706a809f641..ddb5305dc81 100644
--- a/.cirrus.tasks.yml
+++ b/.cirrus.tasks.yml
@@ -28,7 +28,7 @@ env:
   # errors/warnings in one place.
   MBUILD_TARGET: all testprep
   MTEST_ARGS: --print-errorlogs --no-rebuild -C build
-  MTEST_SUITES: # --suite setup --suite ssl --suite ...
+  MTEST_SUITES: --suite setup --suite pytest --suite ssl
   PGCTLTIMEOUT: 120 # avoids spurious failures during parallel tests
   TEMP_CONFIG: ${CIRRUS_WORKING_DIR}/src/tools/ci/pg_ci_base.conf
   PG_TEST_EXTRA: kerberos ldap ssl libpq_encryption load_balance oauth
-- 
2.51.1



  [text/x-patch] v3-0007-Refactor-and-improve-pytest-infrastructure.patch (74.5K, 8-v3-0007-Refactor-and-improve-pytest-infrastructure.patch)
  download | inline diff:
From 220c4db5e6bd5996da4b31abe35a43fc61abb71d Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <[email protected]>
Date: Sun, 19 Oct 2025 23:01:30 +0200
Subject: [PATCH v3 07/10] Refactor and improve pytest infrastructure

This change does a lot of refactoring and adding new features to the
pytest based test infrastructure.

The primary features it adds are:
- A `sql` method on `PGconn`: It takes a query and returns the results
  as native python types.
- A `conn` fixture: Which is a libpq based connection to the default
  Postgres server.
- Use the `pg_config` binary to find the libdir and bindir (can be
  overridden by setting PG_CONFIG). Otherwise I had to use
  LD_LIBRARY_PATH when running pytest manually.

The refactoring it does:
- Rename `pg_server` fixture to `pg` since it'll likely be one of the
  most commonly used ones.
- Rename `pg` module to `pypg` to avoid naming conflict/shadowing
  problems with the newly renamed `pg` fixture
- Move class definitions outside of fixtures to separate modules (either
  in the `pypg` module or the new `libpq` module)
- Move all "general" fixtures to the `pypg.fixtures` module, instead of
  having them be defined in the ssl module.
---
 src/test/pytest/libpq.py                  | 409 ++++++++++++++++++++++
 src/test/pytest/pg/fixtures.py            | 212 -----------
 src/test/pytest/plugins/pgtap.py          |   1 -
 src/test/pytest/{pg => pypg}/__init__.py  |   0
 src/test/pytest/{pg => pypg}/_env.py      |   1 -
 src/test/pytest/{pg => pypg}/_win32.py    |   0
 src/test/pytest/pypg/fixtures.py          | 175 +++++++++
 src/test/pytest/pypg/server.py            | 387 ++++++++++++++++++++
 src/test/pytest/pypg/util.py              |  42 +++
 src/test/pytest/pyt/conftest.py           |   3 +-
 src/test/pytest/pyt/test_libpq.py         |  23 +-
 src/test/pytest/pyt/test_query_helpers.py | 286 +++++++++++++++
 src/test/ssl/pyt/conftest.py              | 136 ++-----
 src/test/ssl/pyt/test_client.py           |  26 +-
 src/test/ssl/pyt/test_server.py           | 380 +-------------------
 15 files changed, 1370 insertions(+), 711 deletions(-)
 create mode 100644 src/test/pytest/libpq.py
 delete mode 100644 src/test/pytest/pg/fixtures.py
 rename src/test/pytest/{pg => pypg}/__init__.py (100%)
 rename src/test/pytest/{pg => pypg}/_env.py (97%)
 rename src/test/pytest/{pg => pypg}/_win32.py (100%)
 create mode 100644 src/test/pytest/pypg/fixtures.py
 create mode 100644 src/test/pytest/pypg/server.py
 create mode 100644 src/test/pytest/pypg/util.py
 create mode 100644 src/test/pytest/pyt/test_query_helpers.py

diff --git a/src/test/pytest/libpq.py b/src/test/pytest/libpq.py
new file mode 100644
index 00000000000..b851a117b66
--- /dev/null
+++ b/src/test/pytest/libpq.py
@@ -0,0 +1,409 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+libpq testing utilities - ctypes bindings and helpers for PostgreSQL's libpq library.
+
+This module provides Python wrappers around libpq for use in pytest tests.
+"""
+
+import contextlib
+import ctypes
+import datetime
+import decimal
+import enum
+import json
+import platform
+import os
+import uuid
+from typing import Any, Callable, Dict
+
+
+class LibpqError(RuntimeError):
+    """
+    Exception class for application-level errors that are encountered during libpq operations.
+    """
+
+    pass
+
+
+class ConnectionStatus(enum.IntEnum):
+    """PostgreSQL connection status codes from libpq."""
+
+    CONNECTION_OK = 0
+    CONNECTION_BAD = 1
+
+
+class ExecStatus(enum.IntEnum):
+    """PostgreSQL result status codes from PQresultStatus."""
+
+    PGRES_EMPTY_QUERY = 0
+    PGRES_COMMAND_OK = 1
+    PGRES_TUPLES_OK = 2
+    PGRES_COPY_OUT = 3
+    PGRES_COPY_IN = 4
+    PGRES_BAD_RESPONSE = 5
+    PGRES_NONFATAL_ERROR = 6
+    PGRES_FATAL_ERROR = 7
+    PGRES_COPY_BOTH = 8
+    PGRES_SINGLE_TUPLE = 9
+    PGRES_PIPELINE_SYNC = 10
+    PGRES_PIPELINE_ABORTED = 11
+
+
+class _PGconn(ctypes.Structure):
+    pass
+
+
+class _PGresult(ctypes.Structure):
+    pass
+
+
+_PGconn_p = ctypes.POINTER(_PGconn)
+_PGresult_p = ctypes.POINTER(_PGresult)
+
+
+def load_libpq_handle(libdir):
+    """
+    Loads a ctypes handle for libpq. Some common function prototypes are
+    initialized for general use.
+    """
+    system = platform.system()
+
+    if system in ("Linux", "FreeBSD", "NetBSD", "OpenBSD"):
+        name = "libpq.so.5"
+    elif system == "Darwin":
+        name = "libpq.5.dylib"
+    elif system == "Windows":
+        name = "libpq.dll"
+    else:
+        assert False, f"the libpq fixture must be updated for {system}"
+
+    libpq_path = os.path.join(libdir, name)
+
+    # XXX ctypes.CDLL() is a little stricter with load paths on Windows. The
+    # preferred way around that is to know the absolute path to libpq.dll, but
+    # that doesn't seem to mesh well with the current test infrastructure. For
+    # now, enable "standard" LoadLibrary behavior.
+    loadopts = {}
+    if system == "Windows":
+        loadopts["winmode"] = 0
+
+    lib = ctypes.CDLL(libpq_path, **loadopts)
+
+    #
+    # Function Prototypes
+    #
+
+    lib.PQconnectdb.restype = _PGconn_p
+    lib.PQconnectdb.argtypes = [ctypes.c_char_p]
+
+    lib.PQstatus.restype = ctypes.c_int
+    lib.PQstatus.argtypes = [_PGconn_p]
+
+    lib.PQexec.restype = _PGresult_p
+    lib.PQexec.argtypes = [_PGconn_p, ctypes.c_char_p]
+
+    lib.PQresultStatus.restype = ctypes.c_int
+    lib.PQresultStatus.argtypes = [_PGresult_p]
+
+    lib.PQclear.restype = None
+    lib.PQclear.argtypes = [_PGresult_p]
+
+    lib.PQerrorMessage.restype = ctypes.c_char_p
+    lib.PQerrorMessage.argtypes = [_PGconn_p]
+
+    lib.PQfinish.restype = None
+    lib.PQfinish.argtypes = [_PGconn_p]
+
+    lib.PQresultErrorMessage.restype = ctypes.c_char_p
+    lib.PQresultErrorMessage.argtypes = [_PGresult_p]
+
+    lib.PQntuples.restype = ctypes.c_int
+    lib.PQntuples.argtypes = [_PGresult_p]
+
+    lib.PQnfields.restype = ctypes.c_int
+    lib.PQnfields.argtypes = [_PGresult_p]
+
+    lib.PQgetvalue.restype = ctypes.c_char_p
+    lib.PQgetvalue.argtypes = [_PGresult_p, ctypes.c_int, ctypes.c_int]
+
+    lib.PQgetisnull.restype = ctypes.c_int
+    lib.PQgetisnull.argtypes = [_PGresult_p, ctypes.c_int, ctypes.c_int]
+
+    lib.PQftype.restype = ctypes.c_uint
+    lib.PQftype.argtypes = [_PGresult_p, ctypes.c_int]
+
+    return lib
+
+
+# PostgreSQL type OIDs and conversion system
+# Type registry - maps OID to converter function
+_type_converters: Dict[int, Callable[[str], Any]] = {}
+_array_to_elem_map: Dict[int, int] = {}
+
+
+def register_type_info(
+    name: str, oid: int, array_oid: int, converter: Callable[[str], Any]
+):
+    """
+    Register a PostgreSQL type with its OID, array OID, and conversion function.
+
+    Usage:
+        register_type_info("bool", 16, 1000, lambda v: v == "t")
+    """
+    _type_converters[oid] = converter
+    if array_oid is not None:
+        _array_to_elem_map[array_oid] = oid
+
+
+# Helper converters
+def _parse_array(value: str, elem_oid: int) -> list:
+    """Parse PostgreSQL array syntax: {elem1,elem2,elem3}"""
+    if not (value.startswith("{") and value.endswith("}")):
+        return value
+
+    inner = value[1:-1]
+    if not inner:
+        return []
+
+    elements = inner.split(",")
+    result = []
+    for elem in elements:
+        elem = elem.strip()
+        if elem == "NULL":
+            result.append(None)
+        else:
+            # Remove quotes if present
+            if elem.startswith('"') and elem.endswith('"'):
+                elem = elem[1:-1]
+            result.append(_convert_pg_value(elem, elem_oid))
+
+    return result
+
+
+# Register standard PostgreSQL types that we'll likely encounter in tests
+register_type_info("bool", 16, 1000, lambda v: v == "t")
+register_type_info("int2", 21, 1005, int)
+register_type_info("int4", 23, 1007, int)
+register_type_info("int8", 20, 1016, int)
+register_type_info("float4", 700, 1021, float)
+register_type_info("float8", 701, 1022, float)
+register_type_info("numeric", 1700, 1231, decimal.Decimal)
+register_type_info("text", 25, 1009, str)
+register_type_info("varchar", 1043, 1015, str)
+register_type_info("date", 1082, 1182, datetime.date.fromisoformat)
+register_type_info("time", 1083, 1183, datetime.time.fromisoformat)
+register_type_info("timestamp", 1114, 1115, datetime.datetime.fromisoformat)
+register_type_info("timestamptz", 1184, 1185, datetime.datetime.fromisoformat)
+register_type_info("uuid", 2950, 2951, uuid.UUID)
+register_type_info("json", 114, 199, json.loads)
+register_type_info("jsonb", 3802, 3807, json.loads)
+
+
+def _convert_pg_value(value: str, type_oid: int) -> Any:
+    """
+    Convert PostgreSQL string value to appropriate Python type based on OID.
+    Uses the registered type converters from register_type_info().
+    """
+    # Check if it's an array type
+    if type_oid in _array_to_elem_map:
+        elem_oid = _array_to_elem_map[type_oid]
+        return _parse_array(value, elem_oid)
+
+    # Use registered converter if available
+    converter = _type_converters.get(type_oid)
+    if converter:
+        return converter(value)
+
+    # Unknown types - return as string
+    return value
+
+
+def simplify_query_results(results) -> Any:
+    """
+    Simplify the results of a query so that the caller doesn't have to unpack
+    lists and tuples of length 1.
+    """
+    if len(results) == 1:
+        row = results[0]
+        if len(row) == 1:
+            # If there's only a single cell, just return the value
+            return row[0]
+        # If there's only a single row, just return that row
+        return row
+
+    if len(results) != 0 and len(results[0]) == 1:
+        # If there's only a single column, return an array of values
+        return [row[0] for row in results]
+
+    # if there are multiple rows and columns, return the results as is
+    return results
+
+
+class PGresult(contextlib.AbstractContextManager):
+    """Wraps a raw _PGresult_p with a more friendly interface."""
+
+    def __init__(self, lib: ctypes.CDLL, res: _PGresult_p):
+        self._lib = lib
+        self._res = res
+
+    def __exit__(self, *exc):
+        self._lib.PQclear(self._res)
+        self._res = None
+
+    def status(self) -> ExecStatus:
+        return ExecStatus(self._lib.PQresultStatus(self._res))
+
+    def error_message(self):
+        """Returns the error message associated with this result."""
+        msg = self._lib.PQresultErrorMessage(self._res)
+        return msg.decode() if msg else ""
+
+    def fetch_all(self):
+        """
+        Fetch all rows and convert to Python types.
+        Returns a list of tuples, with values converted based on their PostgreSQL type.
+        """
+        nrows = self._lib.PQntuples(self._res)
+        ncols = self._lib.PQnfields(self._res)
+
+        # Get type OIDs for each column
+        type_oids = [self._lib.PQftype(self._res, col) for col in range(ncols)]
+
+        results = []
+        for row in range(nrows):
+            row_data = []
+            for col in range(ncols):
+                if self._lib.PQgetisnull(self._res, row, col):
+                    row_data.append(None)
+                else:
+                    value = self._lib.PQgetvalue(self._res, row, col).decode()
+                    row_data.append(_convert_pg_value(value, type_oids[col]))
+            results.append(tuple(row_data))
+
+        return results
+
+
+class PGconn(contextlib.AbstractContextManager):
+    """
+    Wraps a raw _PGconn_p with a more friendly interface. This is just a
+    stub; it's expected to grow.
+    """
+
+    def __init__(
+        self,
+        lib: ctypes.CDLL,
+        handle: _PGconn_p,
+        stack: contextlib.ExitStack,
+    ):
+        self._lib = lib
+        self._handle = handle
+        self._stack = stack
+
+    def __exit__(self, *exc):
+        self._lib.PQfinish(self._handle)
+        self._handle = None
+
+    def exec(self, query: str):
+        """
+        Executes a query via PQexec() and returns a PGresult.
+        """
+        res = self._lib.PQexec(self._handle, query.encode())
+        return self._stack.enter_context(PGresult(self._lib, res))
+
+    def sql(self, query: str):
+        """
+        Executes a query and raises an exception if it fails.
+        Returns the query results with automatic type conversion and simplification.
+        For commands that don't return data (INSERT, UPDATE, etc.), returns None.
+
+        Examples:
+        - SELECT 1 -> 1
+        - SELECT 1, 2 -> (1, 2)
+        - SELECT * FROM generate_series(1, 3) -> [1, 2, 3]
+        - SELECT * FROM (VALUES (1, 'a'), (2, 'b')) t -> [(1, 'a'), (2, 'b')]
+        - CREATE TABLE ... -> None
+        - INSERT INTO ... -> None
+        """
+        res = self.exec(query)
+        status = res.status()
+
+        if status == ExecStatus.PGRES_FATAL_ERROR:
+            error_msg = res.error_message()
+            raise LibpqError(f"Query failed: {error_msg}\nQuery: {query}")
+        elif status == ExecStatus.PGRES_COMMAND_OK:
+            return None
+        elif status == ExecStatus.PGRES_TUPLES_OK:
+            results = res.fetch_all()
+            return simplify_query_results(results)
+        else:
+            error_msg = res.error_message() or f"Unexpected status: {status}"
+            raise LibpqError(f"Query failed: {error_msg}\nQuery: {query}")
+
+
+def connstr(opts: Dict[str, Any]) -> str:
+    """
+    Flattens the provided options into a libpq connection string. Values
+    are converted to str and quoted/escaped as necessary.
+    """
+    settings = []
+
+    for k, v in opts.items():
+        v = str(v)
+        if not v:
+            v = "''"
+        else:
+            v = v.replace("\\", "\\\\")
+            v = v.replace("'", "\\'")
+
+            if " " in v:
+                v = f"'{v}'"
+
+        settings.append(f"{k}={v}")
+
+    return " ".join(settings)
+
+
+def connect(
+    libpq_handle: ctypes.CDLL,
+    stack: contextlib.ExitStack,
+    remaining_timeout_fn: Callable[[], float],
+    **opts,
+) -> PGconn:
+    """
+    Connects to a server, using the given connection options, and
+    returns a PGconn object wrapping the connection handle. A
+    failure will raise LibpqError.
+
+    Connections honor PG_TEST_TIMEOUT_DEFAULT unless connect_timeout is
+    explicitly overridden in opts.
+
+    Args:
+        libpq_handle: ctypes.CDLL handle to libpq library
+        stack: ExitStack for managing connection cleanup
+        remaining_timeout_fn: Function that returns remaining timeout in seconds
+        **opts: Connection options (host, port, dbname, etc.)
+
+    Returns:
+        PGconn: Connected database connection
+
+    Raises:
+        LibpqError: If connection fails
+    """
+
+    if "connect_timeout" not in opts:
+        t = int(remaining_timeout_fn())
+        opts["connect_timeout"] = max(t, 1)
+
+    conn_p = libpq_handle.PQconnectdb(connstr(opts).encode())
+
+    # Check connection status before adding to stack
+    if libpq_handle.PQstatus(conn_p) != ConnectionStatus.CONNECTION_OK:
+        error_msg = libpq_handle.PQerrorMessage(conn_p).decode()
+        # Manually close the failed connection
+        libpq_handle.PQfinish(conn_p)
+        raise LibpqError(error_msg)
+
+    # Connection succeeded - add to stack for cleanup
+    conn = stack.enter_context(PGconn(libpq_handle, conn_p, stack=stack))
+    return conn
diff --git a/src/test/pytest/pg/fixtures.py b/src/test/pytest/pg/fixtures.py
deleted file mode 100644
index b5d3bff69a8..00000000000
--- a/src/test/pytest/pg/fixtures.py
+++ /dev/null
@@ -1,212 +0,0 @@
-# Copyright (c) 2025, PostgreSQL Global Development Group
-
-import contextlib
-import ctypes
-import platform
-import time
-from typing import Any, Callable, Dict
-
-import pytest
-
-from ._env import test_timeout_default
-
-
[email protected]
-def remaining_timeout():
-    """
-    This fixture provides a function that returns how much of the
-    PG_TEST_TIMEOUT_DEFAULT remains for the current test, in fractional seconds.
-    This value is never less than zero.
-
-    This fixture is per-test, so the deadline is also reset on a per-test basis.
-    """
-    now = time.monotonic()
-    deadline = now + test_timeout_default()
-
-    return lambda: max(deadline - time.monotonic(), 0)
-
-
-class _PGconn(ctypes.Structure):
-    pass
-
-
-class _PGresult(ctypes.Structure):
-    pass
-
-
-_PGconn_p = ctypes.POINTER(_PGconn)
-_PGresult_p = ctypes.POINTER(_PGresult)
-
-
[email protected](scope="session")
-def libpq_handle():
-    """
-    Loads a ctypes handle for libpq. Some common function prototypes are
-    initialized for general use.
-    """
-    system = platform.system()
-
-    if system in ("Linux", "FreeBSD", "NetBSD", "OpenBSD"):
-        name = "libpq.so.5"
-    elif system == "Darwin":
-        name = "libpq.5.dylib"
-    elif system == "Windows":
-        name = "libpq.dll"
-    else:
-        assert False, f"the libpq fixture must be updated for {system}"
-
-    # XXX ctypes.CDLL() is a little stricter with load paths on Windows. The
-    # preferred way around that is to know the absolute path to libpq.dll, but
-    # that doesn't seem to mesh well with the current test infrastructure. For
-    # now, enable "standard" LoadLibrary behavior.
-    loadopts = {}
-    if system == "Windows":
-        loadopts["winmode"] = 0
-
-    lib = ctypes.CDLL(name, **loadopts)
-
-    #
-    # Function Prototypes
-    #
-
-    lib.PQconnectdb.restype = _PGconn_p
-    lib.PQconnectdb.argtypes = [ctypes.c_char_p]
-
-    lib.PQstatus.restype = ctypes.c_int
-    lib.PQstatus.argtypes = [_PGconn_p]
-
-    lib.PQexec.restype = _PGresult_p
-    lib.PQexec.argtypes = [_PGconn_p, ctypes.c_char_p]
-
-    lib.PQresultStatus.restype = ctypes.c_int
-    lib.PQresultStatus.argtypes = [_PGresult_p]
-
-    lib.PQclear.restype = None
-    lib.PQclear.argtypes = [_PGresult_p]
-
-    lib.PQerrorMessage.restype = ctypes.c_char_p
-    lib.PQerrorMessage.argtypes = [_PGconn_p]
-
-    lib.PQfinish.restype = None
-    lib.PQfinish.argtypes = [_PGconn_p]
-
-    return lib
-
-
-class PGresult(contextlib.AbstractContextManager):
-    """Wraps a raw _PGresult_p with a more friendly interface."""
-
-    def __init__(self, lib: ctypes.CDLL, res: _PGresult_p):
-        self._lib = lib
-        self._res = res
-
-    def __exit__(self, *exc):
-        self._lib.PQclear(self._res)
-        self._res = None
-
-    def status(self):
-        return self._lib.PQresultStatus(self._res)
-
-
-class PGconn(contextlib.AbstractContextManager):
-    """
-    Wraps a raw _PGconn_p with a more friendly interface. This is just a
-    stub; it's expected to grow.
-    """
-
-    def __init__(
-        self,
-        lib: ctypes.CDLL,
-        handle: _PGconn_p,
-        stack: contextlib.ExitStack,
-    ):
-        self._lib = lib
-        self._handle = handle
-        self._stack = stack
-
-    def __exit__(self, *exc):
-        self._lib.PQfinish(self._handle)
-        self._handle = None
-
-    def exec(self, query: str) -> PGresult:
-        """
-        Executes a query via PQexec() and returns a PGresult.
-        """
-        res = self._lib.PQexec(self._handle, query.encode())
-        return self._stack.enter_context(PGresult(self._lib, res))
-
-
[email protected]
-def libpq(libpq_handle, remaining_timeout):
-    """
-    Provides a ctypes-based API wrapped around libpq.so. This fixture keeps
-    track of allocated resources and cleans them up during teardown. See
-    _Libpq's public API for details.
-    """
-
-    class _Libpq(contextlib.ExitStack):
-        CONNECTION_OK = 0
-
-        PGRES_EMPTY_QUERY = 0
-
-        class Error(RuntimeError):
-            """
-            libpq.Error is the exception class for application-level errors that
-            are encountered during libpq operations.
-            """
-
-            pass
-
-        def __init__(self):
-            super().__init__()
-            self.lib = libpq_handle
-
-        def _connstr(self, opts: Dict[str, Any]) -> str:
-            """
-            Flattens the provided options into a libpq connection string. Values
-            are converted to str and quoted/escaped as necessary.
-            """
-            settings = []
-
-            for k, v in opts.items():
-                v = str(v)
-                if not v:
-                    v = "''"
-                else:
-                    v = v.replace("\\", "\\\\")
-                    v = v.replace("'", "\\'")
-
-                    if " " in v:
-                        v = f"'{v}'"
-
-                settings.append(f"{k}={v}")
-
-            return " ".join(settings)
-
-        def must_connect(self, **opts) -> PGconn:
-            """
-            Connects to a server, using the given connection options, and
-            returns a libpq.PGconn object wrapping the connection handle. A
-            failure will raise libpq.Error.
-
-            Connections honor PG_TEST_TIMEOUT_DEFAULT unless connect_timeout is
-            explicitly overridden in opts.
-            """
-
-            if "connect_timeout" not in opts:
-                t = int(remaining_timeout())
-                opts["connect_timeout"] = max(t, 1)
-
-            conn_p = self.lib.PQconnectdb(self._connstr(opts).encode())
-
-            # Ensure the connection handle is always closed at the end of the
-            # test.
-            conn = self.enter_context(PGconn(self.lib, conn_p, stack=self))
-
-            if self.lib.PQstatus(conn_p) != self.CONNECTION_OK:
-                raise self.Error(self.lib.PQerrorMessage(conn_p).decode())
-
-            return conn
-
-    with _Libpq() as lib:
-        yield lib
diff --git a/src/test/pytest/plugins/pgtap.py b/src/test/pytest/plugins/pgtap.py
index ef8291e291c..6a729d252e1 100644
--- a/src/test/pytest/plugins/pgtap.py
+++ b/src/test/pytest/plugins/pgtap.py
@@ -2,7 +2,6 @@
 
 import os
 import sys
-from typing import Optional
 
 import pytest
 
diff --git a/src/test/pytest/pg/__init__.py b/src/test/pytest/pypg/__init__.py
similarity index 100%
rename from src/test/pytest/pg/__init__.py
rename to src/test/pytest/pypg/__init__.py
diff --git a/src/test/pytest/pg/_env.py b/src/test/pytest/pypg/_env.py
similarity index 97%
rename from src/test/pytest/pg/_env.py
rename to src/test/pytest/pypg/_env.py
index 6f18af07844..154c986d73e 100644
--- a/src/test/pytest/pg/_env.py
+++ b/src/test/pytest/pypg/_env.py
@@ -2,7 +2,6 @@
 
 import logging
 import os
-from typing import List, Optional
 
 import pytest
 
diff --git a/src/test/pytest/pg/_win32.py b/src/test/pytest/pypg/_win32.py
similarity index 100%
rename from src/test/pytest/pg/_win32.py
rename to src/test/pytest/pypg/_win32.py
diff --git a/src/test/pytest/pypg/fixtures.py b/src/test/pytest/pypg/fixtures.py
new file mode 100644
index 00000000000..cf22c8ec436
--- /dev/null
+++ b/src/test/pytest/pypg/fixtures.py
@@ -0,0 +1,175 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import os
+import contextlib
+import pathlib
+import secrets
+import time
+
+import pytest
+
+from ._env import test_timeout_default
+from .util import capture
+from .server import PostgresServer
+
+from libpq import load_libpq_handle, connect as libpq_connect
+
+
[email protected]
+def remaining_timeout():
+    """
+    This fixture provides a function that returns how much of the
+    PG_TEST_TIMEOUT_DEFAULT remains for the current test, in fractional seconds.
+    This value is never less than zero.
+
+    This fixture is per-test, so the deadline is also reset on a per-test basis.
+    """
+    now = time.monotonic()
+    deadline = now + test_timeout_default()
+
+    return lambda: max(deadline - time.monotonic(), 0)
+
+
[email protected](scope="session")
+def libpq_handle(libdir):
+    """
+    Loads a ctypes handle for libpq. Some common function prototypes are
+    initialized for general use.
+    """
+    return load_libpq_handle(libdir)
+
+
[email protected]
+def connect(libpq_handle, remaining_timeout):
+    """
+    Returns a function to connect to PostgreSQL via libpq.
+
+    The returned function accepts connection options as keyword arguments
+    (host, port, dbname, etc.) and returns a PGconn object. Connections
+    are automatically cleaned up at the end of the test.
+
+    Example:
+        conn = connect(host='localhost', port=5432, dbname='postgres')
+        result = conn.sql("SELECT 1")
+    """
+    with contextlib.ExitStack() as stack:
+
+        def _connect(**opts):
+            return libpq_connect(libpq_handle, stack, remaining_timeout, **opts)
+
+        yield _connect
+
+
[email protected](scope="session")
+def pg_config():
+    """
+    Returns the path to pg_config. Uses PG_CONFIG environment variable if set,
+    otherwise uses 'pg_config' from PATH.
+    """
+    return os.environ.get("PG_CONFIG", "pg_config")
+
+
[email protected](scope="session")
+def bindir(pg_config):
+    """
+    Returns the PostgreSQL bin directory using pg_config --bindir.
+    """
+    return capture(pg_config, "--bindir")
+
+
[email protected](scope="session")
+def libdir(pg_config):
+    """
+    Returns the PostgreSQL lib directory using pg_config --libdir.
+    """
+    return capture(pg_config, "--libdir")
+
+
[email protected](scope="session")
+def datadir(tmp_path_factory):
+    """
+    Returns the directory name to use as the server data directory. If
+    TESTDATADIR is provided, that will be used; otherwise a new temporary
+    directory is created in the pytest temp root.
+    """
+    d = os.getenv("TESTDATADIR")
+    if d:
+        d = pathlib.Path(d)
+    else:
+        d = tmp_path_factory.mktemp("tmp_check")
+
+    return d
+
+
[email protected](scope="session")
+def sockdir(tmp_path_factory):
+    """
+    Returns the directory name to use as the server's unix_socket_directories
+    setting. Local client connections use this as the PGHOST.
+
+    At the moment, this is always put under the pytest temp root.
+    """
+    return tmp_path_factory.mktemp("sockfiles")
+
+
[email protected](scope="session")
+def winpassword():
+    """The per-session SCRAM password for the server admin on Windows."""
+    return secrets.token_urlsafe(16)
+
+
[email protected](scope="session")
+def pg_server_global(bindir, datadir, sockdir, winpassword, libpq_handle):
+    """
+    Starts a running Postgres server listening on localhost. The HBA initially
+    allows only local UNIX connections from the same user.
+
+    Returns a PostgresServer instance with methods for server management, configuration,
+    and creating test databases/users.
+    """
+    server = PostgresServer(bindir, datadir, sockdir, winpassword, libpq_handle)
+
+    yield server
+
+    # Cleanup any test resources
+    server.cleanup()
+
+    # Stop the server
+    server.stop()
+
+
[email protected](scope="module")
+def pg_server_module(pg_server_global):
+    """
+    Module-scoped server context. Which can be useful so that certain settings
+    can be overriden at the module level through autouse fixtures. An example
+    of this is in the SSL tests.
+    """
+    with pg_server_global.subcontext() as s:
+        yield s
+
+
[email protected]
+def pg(pg_server_module, remaining_timeout):
+    """
+    Per-test server context. Use this fixture to make changes to the server
+    which will be rolled back at the end of the test (e.g., creating test
+    users/databases).
+    """
+    pg_server_module.set_timeout(remaining_timeout)
+    with pg_server_module.subcontext() as s:
+        yield s
+
+
[email protected]
+def conn(pg):
+    """
+    Returns a connected PGconn instance to the test PostgreSQL server.
+    The connection is automatically cleaned up at the end of the test.
+
+    Example:
+        def test_something(conn):
+            result = conn.sql("SELECT 1")
+            assert result == 1
+    """
+    return pg.connect()
diff --git a/src/test/pytest/pypg/server.py b/src/test/pytest/pypg/server.py
new file mode 100644
index 00000000000..d6675cde93d
--- /dev/null
+++ b/src/test/pytest/pypg/server.py
@@ -0,0 +1,387 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import contextlib
+import glob
+import os
+import pathlib
+import platform
+import socket
+import subprocess
+import tempfile
+import time
+from collections import namedtuple
+from typing import Callable, Optional
+
+from .util import run
+from libpq import PGconn
+
+
+class FileBackup(contextlib.AbstractContextManager):
+    """
+    A context manager which backs up a file's contents, restoring them on exit.
+    """
+
+    def __init__(self, file: pathlib.Path):
+        super().__init__()
+
+        self._file = file
+
+    def __enter__(self):
+        import shutil
+
+        with tempfile.NamedTemporaryFile(
+            prefix=self._file.name, dir=self._file.parent, delete=False
+        ) as f:
+            self._backup = pathlib.Path(f.name)
+
+        shutil.copyfile(self._file, self._backup)
+
+        return self
+
+    def __exit__(self, *exc):
+        import shutil
+
+        # Swap the backup and the original file, so that the modified contents
+        # can still be inspected in case of failure.
+        tmp = self._backup.parent / (self._backup.name + ".tmp")
+
+        shutil.copyfile(self._file, tmp)
+        shutil.copyfile(self._backup, self._file)
+        shutil.move(tmp, self._backup)
+
+
+class HBA(FileBackup):
+    """
+    Backs up a server's HBA configuration and provides means for temporarily
+    editing it.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "pg_hba.conf")
+
+    def prepend(self, *lines):
+        """
+        Temporarily prepends lines to the server's pg_hba.conf.
+
+        As sugar for aligning HBA columns in the tests, each line can be either
+        a string or a list of strings. List elements will be joined by single
+        spaces before they are written to file.
+        """
+        with open(self._file, "r") as f:
+            prior_data = f.read()
+
+        with open(self._file, "w") as f:
+            for line in lines:
+                if isinstance(line, list):
+                    print(*line, file=f)
+                else:
+                    print(line, file=f)
+
+            f.write(prior_data)
+
+
+class Config(FileBackup):
+    """
+    Backs up a server's postgresql.conf and provides means for temporarily
+    editing it.
+    """
+
+    def __init__(self, datadir: pathlib.Path):
+        super().__init__(datadir / "postgresql.conf")
+
+    def set(self, **gucs):
+        """
+        Temporarily appends GUC settings to the server's postgresql.conf.
+        """
+
+        with open(self._file, "a") as f:
+            print(file=f)
+
+            for n, v in gucs.items():
+                v = str(v)
+
+                # TODO: proper quoting
+                v = v.replace("\\", "\\\\")
+                v = v.replace("'", "\\'")
+                v = "'{}'".format(v)
+
+                print(n, "=", v, file=f)
+
+
+Backup = namedtuple("Backup", "conf, hba")
+
+
+class PostgresServer:
+    """
+    Represents a running PostgreSQL server instance with management utilities.
+    Provides methods for configuration, user/database creation, and server control.
+    """
+
+    def __init__(self, bindir, datadir, sockdir, winpassword, libpq_handle):
+        """
+        Initialize and start a PostgreSQL server instance.
+        """
+        self.datadir = datadir
+        self.sockdir = sockdir
+        self.libpq_handle = libpq_handle
+        self._remaining_timeout_fn: Optional[Callable[[], float]] = None
+        self._bindir = bindir
+        self._winpassword = winpassword
+        self._pg_ctl = os.path.join(bindir, "pg_ctl")
+        self._log = os.path.join(datadir, "postgresql.log")
+
+        initdb = os.path.join(bindir, "initdb")
+        pg_ctl = self._pg_ctl
+
+        # Lock down the HBA by default; tests can open it back up later.
+        if platform.system() == "Windows":
+            # On Windows, for admin connections, use SCRAM with a generated password
+            # over local sockets. This requires additional work during initdb.
+            method = "scram-sha-256"
+
+            # NamedTemporaryFile doesn't work very nicely on Windows until Python
+            # 3.12, which introduces NamedTemporaryFile(delete_on_close=False).
+            # Until then, specify delete=False and manually unlink after use.
+            with tempfile.NamedTemporaryFile("w", delete=False) as pwfile:
+                pwfile.write(winpassword)
+
+            run(initdb, "--auth=scram-sha-256", "--pwfile", pwfile.name, datadir)
+            os.unlink(pwfile.name)
+
+        else:
+            # For other OSes we can just use peer auth.
+            method = "peer"
+            run(pg_ctl, "-D", datadir, "init")
+
+        with open(datadir / "pg_hba.conf", "w") as f:
+            print(f"# default: local {method} connections only", file=f)
+            print(f"local all all {method}", file=f)
+
+        # Figure out a port to listen on. Attempt to reserve both IPv4 and IPv6
+        # addresses in one go.
+        #
+        # Note: socket.has_dualstack_ipv6/create_server are only in Python 3.8+.
+        if hasattr(socket, "has_dualstack_ipv6") and socket.has_dualstack_ipv6():
+            addr = ("::1", 0)
+            s = socket.create_server(addr, family=socket.AF_INET6, dualstack_ipv6=True)
+
+            hostaddr, port, _, _ = s.getsockname()
+            addrs = [hostaddr, "127.0.0.1"]
+
+        else:
+            addr = ("127.0.0.1", 0)
+
+            s = socket.socket()
+            s.bind(addr)
+
+            hostaddr, port = s.getsockname()
+            addrs = [hostaddr]
+
+        log = self._log
+
+        with s, open(os.path.join(datadir, "postgresql.conf"), "a") as f:
+            print(file=f)
+            print("unix_socket_directories = '{}'".format(sockdir.as_posix()), file=f)
+            print("listen_addresses = '{}'".format(",".join(addrs)), file=f)
+            print("port =", port, file=f)
+            print("log_connections = all", file=f)
+
+        # Between closing of the socket, s, and server start, we're racing against
+        # anything that wants to open up ephemeral ports, so try not to put any new
+        # work here.
+
+        run(pg_ctl, "-D", datadir, "-l", log, "start")
+
+        # Read the PID file to get the postmaster PID
+        with open(os.path.join(datadir, "postmaster.pid")) as f:
+            pid = int(f.readline().strip())
+
+        # Store the computed values
+        self.hostaddr = hostaddr
+        self.port = port
+        self.pid = pid
+
+        # ExitStack for cleanup callbacks
+        self._cleanup_stack = contextlib.ExitStack()
+
+    def psql(self, *args):
+        """Run psql with the given arguments."""
+        if platform.system() == "Windows":
+            pw = dict(PGPASSWORD=self._winpassword)
+        else:
+            pw = None
+        self._run(os.path.join(self._bindir, "psql"), "-w", *args, addenv=pw)
+
+    def pg_ctl(self, *args):
+        """Run pg_ctl with the given arguments."""
+        self._run(self._pg_ctl, "-l", self._log, *args)
+
+    def _run(self, cmd, *args, addenv: Optional[dict] = None):
+        """Run a command with PG* environment variables set."""
+        subenv = dict(os.environ)
+        subenv.update(
+            {
+                "PGHOST": str(self.sockdir),
+                "PGPORT": str(self.port),
+                "PGDATABASE": "postgres",
+                "PGDATA": str(self.datadir),
+            }
+        )
+        if addenv:
+            subenv.update(addenv)
+        run(cmd, *args, env=subenv)
+
+    def create_users(self, *userkeys: str):
+        """Create test users and register them for cleanup."""
+        usermap = {}
+        for u in userkeys:
+            name = u + "user"
+            usermap[u] = name
+            self.psql("-c", "CREATE USER " + name)
+            self._cleanup_stack.callback(self.psql, "-c", "DROP USER " + name)
+        return usermap
+
+    def create_dbs(self, *dbkeys: str):
+        """Create test databases and register them for cleanup."""
+        dbmap = {}
+        for d in dbkeys:
+            name = d + "db"
+            dbmap[d] = name
+            self.psql("-c", "CREATE DATABASE " + name)
+            self._cleanup_stack.callback(self.psql, "-c", "DROP DATABASE " + name)
+        return dbmap
+
+    @contextlib.contextmanager
+    def reloading(self):
+        """
+        Provides a context manager for making configuration changes.
+
+        If the context suite finishes successfully, the configuration will
+        be reloaded via pg_ctl. On teardown, the configuration changes will
+        be unwound, and the server will be signaled to reload again.
+
+        The context target contains the following attributes which can be
+        used to configure the server:
+        - .conf: modifies postgresql.conf
+        - .hba: modifies pg_hba.conf
+
+        For example:
+
+            with pg_server_session.reloading() as s:
+                s.conf.set(log_connections="on")
+                s.hba.prepend("local all all trust")
+        """
+        # Push a reload onto the stack before making any other
+        # unwindable changes. That way the order of operations will be
+        #
+        #  # test
+        #   - config change 1
+        #   - config change 2
+        #   - reload
+        #  # teardown
+        #   - undo config change 2
+        #   - undo config change 1
+        #   - reload
+        #
+        self._cleanup_stack.callback(self.pg_ctl, "reload")
+        yield self._backup_configuration()
+
+        # Now actually reload
+        self.pg_ctl("reload")
+
+    @contextlib.contextmanager
+    def restarting(self):
+        """Like .reloading(), but with a full server restart."""
+        self._cleanup_stack.callback(self.pg_ctl, "restart")
+        yield self._backup_configuration()
+        self.pg_ctl("restart")
+
+    def _backup_configuration(self):
+        # Wrap the existing HBA and configuration with FileBackups.
+        return Backup(
+            hba=self._cleanup_stack.enter_context(HBA(self.datadir)),
+            conf=self._cleanup_stack.enter_context(Config(self.datadir)),
+        )
+
+    @contextlib.contextmanager
+    def subcontext(self):
+        """
+        Create a new cleanup context for per-test isolation.
+
+        Temporarily replaces the cleanup stack so that any cleanup callbacks
+        registered within this context will be cleaned up when the context exits.
+        """
+        old_stack = self._cleanup_stack
+        self._cleanup_stack = contextlib.ExitStack()
+        try:
+            self._cleanup_stack.__enter__()
+            yield self
+        finally:
+            self._cleanup_stack.__exit__(None, None, None)
+            self._cleanup_stack = old_stack
+
+    def stop(self):
+        """
+        Stop the PostgreSQL server instance.
+
+        Ignores failures if the server is already stopped.
+        """
+        try:
+            run(self._pg_ctl, "-D", self.datadir, "-l", self._log, "stop")
+        except subprocess.CalledProcessError:
+            # Server may have already been stopped
+            pass
+
+    def cleanup(self):
+        """Run all registered cleanup callbacks."""
+        self._cleanup_stack.close()
+
+    def set_timeout(self, remaining_timeout_fn: Callable[[], float]) -> None:
+        """
+        Set the timeout function for connections.
+        This is typically called by pg fixture for each test.
+        """
+        self._remaining_timeout_fn = remaining_timeout_fn
+
+    def connect(self, **opts) -> PGconn:
+        """
+        Creates a connection to this PostgreSQL server instance.
+
+        This is a convenience method that automatically fills in the host, port,
+        and dbname (defaulting to 'postgres') for connecting to this server.
+
+        Args:
+            stack: ExitStack for managing connection cleanup (uses internal stack if not provided)
+            remaining_timeout_fn: Function that returns remaining timeout (uses stored timeout if not provided)
+            **opts: Additional connection options (can override defaults)
+
+        Returns:
+            PGconn: Connected database connection
+
+        Example:
+            conn = pg.connect()
+            conn = pg.connect(dbname='mydb')
+        """
+        from libpq import connect as libpq_connect
+
+        # Set default connection options for this server
+        defaults = {
+            "host": str(self.sockdir),
+            "port": self.port,
+            "dbname": "postgres",
+        }
+
+        # Merge with user-provided options (user options take precedence)
+        defaults.update(opts)
+
+        if self._remaining_timeout_fn is None:
+            raise RuntimeError(
+                "Timeout function not set. Use set_timeout() or pg fixture."
+            )
+
+        return libpq_connect(
+            self.libpq_handle,
+            self._cleanup_stack,
+            self._remaining_timeout_fn,
+            **defaults,
+        )
diff --git a/src/test/pytest/pypg/util.py b/src/test/pytest/pypg/util.py
new file mode 100644
index 00000000000..b2a1e627e4b
--- /dev/null
+++ b/src/test/pytest/pypg/util.py
@@ -0,0 +1,42 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+import shlex
+import subprocess
+import sys
+
+
+def eprint(*args, **kwargs):
+    """eprint prints to stderr"""
+    print(*args, file=sys.stderr, **kwargs)
+
+
+def run(*command, check=True, shell=None, silent=False, **kwargs):
+    """run runs the given command and prints it to stderr"""
+
+    if shell is None:
+        shell = len(command) == 1 and isinstance(command[0], str)
+
+    if shell:
+        command = command[0]
+    else:
+        command = list(map(str, command))
+
+    if not silent:
+        if shell:
+            eprint(f"+ {command}")
+        else:
+            # We could normally use shlex.join here, but it's not available in
+            # Python 3.6 which we still like to support
+            unsafe_string_cmd = " ".join(map(shlex.quote, command))
+            eprint(f"+ {unsafe_string_cmd}")
+
+    if silent:
+        kwargs.setdefault("stdout", subprocess.DEVNULL)
+
+    return subprocess.run(command, check=check, shell=shell, **kwargs)
+
+
+def capture(command, *args, stdout=subprocess.PIPE, encoding="utf-8", **kwargs):
+    return run(
+        command, *args, stdout=stdout, encoding=encoding, **kwargs
+    ).stdout.removesuffix("\n")
diff --git a/src/test/pytest/pyt/conftest.py b/src/test/pytest/pyt/conftest.py
index ecb72be26d7..641af0bbac5 100644
--- a/src/test/pytest/pyt/conftest.py
+++ b/src/test/pytest/pyt/conftest.py
@@ -1,3 +1,4 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
-from pg.fixtures import *
+
+from pypg.fixtures import *
diff --git a/src/test/pytest/pyt/test_libpq.py b/src/test/pytest/pyt/test_libpq.py
index 9f0857cc612..4fcf4056f41 100644
--- a/src/test/pytest/pyt/test_libpq.py
+++ b/src/test/pytest/pyt/test_libpq.py
@@ -9,6 +9,8 @@ from typing import Callable
 
 import pytest
 
+from libpq import connstr, LibpqError
+
 
 @pytest.mark.parametrize(
     "opts, expected",
@@ -22,15 +24,15 @@ import pytest
         (dict(keyword=" \\' "), r"keyword=' \\\' '"),
     ],
 )
-def test_connstr(libpq, opts, expected):
-    """Tests the escape behavior for libpq._connstr()."""
-    assert libpq._connstr(opts) == expected
+def test_connstr(opts, expected):
+    """Tests the escape behavior for connstr()."""
+    assert connstr(opts) == expected
 
 
-def test_must_connect_errors(libpq):
-    """Tests that must_connect() raises libpq.Error."""
-    with pytest.raises(libpq.Error, match="invalid connection option"):
-        libpq.must_connect(some_unknown_keyword="whatever")
+def test_must_connect_errors(connect):
+    """Tests that connect() raises LibpqError."""
+    with pytest.raises(LibpqError, match="invalid connection option"):
+        connect(some_unknown_keyword="whatever")
 
 
 @pytest.fixture
@@ -145,7 +147,7 @@ def local_server(tmp_path, remaining_timeout):
         yield s
 
 
-def test_connection_is_finished_on_error(libpq, local_server, remaining_timeout):
+def test_connection_is_finished_on_error(connect, local_server):
     """Tests that PQfinish() gets called at the end of testing."""
     expected_error = "something is wrong"
 
@@ -165,7 +167,6 @@ def test_connection_is_finished_on_error(libpq, local_server, remaining_timeout)
 
     local_server.background(serve_error)
 
-    with pytest.raises(libpq.Error, match=expected_error):
+    with pytest.raises(LibpqError, match=expected_error):
         # Exiting this context should result in PQfinish().
-        with libpq:
-            libpq.must_connect(host=local_server.host, port=local_server.port)
+        connect(host=local_server.host, port=local_server.port)
diff --git a/src/test/pytest/pyt/test_query_helpers.py b/src/test/pytest/pyt/test_query_helpers.py
new file mode 100644
index 00000000000..5a5a1ae1edf
--- /dev/null
+++ b/src/test/pytest/pyt/test_query_helpers.py
@@ -0,0 +1,286 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+"""
+Tests for query helper functions with type conversion and result simplification.
+"""
+
+import pytest
+
+
+def test_single_cell_int(conn):
+    """Single cell integer query returns just the value."""
+    result = conn.sql("SELECT 1")
+    assert result == 1
+    assert isinstance(result, int)
+
+
+def test_single_cell_string(conn):
+    """Single cell string query returns just the value."""
+    result = conn.sql("SELECT 'hello'")
+    assert result == "hello"
+    assert isinstance(result, str)
+
+
+def test_single_cell_bool(conn):
+    """Single cell boolean query returns just the value."""
+
+    result = conn.sql("SELECT true")
+    assert result is True
+    assert isinstance(result, bool)
+
+    result = conn.sql("SELECT false")
+    assert result is False
+
+
+def test_single_cell_float(conn):
+    """Single cell float query returns just the value."""
+
+    result = conn.sql("SELECT 3.14::float4")
+    assert isinstance(result, float)
+    assert abs(result - 3.14) < 0.01
+
+
+def test_single_cell_null(conn):
+    """Single cell NULL query returns None."""
+
+    result = conn.sql("SELECT NULL")
+    assert result is None
+
+
+def test_single_row_multiple_columns(conn):
+    """Single row with multiple columns returns a tuple."""
+
+    result = conn.sql("SELECT 1, 'hello', true")
+    assert result == (1, "hello", True)
+    assert isinstance(result, tuple)
+
+
+def test_single_column_multiple_rows(conn):
+    """Single column with multiple rows returns a list of values."""
+
+    result = conn.sql("SELECT * FROM generate_series(1, 3)")
+    assert result == [1, 2, 3]
+    assert isinstance(result, list)
+
+
+def test_multiple_rows_and_columns(conn):
+    """Multiple rows and columns returns list of tuples."""
+
+    result = conn.sql("SELECT * FROM (VALUES (1, 'a'), (2, 'b'), (3, 'c')) AS t")
+    assert result == [(1, "a"), (2, "b"), (3, "c")]
+    assert isinstance(result, list)
+    assert all(isinstance(row, tuple) for row in result)
+
+
+def test_empty_result(conn):
+    """Empty result set returns empty list."""
+
+    result = conn.sql("SELECT 1 WHERE false")
+    assert result == []
+
+
+def test_query_error_handling(conn):
+    """Query errors raise RuntimeError with actual error message."""
+
+    with pytest.raises(RuntimeError) as exc_info:
+        conn.sql("SELECT * FROM nonexistent_table")
+
+    error_msg = str(exc_info.value)
+    assert "nonexistent_table" in error_msg or "does not exist" in error_msg
+
+
+def test_division_by_zero_error(conn):
+    """Division by zero raises RuntimeError."""
+
+    with pytest.raises(RuntimeError) as exc_info:
+        conn.sql("SELECT 1/0")
+
+    error_msg = str(exc_info.value)
+    assert "division by zero" in error_msg.lower()
+
+
+def test_simple_exec_create_table(conn):
+    """sql for CREATE TABLE returns None."""
+
+    result = conn.sql("CREATE TEMP TABLE test_table (id int, name text)")
+    assert result is None
+
+    # Verify table was created
+    count = conn.sql("SELECT COUNT(*) FROM test_table")
+    assert count == 0
+
+
+def test_simple_exec_insert(conn):
+    """sql for INSERT returns None."""
+
+    conn.sql("CREATE TEMP TABLE test_table (id int, name text)")
+    result = conn.sql("INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob')")
+    assert result is None
+
+    # Verify data was inserted
+    count = conn.sql("SELECT COUNT(*) FROM test_table")
+    assert count == 2
+
+
+def test_type_conversion_mixed(conn):
+    """Test mixed type conversion in a single row."""
+
+    result = conn.sql(
+        "SELECT 42::int4, 123::int8, 3.14::float8, 'text', true, NULL"
+    )
+    assert result == (42, 123, 3.14, "text", True, None)
+    assert isinstance(result[0], int)
+    assert isinstance(result[1], int)
+    assert isinstance(result[2], float)
+    assert isinstance(result[3], str)
+    assert isinstance(result[4], bool)
+    assert result[5] is None
+
+
+def test_multiple_queries_same_connection(conn):
+    """Test running multiple queries on the same connection."""
+
+    result1 = conn.sql("SELECT 1")
+    assert result1 == 1
+
+    result2 = conn.sql("SELECT 'hello', 'world'")
+    assert result2 == ("hello", "world")
+
+    result3 = conn.sql("SELECT * FROM generate_series(1, 5)")
+    assert result3 == [1, 2, 3, 4, 5]
+
+
+def test_date_type(conn):
+    """Test date type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '2025-10-20'::date")
+    assert result == datetime.date(2025, 10, 20)
+    assert isinstance(result, datetime.date)
+
+
+def test_timestamp_type(conn):
+    """Test timestamp type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '2025-10-20 15:30:45'::timestamp")
+    assert result == datetime.datetime(2025, 10, 20, 15, 30, 45)
+    assert isinstance(result, datetime.datetime)
+
+
+def test_time_type(conn):
+    """Test time type conversion."""
+    import datetime
+
+    result = conn.sql("SELECT '15:30:45'::time")
+    assert result == datetime.time(15, 30, 45)
+    assert isinstance(result, datetime.time)
+
+
+def test_numeric_type(conn):
+    """Test numeric/decimal type conversion."""
+    import decimal
+
+    result = conn.sql("SELECT 123.456::numeric")
+    assert result == decimal.Decimal("123.456")
+    assert isinstance(result, decimal.Decimal)
+
+
+def test_int_array(conn):
+    """Test integer array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[1, 2, 3, 4, 5]")
+    assert result == [1, 2, 3, 4, 5]
+    assert isinstance(result, list)
+    assert all(isinstance(x, int) for x in result)
+
+
+def test_text_array(conn):
+    """Test text array type conversion."""
+
+    result = conn.sql("SELECT ARRAY['hello', 'world', 'test']")
+    assert result == ["hello", "world", "test"]
+    assert isinstance(result, list)
+    assert all(isinstance(x, str) for x in result)
+
+
+def test_bool_array(conn):
+    """Test boolean array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[true, false, true]")
+    assert result == [True, False, True]
+    assert isinstance(result, list)
+    assert all(isinstance(x, bool) for x in result)
+
+
+def test_empty_array(conn):
+    """Test empty array type conversion."""
+
+    result = conn.sql("SELECT ARRAY[]::int[]")
+    assert result == []
+    assert isinstance(result, list)
+
+
+def test_json_type(conn):
+    """Test JSON type (parsed to dict)."""
+
+    result = conn.sql('SELECT \'{"key": "value"}\'::json')
+    assert isinstance(result, dict)
+    assert result == {"key": "value"}
+
+
+def test_jsonb_type(conn):
+    """Test JSONB type (parsed to dict)."""
+
+    result = conn.sql('SELECT \'{"name": "test", "count": 42}\'::jsonb')
+    assert isinstance(result, dict)
+    assert result == {"name": "test", "count": 42}
+
+
+def test_json_array(conn):
+    """Test JSON array type."""
+
+    result = conn.sql("SELECT '[1, 2, 3, 4, 5]'::json")
+    assert isinstance(result, list)
+    assert result == [1, 2, 3, 4, 5]
+
+
+def test_json_nested(conn):
+    """Test nested JSON object."""
+
+    result = conn.sql(
+        'SELECT \'{"user": {"id": 1, "name": "Alice"}, "active": true}\'::json'
+    )
+    assert isinstance(result, dict)
+    assert result == {"user": {"id": 1, "name": "Alice"}, "active": True}
+
+
+def test_mixed_types_with_arrays(conn):
+    """Test mixed types including arrays in a single row."""
+
+    result = conn.sql("SELECT 42, 'text', ARRAY[1, 2, 3], true")
+    assert result == (42, "text", [1, 2, 3], True)
+    assert isinstance(result[0], int)
+    assert isinstance(result[1], str)
+    assert isinstance(result[2], list)
+    assert isinstance(result[3], bool)
+
+
+def test_uuid_type(conn):
+    """Test UUID type conversion."""
+    import uuid
+
+    test_uuid = "550e8400-e29b-41d4-a716-446655440000"
+    result = conn.sql(f"SELECT '{test_uuid}'::uuid")
+    assert result == uuid.UUID(test_uuid)
+    assert isinstance(result, uuid.UUID)
+
+
+def test_uuid_generation(conn):
+    """Test generated UUID type conversion."""
+    import uuid
+
+    result = conn.sql("SELECT uuidv4()")
+    assert isinstance(result, uuid.UUID)
+    # Check it's a valid UUID by ensuring it can be converted to string
+    assert len(str(result)) == 36  # UUID string format length
diff --git a/src/test/ssl/pyt/conftest.py b/src/test/ssl/pyt/conftest.py
index 85d2c994828..6e8699e0971 100644
--- a/src/test/ssl/pyt/conftest.py
+++ b/src/test/ssl/pyt/conftest.py
@@ -1,19 +1,14 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
 import datetime
-import os
-import pathlib
-import platform
-import secrets
-import socket
+import re
 import subprocess
 import tempfile
 from collections import namedtuple
 
 import pytest
 
-import pg
-from pg.fixtures import *
+from pypg.fixtures import *
 
 
 @pytest.fixture(scope="session")
@@ -135,108 +130,51 @@ def certs(cryptography, tmp_path_factory):
     return _Certs()
 
 
[email protected](scope="session")
-def datadir(tmp_path_factory):
[email protected](scope="module", autouse=True)
+def ssl_setup(pg_server_module, certs, datadir):
     """
-    Returns the directory name to use as the server data directory. If
-    TESTDATADIR is provided, that will be used; otherwise a new temporary
-    directory is created in the pytest temp root.
+    Sets up required server settings for all tests in this module.
     """
-    d = os.getenv("TESTDATADIR")
-    if d:
-        d = pathlib.Path(d)
-    else:
-        d = tmp_path_factory.mktemp("tmp_check")
+    try:
+        with pg_server_module.restarting() as s:
+            s.conf.set(
+                ssl="on",
+                ssl_ca_file=certs.ca.certpath,
+                ssl_cert_file=certs.server.certpath,
+                ssl_key_file=certs.server.keypath,
+            )
 
-    return d
+            # Reject by default.
+            s.hba.prepend("hostssl all all all reject")
 
+    except subprocess.CalledProcessError:
+        # This is a decent place to skip if the server isn't set up for SSL.
+        logpath = datadir / "postgresql.log"
+        unsupported = re.compile("SSL is not supported")
 
[email protected](scope="session")
-def sockdir(tmp_path_factory):
-    """
-    Returns the directory name to use as the server's unix_socket_directories
-    setting. Local client connections use this as the PGHOST.
+        with open(logpath, "r") as log:
+            for line in log:
+                if unsupported.search(line):
+                    pytest.skip("the server does not support SSL")
 
-    At the moment, this is always put under the pytest temp root.
-    """
-    return tmp_path_factory.mktemp("sockfiles")
+        # Some other error happened.
+        raise
 
+    users = pg_server_module.create_users("ssl")
+    dbs = pg_server_module.create_dbs("ssl")
 
[email protected](scope="session")
-def winpassword():
-    """The per-session SCRAM password for the server admin on Windows."""
-    return secrets.token_urlsafe(16)
+    return (users, dbs)
 
 
[email protected](scope="session")
-def server_instance(certs, datadir, sockdir, winpassword):
[email protected](scope="module")
+def client_cert(ssl_setup, certs):
     """
-    Starts a running Postgres server listening on localhost. The HBA initially
-    allows only local UNIX connections from the same user.
-
-    TODO: when installcheck is supported, this should optionally point to the
-    currently running server instead.
+    Creates a Cert for the "ssl" user.
     """
+    from cryptography import x509
+    from cryptography.x509.oid import NameOID
+
+    users, _ = ssl_setup
+    user = users["ssl"]
 
-    # Lock down the HBA by default; tests can open it back up later.
-    if platform.system() == "Windows":
-        # On Windows, for admin connections, use SCRAM with a generated password
-        # over local sockets. This requires additional work during initdb.
-        method = "scram-sha-256"
-
-        # NamedTemporaryFile doesn't work very nicely on Windows until Python
-        # 3.12, which introduces NamedTemporaryFile(delete_on_close=False).
-        # Until then, specify delete=False and manually unlink after use.
-        with tempfile.NamedTemporaryFile("w", delete=False) as pwfile:
-            pwfile.write(winpassword)
-
-        subprocess.check_call(
-            ["initdb", "--auth=scram-sha-256", "--pwfile", pwfile.name, datadir]
-        )
-        os.unlink(pwfile.name)
-
-    else:
-        # For other OSes we can just use peer auth.
-        method = "peer"
-        subprocess.check_call(["pg_ctl", "-D", datadir, "init"])
-
-    with open(datadir / "pg_hba.conf", "w") as f:
-        print(f"# default: local {method} connections only", file=f)
-        print(f"local all all {method}", file=f)
-
-    # Figure out a port to listen on. Attempt to reserve both IPv4 and IPv6
-    # addresses in one go.
-    #
-    # Note: socket.has_dualstack_ipv6/create_server are only in Python 3.8+.
-    if hasattr(socket, "has_dualstack_ipv6") and socket.has_dualstack_ipv6():
-        addr = ("::1", 0)
-        s = socket.create_server(addr, family=socket.AF_INET6, dualstack_ipv6=True)
-
-        hostaddr, port, _, _ = s.getsockname()
-        addrs = [hostaddr, "127.0.0.1"]
-
-    else:
-        addr = ("127.0.0.1", 0)
-
-        s = socket.socket()
-        s.bind(addr)
-
-        hostaddr, port = s.getsockname()
-        addrs = [hostaddr]
-
-    log = os.path.join(datadir, "postgresql.log")
-
-    with s, open(os.path.join(datadir, "postgresql.conf"), "a") as f:
-        print(file=f)
-        print("unix_socket_directories = '{}'".format(sockdir.as_posix()), file=f)
-        print("listen_addresses = '{}'".format(",".join(addrs)), file=f)
-        print("port =", port, file=f)
-        print("log_connections = all", file=f)
-
-    # Between closing of the socket, s, and server start, we're racing against
-    # anything that wants to open up ephemeral ports, so try not to put any new
-    # work here.
-
-    subprocess.check_call(["pg_ctl", "-D", datadir, "-l", log, "start"])
-    yield (hostaddr, port)
-    subprocess.check_call(["pg_ctl", "-D", datadir, "-l", log, "stop"])
+    return certs.new(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, user)]))
diff --git a/src/test/ssl/pyt/test_client.py b/src/test/ssl/pyt/test_client.py
index 28110ae0717..247681f93cb 100644
--- a/src/test/ssl/pyt/test_client.py
+++ b/src/test/ssl/pyt/test_client.py
@@ -10,10 +10,11 @@ from typing import Callable
 
 import pytest
 
-import pg
+import pypg
+from libpq import LibpqError, ExecStatus
 
 # This suite opens up local TCP ports and is hidden behind PG_TEST_EXTRA=ssl.
-pytestmark = pg.require_test_extra("ssl")
+pytestmark = pypg.require_test_extra("ssl")
 
 
 @pytest.fixture(scope="session", autouse=True)
@@ -192,7 +193,7 @@ def ssl_server(tcp_server_class, certs):
 
 
 @pytest.mark.parametrize("sslmode", ("require", "verify-ca", "verify-full"))
-def test_server_with_ssl_disabled(libpq, tcp_server, certs, sslmode):
+def test_server_with_ssl_disabled(connect, tcp_server, certs, sslmode):
     """
     Make sure client refuses to talk to non-SSL servers with stricter
     sslmodes.
@@ -214,16 +215,15 @@ def test_server_with_ssl_disabled(libpq, tcp_server, certs, sslmode):
 
     tcp_server.background(refuse_ssl)
 
-    with pytest.raises(libpq.Error, match="server does not support SSL"):
-        with libpq:  # XXX tests shouldn't need to do this
-            libpq.must_connect(
-                **tcp_server.conninfo,
-                sslrootcert=certs.ca.certpath,
-                sslmode=sslmode,
-            )
+    with pytest.raises(LibpqError, match="server does not support SSL"):
+        connect(
+            **tcp_server.conninfo,
+            sslrootcert=certs.ca.certpath,
+            sslmode=sslmode,
+        )
 
 
-def test_verify_full_connection(libpq, ssl_server, certs):
+def test_verify_full_connection(connect, ssl_server, certs):
     """Completes a verify-full connection and empty query."""
 
     def handle_empty_query(s: ssl.SSLSocket):
@@ -269,10 +269,10 @@ def test_verify_full_connection(libpq, ssl_server, certs):
 
     ssl_server.background_ssl(handle_empty_query)
 
-    conn = libpq.must_connect(
+    conn = connect(
         **ssl_server.conninfo,
         sslrootcert=certs.ca.certpath,
         sslmode="verify-full",
     )
     with conn:
-        assert conn.exec("").status() == libpq.PGRES_EMPTY_QUERY
+        assert conn.exec("").status() == ExecStatus.PGRES_EMPTY_QUERY
diff --git a/src/test/ssl/pyt/test_server.py b/src/test/ssl/pyt/test_server.py
index 2d0be735371..60628d0c067 100644
--- a/src/test/ssl/pyt/test_server.py
+++ b/src/test/ssl/pyt/test_server.py
@@ -1,25 +1,16 @@
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
-import contextlib
-import os
-import pathlib
-import platform
 import re
-import shutil
 import socket
 import ssl
 import struct
-import subprocess
-import tempfile
-from collections import namedtuple
-from typing import Dict, List, Union
 
 import pytest
 
-import pg
+import pypg
 
 # This suite opens up local TCP ports and is hidden behind PG_TEST_EXTRA=ssl.
-pytestmark = pg.require_test_extra("ssl")
+pytestmark = pypg.require_test_extra("ssl")
 
 
 #
@@ -27,363 +18,6 @@ pytestmark = pg.require_test_extra("ssl")
 #
 
 
[email protected](scope="session")
-def connenv(server_instance, sockdir, datadir):
-    """
-    Provides the values for several PG* environment variables needed for our
-    utility programs to connect to the server_instance.
-    """
-    return {
-        "PGHOST": str(sockdir),
-        "PGPORT": str(server_instance[1]),
-        "PGDATABASE": "postgres",
-        "PGDATA": str(datadir),
-    }
-
-
-class FileBackup(contextlib.AbstractContextManager):
-    """
-    A context manager which backs up a file's contents, restoring them on exit.
-    """
-
-    def __init__(self, file: pathlib.Path):
-        super().__init__()
-
-        self._file = file
-
-    def __enter__(self):
-        with tempfile.NamedTemporaryFile(
-            prefix=self._file.name, dir=self._file.parent, delete=False
-        ) as f:
-            self._backup = pathlib.Path(f.name)
-
-        shutil.copyfile(self._file, self._backup)
-
-        return self
-
-    def __exit__(self, *exc):
-        # Swap the backup and the original file, so that the modified contents
-        # can still be inspected in case of failure.
-        #
-        # TODO: this is less helpful if there are multiple layers, because it's
-        # not clear which backup to look at. Can the backup name be printed as
-        # part of the failed test output? Should we only swap on test failure?
-        tmp = self._backup.parent / (self._backup.name + ".tmp")
-
-        shutil.copyfile(self._file, tmp)
-        shutil.copyfile(self._backup, self._file)
-        shutil.move(tmp, self._backup)
-
-
-class HBA(FileBackup):
-    """
-    Backs up a server's HBA configuration and provides means for temporarily
-    editing it. See also pg_server, which provides an instance of this class and
-    context managers for enforcing the reload/restart order of operations.
-    """
-
-    def __init__(self, datadir: pathlib.Path):
-        super().__init__(datadir / "pg_hba.conf")
-
-    def prepend(self, *lines: Union[str, List[str]]):
-        """
-        Temporarily prepends lines to the server's pg_hba.conf.
-
-        As sugar for aligning HBA columns in the tests, each line can be either
-        a string or a list of strings. List elements will be joined by single
-        spaces before they are written to file.
-        """
-        with open(self._file, "r") as f:
-            prior_data = f.read()
-
-        with open(self._file, "w") as f:
-            for l in lines:
-                if isinstance(l, list):
-                    print(*l, file=f)
-                else:
-                    print(l, file=f)
-
-            f.write(prior_data)
-
-
-class Config(FileBackup):
-    """
-    Backs up a server's postgresql.conf and provides means for temporarily
-    editing it. See also pg_server, which provides an instance of this class and
-    context managers for enforcing the reload/restart order of operations.
-    """
-
-    def __init__(self, datadir: pathlib.Path):
-        super().__init__(datadir / "postgresql.conf")
-
-    def set(self, **gucs):
-        """
-        Temporarily appends GUC settings to the server's postgresql.conf.
-        """
-
-        with open(self._file, "a") as f:
-            print(file=f)
-
-            for n, v in gucs.items():
-                v = str(v)
-
-                # TODO: proper quoting
-                v = v.replace("\\", "\\\\")
-                v = v.replace("'", "\\'")
-                v = "'{}'".format(v)
-
-                print(n, "=", v, file=f)
-
-
[email protected](scope="session")
-def pg_server_session(server_instance, connenv, datadir, winpassword):
-    """
-    Provides common routines for configuring and connecting to the
-    server_instance. For example:
-
-        users = pg_server_session.create_users("one", "two")
-        dbs = pg_server_session.create_dbs("default")
-
-        with pg_server_session.reloading() as s:
-            s.hba.prepend(["local", dbs["default"], users["two"], "peer"])
-
-        conn = connect_somehow(**pg_server_session.conninfo)
-        ...
-
-    Attributes of note are
-    - .conninfo: provides TCP connection info for the server
-
-    This fixture unwinds its configuration changes at the end of the pytest
-    session. For more granular changes, pg_server_session.subcontext() splits
-    off a "nested" context to allow smaller scopes.
-    """
-
-    class _Server(contextlib.ExitStack):
-        conninfo = dict(
-            hostaddr=server_instance[0],
-            port=server_instance[1],
-        )
-
-        # for _backup_configuration()
-        _Backup = namedtuple("Backup", "conf, hba")
-
-        def subcontext(self):
-            """
-            Creates a new server stack instance that can be tied to a smaller
-            scope than "session".
-            """
-            # So far, there doesn't seem to be a need to link the two objects,
-            # since HBA/Config/FileBackup operate directly on the filesystem and
-            # will appear to "nest" naturally.
-            return self.__class__()
-
-        def create_users(self, *userkeys: str) -> Dict[str, str]:
-            """
-            Creates new users which will be dropped at the end of the server
-            context.
-
-            For each provided key, a related user name will be selected and
-            stored in a map. This map is returned to let calling code look up
-            the selected usernames (instead of hardcoding them and potentially
-            stomping on an existing installation).
-            """
-            usermap = {}
-
-            for u in userkeys:
-                # TODO: use a uniquifier to support installcheck
-                name = u + "user"
-                usermap[u] = name
-
-                # TODO: proper escaping
-                self.psql("-c", "CREATE USER " + name)
-                self.callback(self.psql, "-c", "DROP USER " + name)
-
-            return usermap
-
-        def create_dbs(self, *dbkeys: str) -> Dict[str, str]:
-            """
-            Creates new databases which will be dropped at the end of the server
-            context. See create_users() for the meaning of the keys and returned
-            map.
-            """
-            dbmap = {}
-
-            for d in dbkeys:
-                # TODO: use a uniquifier to support installcheck
-                name = d + "db"
-                dbmap[d] = name
-
-                # TODO: proper escaping
-                self.psql("-c", "CREATE DATABASE " + name)
-                self.callback(self.psql, "-c", "DROP DATABASE " + name)
-
-            return dbmap
-
-        @contextlib.contextmanager
-        def reloading(self):
-            """
-            Provides a context manager for making configuration changes.
-
-            If the context suite finishes successfully, the configuration will
-            be reloaded via pg_ctl. On teardown, the configuration changes will
-            be unwound, and the server will be signaled to reload again.
-
-            The context target contains the following attributes which can be
-            used to configure the server:
-            - .conf: modifies postgresql.conf
-            - .hba: modifies pg_hba.conf
-
-            For example:
-
-                with pg_server_session.reloading() as s:
-                    s.conf.set(log_connections="on")
-                    s.hba.prepend("local all all trust")
-            """
-            try:
-                # Push a reload onto the stack before making any other
-                # unwindable changes. That way the order of operations will be
-                #
-                #  # test
-                #   - config change 1
-                #   - config change 2
-                #   - reload
-                #  # teardown
-                #   - undo config change 2
-                #   - undo config change 1
-                #   - reload
-                #
-                self.callback(self.pg_ctl, "reload")
-                yield self._backup_configuration()
-            except:
-                # We only want to reload at the end of the suite if there were
-                # no errors. During exceptions, the pushed callback handles
-                # things instead, so there's nothing to do here.
-                raise
-            else:
-                # Suite completed successfully.
-                self.pg_ctl("reload")
-
-        @contextlib.contextmanager
-        def restarting(self):
-            """Like .reloading(), but with a full server restart."""
-            try:
-                self.callback(self.pg_ctl, "restart")
-                yield self._backup_configuration()
-            except:
-                raise
-            else:
-                self.pg_ctl("restart")
-
-        def psql(self, *args):
-            """
-            Runs psql with the given arguments. Password prompts are always
-            disabled. On Windows, the admin password will be included in the
-            environment.
-            """
-            if platform.system() == "Windows":
-                pw = dict(PGPASSWORD=winpassword)
-            else:
-                pw = None
-
-            self._run("psql", "-w", *args, addenv=pw)
-
-        def pg_ctl(self, *args):
-            """
-            Runs pg_ctl with the given arguments. Log output will be placed in
-            postgresql.log in the server's data directory.
-
-            TODO: put the log in TESTLOGDIR
-            """
-            self._run("pg_ctl", "-l", str(datadir / "postgresql.log"), *args)
-
-        def _run(self, cmd, *args, addenv: dict = None):
-            # Override the existing environment with the connenv values and
-            # anything the caller wanted to add. (Python 3.9 gives us the
-            # less-ugly `os.environ | connenv` merge operator.)
-            subenv = dict(os.environ, **connenv)
-            if addenv:
-                subenv.update(addenv)
-
-            subprocess.check_call([cmd, *args], env=subenv)
-
-        def _backup_configuration(self):
-            # Wrap the existing HBA and configuration with FileBackups.
-            return self._Backup(
-                hba=self.enter_context(HBA(datadir)),
-                conf=self.enter_context(Config(datadir)),
-            )
-
-    with _Server() as s:
-        yield s
-
-
[email protected](scope="module", autouse=True)
-def ssl_setup(pg_server_session, certs, datadir):
-    """
-    Sets up required server settings for all tests in this module. The fixture
-    variable is a tuple (users, dbs) containing the user and database names that
-    have been chosen for the test session.
-    """
-    try:
-        with pg_server_session.restarting() as s:
-            s.conf.set(
-                ssl="on",
-                ssl_ca_file=certs.ca.certpath,
-                ssl_cert_file=certs.server.certpath,
-                ssl_key_file=certs.server.keypath,
-            )
-
-            # Reject by default.
-            s.hba.prepend("hostssl all all all reject")
-
-    except subprocess.CalledProcessError:
-        # This is a decent place to skip if the server isn't set up for SSL.
-        logpath = datadir / "postgresql.log"
-        unsupported = re.compile("SSL is not supported")
-
-        with open(logpath, "r") as log:
-            for line in log:
-                if unsupported.search(line):
-                    pytest.skip("the server does not support SSL")
-
-        # Some other error happened.
-        raise
-
-    users = pg_server_session.create_users(
-        "ssl",
-    )
-
-    dbs = pg_server_session.create_dbs(
-        "ssl",
-    )
-
-    return (users, dbs)
-
-
[email protected](scope="module")
-def client_cert(ssl_setup, certs):
-    """
-    Creates a Cert for the "ssl" user.
-    """
-    from cryptography import x509
-    from cryptography.x509.oid import NameOID
-
-    users, _ = ssl_setup
-    user = users["ssl"]
-
-    return certs.new(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, user)]))
-
-
[email protected]
-def pg_server(pg_server_session):
-    """
-    A per-test instance of pg_server_session. Use this fixture to make changes
-    to the server which will be rolled back at the end of every test.
-    """
-    with pg_server_session.subcontext() as s:
-        yield s
-
-
 #
 # Tests
 #
@@ -394,8 +28,8 @@ CLIENT = "client"
 SERVER = "server"
 
 
+# fmt: off
 @pytest.mark.parametrize(
-    # fmt: off
     "auth_method,                    creds,  expected_error",
 [
     # Trust allows anything.
@@ -416,10 +50,10 @@ SERVER = "server"
     ("cert",                         CLIENT, None),
     ("cert",                         SERVER, "authentication failed for user"),
 ],
-    # fmt: on
 )
+# fmt: on
 def test_direct_ssl_certificate_authentication(
-    pg_server,
+    pg,
     ssl_setup,
     certs,
     client_cert,
@@ -440,7 +74,7 @@ def test_direct_ssl_certificate_authentication(
     user = users["ssl"]
     db = dbs["ssl"]
 
-    with pg_server.reloading() as s:
+    with pg.reloading() as s:
         s.hba.prepend(
             ["hostssl", db, user, "127.0.0.1/32", auth_method],
             ["hostssl", db, user, "::1/128", auth_method],
@@ -461,7 +95,7 @@ def test_direct_ssl_certificate_authentication(
 
     # Make a direct SSL connection. There's no SSLRequest in the handshake; we
     # simply wrap a TCP connection with OpenSSL.
-    addr = (pg_server.conninfo["hostaddr"], pg_server.conninfo["port"])
+    addr = (pg.hostaddr, pg.port)
     with socket.create_connection(addr) as s:
         s.settimeout(remaining_timeout())  # XXX this resets every operation
 
-- 
2.51.1



view thread (79+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: Re: RFC: adding pytest as a supported test framework
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox