From 775e63acb07d6a986451032a7f036ff18401f699 Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Sun, 5 Apr 2026 04:42:32 +0300 Subject: [PATCH v18 4/5] Move async infrastructure into shared AppendBase functions Move all async-related code from nodeAppend.c into the shared execAppend.c, preparing MergeAppend to support async foreign scan subplans. - ExecInitAppendBase() now detects async-capable subplans and allocates async request/result state. - ExecReScanAppendBase() now resets async request state. - ExecAppendBaseAsyncBegin(): fire async requests (moved from nodeAppend.c's ExecAppendAsyncBegin). - ExecAppendBaseAsyncEventWait(): wait/poll for async events (moved from nodeAppend.c's ExecAppendAsyncEventWait). - classify_matching_subplans_common(): new helper to split valid subplans into sync and async sets. - MergeAppend now uses valid_subplans_identified flag instead of checking valid_subplans == NULL. Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru Author: Matheus Alcantara Co-authored-by: Alexander Korotkov Reviewed-by: Alexander Pyhalov Reviewed-by: Alena Rybakina --- src/backend/executor/execAppend.c | 232 ++++++++++++++++++++++++- src/backend/executor/nodeAppend.c | 208 ++-------------------- src/backend/executor/nodeMergeAppend.c | 5 +- src/include/executor/execAppend.h | 7 + 4 files changed, 250 insertions(+), 202 deletions(-) diff --git a/src/backend/executor/execAppend.c b/src/backend/executor/execAppend.c index 9599d10a952..d6bebabbd32 100644 --- a/src/backend/executor/execAppend.c +++ b/src/backend/executor/execAppend.c @@ -14,8 +14,14 @@ #include "postgres.h" #include "executor/execAppend.h" +#include "executor/execAsync.h" #include "executor/execPartition.h" #include "executor/executor.h" +#include "miscadmin.h" +#include "storage/latch.h" +#include "storage/waiteventset.h" + +#define EVENT_BUFFER_SIZE 16 /* Begin all of the subscans of an AppendBase node. */ void @@ -29,7 +35,9 @@ ExecInitAppendBase(AppendBaseState *state, PlanState **appendplanstates; const TupleTableSlotOps *appendops; Bitmapset *validsubplans; + Bitmapset *asyncplans; int nplans; + int nasyncplans; int firstvalid; int i, j; @@ -87,12 +95,25 @@ ExecInitAppendBase(AppendBaseState *state, * While at it, find out the first valid partial plan. */ j = 0; + asyncplans = NULL; + nasyncplans = 0; firstvalid = nplans; i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->subplans, i); + /* + * Record async subplans. When executing EvalPlanQual, we treat them + * as sync ones; don't do this when initializing an EvalPlanQual plan + * tree. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + /* * Record the lowest appendplans index which is a valid partial plan. */ @@ -130,15 +151,38 @@ ExecInitAppendBase(AppendBaseState *state, state->ps.resultopsfixed = false; } - /* Initialize async state to safe defaults */ - state->asyncplans = NULL; - state->nasyncplans = 0; + /* Initialize async state */ + state->asyncplans = asyncplans; + state->nasyncplans = nasyncplans; state->asyncrequests = NULL; state->asyncresults = NULL; state->needrequest = NULL; state->eventset = NULL; state->valid_asyncplans = NULL; + if (nasyncplans > 0) + { + state->asyncrequests = palloc0_array(AsyncRequest *, nplans); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc_object(AsyncRequest); + areq->requestor = (PlanState *) state; + areq->requestee = appendplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + state->asyncrequests[i] = areq; + } + + state->asyncresults = palloc0_array(TupleTableSlot *, nasyncplans); + } + /* * Miscellaneous initialization */ @@ -149,6 +193,7 @@ void ExecReScanAppendBase(AppendBaseState *node) { int i; + int nasyncplans = node->nasyncplans; /* * If any PARAM_EXEC Params used in pruning expressions have changed, then @@ -184,6 +229,187 @@ ExecReScanAppendBase(AppendBaseState *node) if (subnode->chgParam == NULL) ExecReScan(subnode); } + + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + bms_free(node->needrequest); + node->needrequest = NULL; + } +} + +/* Wait or poll for file descriptor events and fire callbacks. */ +void +ExecAppendBaseAsyncEventWait(AppendBaseState *node, int timeout, + uint32 wait_event_info) +{ + int nevents = node->nasyncplans + 2; /* one for PM death and + * one for latch */ + int noccurred; + int i; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + + Assert(node->eventset == NULL); + + node->eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); + AddWaitEventToSet(node->eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* + * No need for further processing if none of the subplans configured any + * events. + */ + if (GetNumRegisteredWaitEvents(node->eventset) == 1) + { + FreeWaitEventSet(node->eventset); + node->eventset = NULL; + return; + } + + /* + * Add the process latch to the set, so that we wake up to process the + * standard interrupts with CHECK_FOR_INTERRUPTS(). + * + * NOTE: For historical reasons, it's important that this is added to the + * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, + * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if + * any other events are in the set. That's a poor design, it's + * questionable for postgres_fdw to be doing that in the first place, but + * we cannot change it now. The pattern has possibly been copied to other + * extensions too. + */ + AddWaitEventToSet(node->eventset, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + + /* Return at most EVENT_BUFFER_SIZE events in one call. */ + if (nevents > EVENT_BUFFER_SIZE) + nevents = EVENT_BUFFER_SIZE; + + /* + * If the timeout is -1, wait until at least one event occurs. If the + * timeout is 0, poll for events, but do not wait at all. + */ + noccurred = WaitEventSetWait(node->eventset, timeout, occurred_event, + nevents, wait_event_info); + FreeWaitEventSet(node->eventset); + node->eventset = NULL; + if (noccurred == 0) + return; + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + if (areq->callback_pending) + { + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } + + /* Handle standard interrupts */ + if ((w->events & WL_LATCH_SET) != 0) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } +} + +/* Begin executing async-capable subplans. */ +void +ExecAppendBaseAsyncBegin(AppendBaseState *node) +{ + int i; + + /* Backward scan is not supported by async-aware Appends. */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no subplans */ + Assert(node->nplans > 0); + + /* We should never be called when there are no async subplans. */ + Assert(node->nasyncplans > 0); + + /* Make a request for each of the valid async subplans. */ + i = -1; + while ((i = bms_next_member(node->valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } +} + +/* + * classify_matching_subplans_common + * Common part of classify_matching_subplans() for Append and MergeAppend. + * + * Splits valid_subplans into sync and async sets. Returns false if there + * are no valid async subplans, true otherwise. + */ +bool +classify_matching_subplans_common(Bitmapset **valid_subplans, + Bitmapset *asyncplans, + Bitmapset **valid_asyncplans) +{ + Assert(*valid_asyncplans == NULL); + + /* Checked by classify_matching_subplans() */ + Assert(!bms_is_empty(*valid_subplans)); + + /* Nothing to do if there are no valid async subplans. */ + if (!bms_overlap(*valid_subplans, asyncplans)) + return false; + + /* Get valid async subplans. */ + *valid_asyncplans = bms_intersect(asyncplans, + *valid_subplans); + + /* Adjust the valid subplans to contain sync subplans only. */ + *valid_subplans = bms_del_members(*valid_subplans, + *valid_asyncplans); + return true; } /* Shuts down the subplans of an AppendBase node. */ diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index f267ffe13fa..95ed4d86e20 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -84,7 +84,6 @@ struct ParallelAppendState }; #define INVALID_SUBPLAN_INDEX -1 -#define EVENT_BUFFER_SIZE 16 static TupleTableSlot *ExecAppend(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); @@ -112,10 +111,6 @@ AppendState * ExecInitAppend(Append *node, EState *estate, int eflags) { AppendState *appendstate = makeNode(AppendState); - Bitmapset *asyncplans; - int nasyncplans; - int nplans; - int i; /* check for unsupported flags */ Assert(!(eflags & EXEC_FLAG_MARK)); @@ -140,59 +135,11 @@ ExecInitAppend(Append *node, EState *estate, int eflags) node->first_partial_plan, &appendstate->as_first_partial_plan); - nplans = appendstate->as.nplans; + if (appendstate->as.nasyncplans > 0 && appendstate->as.valid_subplans_identified) + classify_matching_subplans(appendstate); - /* - * Detect async-capable subplans. When executing EvalPlanQual, we treat - * them as sync ones; don't do this when initializing an EvalPlanQual plan - * tree. - */ - asyncplans = NULL; - nasyncplans = 0; - for (i = 0; i < nplans; i++) - { - if (appendstate->as.plans[i]->plan->async_capable && - estate->es_epq_active == NULL) - { - asyncplans = bms_add_member(asyncplans, i); - nasyncplans++; - } - } - - /* Initialize async state */ - appendstate->as.asyncplans = asyncplans; - appendstate->as.nasyncplans = nasyncplans; - appendstate->as_nasyncresults = 0; appendstate->as_nasyncremain = 0; - if (nasyncplans > 0) - { - appendstate->as.asyncrequests = (AsyncRequest **) - palloc0(nplans * sizeof(AsyncRequest *)); - - i = -1; - while ((i = bms_next_member(asyncplans, i)) >= 0) - { - AsyncRequest *areq; - - areq = palloc_object(AsyncRequest); - areq->requestor = (PlanState *) appendstate; - areq->requestee = appendstate->as.plans[i]; - areq->request_index = i; - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - - appendstate->as.asyncrequests[i] = areq; - } - - appendstate->as.asyncresults = (TupleTableSlot **) - palloc0(nasyncplans * sizeof(TupleTableSlot *)); - - if (appendstate->as.valid_subplans_identified) - classify_matching_subplans(appendstate); - } - /* For parallel query, this will be overridden later. */ appendstate->choose_next_subplan = choose_next_subplan_locally; @@ -312,29 +259,16 @@ ExecEndAppend(AppendState *node) void ExecReScanAppend(AppendState *node) { + int nasyncplans = node->as.nasyncplans; ExecReScanAppendBase(&node->as); - /* Reset async state */ + /* Reset Append-specific state */ if (nasyncplans > 0) { - int i; - - i = -1; - while ((i = bms_next_member(node->as.asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as.asyncrequests[i]; - - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - } - node->as_nasyncresults = 0; node->as_nasyncremain = 0; - bms_free(node->as.needrequest); - node->as.needrequest = NULL; } /* Let choose_next_subplan_* function handle setting the first subplan */ @@ -754,21 +688,7 @@ ExecAppendAsyncBegin(AppendState *node) if (node->as_nasyncremain == 0) return; - /* Make a request for each of the valid async subplans. */ - { - int i = -1; - - while ((i = bms_next_member(node->as.valid_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as.asyncrequests[i]; - - Assert(areq->request_index == i); - Assert(!areq->callback_pending); - - /* Do the actual work. */ - ExecAsyncRequest(areq); - } - } + ExecAppendBaseAsyncBegin(&node->as); } /* ---------------------------------------------------------------- @@ -883,105 +803,12 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) static void ExecAppendAsyncEventWait(AppendState *node) { - int nevents = node->as.nasyncplans + 2; long timeout = node->as_syncdone ? -1 : 0; - WaitEvent occurred_event[EVENT_BUFFER_SIZE]; - int noccurred; - int i; /* We should never be called when there are no valid async subplans. */ Assert(node->as_nasyncremain > 0); - Assert(node->as.eventset == NULL); - node->as.eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); - AddWaitEventToSet(node->as.eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - - /* Give each waiting subplan a chance to add an event. */ - i = -1; - while ((i = bms_next_member(node->as.asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as.asyncrequests[i]; - - if (areq->callback_pending) - ExecAsyncConfigureWait(areq); - } - - /* - * No need for further processing if none of the subplans configured any - * events. - */ - if (GetNumRegisteredWaitEvents(node->as.eventset) == 1) - { - FreeWaitEventSet(node->as.eventset); - node->as.eventset = NULL; - return; - } - - /* - * Add the process latch to the set, so that we wake up to process the - * standard interrupts with CHECK_FOR_INTERRUPTS(). - * - * NOTE: For historical reasons, it's important that this is added to the - * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, - * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if - * any other events are in the set. That's a poor design, it's - * questionable for postgres_fdw to be doing that in the first place, but - * we cannot change it now. The pattern has possibly been copied to other - * extensions too. - */ - AddWaitEventToSet(node->as.eventset, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - - /* Return at most EVENT_BUFFER_SIZE events in one call. */ - if (nevents > EVENT_BUFFER_SIZE) - nevents = EVENT_BUFFER_SIZE; - - /* - * If the timeout is -1, wait until at least one event occurs. If the - * timeout is 0, poll for events, but do not wait at all. - */ - noccurred = WaitEventSetWait(node->as.eventset, timeout, occurred_event, - nevents, WAIT_EVENT_APPEND_READY); - FreeWaitEventSet(node->as.eventset); - node->as.eventset = NULL; - if (noccurred == 0) - return; - - /* Deliver notifications. */ - for (i = 0; i < noccurred; i++) - { - WaitEvent *w = &occurred_event[i]; - - /* - * Each waiting subplan should have registered its wait event with - * user_data pointing back to its AsyncRequest. - */ - if ((w->events & WL_SOCKET_READABLE) != 0) - { - AsyncRequest *areq = (AsyncRequest *) w->user_data; - - if (areq->callback_pending) - { - /* - * Mark it as no longer needing a callback. We must do this - * before dispatching the callback in case the callback resets - * the flag. - */ - areq->callback_pending = false; - - /* Do the actual work. */ - ExecAsyncNotify(areq); - } - } - - /* Handle standard interrupts */ - if ((w->events & WL_LATCH_SET) != 0) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - } + ExecAppendBaseAsyncEventWait(&node->as, timeout, WAIT_EVENT_APPEND_READY); } /* ---------------------------------------------------------------- @@ -1039,10 +866,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq) static void classify_matching_subplans(AppendState *node) { - Bitmapset *valid_asyncplans; - Assert(node->as.valid_subplans_identified); - Assert(node->as.valid_asyncplans == NULL); /* Nothing to do if there are no valid subplans. */ if (bms_is_empty(node->as.valid_subplans)) @@ -1052,21 +876,9 @@ classify_matching_subplans(AppendState *node) return; } - /* Nothing to do if there are no valid async subplans. */ - if (!bms_overlap(node->as.valid_subplans, node->as.asyncplans)) - { + /* No valid async subplans identified. */ + if (!classify_matching_subplans_common(&node->as.valid_subplans, + node->as.asyncplans, + &node->as.valid_asyncplans)) node->as_nasyncremain = 0; - return; - } - - /* Get valid async subplans. */ - valid_asyncplans = bms_intersect(node->as.asyncplans, - node->as.valid_subplans); - - /* Adjust the valid subplans to contain sync subplans only. */ - node->as.valid_subplans = bms_del_members(node->as.valid_subplans, - valid_asyncplans); - - /* Save valid async subplans. */ - node->as.valid_asyncplans = valid_asyncplans; } diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index 6928152f16f..591be1018d8 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -152,9 +152,12 @@ ExecMergeAppend(PlanState *pstate) * run-time pruning is disabled then the valid subplans will always be * set to all subplans. */ - if (node->as.valid_subplans == NULL) + if (!node->as.valid_subplans_identified) + { node->as.valid_subplans = ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; + } /* * First time through: pull the first tuple from each valid subplan, diff --git a/src/include/executor/execAppend.h b/src/include/executor/execAppend.h index a8f41bad921..7f53ad89213 100644 --- a/src/include/executor/execAppend.h +++ b/src/include/executor/execAppend.h @@ -22,5 +22,12 @@ extern void ExecInitAppendBase(AppendBaseState *state, int *first_valid_partial_plan); extern void ExecEndAppendBase(AppendBaseState *node); extern void ExecReScanAppendBase(AppendBaseState *node); +extern void ExecAppendBaseAsyncBegin(AppendBaseState *node); +extern void ExecAppendBaseAsyncEventWait(AppendBaseState *node, + int timeout, + uint32 wait_event_info); +extern bool classify_matching_subplans_common(Bitmapset **valid_subplans, + Bitmapset *asyncplans, + Bitmapset **valid_asyncplans); #endif /* EXECAPPEND_H */ -- 2.39.5 (Apple Git-154)