Add kqueue(2) support to the WaitEventSet API.
authorThomas Munro <tmunro@postgresql.org>
Wed, 5 Feb 2020 04:35:57 +0000 (17:35 +1300)
committerThomas Munro <tmunro@postgresql.org>
Wed, 5 Feb 2020 04:35:57 +0000 (17:35 +1300)
Use kevent(2) to wait for events on the BSD family of operating
systems and macOS.  This is similar to the epoll(2) support added
for Linux by commit 98a64d0bd.

Author: Thomas Munro
Reviewed-by: Andres Freund, Marko Tiikkaja, Tom Lane
Tested-by: Mateusz Guzik, Matteo Beccati, Keith Fiske, Heikki Linnakangas, Michael Paquier, Peter Eisentraut, Rui DeSousa, Tom Lane, Mark Wong
Discussion: https://postgr.es/m/CAEepm%3D37oF84-iXDTQ9MrGjENwVGds%2B5zTr38ca73kWR7ez_tA%40mail.gmail.com

configure
configure.in
src/backend/storage/ipc/latch.c
src/include/pg_config.h.in
src/tools/msvc/Solution.pm

index 702adba8390aedb877bc78ebf4600d99559a4730..59e181a88555a35577444043b17e93b7fab9da69 100755 (executable)
--- a/configure
+++ b/configure
@@ -12760,7 +12760,7 @@ $as_echo "#define HAVE_STDBOOL_H 1" >>confdefs.h
 fi
 
 
-for ac_header in atomic.h copyfile.h execinfo.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/ipc.h sys/prctl.h sys/procctl.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
+for ac_header in atomic.h copyfile.h execinfo.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/event.h sys/ipc.h sys/prctl.h sys/procctl.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
 do :
   as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh`
 ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default"
@@ -14996,7 +14996,7 @@ fi
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-for ac_func in backtrace_symbols cbrt clock_gettime copyfile fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memset_s memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open strchrnul strsignal symlink sync_file_range uselocale utime utimes wcstombs_l
+for ac_func in backtrace_symbols cbrt clock_gettime copyfile fdatasync getifaddrs getpeerucred getrlimit kqueue mbstowcs_l memset_s memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open strchrnul strsignal symlink sync_file_range uselocale utime utimes wcstombs_l
 do :
   as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
 ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
index 8165f700398b15493456b518a94cc2b7d659add6..57346bc89bd2badb670fea59d02a01474fa69058 100644 (file)
@@ -1288,6 +1288,7 @@ AC_CHECK_HEADERS(m4_normalize([
    mbarrier.h
    poll.h
    sys/epoll.h
+   sys/event.h
    sys/ipc.h
    sys/prctl.h
    sys/procctl.h
@@ -1628,6 +1629,7 @@ AC_CHECK_FUNCS(m4_normalize([
    getifaddrs
    getpeerucred
    getrlimit
+   kqueue
    mbstowcs_l
    memset_s
    memmove
index d677ffbda7add1c9820b49cd393e886dde272f4f..cbd495225ca4b6c1153a657e74f7cf6e0fe2fa82 100644 (file)
@@ -39,6 +39,9 @@
 #ifdef HAVE_SYS_EPOLL_H
 #include <sys/epoll.h>
 #endif
+#ifdef HAVE_SYS_EVENT_H
+#include <sys/event.h>
+#endif
 #ifdef HAVE_POLL_H
 #include <poll.h>
 #endif
  * define somewhere before this block.
  */
 #if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \
-   defined(WAIT_USE_WIN32)
+   defined(WAIT_USE_KQUEUE) || defined(WAIT_USE_WIN32)
 /* don't overwrite manual choice */
 #elif defined(HAVE_SYS_EPOLL_H)
 #define WAIT_USE_EPOLL
+#elif defined(HAVE_KQUEUE)
+#define WAIT_USE_KQUEUE
 #elif defined(HAVE_POLL)
 #define WAIT_USE_POLL
 #elif WIN32
@@ -104,6 +109,11 @@ struct WaitEventSet
    int         epoll_fd;
    /* epoll_wait returns events in a user provided arrays, allocate once */
    struct epoll_event *epoll_ret_events;
+#elif defined(WAIT_USE_KQUEUE)
+   int         kqueue_fd;
+   /* kevent returns events in a user provided arrays, allocate once */
+   struct kevent *kqueue_ret_events;
+   bool        report_postmaster_not_running;
 #elif defined(WAIT_USE_POLL)
    /* poll expects events to be waited on every poll() call, prepare once */
    struct pollfd *pollfds;
@@ -136,6 +146,8 @@ static void drainSelfPipe(void);
 
 #if defined(WAIT_USE_EPOLL)
 static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
+#elif defined(WAIT_USE_KQUEUE)
+static void WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events);
 #elif defined(WAIT_USE_POLL)
 static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
 #elif defined(WAIT_USE_WIN32)
@@ -556,6 +568,8 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 
 #if defined(WAIT_USE_EPOLL)
    sz += MAXALIGN(sizeof(struct epoll_event) * nevents);
+#elif defined(WAIT_USE_KQUEUE)
+   sz += MAXALIGN(sizeof(struct kevent) * nevents);
 #elif defined(WAIT_USE_POLL)
    sz += MAXALIGN(sizeof(struct pollfd) * nevents);
 #elif defined(WAIT_USE_WIN32)
@@ -574,6 +588,9 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 #if defined(WAIT_USE_EPOLL)
    set->epoll_ret_events = (struct epoll_event *) data;
    data += MAXALIGN(sizeof(struct epoll_event) * nevents);
+#elif defined(WAIT_USE_KQUEUE)
+   set->kqueue_ret_events = (struct kevent *) data;
+   data += MAXALIGN(sizeof(struct kevent) * nevents);
 #elif defined(WAIT_USE_POLL)
    set->pollfds = (struct pollfd *) data;
    data += MAXALIGN(sizeof(struct pollfd) * nevents);
@@ -599,6 +616,13 @@ CreateWaitEventSet(MemoryContext context, int nevents)
    if (fcntl(set->epoll_fd, F_SETFD, FD_CLOEXEC) == -1)
        elog(ERROR, "fcntl(F_SETFD) failed on epoll descriptor: %m");
 #endif                         /* EPOLL_CLOEXEC */
+#elif defined(WAIT_USE_KQUEUE)
+   set->kqueue_fd = kqueue();
+   if (set->kqueue_fd < 0)
+       elog(ERROR, "kqueue failed: %m");
+   if (fcntl(set->kqueue_fd, F_SETFD, FD_CLOEXEC) == -1)
+       elog(ERROR, "fcntl(F_SETFD) failed on kqueue descriptor: %m");
+   set->report_postmaster_not_running = false;
 #elif defined(WAIT_USE_WIN32)
 
    /*
@@ -631,6 +655,8 @@ FreeWaitEventSet(WaitEventSet *set)
 {
 #if defined(WAIT_USE_EPOLL)
    close(set->epoll_fd);
+#elif defined(WAIT_USE_KQUEUE)
+   close(set->kqueue_fd);
 #elif defined(WAIT_USE_WIN32)
    WaitEvent  *cur_event;
 
@@ -747,6 +773,8 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
    /* perform wait primitive specific initialization, if needed */
 #if defined(WAIT_USE_EPOLL)
    WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
+#elif defined(WAIT_USE_KQUEUE)
+   WaitEventAdjustKqueue(set, event, 0);
 #elif defined(WAIT_USE_POLL)
    WaitEventAdjustPoll(set, event);
 #elif defined(WAIT_USE_WIN32)
@@ -766,10 +794,16 @@ void
 ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 {
    WaitEvent  *event;
+#if defined(WAIT_USE_KQUEUE)
+   int         old_events;
+#endif
 
    Assert(pos < set->nevents);
 
    event = &set->events[pos];
+#if defined(WAIT_USE_KQUEUE)
+   old_events = event->events;
+#endif
 
    /*
     * If neither the event mask nor the associated latch changes, return
@@ -803,6 +837,8 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 
 #if defined(WAIT_USE_EPOLL)
    WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
+#elif defined(WAIT_USE_KQUEUE)
+   WaitEventAdjustKqueue(set, event, old_events);
 #elif defined(WAIT_USE_POLL)
    WaitEventAdjustPoll(set, event);
 #elif defined(WAIT_USE_WIN32)
@@ -895,6 +931,131 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 }
 #endif
 
+#if defined(WAIT_USE_KQUEUE)
+
+/*
+ * On most BSD family systems, the udata member of struct kevent is of type
+ * void *, so we could directly convert to/from WaitEvent *.  Unfortunately,
+ * NetBSD has it as intptr_t, so here we wallpaper over that difference with
+ * an lvalue cast.
+ */
+#define AccessWaitEvent(k_ev) (*((WaitEvent **)(&(k_ev)->udata)))
+
+static inline void
+WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action,
+                        WaitEvent *event)
+{
+   k_ev->ident = event->fd;
+   k_ev->filter = filter;
+   k_ev->flags = action | EV_CLEAR;
+   k_ev->fflags = 0;
+   k_ev->data = 0;
+   AccessWaitEvent(k_ev) = event;
+}
+
+static inline void
+WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event)
+{
+   /* For now postmaster death can only be added, not removed. */
+   k_ev->ident = PostmasterPid;
+   k_ev->filter = EVFILT_PROC;
+   k_ev->flags = EV_ADD | EV_CLEAR;
+   k_ev->fflags = NOTE_EXIT;
+   k_ev->data = 0;
+   AccessWaitEvent(k_ev) = event;
+}
+
+/*
+ * old_events is the previous event mask, used to compute what has changed.
+ */
+static void
+WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
+{
+   int         rc;
+   struct kevent k_ev[2];
+   int         count = 0;
+   bool        new_filt_read = false;
+   bool        old_filt_read = false;
+   bool        new_filt_write = false;
+   bool        old_filt_write = false;
+
+   if (old_events == event->events)
+       return;
+
+   Assert(event->events != WL_LATCH_SET || set->latch != NULL);
+   Assert(event->events == WL_LATCH_SET ||
+          event->events == WL_POSTMASTER_DEATH ||
+          (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+
+   if (event->events == WL_POSTMASTER_DEATH)
+   {
+       /*
+        * Unlike all the other implementations, we detect postmaster death
+        * using process notification instead of waiting on the postmaster
+        * alive pipe.
+        */
+       WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event);
+   }
+   else
+   {
+       /*
+        * We need to compute the adds and deletes required to get from the
+        * old event mask to the new event mask, since kevent treats readable
+        * and writable as separate events.
+        */
+       if (old_events == WL_LATCH_SET ||
+           (old_events & WL_SOCKET_READABLE))
+           old_filt_read = true;
+       if (event->events == WL_LATCH_SET ||
+           (event->events & WL_SOCKET_READABLE))
+           new_filt_read = true;
+       if (old_events & WL_SOCKET_WRITEABLE)
+           old_filt_write = true;
+       if (event->events & WL_SOCKET_WRITEABLE)
+           new_filt_write = true;
+       if (old_filt_read && !new_filt_read)
+           WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_DELETE,
+                                    event);
+       else if (!old_filt_read && new_filt_read)
+           WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_ADD,
+                                    event);
+       if (old_filt_write && !new_filt_write)
+           WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_DELETE,
+                                    event);
+       else if (!old_filt_write && new_filt_write)
+           WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_ADD,
+                                    event);
+   }
+
+   Assert(count > 0);
+   Assert(count <= 2);
+
+   rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
+
+   /*
+    * When adding the postmaster's pid, we have to consider that it might
+    * already have exited and perhaps even been replaced by another process
+    * with the same pid.  If so, we have to defer reporting this as an event
+    * until the next call to WaitEventSetWaitBlock().
+    */
+
+   if (rc < 0)
+   {
+       if (event->events == WL_POSTMASTER_DEATH && errno == ESRCH)
+           set->report_postmaster_not_running = true;
+       else
+           ereport(ERROR,
+                   (errcode_for_socket_access(),
+           /* translator: %s is a syscall name, such as "poll()" */
+                    errmsg("%s failed: %m",
+                           "kevent()")));
+   }
+   else if (event->events == WL_POSTMASTER_DEATH && PostmasterPid != getppid())
+       set->report_postmaster_not_running = true;
+}
+
+#endif
+
 #if defined(WAIT_USE_WIN32)
 static void
 WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
@@ -1186,6 +1347,143 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
    return returned_events;
 }
 
+#elif defined(WAIT_USE_KQUEUE)
+
+/*
+ * Wait using kevent(2) on BSD-family systems and macOS.
+ *
+ * For now this mirrors the epoll code, but in future it could modify the fd
+ * set in the same call to kevent as it uses for waiting instead of doing that
+ * with separate system calls.
+ */
+static int
+WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
+                     WaitEvent *occurred_events, int nevents)
+{
+   int         returned_events = 0;
+   int         rc;
+   WaitEvent  *cur_event;
+   struct kevent *cur_kqueue_event;
+   struct timespec timeout;
+   struct timespec *timeout_p;
+
+   if (cur_timeout < 0)
+       timeout_p = NULL;
+   else
+   {
+       timeout.tv_sec = cur_timeout / 1000;
+       timeout.tv_nsec = (cur_timeout % 1000) * 1000000;
+       timeout_p = &timeout;
+   }
+
+   /* Report events discovered by WaitEventAdjustKqueue(). */
+   if (unlikely(set->report_postmaster_not_running))
+   {
+       if (set->exit_on_postmaster_death)
+           proc_exit(1);
+       occurred_events->fd = PGINVALID_SOCKET;
+       occurred_events->events = WL_POSTMASTER_DEATH;
+       return 1;
+   }
+
+   /* Sleep */
+   rc = kevent(set->kqueue_fd, NULL, 0,
+               set->kqueue_ret_events, nevents,
+               timeout_p);
+
+   /* Check return code */
+   if (rc < 0)
+   {
+       /* EINTR is okay, otherwise complain */
+       if (errno != EINTR)
+       {
+           waiting = false;
+           ereport(ERROR,
+                   (errcode_for_socket_access(),
+           /* translator: %s is a syscall name, such as "poll()" */
+                    errmsg("%s failed: %m",
+                           "kevent()")));
+       }
+       return 0;
+   }
+   else if (rc == 0)
+   {
+       /* timeout exceeded */
+       return -1;
+   }
+
+   /*
+    * At least one event occurred, iterate over the returned kqueue events
+    * until they're either all processed, or we've returned all the events
+    * the caller desired.
+    */
+   for (cur_kqueue_event = set->kqueue_ret_events;
+        cur_kqueue_event < (set->kqueue_ret_events + rc) &&
+        returned_events < nevents;
+        cur_kqueue_event++)
+   {
+       /* kevent's udata points to the associated WaitEvent */
+       cur_event = AccessWaitEvent(cur_kqueue_event);
+
+       occurred_events->pos = cur_event->pos;
+       occurred_events->user_data = cur_event->user_data;
+       occurred_events->events = 0;
+
+       if (cur_event->events == WL_LATCH_SET &&
+           cur_kqueue_event->filter == EVFILT_READ)
+       {
+           /* There's data in the self-pipe, clear it. */
+           drainSelfPipe();
+
+           if (set->latch->is_set)
+           {
+               occurred_events->fd = PGINVALID_SOCKET;
+               occurred_events->events = WL_LATCH_SET;
+               occurred_events++;
+               returned_events++;
+           }
+       }
+       else if (cur_event->events == WL_POSTMASTER_DEATH &&
+                cur_kqueue_event->filter == EVFILT_PROC &&
+                (cur_kqueue_event->fflags & NOTE_EXIT) != 0)
+       {
+           if (set->exit_on_postmaster_death)
+               proc_exit(1);
+           occurred_events->fd = PGINVALID_SOCKET;
+           occurred_events->events = WL_POSTMASTER_DEATH;
+           occurred_events++;
+           returned_events++;
+       }
+       else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+       {
+           Assert(cur_event->fd >= 0);
+
+           if ((cur_event->events & WL_SOCKET_READABLE) &&
+               (cur_kqueue_event->filter == EVFILT_READ))
+           {
+               /* readable, or EOF */
+               occurred_events->events |= WL_SOCKET_READABLE;
+           }
+
+           if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
+               (cur_kqueue_event->filter == EVFILT_WRITE))
+           {
+               /* writable, or EOF */
+               occurred_events->events |= WL_SOCKET_WRITEABLE;
+           }
+
+           if (occurred_events->events != 0)
+           {
+               occurred_events->fd = cur_event->fd;
+               occurred_events++;
+               returned_events++;
+           }
+       }
+   }
+
+   return returned_events;
+}
+
 #elif defined(WAIT_USE_POLL)
 
 /*
index 6f485f73cd12cfc5af2eb272a5b5adfddcb4511b..be39cfde4bb45b0484300c8de562c21e36df6ece 100644 (file)
 /* Define to 1 if __builtin_constant_p(x) implies "i"(x) acceptance. */
 #undef HAVE_I_CONSTRAINT__BUILTIN_CONSTANT_P
 
+/* Define to 1 if you have the `kqueue' function. */
+#undef HAVE_KQUEUE
+
 /* Define to 1 if you have the <langinfo.h> header file. */
 #undef HAVE_LANGINFO_H
 
 /* Define to 1 if you have the <sys/epoll.h> header file. */
 #undef HAVE_SYS_EPOLL_H
 
+/* Define to 1 if you have the <sys/event.h> header file. */
+#undef HAVE_SYS_EVENT_H
+
 /* Define to 1 if you have the <sys/ipc.h> header file. */
 #undef HAVE_SYS_IPC_H
 
index 90de0583203bd86d0c8aa6ab375a3c34e9587f35..90352c1a7fe822c5987f487fc2c7ad5ae58b7cf1 100644 (file)
@@ -281,6 +281,7 @@ sub GenerateFiles
        HAVE_IPV6                                   => 1,
        HAVE_ISINF                                  => 1,
        HAVE_I_CONSTRAINT__BUILTIN_CONSTANT_P       => undef,
+       HAVE_KQUEUE                                 => undef,
        HAVE_LANGINFO_H                             => undef,
        HAVE_LDAP_H                                 => undef,
        HAVE_LDAP_INITIALIZE                        => undef,
@@ -374,6 +375,7 @@ sub GenerateFiles
        HAVE_SYMLINK                             => 1,
        HAVE_SYSLOG                              => undef,
        HAVE_SYS_EPOLL_H                         => undef,
+       HAVE_SYS_EVENT_H                         => undef,
        HAVE_SYS_IPC_H                           => undef,
        HAVE_SYS_PRCTL_H                         => undef,
        HAVE_SYS_PROCCTL_H                       => undef,