gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnurl] 24/411: multi: implement wait using winsock events


From: gnunet
Subject: [gnurl] 24/411: multi: implement wait using winsock events
Date: Wed, 13 Jan 2021 01:17:19 +0100

This is an automated email from the git hooks/post-receive script.

nikita pushed a commit to branch master
in repository gnurl.

commit d2a7d7c185f98df8f3e585e5620cbc0482e45fac
Author: rcombs <rcombs@rcombs.me>
AuthorDate: Wed May 13 18:49:57 2020 -0500

    multi: implement wait using winsock events
    
    This avoids using a pair of TCP ports to provide wakeup functionality
    for every multi instance on Windows, where socketpair() is emulated
    using a TCP socket on loopback which could in turn lead to socket
    resource exhaustion.
    
    A previous version of this patch failed to account for how in WinSock,
    FD_WRITE is set only once when writing becomes possible and not again
    until after a send has failed due to the buffer filling. This contrasts
    to how FD_READ and FD_OOB continue to be set until the conditions they
    refer to no longer apply. This meant that if a user wrote some data to
    a socket, but not enough data to completely fill its send buffer, then
    waited on that socket to become writable, we'd erroneously stall until
    their configured timeout rather than returning immediately.
    
    This version of the patch addresses that issue by checking each socket
    we're waiting on to become writable with select() before the wait, and
    zeroing the timeout if it's already writable.
    
    Assisted-by: Marc Hörsken
    Reviewed-by: Marcel Raad
    Reviewed-by: Daniel Stenberg
    Tested-by: Gergely Nagy
    Tested-by: Rasmus Melchior Jacobsen
    Tested-by: Tomas Berger
    
    Replaces #5397
    Reverts #5632
    Closes #5634
---
 lib/multi.c       | 160 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 lib/multihandle.h |   4 ++
 2 files changed, 158 insertions(+), 6 deletions(-)

diff --git a/lib/multi.c b/lib/multi.c
index 3c7fb85ed..c8bba47f6 100644
--- a/lib/multi.c
+++ b/lib/multi.c
@@ -374,6 +374,11 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* 
socket hash */
   multi->max_concurrent_streams = 100;
   multi->ipv6_works = Curl_ipv6works(NULL);
 
+#ifdef USE_WINSOCK
+  multi->wsa_event = WSACreateEvent();
+  if(multi->wsa_event == WSA_INVALID_EVENT)
+    goto error;
+#else
 #ifdef ENABLE_WAKEUP
   if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, multi->wakeup_pair) < 0) {
     multi->wakeup_pair[0] = CURL_SOCKET_BAD;
@@ -386,6 +391,7 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* 
socket hash */
     multi->wakeup_pair[0] = CURL_SOCKET_BAD;
     multi->wakeup_pair[1] = CURL_SOCKET_BAD;
   }
+#endif
 #endif
 
   return multi;
@@ -1081,11 +1087,16 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
   unsigned int i;
   unsigned int nfds = 0;
   unsigned int curlfds;
-  bool ufds_malloc = FALSE;
   long timeout_internal;
   int retcode = 0;
+#ifndef USE_WINSOCK
   struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
   struct pollfd *ufds = &a_few_on_stack[0];
+  bool ufds_malloc = FALSE;
+#else
+  int already_writable = 0;
+  DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
+#endif
 
   if(!GOOD_MULTI_HANDLE(multi))
     return CURLM_BAD_HANDLE;
@@ -1131,11 +1142,16 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
   nfds += extra_nfds; /* add the externally provided ones */
 
 #ifdef ENABLE_WAKEUP
+#ifdef USE_WINSOCK
+  if(use_wakeup) {
+#else
   if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
+#endif
     ++nfds;
   }
 #endif
 
+#ifndef USE_WINSOCK
   if(nfds > NUM_POLLS_ON_STACK) {
     /* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
        big, so at 2^29 sockets this value might wrap. When a process gets
@@ -1146,7 +1162,9 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
       return CURLM_OUT_OF_MEMORY;
     ufds_malloc = TRUE;
   }
+
   nfds = 0;
+#endif
 
   /* only do the second loop if we found descriptors in the first stage run
      above */
@@ -1159,22 +1177,45 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
 
       for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) {
         curl_socket_t s = CURL_SOCKET_BAD;
-
+#ifdef USE_WINSOCK
+        long mask = 0;
+#endif
         if(bitmap & GETSOCK_READSOCK(i)) {
+#ifdef USE_WINSOCK
+          mask |= FD_READ;
+#else
           ufds[nfds].fd = sockbunch[i];
           ufds[nfds].events = POLLIN;
           ++nfds;
+#endif
           s = sockbunch[i];
         }
         if(bitmap & GETSOCK_WRITESOCK(i)) {
+#ifdef USE_WINSOCK
+          struct timeval timeout;
+          fd_set writefds;
+          timeout.tv_sec = 0;
+          timeout.tv_usec = 0;
+          FD_ZERO(&writefds);
+          FD_SET(sockbunch[i], &writefds);
+          if(select((int)sockbunch[i] + 1, NULL, &writefds, NULL,
+                    &timeout) == 1)
+            already_writable++;
+          mask |= FD_WRITE;
+#else
           ufds[nfds].fd = sockbunch[i];
           ufds[nfds].events = POLLOUT;
           ++nfds;
+#endif
           s = sockbunch[i];
         }
         if(s == CURL_SOCKET_BAD) {
           break;
         }
+#ifdef USE_WINSOCK
+        if(WSAEventSelect(s, multi->wsa_event, mask) != 0)
+          return CURLM_INTERNAL_ERROR;
+#endif
       }
 
       data = data->next; /* check next handle */
@@ -1183,6 +1224,30 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
 
   /* Add external file descriptions from poll-like struct curl_waitfd */
   for(i = 0; i < extra_nfds; i++) {
+#ifdef USE_WINSOCK
+    long events = 0;
+    extra_fds[i].revents = 0;
+    if(extra_fds[i].events & CURL_WAIT_POLLIN)
+      events |= FD_READ;
+    if(extra_fds[i].events & CURL_WAIT_POLLPRI)
+      events |= FD_OOB;
+    if(extra_fds[i].events & CURL_WAIT_POLLOUT) {
+      struct timeval timeout;
+      fd_set writefds;
+      timeout.tv_sec = 0;
+      timeout.tv_usec = 0;
+      FD_ZERO(&writefds);
+      FD_SET(extra_fds[i].fd, &writefds);
+      if(select((int)extra_fds[i].fd + 1, NULL, &writefds, NULL,
+                &timeout) == 1) {
+        extra_fds[i].revents = CURL_WAIT_POLLOUT;
+        already_writable++;
+      }
+      events |= FD_WRITE;
+    }
+    if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, events) != 0)
+      return CURLM_INTERNAL_ERROR;
+#else
     ufds[nfds].fd = extra_fds[i].fd;
     ufds[nfds].events = 0;
     if(extra_fds[i].events & CURL_WAIT_POLLIN)
@@ -1192,28 +1257,60 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
     if(extra_fds[i].events & CURL_WAIT_POLLOUT)
       ufds[nfds].events |= POLLOUT;
     ++nfds;
+#endif
   }
 
 #ifdef ENABLE_WAKEUP
+#ifndef USE_WINSOCK
   if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
     ufds[nfds].fd = multi->wakeup_pair[0];
     ufds[nfds].events = POLLIN;
     ++nfds;
   }
+#endif
 #endif
 
   if(nfds) {
-    int pollrc;
     /* wait... */
-    pollrc = Curl_poll(ufds, nfds, timeout_ms);
+#ifdef USE_WINSOCK
+    if(already_writable > 0)
+      timeout_ms = 0;
+    WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, timeout_ms, FALSE);
+#else
+    int pollrc = Curl_poll(ufds, nfds, timeout_ms);
+#endif
 
+#ifdef USE_WINSOCK
+    /* With Winsock, we have to run this unconditionally to call
+       WSAEventSelect(fd, event, 0) on all the sockets */
+    {
+      retcode = 0;
+#else
     if(pollrc > 0) {
       retcode = pollrc;
+#endif
       /* copy revents results from the poll to the curl_multi_wait poll
          struct, the bit values of the actual underlying poll() implementation
          may not be the same as the ones in the public libcurl API! */
       for(i = 0; i < extra_nfds; i++) {
         unsigned short mask = 0;
+#ifdef USE_WINSOCK
+        WSANETWORKEVENTS events = {0};
+        mask = extra_fds[i].revents;
+        if(WSAEnumNetworkEvents(extra_fds[i].fd, multi->wsa_event,
+                                &events) == 0) {
+          if(events.lNetworkEvents & FD_READ)
+            mask |= CURL_WAIT_POLLIN;
+          if(events.lNetworkEvents & FD_WRITE)
+            mask |= CURL_WAIT_POLLOUT;
+          if(events.lNetworkEvents & FD_OOB)
+            mask |= CURL_WAIT_POLLPRI;
+
+          if(events.lNetworkEvents != 0)
+            retcode++;
+        }
+        WSAEventSelect(extra_fds[i].fd, multi->wsa_event, 0);
+#else
         unsigned r = ufds[curlfds + i].revents;
 
         if(r & POLLIN)
@@ -1222,10 +1319,51 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
           mask |= CURL_WAIT_POLLOUT;
         if(r & POLLPRI)
           mask |= CURL_WAIT_POLLPRI;
+#endif
 
         extra_fds[i].revents = mask;
       }
 
+#ifdef USE_WINSOCK
+      /* Count up all our own sockets that had activity,
+         and remove them from the event. */
+      if(curlfds) {
+        data = multi->easyp;
+        while(data) {
+          bitmap = multi_getsock(data, sockbunch);
+
+          for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) {
+            if(bitmap & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))) {
+              WSANETWORKEVENTS events = {0};
+              if(WSAEnumNetworkEvents(sockbunch[i], multi->wsa_event,
+                                      &events) == 0) {
+                if(events.lNetworkEvents != 0)
+                  retcode++;
+              }
+              if(ret && !events.lNetworkEvents &&
+                 (bitmap & GETSOCK_WRITESOCK(i))) {
+                struct timeval timeout;
+                fd_set writefds;
+                timeout.tv_sec = 0;
+                timeout.tv_usec = 0;
+                FD_ZERO(&writefds);
+                FD_SET(sockbunch[i], &writefds);
+                if(select((int)sockbunch[i] + 1, NULL, &writefds, NULL,
+                          &timeout) == 1)
+                  retcode++;
+              }
+              WSAEventSelect(sockbunch[i], multi->wsa_event, 0);
+            }
+            else
+              break;
+          }
+
+          data = data->next;
+        }
+      }
+
+      WSAResetEvent(multi->wsa_event);
+#else
 #ifdef ENABLE_WAKEUP
       if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
         if(ufds[curlfds + extra_nfds].revents & POLLIN) {
@@ -1238,10 +1376,8 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
                when there is no more data, breaking the loop. */
             nread = sread(multi->wakeup_pair[0], buf, sizeof(buf));
             if(nread <= 0) {
-#ifndef USE_WINSOCK
               if(nread < 0 && EINTR == SOCKERRNO)
                 continue;
-#endif
               break;
             }
           }
@@ -1249,12 +1385,15 @@ static CURLMcode Curl_multi_wait(struct Curl_multi 
*multi,
           retcode--;
         }
       }
+#endif
 #endif
     }
   }
 
+#ifndef USE_WINSOCK
   if(ufds_malloc)
     free(ufds);
+#endif
   if(ret)
     *ret = retcode;
   if(!extrawait || nfds)
@@ -1309,6 +1448,10 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
     return CURLM_BAD_HANDLE;
 
 #ifdef ENABLE_WAKEUP
+#ifdef USE_WINSOCK
+  if(WSASetEvent(multi->wsa_event))
+    return CURLM_OK;
+#else
   /* the wakeup_pair variable is only written during init and cleanup,
      making it safe to access from another thread after the init part
      and before cleanup */
@@ -1341,6 +1484,7 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
       return CURLM_OK;
     }
   }
+#endif
 #endif
   return CURLM_WAKEUP_FAILURE;
 }
@@ -2500,9 +2644,13 @@ CURLMcode curl_multi_cleanup(struct Curl_multi *multi)
     Curl_hash_destroy(&multi->hostcache);
     Curl_psl_destroy(&multi->psl);
 
+#ifdef USE_WINSOCK
+    WSACloseEvent(multi->wsa_event);
+#else
 #ifdef ENABLE_WAKEUP
     sclose(multi->wakeup_pair[0]);
     sclose(multi->wakeup_pair[1]);
+#endif
 #endif
     free(multi);
 
diff --git a/lib/multihandle.h b/lib/multihandle.h
index 9d73df081..0c9ce7f74 100644
--- a/lib/multihandle.h
+++ b/lib/multihandle.h
@@ -138,9 +138,13 @@ struct Curl_multi {
                                     previous callback */
   unsigned int max_concurrent_streams;
 
+#ifdef USE_WINSOCK
+  WSAEVENT wsa_event; /* winsock event used for waits */
+#else
 #ifdef ENABLE_WAKEUP
   curl_socket_t wakeup_pair[2]; /* socketpair() used for wakeup
                                    0 is used for read, 1 is used for write */
+#endif
 #endif
   /* multiplexing wanted */
   bool multiplexing;

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]