--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * method_io_uring.c
+ * AIO - perform AIO using Linux' io_uring
+ *
+ * For now we create one io_uring instance for each backend. These io_uring
+ * instances have to be created in postmaster, during startup, to allow other
+ * backends to process IO completions, if the issuing backend is currently
+ * busy doing other things. Other backends may not use another backend's
+ * io_uring instance to submit IO, that'd require additional locking that
+ * would likely be harmful for performance.
+ *
+ * We likely will want to introduce a backend-local io_uring instance in the
+ * future, e.g. for FE/BE network IO.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/aio/method_io_uring.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+/* included early, for IOMETHOD_IO_URING_ENABLED */
+#include "storage/aio.h"
+
+#ifdef IOMETHOD_IO_URING_ENABLED
+
+#include <liburing.h>
+
+#include "miscadmin.h"
+#include "storage/aio_internal.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/lwlock.h"
+#include "storage/procnumber.h"
+#include "utils/wait_event.h"
+
+
+/* number of completions processed at once */
+#define PGAIO_MAX_LOCAL_COMPLETED_IO 32
+
+
+/* Entry points for IoMethodOps. */
+static size_t pgaio_uring_shmem_size(void);
+static void pgaio_uring_shmem_init(bool first_time);
+static void pgaio_uring_init_backend(void);
+static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
+static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation);
+
+/* helper functions */
+static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe);
+
+
+const IoMethodOps pgaio_uring_ops = {
+ /*
+ * While io_uring mostly is OK with FDs getting closed while the IO is in
+ * flight, that is not true for IOs submitted with IOSQE_ASYNC.
+ *
+ * See
+ * https://postgr.es/m/5ons2rtmwarqqhhexb3dnqulw5rjgwgoct57vpdau4rujlrffj%403fls6d2mkiwc
+ */
+ .wait_on_fd_before_close = true,
+
+ .shmem_size = pgaio_uring_shmem_size,
+ .shmem_init = pgaio_uring_shmem_init,
+ .init_backend = pgaio_uring_init_backend,
+
+ .submit = pgaio_uring_submit,
+ .wait_one = pgaio_uring_wait_one,
+};
+
+/*
+ * Per-backend state when using io_method=io_uring
+ *
+ * Align the whole struct to a cacheline boundary, to prevent false sharing
+ * between completion_lock and prior backend's io_uring_ring.
+ */
+typedef struct pg_attribute_aligned (PG_CACHE_LINE_SIZE)
+PgAioUringContext
+{
+ /*
+ * Multiple backends can process completions for this backend's io_uring
+ * instance (e.g. when the backend issuing IO is busy doing something
+ * else). To make that safe we have to ensure that only a single backend
+ * gets io completions from the io_uring instance at a time.
+ */
+ LWLock completion_lock;
+
+ struct io_uring io_uring_ring;
+} PgAioUringContext;
+
+/* PgAioUringContexts for all backends */
+static PgAioUringContext *pgaio_uring_contexts;
+
+/* the current backend's context */
+static PgAioUringContext *pgaio_my_uring_context;
+
+
+static uint32
+pgaio_uring_procs(void)
+{
+ /*
+ * We can subtract MAX_IO_WORKERS here as io workers are never used at the
+ * same time as io_method=io_uring.
+ */
+ return MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+}
+
+static Size
+pgaio_uring_context_shmem_size(void)
+{
+ return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext));
+}
+
+static size_t
+pgaio_uring_shmem_size(void)
+{
+ return pgaio_uring_context_shmem_size();
+}
+
+static void
+pgaio_uring_shmem_init(bool first_time)
+{
+ int TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+ bool found;
+
+ pgaio_uring_contexts = (PgAioUringContext *)
+ ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found);
+
+ if (found)
+ return;
+
+ for (int contextno = 0; contextno < TotalProcs; contextno++)
+ {
+ PgAioUringContext *context = &pgaio_uring_contexts[contextno];
+ int ret;
+
+ /*
+ * Right now a high TotalProcs will cause problems in two ways:
+ *
+ * - RLIMIT_NOFILE needs to be big enough to allow all
+ * io_uring_queue_init() calls to succeed.
+ *
+ * - RLIMIT_NOFILE needs to be big enough to still have enough file
+ * descriptors to satisfy set_max_safe_fds() left over. Or, even
+ * better, have max_files_per_process left over FDs.
+ *
+ * We probably should adjust the soft RLIMIT_NOFILE to ensure that.
+ *
+ *
+ * XXX: Newer versions of io_uring support sharing the workers that
+ * execute some asynchronous IOs between io_uring instances. It might
+ * be worth using that - also need to evaluate if that causes
+ * noticeable additional contention?
+ */
+ ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0);
+ if (ret < 0)
+ {
+ char *hint = NULL;
+ int err = ERRCODE_INTERNAL_ERROR;
+
+ /* add hints for some failures that errno explains sufficiently */
+ if (-ret == EPERM)
+ {
+ err = ERRCODE_INSUFFICIENT_PRIVILEGE;
+ hint = _("Check if io_uring is disabled via /proc/sys/kernel/io_uring_disabled.");
+ }
+ else if (-ret == EMFILE)
+ {
+ err = ERRCODE_INSUFFICIENT_RESOURCES;
+ hint = psprintf(_("Consider increasing \"ulimit -n\" to at least %d."),
+ TotalProcs + max_files_per_process);
+ }
+ else if (-ret == ENOSYS)
+ {
+ err = ERRCODE_FEATURE_NOT_SUPPORTED;
+ hint = _("Kernel does not support io_uring.");
+ }
+
+ /* update errno to allow %m to work */
+ errno = -ret;
+
+ ereport(ERROR,
+ errcode(err),
+ errmsg("could not setup io_uring queue: %m"),
+ hint != NULL ? errhint("%s", hint) : 0);
+ }
+
+ LWLockInitialize(&context->completion_lock, LWTRANCHE_AIO_URING_COMPLETION);
+ }
+}
+
+static void
+pgaio_uring_init_backend(void)
+{
+ Assert(MyProcNumber < pgaio_uring_procs());
+
+ pgaio_my_uring_context = &pgaio_uring_contexts[MyProcNumber];
+}
+
+static int
+pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
+{
+ struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring;
+ int in_flight_before = dclist_count(&pgaio_my_backend->in_flight_ios);
+
+ Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+ for (int i = 0; i < num_staged_ios; i++)
+ {
+ PgAioHandle *ioh = staged_ios[i];
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(uring_instance);
+
+ if (!sqe)
+ elog(ERROR, "io_uring submission queue is unexpectedly full");
+
+ pgaio_io_prepare_submit(ioh);
+ pgaio_uring_sq_from_io(ioh, sqe);
+
+ /*
+ * io_uring executes IO in process context if possible. That's
+ * generally good, as it reduces context switching. When performing a
+ * lot of buffered IO that means that copying between page cache and
+ * userspace memory happens in the foreground, as it can't be
+ * offloaded to DMA hardware as is possible when using direct IO. When
+ * executing a lot of buffered IO this causes io_uring to be slower
+ * than worker mode, as worker mode parallelizes the copying. io_uring
+ * can be told to offload work to worker threads instead.
+ *
+ * If an IO is buffered IO and we already have IOs in flight or
+ * multiple IOs are being submitted, we thus tell io_uring to execute
+ * the IO in the background. We don't do so for the first few IOs
+ * being submitted as executing in this process' context has lower
+ * latency.
+ */
+ if (in_flight_before > 4 && (ioh->flags & PGAIO_HF_BUFFERED))
+ io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
+
+ in_flight_before++;
+ }
+
+ while (true)
+ {
+ int ret;
+
+ pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_SUBMIT);
+ ret = io_uring_submit(uring_instance);
+ pgstat_report_wait_end();
+
+ if (ret == -EINTR)
+ {
+ pgaio_debug(DEBUG3,
+ "aio method uring: submit EINTR, nios: %d",
+ num_staged_ios);
+ }
+ else if (ret < 0)
+ {
+ /*
+ * The io_uring_enter() manpage suggests that the appropriate
+ * reaction to EAGAIN is:
+ *
+ * "The application should wait for some completions and try
+ * again"
+ *
+ * However, it seems unlikely that that would help in our case, as
+ * we apply a low limit to the number of outstanding IOs and thus
+ * also outstanding completions, making it unlikely that we'd get
+ * EAGAIN while the OS is in good working order.
+ *
+ * Additionally, it would be problematic to just wait here, our
+ * caller might hold critical locks. It'd possibly lead to
+ * delaying the crash-restart that seems likely to occur when the
+ * kernel is under such heavy memory pressure.
+ *
+ * Update errno to allow %m to work.
+ */
+ errno = -ret;
+ elog(PANIC, "io_uring submit failed: %m");
+ }
+ else if (ret != num_staged_ios)
+ {
+ /* likely unreachable, but if it is, we would need to re-submit */
+ elog(PANIC, "io_uring submit submitted only %d of %d",
+ ret, num_staged_ios);
+ }
+ else
+ {
+ pgaio_debug(DEBUG4,
+ "aio method uring: submitted %d IOs",
+ num_staged_ios);
+ break;
+ }
+ }
+
+ return num_staged_ios;
+}
+
+static void
+pgaio_uring_drain_locked(PgAioUringContext *context)
+{
+ int ready;
+ int orig_ready;
+
+ Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE));
+
+ /*
+ * Don't drain more events than available right now. Otherwise it's
+ * plausible that one backend could get stuck, for a while, receiving CQEs
+ * without actually processing them.
+ */
+ orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring);
+
+ while (ready > 0)
+ {
+ struct io_uring_cqe *cqes[PGAIO_MAX_LOCAL_COMPLETED_IO];
+ uint32 ncqes;
+
+ START_CRIT_SECTION();
+ ncqes =
+ io_uring_peek_batch_cqe(&context->io_uring_ring,
+ cqes,
+ Min(PGAIO_MAX_LOCAL_COMPLETED_IO, ready));
+ Assert(ncqes <= ready);
+
+ ready -= ncqes;
+
+ for (int i = 0; i < ncqes; i++)
+ {
+ struct io_uring_cqe *cqe = cqes[i];
+ PgAioHandle *ioh;
+
+ ioh = io_uring_cqe_get_data(cqe);
+ io_uring_cqe_seen(&context->io_uring_ring, cqe);
+
+ pgaio_io_process_completion(ioh, cqe->res);
+ }
+
+ END_CRIT_SECTION();
+
+ pgaio_debug(DEBUG3,
+ "drained %d/%d, now expecting %d",
+ ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
+ }
+}
+
+static void
+pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation)
+{
+ PgAioHandleState state;
+ ProcNumber owner_procno = ioh->owner_procno;
+ PgAioUringContext *owner_context = &pgaio_uring_contexts[owner_procno];
+ bool expect_cqe;
+ int waited = 0;
+
+ /*
+ * XXX: It would be nice to have a smarter locking scheme, nearly all the
+ * time the backend owning the ring will consume the completions, making
+ * the locking unnecessarily expensive.
+ */
+ LWLockAcquire(&owner_context->completion_lock, LW_EXCLUSIVE);
+
+ while (true)
+ {
+ pgaio_debug_io(DEBUG3, ioh,
+ "wait_one io_gen: %llu, ref_gen: %llu, cycle %d",
+ (long long unsigned) ioh->generation,
+ (long long unsigned) ref_generation,
+ waited);
+
+ if (pgaio_io_was_recycled(ioh, ref_generation, &state) ||
+ state != PGAIO_HS_SUBMITTED)
+ {
+ /* the IO was completed by another backend */
+ break;
+ }
+ else if (io_uring_cq_ready(&owner_context->io_uring_ring))
+ {
+ /* no need to wait in the kernel, io_uring has a completion */
+ expect_cqe = true;
+ }
+ else
+ {
+ int ret;
+ struct io_uring_cqe *cqes;
+
+ /* need to wait in the kernel */
+ pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_EXECUTION);
+ ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL);
+ pgstat_report_wait_end();
+
+ if (ret == -EINTR)
+ {
+ continue;
+ }
+ else if (ret != 0)
+ {
+ /* see comment after io_uring_submit() */
+ errno = -ret;
+ elog(PANIC, "io_uring wait failed: %m");
+ }
+ else
+ {
+ Assert(cqes != NULL);
+ expect_cqe = true;
+ waited++;
+ }
+ }
+
+ if (expect_cqe)
+ {
+ pgaio_uring_drain_locked(owner_context);
+ }
+ }
+
+ LWLockRelease(&owner_context->completion_lock);
+
+ pgaio_debug(DEBUG3,
+ "wait_one with %d sleeps",
+ waited);
+}
+
+static void
+pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
+{
+ struct iovec *iov;
+
+ switch (ioh->op)
+ {
+ case PGAIO_OP_READV:
+ iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+ if (ioh->op_data.read.iov_length == 1)
+ {
+ io_uring_prep_read(sqe,
+ ioh->op_data.read.fd,
+ iov->iov_base,
+ iov->iov_len,
+ ioh->op_data.read.offset);
+ }
+ else
+ {
+ io_uring_prep_readv(sqe,
+ ioh->op_data.read.fd,
+ iov,
+ ioh->op_data.read.iov_length,
+ ioh->op_data.read.offset);
+
+ }
+ break;
+
+ case PGAIO_OP_WRITEV:
+ iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+ if (ioh->op_data.write.iov_length == 1)
+ {
+ io_uring_prep_write(sqe,
+ ioh->op_data.write.fd,
+ iov->iov_base,
+ iov->iov_len,
+ ioh->op_data.write.offset);
+ }
+ else
+ {
+ io_uring_prep_writev(sqe,
+ ioh->op_data.write.fd,
+ iov,
+ ioh->op_data.write.iov_length,
+ ioh->op_data.write.offset);
+ }
+ break;
+
+ case PGAIO_OP_INVALID:
+ elog(ERROR, "trying to prepare invalid IO operation for execution");
+ }
+
+ io_uring_sqe_set_data(sqe, ioh);
+}
+
+#endif /* IOMETHOD_IO_URING_ENABLED */