pgbench: Refactor thread portability support.
authorThomas Munro <tmunro@postgresql.org>
Wed, 10 Mar 2021 02:39:08 +0000 (15:39 +1300)
committerThomas Munro <tmunro@postgresql.org>
Wed, 10 Mar 2021 04:44:04 +0000 (17:44 +1300)
Instead of maintaining an incomplete emulation of POSIX threads for
Windows, let's use an extremely minimalist macro-based abstraction for
now.  A later patch will extend this, without the need to supply more
complicated pthread emulation code.  (There may be a need for a more
serious portable thread abstraction in later projects, but this is not
it.)

Minor incidental problems fixed: it wasn't OK to use (pthread_t) 0 as a
special value, it wasn't OK to compare thread_t values with ==, and we
incorrectly assumed that pthread functions set errno.

Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de

src/bin/pgbench/pgbench.c

index f1d98be2d2debd9be98f74bf19b3ecd7bc112a05..746b589e6d0e7e8092491b2822654010bb2220b3 100644 (file)
@@ -111,22 +111,36 @@ typedef struct socket_set
 #endif                                                 /* POLL_USING_SELECT */
 
 /*
- * Multi-platform pthread implementations
+ * Multi-platform thread implementations
  */
 
 #ifdef WIN32
-/* Use native win32 threads on Windows */
-typedef struct win32_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int     pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int     pthread_join(pthread_t th, void **thread_return);
+/* Use Windows threads */
+#include <windows.h>
+#define GETERRNO() (_dosmaperr(GetLastError()), errno)
+#define THREAD_T HANDLE
+#define THREAD_FUNC_RETURN_TYPE unsigned
+#define THREAD_FUNC_RETURN return 0
+#define THREAD_CREATE(handle, function, arg) \
+       ((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0)
+#define THREAD_JOIN(handle) \
+       (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
+       GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
 #elif defined(ENABLE_THREAD_SAFETY)
-/* Use platform-dependent pthread capability */
+/* Use POSIX threads */
 #include <pthread.h>
+#define THREAD_T pthread_t
+#define THREAD_FUNC_RETURN_TYPE void *
+#define THREAD_FUNC_RETURN return NULL
+#define THREAD_CREATE(handle, function, arg) \
+       pthread_create((handle), NULL, (function), (arg))
+#define THREAD_JOIN(handle) \
+       pthread_join((handle), NULL)
 #else
 /* No threads implementation, use none (-j 1) */
-#define pthread_t void *
+#define THREAD_T void *
+#define THREAD_FUNC_RETURN_TYPE void *
+#define THREAD_FUNC_RETURN return NULL
 #endif
 
 
@@ -436,7 +450,7 @@ typedef struct
 typedef struct
 {
        int                     tid;                    /* thread id */
-       pthread_t       thread;                 /* thread handle */
+       THREAD_T        thread;                 /* thread handle */
        CState     *state;                      /* array of CState */
        int                     nstate;                 /* length of state[] */
 
@@ -459,8 +473,6 @@ typedef struct
        int64           latency_late;   /* executed but late transactions */
 } TState;
 
-#define INVALID_THREAD         ((pthread_t) 0)
-
 /*
  * queries read from files
  */
@@ -604,7 +616,7 @@ static void doLog(TState *thread, CState *st,
 static void processXactStats(TState *thread, CState *st, instr_time *now,
                                                         bool skipped, StatsData *agg);
 static void addScript(ParsedScript script);
-static void *threadRun(void *arg);
+static THREAD_FUNC_RETURN_TYPE threadRun(void *arg);
 static void finishCon(CState *st);
 static void setalarm(int seconds);
 static socket_set *alloc_socket_set(int count);
@@ -6142,18 +6154,14 @@ main(int argc, char **argv)
                /* the first thread (i = 0) is executed by main thread */
                if (i > 0)
                {
-                       int                     err = pthread_create(&thread->thread, NULL, threadRun, thread);
+                       errno = THREAD_CREATE(&thread->thread, threadRun, thread);
 
-                       if (err != 0 || thread->thread == INVALID_THREAD)
+                       if (errno != 0)
                        {
                                pg_log_fatal("could not create thread: %m");
                                exit(1);
                        }
                }
-               else
-               {
-                       thread->thread = INVALID_THREAD;
-               }
        }
 #else
        INSTR_TIME_SET_CURRENT(threads[0].start_time);
@@ -6161,7 +6169,6 @@ main(int argc, char **argv)
        if (duration > 0)
                end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
                        (int64) 1000000 * duration;
-       threads[0].thread = INVALID_THREAD;
 #endif                                                 /* ENABLE_THREAD_SAFETY */
 
        /* wait for threads and accumulate results */
@@ -6172,12 +6179,12 @@ main(int argc, char **argv)
                TState     *thread = &threads[i];
 
 #ifdef ENABLE_THREAD_SAFETY
-               if (threads[i].thread == INVALID_THREAD)
+               if (i == 0)
                        /* actually run this thread directly in the main thread */
                        (void) threadRun(thread);
                else
                        /* wait of other threads. should check that 0 is returned? */
-                       pthread_join(thread->thread, NULL);
+                       THREAD_JOIN(thread->thread);
 #else
                (void) threadRun(thread);
 #endif                                                 /* ENABLE_THREAD_SAFETY */
@@ -6216,7 +6223,7 @@ main(int argc, char **argv)
        return exit_code;
 }
 
-static void *
+static THREAD_FUNC_RETURN_TYPE
 threadRun(void *arg)
 {
        TState     *thread = (TState *) arg;
@@ -6501,7 +6508,7 @@ done:
                thread->logfile = NULL;
        }
        free_socket_set(sockets);
-       return NULL;
+       THREAD_FUNC_RETURN;
 }
 
 static void
@@ -6732,74 +6739,3 @@ socket_has_input(socket_set *sa, int fd, int idx)
 }
 
 #endif                                                 /* POLL_USING_SELECT */
-
-
-/* partial pthread implementation for Windows */
-
-#ifdef WIN32
-
-typedef struct win32_pthread
-{
-       HANDLE          handle;
-       void       *(*routine) (void *);
-       void       *arg;
-       void       *result;
-} win32_pthread;
-
-static unsigned __stdcall
-win32_pthread_run(void *arg)
-{
-       win32_pthread *th = (win32_pthread *) arg;
-
-       th->result = th->routine(th->arg);
-
-       return 0;
-}
-
-static int
-pthread_create(pthread_t *thread,
-                          pthread_attr_t *attr,
-                          void *(*start_routine) (void *),
-                          void *arg)
-{
-       int                     save_errno;
-       win32_pthread *th;
-
-       th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
-       th->routine = start_routine;
-       th->arg = arg;
-       th->result = NULL;
-
-       th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
-       if (th->handle == NULL)
-       {
-               save_errno = errno;
-               free(th);
-               return save_errno;
-       }
-
-       *thread = th;
-       return 0;
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
-       if (th == NULL || th->handle == NULL)
-               return errno = EINVAL;
-
-       if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
-       {
-               _dosmaperr(GetLastError());
-               return errno;
-       }
-
-       if (thread_return)
-               *thread_return = th->result;
-
-       CloseHandle(th->handle);
-       free(th);
-       return 0;
-}
-
-#endif                                                 /* WIN32 */