From 54097f43d61d34071748ec5cfc0ed49469fdf149 Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Tue, 8 Jul 2025 11:19:29 +0900 Subject: [PATCH v1] Feature: implement NegotiateProtocolVersion message. Implementing the message is necessary when frontend requests the protocol version 3.2 (i.e. PostgreSQL 18+ or compatible clients), while backend still only supports 3.0 (i.e. backend is PostgreSQL 17 or before). This commit handles the message so that the message is forwarded from backend to frontend when there's no connection cache exists. If connection cache exists, pgpool sends the message, which has been saved at the time when the connection cache was created, to frontend. Note that the frontend/backend protocol 3.2 changes the BackendKeyData message format, but it's not implemented in this commit yet. This means that still pgpool cannot handle 3.2 protocol. --- src/auth/pool_auth.c | 116 +++++++++++++++++++++++++++- src/include/pool.h | 9 +++ src/protocol/pool_connection_pool.c | 2 + 3 files changed, 123 insertions(+), 4 deletions(-) diff --git a/src/auth/pool_auth.c b/src/auth/pool_auth.c index be9f33434..f0a376c27 100644 --- a/src/auth/pool_auth.c +++ b/src/auth/pool_auth.c @@ -79,6 +79,7 @@ static void authenticate_frontend_SCRAM(POOL_CONNECTION * backend, POOL_CONNECTI static void authenticate_frontend_clear_text(POOL_CONNECTION * frontend); static bool get_auth_password(POOL_CONNECTION * backend, POOL_CONNECTION * frontend, int reauth, char **password, PasswordType *passwordType); +static void ProcessNegotiateProtocol(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp); /* * Do authentication. Assuming the only caller is @@ -342,6 +343,7 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) protoMajor = MAIN_CONNECTION(cp)->sp->major; +read_kind: kind = pool_read_kind(cp); if (kind < 0) ereport(ERROR, @@ -365,6 +367,12 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) errdetail("backend response with kind \'E\' when expecting \'R\'"), errhint("This issue can be caused by version mismatch (current version %d)", protoMajor))); } + else if (kind == 'v') + { + /* NegotiateProtocolVersion received */ + ProcessNegotiateProtocol(frontend, cp); + goto read_kind; + } else if (kind != 'R') ereport(ERROR, (errmsg("backend authentication failed"), @@ -597,8 +605,11 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) } - send_auth_ok(frontend, protoMajor); - authkind = 0; + if (kind == 'R') + { + send_auth_ok(frontend, protoMajor); + authkind = 0; + } } else @@ -756,7 +767,16 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) CONNECTION_SLOT(cp, i)->key = cp->info[i].key = key; cp->info[i].major = sp->major; - cp->info[i].minor = sp->minor; + + /* + * If NegotiateProtocol message has been received, set the minor + * version. Othewise use the version in the StartupMessage. + */ + if (CONNECTION_SLOT(cp, i)->negotiated_minor >= 0) + cp->info[i].minor = CONNECTION_SLOT(cp, i)->negotiated_minor; + else + cp->info[i].minor = sp->minor; + strlcpy(cp->info[i].database, sp->database, sizeof(cp->info[i].database)); strlcpy(cp->info[i].user, sp->user, sizeof(cp->info[i].user)); cp->info[i].counter = 1; @@ -779,16 +799,31 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) } /* -* do re-authentication for reused connection. if success return 0 otherwise throws ereport. +* do re-authentication for reused connection. if success return 0 otherwise +* throws ereport. */ int pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) { int protoMajor; int msglen; + POOL_CONNECTION_POOL_SLOT *sp; protoMajor = MAJOR(cp); + /* + * If NegotiateProtocolMsg has been received from backend, forward it to + * frontend. If the frontend dislike it, it will disconnect the + * connection. Otherwise it will silently continue. + */ + sp = CONNECTION_SLOT(cp, MAIN_NODE_ID); + if (protoMajor == PROTO_MAJOR_V3 && sp->negotiateProtocolMsg) + { + elog(DEBUG1, "negotiateProtocol message is forwarded to frontend at reauth"); + pool_write_and_flush(frontend, sp->negotiateProtocolMsg, + sp->nplen); + } + /* * if hba is enabled we would already have passed authentication */ @@ -822,6 +857,9 @@ pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) } } + /* + * Send auth ok + */ pool_write(frontend, "R", 1); if (protoMajor == PROTO_MAJOR_V3) @@ -832,7 +870,10 @@ pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) msglen = htonl(0); pool_write_and_flush(frontend, &msglen, sizeof(msglen)); + + /* send BackendKeyData */ pool_send_backend_key_data(frontend, MAIN_CONNECTION(cp)->pid, MAIN_CONNECTION(cp)->key, protoMajor); + return 0; } @@ -2074,3 +2115,70 @@ pg_SASL_continue(POOL_CONNECTION * backend, char *payload, int payloadlen, void return 0; } + +/* + * Forward NegotiateProtocol message to frontend. + * + * When this function is called, message kind has been already read. + */ +static void +ProcessNegotiateProtocol(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *cp) +{ + int32 len; + int32 savelen; + int32 protoMajor; + int32 protoMinor; + int32 protov; + bool forwardMsg = false; + int i; + + elog(DEBUG1, "Forwarding NegotiateProtocol message to frontend"); + pool_write(frontend, "v", 1); /* forward message kind */ + savelen = len = pool_read_int(cp); /* message length including self */ + pool_write(frontend, &len, 4); /* forward message length */ + len = ntohl(len) - 4; /* length of rest of the message */ + protov = pool_read_int(cp); /* read protocol version */ + protoMajor = ntohl(protov) >> 16; /* protocol major version */ + protoMinor = ntohl(protov) & 0x0000ffff; /* protocol minor version */ + pool_write(frontend, &protov, 4); /* forward protocol version */ + elog(DEBUG1, "protocol verion offered: major: %d minor: %d", protoMajor, protoMinor); + len -= 4; + for (i = 0; i < NUM_BACKENDS; i++) + { + if (VALID_BACKEND(i)) + { + POOL_CONNECTION_POOL_SLOT *sp; + char *p; + char *np; + Size nplen; + + p = pool_read2(CONNECTION(cp, i), len); + if (!forwardMsg) + { + pool_write_and_flush(frontend, p, len); /* forward rest of message */ + forwardMsg = true; + } + /* save negatiate protocol version */ + sp = CONNECTION_SLOT(cp, i); + sp->negotiated_major = protoMajor; + sp->negotiated_minor = protoMinor; + + /* save negatiate protocol message */ + nplen = 1 + /* message kind */ + sizeof(savelen) + /* message length */ + sizeof(protov) + /* protocol version */ + len; /* rest of message */ + /* allocate message area */ + sp->negotiateProtocolMsg = MemoryContextAlloc(TopMemoryContext, nplen); + np = sp->negotiateProtocolMsg; + sp->nplen = nplen; /* set message length */ + + *np++ = 'v'; + memcpy(np, &savelen, sizeof(savelen)); + np += sizeof(savelen); + memcpy(np, &protov, sizeof(protov)); + np += sizeof(protov); + memcpy(np, p, len); + } + } +} diff --git a/src/include/pool.h b/src/include/pool.h index c9b4dc27e..cf8aff073 100644 --- a/src/include/pool.h +++ b/src/include/pool.h @@ -262,6 +262,15 @@ typedef struct time_t closetime; /* absolute time in second when the connection * closed if 0, that means the connection is * under use. */ + /* + * Protocol version after negotiation. Negative value means no negotiation + * has been done. + */ + int negotiated_major; + int negotiated_minor; + char *negotiateProtocolMsg; /* Raw NegotiateProtocol messag */ + int32 nplen; /* message length of NegotiateProtocol messag */ + } POOL_CONNECTION_POOL_SLOT; typedef struct diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c index 225294a1b..00046e687 100644 --- a/src/protocol/pool_connection_pool.c +++ b/src/protocol/pool_connection_pool.c @@ -235,6 +235,8 @@ pool_discard_cp(char *user, char *database, int protoMajor) } CONNECTION_SLOT(p, i)->sp = NULL; pool_close(CONNECTION(p, i)); + if (CONNECTION_SLOT(p, i)->negotiateProtocolMsg) + pfree(CONNECTION_SLOT(p, i)->negotiateProtocolMsg); pfree(CONNECTION_SLOT(p, i)); } -- 2.25.1