LCOV - code coverage report
Current view: top level - src/backend/storage/aio - aio.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 284 331 85.8 %
Date: 2025-05-02 05:15:34 Functions: 37 39 94.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * aio.c
       4             :  *    AIO - Core Logic
       5             :  *
       6             :  * For documentation about how AIO works on a higher level, including a
       7             :  * schematic example, see README.md.
       8             :  *
       9             :  *
      10             :  * AIO is a complicated subsystem. To keep things navigable, it is split
      11             :  * across a number of files:
      12             :  *
      13             :  * - method_*.c - different ways of executing AIO (e.g. worker process)
      14             :  *
      15             :  * - aio_target.c - IO on different kinds of targets
      16             :  *
      17             :  * - aio_io.c - method-independent code for specific IO ops (e.g. readv)
      18             :  *
      19             :  * - aio_callback.c - callbacks at IO operation lifecycle events
      20             :  *
      21             :  * - aio_init.c - per-server and per-backend initialization
      22             :  *
      23             :  * - aio.c - all other topics
      24             :  *
      25             :  * - read_stream.c - helper for reading buffered relation data
      26             :  *
      27             :  * - README.md - higher-level overview over AIO
      28             :  *
      29             :  *
      30             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      31             :  * Portions Copyright (c) 1994, Regents of the University of California
      32             :  *
      33             :  * IDENTIFICATION
      34             :  *    src/backend/storage/aio/aio.c
      35             :  *
      36             :  *-------------------------------------------------------------------------
      37             :  */
      38             : 
      39             : #include "postgres.h"
      40             : 
      41             : #include "lib/ilist.h"
      42             : #include "miscadmin.h"
      43             : #include "port/atomics.h"
      44             : #include "storage/aio.h"
      45             : #include "storage/aio_internal.h"
      46             : #include "storage/aio_subsys.h"
      47             : #include "utils/guc.h"
      48             : #include "utils/guc_hooks.h"
      49             : #include "utils/resowner.h"
      50             : #include "utils/wait_event_types.h"
      51             : 
      52             : #ifdef USE_INJECTION_POINTS
      53             : #include "utils/injection_point.h"
      54             : #endif
      55             : 
      56             : 
      57             : static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state);
      58             : static void pgaio_io_reclaim(PgAioHandle *ioh);
      59             : static void pgaio_io_resowner_register(PgAioHandle *ioh);
      60             : static void pgaio_io_wait_for_free(void);
      61             : static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation);
      62             : static const char *pgaio_io_state_get_name(PgAioHandleState s);
      63             : static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
      64             : 
      65             : 
      66             : /* Options for io_method. */
      67             : const struct config_enum_entry io_method_options[] = {
      68             :     {"sync", IOMETHOD_SYNC, false},
      69             :     {"worker", IOMETHOD_WORKER, false},
      70             : #ifdef IOMETHOD_IO_URING_ENABLED
      71             :     {"io_uring", IOMETHOD_IO_URING, false},
      72             : #endif
      73             :     {NULL, 0, false}
      74             : };
      75             : 
      76             : /* GUCs */
      77             : int         io_method = DEFAULT_IO_METHOD;
      78             : int         io_max_concurrency = -1;
      79             : 
      80             : /* global control for AIO */
      81             : PgAioCtl   *pgaio_ctl;
      82             : 
      83             : /* current backend's per-backend state */
      84             : PgAioBackend *pgaio_my_backend;
      85             : 
      86             : 
      87             : static const IoMethodOps *const pgaio_method_ops_table[] = {
      88             :     [IOMETHOD_SYNC] = &pgaio_sync_ops,
      89             :     [IOMETHOD_WORKER] = &pgaio_worker_ops,
      90             : #ifdef IOMETHOD_IO_URING_ENABLED
      91             :     [IOMETHOD_IO_URING] = &pgaio_uring_ops,
      92             : #endif
      93             : };
      94             : 
      95             : /* callbacks for the configured io_method, set by assign_io_method */
      96             : const IoMethodOps *pgaio_method_ops;
      97             : 
      98             : 
      99             : /*
     100             :  * Currently there's no infrastructure to pass arguments to injection points,
     101             :  * so we instead set this up for the duration of the injection point
     102             :  * invocation. See pgaio_io_call_inj().
     103             :  */
     104             : #ifdef USE_INJECTION_POINTS
     105             : static PgAioHandle *pgaio_inj_cur_handle;
     106             : #endif
     107             : 
     108             : 
     109             : 
     110             : /* --------------------------------------------------------------------------------
     111             :  * Public Functions related to PgAioHandle
     112             :  * --------------------------------------------------------------------------------
     113             :  */
     114             : 
     115             : /*
     116             :  * Acquire an AioHandle, waiting for IO completion if necessary.
     117             :  *
     118             :  * Each backend can only have one AIO handle that has been "handed out" to
     119             :  * code, but not yet submitted or released. This restriction is necessary to
     120             :  * ensure that it is possible for code to wait for an unused handle by waiting
     121             :  * for in-flight IO to complete. There is a limited number of handles in each
     122             :  * backend, if multiple handles could be handed out without being submitted,
     123             :  * waiting for all in-flight IO to complete would not guarantee that handles
     124             :  * free up.
     125             :  *
     126             :  * It is cheap to acquire an IO handle, unless all handles are in use. In that
     127             :  * case this function waits for the oldest IO to complete. If that is not
     128             :  * desirable, use pgaio_io_acquire_nb().
     129             :  *
     130             :  * If a handle was acquired but then does not turn out to be needed,
     131             :  * e.g. because pgaio_io_acquire() is called before starting an IO in a
     132             :  * critical section, the handle needs to be released with pgaio_io_release().
     133             :  *
     134             :  *
     135             :  * To react to the completion of the IO as soon as it is known to have
     136             :  * completed, callbacks can be registered with pgaio_io_register_callbacks().
     137             :  *
     138             :  * To actually execute IO using the returned handle, the pgaio_io_start_*()
     139             :  * family of functions is used. In many cases the pgaio_io_start_*() call will
     140             :  * not be done directly by code that acquired the handle, but by lower level
     141             :  * code that gets passed the handle. E.g. if code in bufmgr.c wants to perform
     142             :  * AIO, it typically will pass the handle to smgr.c, which will pass it on to
     143             :  * md.c, on to fd.c, which then finally calls pgaio_io_start_*().  This
     144             :  * forwarding allows the various layers to react to the IO's completion by
     145             :  * registering callbacks. These callbacks in turn can translate a lower
     146             :  * layer's result into a result understandable by a higher layer.
     147             :  *
     148             :  * During pgaio_io_start_*() the IO is staged (i.e. prepared for execution but
     149             :  * not submitted to the kernel). Unless in batchmode
     150             :  * (c.f. pgaio_enter_batchmode()), the IO will also get submitted for
     151             :  * execution. Note that, whether in batchmode or not, the IO might even
     152             :  * complete before the functions return.
     153             :  *
     154             :  * After pgaio_io_start_*() the AioHandle is "consumed" and may not be
     155             :  * referenced by the IO issuing code. To e.g. wait for IO, references to the
     156             :  * IO can be established with pgaio_io_get_wref() *before* pgaio_io_start_*()
     157             :  * is called.  pgaio_wref_wait() can be used to wait for the IO to complete.
     158             :  *
     159             :  *
     160             :  * To know if the IO [partially] succeeded or failed, a PgAioReturn * can be
     161             :  * passed to pgaio_io_acquire(). Once the issuing backend has called
     162             :  * pgaio_wref_wait(), the PgAioReturn contains information about whether the
     163             :  * operation succeeded and details about the first failure, if any. The error
     164             :  * can be raised / logged with pgaio_result_report().
     165             :  *
     166             :  * The lifetime of the memory pointed to be *ret needs to be at least as long
     167             :  * as the passed in resowner. If the resowner releases resources before the IO
     168             :  * completes (typically due to an error), the reference to *ret will be
     169             :  * cleared. In case of resowner cleanup *ret will not be updated with the
     170             :  * results of the IO operation.
     171             :  */
     172             : PgAioHandle *
     173       10860 : pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
     174             : {
     175             :     PgAioHandle *h;
     176             : 
     177             :     while (true)
     178             :     {
     179       10860 :         h = pgaio_io_acquire_nb(resowner, ret);
     180             : 
     181       10856 :         if (h != NULL)
     182        5526 :             return h;
     183             : 
     184             :         /*
     185             :          * Evidently all handles by this backend are in use. Just wait for
     186             :          * some to complete.
     187             :          */
     188        5330 :         pgaio_io_wait_for_free();
     189             :     }
     190             : }
     191             : 
     192             : /*
     193             :  * Acquire an AioHandle, returning NULL if no handles are free.
     194             :  *
     195             :  * See pgaio_io_acquire(). The only difference is that this function will return
     196             :  * NULL if there are no idle handles, instead of blocking.
     197             :  */
     198             : PgAioHandle *
     199     2504944 : pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
     200             : {
     201     2504944 :     if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE)
     202             :     {
     203             :         Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE);
     204           0 :         pgaio_submit_staged();
     205             :     }
     206             : 
     207     2504944 :     if (pgaio_my_backend->handed_out_io)
     208           4 :         elog(ERROR, "API violation: Only one IO can be handed out");
     209             : 
     210     2504940 :     if (!dclist_is_empty(&pgaio_my_backend->idle_ios))
     211             :     {
     212     2494280 :         dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios);
     213     2494280 :         PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion);
     214             : 
     215             :         Assert(ioh->state == PGAIO_HS_IDLE);
     216             :         Assert(ioh->owner_procno == MyProcNumber);
     217             : 
     218     2494280 :         pgaio_io_update_state(ioh, PGAIO_HS_HANDED_OUT);
     219     2494280 :         pgaio_my_backend->handed_out_io = ioh;
     220             : 
     221     2494280 :         if (resowner)
     222     2494280 :             pgaio_io_resowner_register(ioh);
     223             : 
     224     2494280 :         if (ret)
     225             :         {
     226     2494228 :             ioh->report_return = ret;
     227     2494228 :             ret->result.status = PGAIO_RS_UNKNOWN;
     228             :         }
     229             : 
     230     2494280 :         return ioh;
     231             :     }
     232             : 
     233       10660 :     return NULL;
     234             : }
     235             : 
     236             : /*
     237             :  * Release IO handle that turned out to not be required.
     238             :  *
     239             :  * See pgaio_io_acquire() for more details.
     240             :  */
     241             : void
     242        5296 : pgaio_io_release(PgAioHandle *ioh)
     243             : {
     244        5296 :     if (ioh == pgaio_my_backend->handed_out_io)
     245             :     {
     246             :         Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     247             :         Assert(ioh->resowner);
     248             : 
     249        5292 :         pgaio_my_backend->handed_out_io = NULL;
     250        5292 :         pgaio_io_reclaim(ioh);
     251             :     }
     252             :     else
     253             :     {
     254           4 :         elog(ERROR, "release in unexpected state");
     255             :     }
     256        5292 : }
     257             : 
     258             : /*
     259             :  * Release IO handle during resource owner cleanup.
     260             :  */
     261             : void
     262          86 : pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
     263             : {
     264          86 :     PgAioHandle *ioh = dlist_container(PgAioHandle, resowner_node, ioh_node);
     265             : 
     266             :     Assert(ioh->resowner);
     267             : 
     268          86 :     ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
     269          86 :     ioh->resowner = NULL;
     270             : 
     271          86 :     switch (ioh->state)
     272             :     {
     273           0 :         case PGAIO_HS_IDLE:
     274           0 :             elog(ERROR, "unexpected");
     275             :             break;
     276          66 :         case PGAIO_HS_HANDED_OUT:
     277             :             Assert(ioh == pgaio_my_backend->handed_out_io || pgaio_my_backend->handed_out_io == NULL);
     278             : 
     279          66 :             if (ioh == pgaio_my_backend->handed_out_io)
     280             :             {
     281          66 :                 pgaio_my_backend->handed_out_io = NULL;
     282          66 :                 if (!on_error)
     283          20 :                     elog(WARNING, "leaked AIO handle");
     284             :             }
     285             : 
     286          66 :             pgaio_io_reclaim(ioh);
     287          66 :             break;
     288           0 :         case PGAIO_HS_DEFINED:
     289             :         case PGAIO_HS_STAGED:
     290           0 :             if (!on_error)
     291           0 :                 elog(WARNING, "AIO handle was not submitted");
     292           0 :             pgaio_submit_staged();
     293           0 :             break;
     294          20 :         case PGAIO_HS_SUBMITTED:
     295             :         case PGAIO_HS_COMPLETED_IO:
     296             :         case PGAIO_HS_COMPLETED_SHARED:
     297             :         case PGAIO_HS_COMPLETED_LOCAL:
     298             :             /* this is expected to happen */
     299          20 :             break;
     300             :     }
     301             : 
     302             :     /*
     303             :      * Need to unregister the reporting of the IO's result, the memory it's
     304             :      * referencing likely has gone away.
     305             :      */
     306          86 :     if (ioh->report_return)
     307          20 :         ioh->report_return = NULL;
     308          86 : }
     309             : 
     310             : /*
     311             :  * Add a [set of] flags to the IO.
     312             :  *
     313             :  * Note that this combines flags with already set flags, rather than set flags
     314             :  * to explicitly the passed in parameters. This is to allow multiple callsites
     315             :  * to set flags.
     316             :  */
     317             : void
     318     4974890 : pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag)
     319             : {
     320             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     321             : 
     322     4974890 :     ioh->flags |= flag;
     323     4974890 : }
     324             : 
     325             : /*
     326             :  * Returns an ID uniquely identifying the IO handle. This is only really
     327             :  * useful for logging, as handles are reused across multiple IOs.
     328             :  */
     329             : int
     330     1234654 : pgaio_io_get_id(PgAioHandle *ioh)
     331             : {
     332             :     Assert(ioh >= pgaio_ctl->io_handles &&
     333             :            ioh < (pgaio_ctl->io_handles + pgaio_ctl->io_handle_count));
     334     1234654 :     return ioh - pgaio_ctl->io_handles;
     335             : }
     336             : 
     337             : /*
     338             :  * Return the ProcNumber for the process that can use an IO handle. The
     339             :  * mapping from IO handles to PGPROCs is static, therefore this even works
     340             :  * when the corresponding PGPROC is not in use.
     341             :  */
     342             : ProcNumber
     343           0 : pgaio_io_get_owner(PgAioHandle *ioh)
     344             : {
     345           0 :     return ioh->owner_procno;
     346             : }
     347             : 
     348             : /*
     349             :  * Return a wait reference for the IO. Only wait references can be used to
     350             :  * wait for an IOs completion, as handles themselves can be reused after
     351             :  * completion.  See also the comment above pgaio_io_acquire().
     352             :  */
     353             : void
     354     4977874 : pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow)
     355             : {
     356             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT ||
     357             :            ioh->state == PGAIO_HS_DEFINED ||
     358             :            ioh->state == PGAIO_HS_STAGED);
     359             :     Assert(ioh->generation != 0);
     360             : 
     361     4977874 :     iow->aio_index = ioh - pgaio_ctl->io_handles;
     362     4977874 :     iow->generation_upper = (uint32) (ioh->generation >> 32);
     363     4977874 :     iow->generation_lower = (uint32) ioh->generation;
     364     4977874 : }
     365             : 
     366             : 
     367             : 
     368             : /* --------------------------------------------------------------------------------
     369             :  * Internal Functions related to PgAioHandle
     370             :  * --------------------------------------------------------------------------------
     371             :  */
     372             : 
     373             : static inline void
     374    19501168 : pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state)
     375             : {
     376    19501168 :     pgaio_debug_io(DEBUG5, ioh,
     377             :                    "updating state to %s",
     378             :                    pgaio_io_state_get_name(new_state));
     379             : 
     380             :     /*
     381             :      * Ensure the changes signified by the new state are visible before the
     382             :      * new state becomes visible.
     383             :      */
     384    19501168 :     pg_write_barrier();
     385             : 
     386    19501168 :     ioh->state = new_state;
     387    19501168 : }
     388             : 
     389             : static void
     390     2494280 : pgaio_io_resowner_register(PgAioHandle *ioh)
     391             : {
     392             :     Assert(!ioh->resowner);
     393             :     Assert(CurrentResourceOwner);
     394             : 
     395     2494280 :     ResourceOwnerRememberAioHandle(CurrentResourceOwner, &ioh->resowner_node);
     396     2494280 :     ioh->resowner = CurrentResourceOwner;
     397     2494280 : }
     398             : 
     399             : /*
     400             :  * Stage IO for execution and, if appropriate, submit it immediately.
     401             :  *
     402             :  * Should only be called from pgaio_io_start_*().
     403             :  */
     404             : void
     405     2488922 : pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)
     406             : {
     407             :     bool        needs_synchronous;
     408             : 
     409             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     410             :     Assert(pgaio_my_backend->handed_out_io == ioh);
     411             :     Assert(pgaio_io_has_target(ioh));
     412             : 
     413     2488922 :     ioh->op = op;
     414     2488922 :     ioh->result = 0;
     415             : 
     416     2488922 :     pgaio_io_update_state(ioh, PGAIO_HS_DEFINED);
     417             : 
     418             :     /* allow a new IO to be staged */
     419     2488922 :     pgaio_my_backend->handed_out_io = NULL;
     420             : 
     421     2488922 :     pgaio_io_call_stage(ioh);
     422             : 
     423     2488922 :     pgaio_io_update_state(ioh, PGAIO_HS_STAGED);
     424             : 
     425             :     /*
     426             :      * Synchronous execution has to be executed, well, synchronously, so check
     427             :      * that first.
     428             :      */
     429     2488922 :     needs_synchronous = pgaio_io_needs_synchronous_execution(ioh);
     430             : 
     431     2488922 :     pgaio_debug_io(DEBUG3, ioh,
     432             :                    "staged (synchronous: %d, in_batch: %d)",
     433             :                    needs_synchronous, pgaio_my_backend->in_batchmode);
     434             : 
     435     2488922 :     if (!needs_synchronous)
     436             :     {
     437     1176932 :         pgaio_my_backend->staged_ios[pgaio_my_backend->num_staged_ios++] = ioh;
     438             :         Assert(pgaio_my_backend->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
     439             : 
     440             :         /*
     441             :          * Unless code explicitly opted into batching IOs, submit the IO
     442             :          * immediately.
     443             :          */
     444     1176932 :         if (!pgaio_my_backend->in_batchmode)
     445       44250 :             pgaio_submit_staged();
     446             :     }
     447             :     else
     448             :     {
     449     1311990 :         pgaio_io_prepare_submit(ioh);
     450     1311990 :         pgaio_io_perform_synchronously(ioh);
     451             :     }
     452     2488922 : }
     453             : 
     454             : bool
     455     2488922 : pgaio_io_needs_synchronous_execution(PgAioHandle *ioh)
     456             : {
     457             :     /*
     458             :      * If the caller said to execute the IO synchronously, do so.
     459             :      *
     460             :      * XXX: We could optimize the logic when to execute synchronously by first
     461             :      * checking if there are other IOs in flight and only synchronously
     462             :      * executing if not. Unclear whether that'll be sufficiently common to be
     463             :      * worth worrying about.
     464             :      */
     465     2488922 :     if (ioh->flags & PGAIO_HF_SYNCHRONOUS)
     466     1303946 :         return true;
     467             : 
     468             :     /* Check if the IO method requires synchronous execution of IO */
     469     1184976 :     if (pgaio_method_ops->needs_synchronous_execution)
     470     1184976 :         return pgaio_method_ops->needs_synchronous_execution(ioh);
     471             : 
     472           0 :     return false;
     473             : }
     474             : 
     475             : /*
     476             :  * Handle IO being processed by IO method.
     477             :  *
     478             :  * Should be called by IO methods / synchronous IO execution, just before the
     479             :  * IO is performed.
     480             :  */
     481             : void
     482     2488922 : pgaio_io_prepare_submit(PgAioHandle *ioh)
     483             : {
     484     2488922 :     pgaio_io_update_state(ioh, PGAIO_HS_SUBMITTED);
     485             : 
     486     2488922 :     dclist_push_tail(&pgaio_my_backend->in_flight_ios, &ioh->node);
     487     2488922 : }
     488             : 
     489             : /*
     490             :  * Handle IO getting completed by a method.
     491             :  *
     492             :  * Should be called by IO methods / synchronous IO execution, just after the
     493             :  * IO has been performed.
     494             :  *
     495             :  * Expects to be called in a critical section. We expect IOs to be usable for
     496             :  * WAL etc, which requires being able to execute completion callbacks in a
     497             :  * critical section.
     498             :  */
     499             : void
     500     2278460 : pgaio_io_process_completion(PgAioHandle *ioh, int result)
     501             : {
     502             :     Assert(ioh->state == PGAIO_HS_SUBMITTED);
     503             : 
     504             :     Assert(CritSectionCount > 0);
     505             : 
     506     2278460 :     ioh->result = result;
     507             : 
     508     2278460 :     pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_IO);
     509             : 
     510     2278460 :     pgaio_io_call_inj(ioh, "aio-process-completion-before-shared");
     511             : 
     512     2278460 :     pgaio_io_call_complete_shared(ioh);
     513             : 
     514     2278460 :     pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_SHARED);
     515             : 
     516             :     /* condition variable broadcast ensures state is visible before wakeup */
     517     2278460 :     ConditionVariableBroadcast(&ioh->cv);
     518             : 
     519             :     /* contains call to pgaio_io_call_complete_local() */
     520     2278460 :     if (ioh->owner_procno == MyProcNumber)
     521     1311990 :         pgaio_io_reclaim(ioh);
     522     2278460 : }
     523             : 
     524             : /*
     525             :  * Has the IO completed and thus the IO handle been reused?
     526             :  *
     527             :  * This is useful when waiting for IO completion at a low level (e.g. in an IO
     528             :  * method's ->wait_one() callback).
     529             :  */
     530             : bool
     531     3796394 : pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
     532             : {
     533     3796394 :     *state = ioh->state;
     534     3796394 :     pg_read_barrier();
     535             : 
     536     3796394 :     return ioh->generation != ref_generation;
     537             : }
     538             : 
     539             : /*
     540             :  * Wait for IO to complete. External code should never use this, outside of
     541             :  * the AIO subsystem waits are only allowed via pgaio_wref_wait().
     542             :  */
     543             : static void
     544      525726 : pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
     545             : {
     546             :     PgAioHandleState state;
     547             :     bool        am_owner;
     548             : 
     549      525726 :     am_owner = ioh->owner_procno == MyProcNumber;
     550             : 
     551      525726 :     if (pgaio_io_was_recycled(ioh, ref_generation, &state))
     552          70 :         return;
     553             : 
     554      525656 :     if (am_owner)
     555             :     {
     556      520986 :         if (state != PGAIO_HS_SUBMITTED
     557      120476 :             && state != PGAIO_HS_COMPLETED_IO
     558         488 :             && state != PGAIO_HS_COMPLETED_SHARED
     559           0 :             && state != PGAIO_HS_COMPLETED_LOCAL)
     560             :         {
     561           0 :             elog(PANIC, "waiting for own IO in wrong state: %d",
     562             :                  state);
     563             :         }
     564             :     }
     565             : 
     566             :     while (true)
     567             :     {
     568     1050522 :         if (pgaio_io_was_recycled(ioh, ref_generation, &state))
     569        2298 :             return;
     570             : 
     571     1048224 :         switch (state)
     572             :         {
     573           0 :             case PGAIO_HS_IDLE:
     574             :             case PGAIO_HS_HANDED_OUT:
     575           0 :                 elog(ERROR, "IO in wrong state: %d", state);
     576             :                 break;
     577             : 
     578      403470 :             case PGAIO_HS_SUBMITTED:
     579             : 
     580             :                 /*
     581             :                  * If we need to wait via the IO method, do so now. Don't
     582             :                  * check via the IO method if the issuing backend is executing
     583             :                  * the IO synchronously.
     584             :                  */
     585      403470 :                 if (pgaio_method_ops->wait_one && !(ioh->flags & PGAIO_HF_SYNCHRONOUS))
     586             :                 {
     587           0 :                     pgaio_method_ops->wait_one(ioh, ref_generation);
     588           0 :                     continue;
     589             :                 }
     590             :                 /* fallthrough */
     591             : 
     592             :                 /* waiting for owner to submit */
     593             :             case PGAIO_HS_DEFINED:
     594             :             case PGAIO_HS_STAGED:
     595             :                 /* waiting for reaper to complete */
     596             :                 /* fallthrough */
     597             :             case PGAIO_HS_COMPLETED_IO:
     598             :                 /* shouldn't be able to hit this otherwise */
     599             :                 Assert(IsUnderPostmaster);
     600             :                 /* ensure we're going to get woken up */
     601      524866 :                 ConditionVariablePrepareToSleep(&ioh->cv);
     602             : 
     603     1048594 :                 while (!pgaio_io_was_recycled(ioh, ref_generation, &state))
     604             :                 {
     605     1046310 :                     if (state == PGAIO_HS_COMPLETED_SHARED ||
     606      523752 :                         state == PGAIO_HS_COMPLETED_LOCAL)
     607             :                         break;
     608      523728 :                     ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_IO_COMPLETION);
     609             :                 }
     610             : 
     611      524866 :                 ConditionVariableCancelSleep();
     612      524866 :                 break;
     613             : 
     614      523358 :             case PGAIO_HS_COMPLETED_SHARED:
     615             :             case PGAIO_HS_COMPLETED_LOCAL:
     616             :                 /* see above */
     617      523358 :                 if (am_owner)
     618      520986 :                     pgaio_io_reclaim(ioh);
     619      523358 :                 return;
     620             :         }
     621      524866 :     }
     622             : }
     623             : 
     624             : /*
     625             :  * Make IO handle ready to be reused after IO has completed or after the
     626             :  * handle has been released without being used.
     627             :  */
     628             : static void
     629     2494280 : pgaio_io_reclaim(PgAioHandle *ioh)
     630             : {
     631             :     /* This is only ok if it's our IO */
     632             :     Assert(ioh->owner_procno == MyProcNumber);
     633             :     Assert(ioh->state != PGAIO_HS_IDLE);
     634             : 
     635             :     /*
     636             :      * It's a bit ugly, but right now the easiest place to put the execution
     637             :      * of local completion callbacks is this function, as we need to execute
     638             :      * local callbacks just before reclaiming at multiple callsites.
     639             :      */
     640     2494280 :     if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
     641             :     {
     642             :         PgAioResult local_result;
     643             : 
     644     2488922 :         local_result = pgaio_io_call_complete_local(ioh);
     645     2488922 :         pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_LOCAL);
     646             : 
     647     2488922 :         if (ioh->report_return)
     648             :         {
     649     2488902 :             ioh->report_return->result = local_result;
     650     2488902 :             ioh->report_return->target_data = ioh->target_data;
     651             :         }
     652             :     }
     653             : 
     654     2494280 :     pgaio_debug_io(DEBUG4, ioh,
     655             :                    "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d",
     656             :                    pgaio_result_status_string(ioh->distilled_result.status),
     657             :                    ioh->distilled_result.id,
     658             :                    ioh->distilled_result.error_data,
     659             :                    ioh->result);
     660             : 
     661             :     /* if the IO has been defined, it's on the in-flight list, remove */
     662     2494280 :     if (ioh->state != PGAIO_HS_HANDED_OUT)
     663     2488922 :         dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node);
     664             : 
     665     2494280 :     if (ioh->resowner)
     666             :     {
     667     2494194 :         ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
     668     2494194 :         ioh->resowner = NULL;
     669             :     }
     670             : 
     671             :     Assert(!ioh->resowner);
     672             : 
     673             :     /*
     674             :      * Update generation & state first, before resetting the IO's fields,
     675             :      * otherwise a concurrent "viewer" could think the fields are valid, even
     676             :      * though they are being reset.  Increment the generation first, so that
     677             :      * we can assert elsewhere that we never wait for an IDLE IO.  While it's
     678             :      * a bit weird for the state to go backwards for a generation, it's OK
     679             :      * here, as there cannot be references to the "reborn" IO yet.  Can't
     680             :      * update both at once, so something has to give.
     681             :      */
     682     2494280 :     ioh->generation++;
     683     2494280 :     pgaio_io_update_state(ioh, PGAIO_HS_IDLE);
     684             : 
     685             :     /* ensure the state update is visible before we reset fields */
     686     2494280 :     pg_write_barrier();
     687             : 
     688     2494280 :     ioh->op = PGAIO_OP_INVALID;
     689     2494280 :     ioh->target = PGAIO_TID_INVALID;
     690     2494280 :     ioh->flags = 0;
     691     2494280 :     ioh->num_callbacks = 0;
     692     2494280 :     ioh->handle_data_len = 0;
     693     2494280 :     ioh->report_return = NULL;
     694     2494280 :     ioh->result = 0;
     695     2494280 :     ioh->distilled_result.status = PGAIO_RS_UNKNOWN;
     696             : 
     697             :     /*
     698             :      * We push the IO to the head of the idle IO list, that seems more cache
     699             :      * efficient in cases where only a few IOs are used.
     700             :      */
     701     2494280 :     dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node);
     702     2494280 : }
     703             : 
     704             : /*
     705             :  * Wait for an IO handle to become usable.
     706             :  *
     707             :  * This only really is useful for pgaio_io_acquire().
     708             :  */
     709             : static void
     710        5330 : pgaio_io_wait_for_free(void)
     711             : {
     712        5330 :     int         reclaimed = 0;
     713             : 
     714        5330 :     pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %d in-flight, %d idle IOs",
     715             :                 pgaio_my_backend->num_staged_ios,
     716             :                 dclist_count(&pgaio_my_backend->in_flight_ios),
     717             :                 dclist_is_empty(&pgaio_my_backend->idle_ios));
     718             : 
     719             :     /*
     720             :      * First check if any of our IOs actually have completed - when using
     721             :      * worker, that'll often be the case. We could do so as part of the loop
     722             :      * below, but that'd potentially lead us to wait for some IO submitted
     723             :      * before.
     724             :      */
     725       10660 :     for (int i = 0; i < io_max_concurrency; i++)
     726             :     {
     727        5330 :         PgAioHandle *ioh = &pgaio_ctl->io_handles[pgaio_my_backend->io_handle_off + i];
     728             : 
     729        5330 :         if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
     730             :         {
     731        4250 :             pgaio_io_reclaim(ioh);
     732        4250 :             reclaimed++;
     733             :         }
     734             :     }
     735             : 
     736        5330 :     if (reclaimed > 0)
     737        4250 :         return;
     738             : 
     739             :     /*
     740             :      * If we have any unsubmitted IOs, submit them now. We'll start waiting in
     741             :      * a second, so it's better they're in flight. This also addresses the
     742             :      * edge-case that all IOs are unsubmitted.
     743             :      */
     744        1080 :     if (pgaio_my_backend->num_staged_ios > 0)
     745           0 :         pgaio_submit_staged();
     746             : 
     747        1080 :     if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0)
     748           0 :         ereport(ERROR,
     749             :                 errmsg_internal("no free IOs despite no in-flight IOs"),
     750             :                 errdetail_internal("%d pending, %d in-flight, %d idle IOs",
     751             :                                    pgaio_my_backend->num_staged_ios,
     752             :                                    dclist_count(&pgaio_my_backend->in_flight_ios),
     753             :                                    dclist_is_empty(&pgaio_my_backend->idle_ios)));
     754             : 
     755             :     /*
     756             :      * Wait for the oldest in-flight IO to complete.
     757             :      *
     758             :      * XXX: Reusing the general IO wait is suboptimal, we don't need to wait
     759             :      * for that specific IO to complete, we just need *any* IO to complete.
     760             :      */
     761             :     {
     762        1080 :         PgAioHandle *ioh = dclist_head_element(PgAioHandle, node,
     763             :                                                &pgaio_my_backend->in_flight_ios);
     764             : 
     765        1080 :         switch (ioh->state)
     766             :         {
     767             :                 /* should not be in in-flight list */
     768           0 :             case PGAIO_HS_IDLE:
     769             :             case PGAIO_HS_DEFINED:
     770             :             case PGAIO_HS_HANDED_OUT:
     771             :             case PGAIO_HS_STAGED:
     772             :             case PGAIO_HS_COMPLETED_LOCAL:
     773           0 :                 elog(ERROR, "shouldn't get here with io:%d in state %d",
     774             :                      pgaio_io_get_id(ioh), ioh->state);
     775             :                 break;
     776             : 
     777        1076 :             case PGAIO_HS_COMPLETED_IO:
     778             :             case PGAIO_HS_SUBMITTED:
     779        1076 :                 pgaio_debug_io(DEBUG2, ioh,
     780             :                                "waiting for free io with %d in flight",
     781             :                                dclist_count(&pgaio_my_backend->in_flight_ios));
     782             : 
     783             :                 /*
     784             :                  * In a more general case this would be racy, because the
     785             :                  * generation could increase after we read ioh->state above.
     786             :                  * But we are only looking at IOs by the current backend and
     787             :                  * the IO can only be recycled by this backend.
     788             :                  */
     789        1076 :                 pgaio_io_wait(ioh, ioh->generation);
     790        1076 :                 break;
     791             : 
     792           4 :             case PGAIO_HS_COMPLETED_SHARED:
     793             :                 /* it's possible that another backend just finished this IO */
     794           4 :                 pgaio_io_reclaim(ioh);
     795           4 :                 break;
     796             :         }
     797             : 
     798        1080 :         if (dclist_count(&pgaio_my_backend->idle_ios) == 0)
     799           0 :             elog(PANIC, "no idle IO after waiting for IO to terminate");
     800        1080 :         return;
     801             :     }
     802             : }
     803             : 
     804             : /*
     805             :  * Internal - code outside of AIO should never need this and it'd be hard for
     806             :  * such code to be safe.
     807             :  */
     808             : static PgAioHandle *
     809     1696184 : pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation)
     810             : {
     811             :     PgAioHandle *ioh;
     812             : 
     813             :     Assert(iow->aio_index < pgaio_ctl->io_handle_count);
     814             : 
     815     1696184 :     ioh = &pgaio_ctl->io_handles[iow->aio_index];
     816             : 
     817     1696184 :     *ref_generation = ((uint64) iow->generation_upper) << 32 |
     818     1696184 :         iow->generation_lower;
     819             : 
     820             :     Assert(*ref_generation != 0);
     821             : 
     822     1696184 :     return ioh;
     823             : }
     824             : 
     825             : static const char *
     826       14286 : pgaio_io_state_get_name(PgAioHandleState s)
     827             : {
     828             : #define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym
     829       14286 :     switch (s)
     830             :     {
     831           0 :             PGAIO_HS_TOSTR_CASE(IDLE);
     832        4756 :             PGAIO_HS_TOSTR_CASE(HANDED_OUT);
     833        2378 :             PGAIO_HS_TOSTR_CASE(DEFINED);
     834        2378 :             PGAIO_HS_TOSTR_CASE(STAGED);
     835           0 :             PGAIO_HS_TOSTR_CASE(SUBMITTED);
     836        2378 :             PGAIO_HS_TOSTR_CASE(COMPLETED_IO);
     837        2396 :             PGAIO_HS_TOSTR_CASE(COMPLETED_SHARED);
     838           0 :             PGAIO_HS_TOSTR_CASE(COMPLETED_LOCAL);
     839             :     }
     840             : #undef PGAIO_HS_TOSTR_CASE
     841             : 
     842           0 :     return NULL;                /* silence compiler */
     843             : }
     844             : 
     845             : const char *
     846       14286 : pgaio_io_get_state_name(PgAioHandle *ioh)
     847             : {
     848       14286 :     return pgaio_io_state_get_name(ioh->state);
     849             : }
     850             : 
     851             : const char *
     852        4756 : pgaio_result_status_string(PgAioResultStatus rs)
     853             : {
     854        4756 :     switch (rs)
     855             :     {
     856           0 :         case PGAIO_RS_UNKNOWN:
     857           0 :             return "UNKNOWN";
     858        4396 :         case PGAIO_RS_OK:
     859        4396 :             return "OK";
     860         136 :         case PGAIO_RS_WARNING:
     861         136 :             return "WARNING";
     862          40 :         case PGAIO_RS_PARTIAL:
     863          40 :             return "PARTIAL";
     864         184 :         case PGAIO_RS_ERROR:
     865         184 :             return "ERROR";
     866             :     }
     867             : 
     868           0 :     return NULL;                /* silence compiler */
     869             : }
     870             : 
     871             : 
     872             : 
     873             : /* --------------------------------------------------------------------------------
     874             :  * Functions primarily related to IO Wait References
     875             :  * --------------------------------------------------------------------------------
     876             :  */
     877             : 
     878             : /*
     879             :  * Mark a wait reference as invalid
     880             :  */
     881             : void
     882    25765548 : pgaio_wref_clear(PgAioWaitRef *iow)
     883             : {
     884    25765548 :     iow->aio_index = PG_UINT32_MAX;
     885    25765548 : }
     886             : 
     887             : /* Is the wait reference valid? */
     888             : bool
     889     5082958 : pgaio_wref_valid(PgAioWaitRef *iow)
     890             : {
     891     5082958 :     return iow->aio_index != PG_UINT32_MAX;
     892             : }
     893             : 
     894             : /*
     895             :  * Similar to pgaio_io_get_id(), just for wait references.
     896             :  */
     897             : int
     898           0 : pgaio_wref_get_id(PgAioWaitRef *iow)
     899             : {
     900             :     Assert(pgaio_wref_valid(iow));
     901           0 :     return iow->aio_index;
     902             : }
     903             : 
     904             : /*
     905             :  * Wait for the IO to have completed. Can be called in any process, not just
     906             :  * in the issuing backend.
     907             :  */
     908             : void
     909      524632 : pgaio_wref_wait(PgAioWaitRef *iow)
     910             : {
     911             :     uint64      ref_generation;
     912             :     PgAioHandle *ioh;
     913             : 
     914      524632 :     ioh = pgaio_io_from_wref(iow, &ref_generation);
     915             : 
     916      524632 :     pgaio_io_wait(ioh, ref_generation);
     917      524632 : }
     918             : 
     919             : /*
     920             :  * Check if the referenced IO completed, without blocking.
     921             :  */
     922             : bool
     923     1171552 : pgaio_wref_check_done(PgAioWaitRef *iow)
     924             : {
     925             :     uint64      ref_generation;
     926             :     PgAioHandleState state;
     927             :     bool        am_owner;
     928             :     PgAioHandle *ioh;
     929             : 
     930     1171552 :     ioh = pgaio_io_from_wref(iow, &ref_generation);
     931             : 
     932     1171552 :     if (pgaio_io_was_recycled(ioh, ref_generation, &state))
     933           0 :         return true;
     934             : 
     935     1171552 :     if (state == PGAIO_HS_IDLE)
     936           0 :         return true;
     937             : 
     938     1171552 :     am_owner = ioh->owner_procno == MyProcNumber;
     939             : 
     940     1171552 :     if (state == PGAIO_HS_COMPLETED_SHARED ||
     941      519860 :         state == PGAIO_HS_COMPLETED_LOCAL)
     942             :     {
     943      651692 :         if (am_owner)
     944      651692 :             pgaio_io_reclaim(ioh);
     945      651692 :         return true;
     946             :     }
     947             : 
     948             :     /*
     949             :      * XXX: It likely would be worth checking in with the io method, to give
     950             :      * the IO method a chance to check if there are completion events queued.
     951             :      */
     952             : 
     953      519860 :     return false;
     954             : }
     955             : 
     956             : 
     957             : 
     958             : /* --------------------------------------------------------------------------------
     959             :  * Actions on multiple IOs.
     960             :  * --------------------------------------------------------------------------------
     961             :  */
     962             : 
     963             : /*
     964             :  * Submit IOs in batches going forward.
     965             :  *
     966             :  * Submitting multiple IOs at once can be substantially faster than doing so
     967             :  * one-by-one. At the same time, submitting multiple IOs at once requires more
     968             :  * care to avoid deadlocks.
     969             :  *
     970             :  * Consider backend A staging an IO for buffer 1 and then trying to start IO
     971             :  * on buffer 2, while backend B does the inverse. If A submitted the IO before
     972             :  * moving on to buffer 2, this works just fine, B will wait for the IO to
     973             :  * complete. But if batching were used, each backend will wait for IO that has
     974             :  * not yet been submitted to complete, i.e. forever.
     975             :  *
     976             :  * End batch submission mode with pgaio_exit_batchmode().  (Throwing errors is
     977             :  * allowed; error recovery will end the batch.)
     978             :  *
     979             :  * To avoid deadlocks, code needs to ensure that it will not wait for another
     980             :  * backend while there is unsubmitted IO. E.g. by using conditional lock
     981             :  * acquisition when acquiring buffer locks. To check if there currently are
     982             :  * staged IOs, call pgaio_have_staged() and to submit all staged IOs call
     983             :  * pgaio_submit_staged().
     984             :  *
     985             :  * It is not allowed to enter batchmode while already in batchmode, it's
     986             :  * unlikely to ever be needed, as code needs to be explicitly aware of being
     987             :  * called in batchmode, to avoid the deadlock risks explained above.
     988             :  *
     989             :  * Note that IOs may get submitted before pgaio_exit_batchmode() is called,
     990             :  * e.g. because too many IOs have been staged or because pgaio_submit_staged()
     991             :  * was called.
     992             :  */
     993             : void
     994     5449194 : pgaio_enter_batchmode(void)
     995             : {
     996     5449194 :     if (pgaio_my_backend->in_batchmode)
     997           0 :         elog(ERROR, "starting batch while batch already in progress");
     998     5449194 :     pgaio_my_backend->in_batchmode = true;
     999     5449194 : }
    1000             : 
    1001             : /*
    1002             :  * Stop submitting IOs in batches.
    1003             :  */
    1004             : void
    1005     5449174 : pgaio_exit_batchmode(void)
    1006             : {
    1007             :     Assert(pgaio_my_backend->in_batchmode);
    1008             : 
    1009     5449174 :     pgaio_submit_staged();
    1010     5449174 :     pgaio_my_backend->in_batchmode = false;
    1011     5449174 : }
    1012             : 
    1013             : /*
    1014             :  * Are there staged but unsubmitted IOs?
    1015             :  *
    1016             :  * See comment above pgaio_enter_batchmode() for why code may need to check if
    1017             :  * there is IO in that state.
    1018             :  */
    1019             : bool
    1020     2494084 : pgaio_have_staged(void)
    1021             : {
    1022             :     Assert(pgaio_my_backend->in_batchmode ||
    1023             :            pgaio_my_backend->num_staged_ios == 0);
    1024     2494084 :     return pgaio_my_backend->num_staged_ios > 0;
    1025             : }
    1026             : 
    1027             : /*
    1028             :  * Submit all staged but not yet submitted IOs.
    1029             :  *
    1030             :  * Unless in batch mode, this never needs to be called, as IOs get submitted
    1031             :  * as soon as possible. While in batchmode pgaio_submit_staged() can be called
    1032             :  * before waiting on another backend, to avoid the risk of deadlocks. See
    1033             :  * pgaio_enter_batchmode().
    1034             :  */
    1035             : void
    1036     5498778 : pgaio_submit_staged(void)
    1037             : {
    1038     5498778 :     int         total_submitted = 0;
    1039             :     int         did_submit;
    1040             : 
    1041     5498778 :     if (pgaio_my_backend->num_staged_ios == 0)
    1042     4322994 :         return;
    1043             : 
    1044             : 
    1045     1175784 :     START_CRIT_SECTION();
    1046             : 
    1047     1175784 :     did_submit = pgaio_method_ops->submit(pgaio_my_backend->num_staged_ios,
    1048     1175784 :                                           pgaio_my_backend->staged_ios);
    1049             : 
    1050     1175784 :     END_CRIT_SECTION();
    1051             : 
    1052     1175784 :     total_submitted += did_submit;
    1053             : 
    1054             :     Assert(total_submitted == did_submit);
    1055             : 
    1056     1175784 :     pgaio_my_backend->num_staged_ios = 0;
    1057             : 
    1058     1175784 :     pgaio_debug(DEBUG4,
    1059             :                 "aio: submitted %d IOs",
    1060             :                 total_submitted);
    1061             : }
    1062             : 
    1063             : 
    1064             : 
    1065             : /* --------------------------------------------------------------------------------
    1066             :  * Other
    1067             :  * --------------------------------------------------------------------------------
    1068             :  */
    1069             : 
    1070             : 
    1071             : /*
    1072             :  * Perform AIO related cleanup after an error.
    1073             :  *
    1074             :  * This should be called early in the error recovery paths, as later steps may
    1075             :  * need to issue AIO (e.g. to record a transaction abort WAL record).
    1076             :  */
    1077             : void
    1078       58346 : pgaio_error_cleanup(void)
    1079             : {
    1080             :     /*
    1081             :      * It is possible that code errored out after pgaio_enter_batchmode() but
    1082             :      * before pgaio_exit_batchmode() was called. In that case we need to
    1083             :      * submit the IO now.
    1084             :      */
    1085       58346 :     if (pgaio_my_backend->in_batchmode)
    1086             :     {
    1087          20 :         pgaio_my_backend->in_batchmode = false;
    1088             : 
    1089          20 :         pgaio_submit_staged();
    1090             :     }
    1091             : 
    1092             :     /*
    1093             :      * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
    1094             :      */
    1095             :     Assert(pgaio_my_backend->num_staged_ios == 0);
    1096       58346 : }
    1097             : 
    1098             : /*
    1099             :  * Perform AIO related checks at (sub-)transactional boundaries.
    1100             :  *
    1101             :  * This should be called late during (sub-)transactional commit/abort, after
    1102             :  * all steps that might need to perform AIO, so that we can verify that the
    1103             :  * AIO subsystem is in a valid state at the end of a transaction.
    1104             :  */
    1105             : void
    1106      908238 : AtEOXact_Aio(bool is_commit)
    1107             : {
    1108             :     /*
    1109             :      * We should never be in batch mode at transactional boundaries. In case
    1110             :      * an error was thrown while in batch mode, pgaio_error_cleanup() should
    1111             :      * have exited batchmode.
    1112             :      *
    1113             :      * In case we are in batchmode somehow, make sure to submit all staged
    1114             :      * IOs, other backends may need them to complete to continue.
    1115             :      */
    1116      908238 :     if (pgaio_my_backend->in_batchmode)
    1117             :     {
    1118           8 :         pgaio_error_cleanup();
    1119           8 :         elog(WARNING, "open AIO batch at end of (sub-)transaction");
    1120             :     }
    1121             : 
    1122             :     /*
    1123             :      * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
    1124             :      */
    1125             :     Assert(pgaio_my_backend->num_staged_ios == 0);
    1126      908238 : }
    1127             : 
    1128             : /*
    1129             :  * Need to submit staged but not yet submitted IOs using the fd, otherwise
    1130             :  * the IO would end up targeting something bogus.
    1131             :  */
    1132             : void
    1133    16509658 : pgaio_closing_fd(int fd)
    1134             : {
    1135             :     /*
    1136             :      * Might be called before AIO is initialized or in a subprocess that
    1137             :      * doesn't use AIO.
    1138             :      */
    1139    16509658 :     if (!pgaio_my_backend)
    1140       13634 :         return;
    1141             : 
    1142             :     /*
    1143             :      * For now just submit all staged IOs - we could be more selective, but
    1144             :      * it's probably not worth it.
    1145             :      */
    1146    16496024 :     if (pgaio_my_backend->num_staged_ios > 0)
    1147             :     {
    1148           4 :         pgaio_debug(DEBUG2,
    1149             :                     "submitting %d IOs before FD %d gets closed",
    1150             :                     pgaio_my_backend->num_staged_ios, fd);
    1151           4 :         pgaio_submit_staged();
    1152             :     }
    1153             : 
    1154             :     /*
    1155             :      * If requested by the IO method, wait for all IOs that use the
    1156             :      * to-be-closed FD.
    1157             :      */
    1158    16496024 :     if (pgaio_method_ops->wait_on_fd_before_close)
    1159             :     {
    1160             :         /*
    1161             :          * As waiting for one IO to complete may complete multiple IOs, we
    1162             :          * can't just use a mutable list iterator. The maximum number of
    1163             :          * in-flight IOs is fairly small, so just restart the loop after
    1164             :          * waiting for an IO.
    1165             :          */
    1166           0 :         while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
    1167             :         {
    1168             :             dlist_iter  iter;
    1169           0 :             PgAioHandle *ioh = NULL;
    1170             : 
    1171           0 :             dclist_foreach(iter, &pgaio_my_backend->in_flight_ios)
    1172             :             {
    1173           0 :                 ioh = dclist_container(PgAioHandle, node, iter.cur);
    1174             : 
    1175           0 :                 if (pgaio_io_uses_fd(ioh, fd))
    1176           0 :                     break;
    1177             :                 else
    1178           0 :                     ioh = NULL;
    1179             :             }
    1180             : 
    1181           0 :             if (!ioh)
    1182           0 :                 break;
    1183             : 
    1184           0 :             pgaio_debug_io(DEBUG2, ioh,
    1185             :                            "waiting for IO before FD %d gets closed, %d in-flight IOs",
    1186             :                            fd, dclist_count(&pgaio_my_backend->in_flight_ios));
    1187             : 
    1188             :             /* see comment in pgaio_io_wait_for_free() about raciness */
    1189           0 :             pgaio_io_wait(ioh, ioh->generation);
    1190             :         }
    1191             :     }
    1192             : }
    1193             : 
    1194             : /*
    1195             :  * Registered as before_shmem_exit() callback in pgaio_init_backend()
    1196             :  */
    1197             : void
    1198       40070 : pgaio_shutdown(int code, Datum arg)
    1199             : {
    1200             :     Assert(pgaio_my_backend);
    1201             :     Assert(!pgaio_my_backend->handed_out_io);
    1202             : 
    1203             :     /* first clean up resources as we would at a transaction boundary */
    1204       40070 :     AtEOXact_Aio(code == 0);
    1205             : 
    1206             :     /*
    1207             :      * Before exiting, make sure that all IOs are finished. That has two main
    1208             :      * purposes:
    1209             :      *
    1210             :      * - Some kernel-level AIO mechanisms don't deal well with the issuer of
    1211             :      * an AIO exiting before IO completed
    1212             :      *
    1213             :      * - It'd be confusing to see partially finished IOs in stats views etc
    1214             :      */
    1215       40088 :     while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
    1216             :     {
    1217          18 :         PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios);
    1218             : 
    1219          18 :         pgaio_debug_io(DEBUG2, ioh,
    1220             :                        "waiting for IO to complete during shutdown, %d in-flight IOs",
    1221             :                        dclist_count(&pgaio_my_backend->in_flight_ios));
    1222             : 
    1223             :         /* see comment in pgaio_io_wait_for_free() about raciness */
    1224          18 :         pgaio_io_wait(ioh, ioh->generation);
    1225             :     }
    1226             : 
    1227       40070 :     pgaio_my_backend = NULL;
    1228       40070 : }
    1229             : 
    1230             : void
    1231        2190 : assign_io_method(int newval, void *extra)
    1232             : {
    1233             :     Assert(pgaio_method_ops_table[newval] != NULL);
    1234             :     Assert(newval < lengthof(io_method_options));
    1235             : 
    1236        2190 :     pgaio_method_ops = pgaio_method_ops_table[newval];
    1237        2190 : }
    1238             : 
    1239             : bool
    1240        4264 : check_io_max_concurrency(int *newval, void **extra, GucSource source)
    1241             : {
    1242        4264 :     if (*newval == -1)
    1243             :     {
    1244             :         /*
    1245             :          * Auto-tuning will be applied later during startup, as auto-tuning
    1246             :          * depends on the value of various GUCs.
    1247             :          */
    1248        2168 :         return true;
    1249             :     }
    1250        2096 :     else if (*newval == 0)
    1251             :     {
    1252           0 :         GUC_check_errdetail("Only -1 or values bigger than 0 are valid.");
    1253           0 :         return false;
    1254             :     }
    1255             : 
    1256        2096 :     return true;
    1257             : }
    1258             : 
    1259             : 
    1260             : 
    1261             : /* --------------------------------------------------------------------------------
    1262             :  * Injection point support
    1263             :  * --------------------------------------------------------------------------------
    1264             :  */
    1265             : 
    1266             : #ifdef USE_INJECTION_POINTS
    1267             : 
    1268             : /*
    1269             :  * Call injection point with support for pgaio_inj_io_get().
    1270             :  */
    1271             : void
    1272     3244930 : pgaio_io_call_inj(PgAioHandle *ioh, const char *injection_point)
    1273             : {
    1274     3244930 :     pgaio_inj_cur_handle = ioh;
    1275             : 
    1276     3244930 :     PG_TRY();
    1277             :     {
    1278     3244930 :         InjectionPointCached(injection_point);
    1279             :     }
    1280           2 :     PG_FINALLY();
    1281             :     {
    1282     3244930 :         pgaio_inj_cur_handle = NULL;
    1283             :     }
    1284     3244930 :     PG_END_TRY();
    1285     3244928 : }
    1286             : 
    1287             : /*
    1288             :  * Return IO associated with injection point invocation. This is only needed
    1289             :  * as injection points currently don't support arguments.
    1290             :  */
    1291             : PgAioHandle *
    1292          96 : pgaio_inj_io_get(void)
    1293             : {
    1294          96 :     return pgaio_inj_cur_handle;
    1295             : }
    1296             : 
    1297             : #endif

Generated by: LCOV version 1.14