Browse Source

Rewritten part of the job threads to be leaner and faster.

default 1 year ago
parent
commit
93e7138e53
3 changed files with 46 additions and 41 deletions
  1. 1 1
      data.c
  2. 45 39
      httpd.c
  3. 0 1
      snac.h

+ 1 - 1
data.c

@@ -2168,7 +2168,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey,
     qmsg = xs_dict_append(qmsg, "seckey", seckey);
 
     /* if it's to be sent right now, bypass the disk queue and post the job */
-    if (retries == 0 && job_fifo_ready())
+    if (retries == 0)
         job_post(qmsg, 0);
     else {
         qmsg = _enqueue_put(fn, qmsg);

+ 45 - 39
httpd.c

@@ -31,16 +31,22 @@
 srv_stat s_stat = {0};
 srv_stat *p_stat = NULL;
 
+
 /** job control **/
 
 /* mutex to access the lists of jobs */
 static pthread_mutex_t job_mutex;
 
-/* semaphre to trigger job processing */
+/* semaphore to trigger job processing */
 static sem_t *job_sem;
 
-/* fifo of jobs */
-xs_list *job_fifo = NULL;
+typedef struct job_fifo_item {
+    struct job_fifo_item *next;
+    xs_val *job;
+} job_fifo_item;
+
+static job_fifo_item *job_fifo_first = NULL;
+static job_fifo_item *job_fifo_last  = NULL;
 
 
 /* nodeinfo 2.0 template */
@@ -418,24 +424,6 @@ void httpd_connection(FILE *f)
 }
 
 
-static jmp_buf on_break;
-
-
-void term_handler(int s)
-{
-    (void)s;
-
-    longjmp(on_break, 1);
-}
-
-
-int job_fifo_ready(void)
-/* returns true if the job fifo is ready */
-{
-    return job_fifo != NULL;
-}
-
-
 void job_post(const xs_val *job, int urgent)
 /* posts a job for the threads to process it */
 {
@@ -443,19 +431,25 @@ void job_post(const xs_val *job, int urgent)
         /* lock the mutex */
         pthread_mutex_lock(&job_mutex);
 
-        /* add to the fifo */
-        if (job_fifo != NULL) {
-            if (urgent)
-                job_fifo = xs_list_insert(job_fifo, 0, job);
-            else
-                job_fifo = xs_list_append(job_fifo, job);
-
-            p_stat->job_fifo_size++;
+        job_fifo_item *i = xs_realloc(NULL, sizeof(job_fifo_item));
+        *i = (job_fifo_item){ NULL, xs_dup(job) };
 
-            srv_debug(2, xs_fmt(
-                "job_fifo sizes: %d %08x", p_stat->job_fifo_size, xs_size(job_fifo)));
+        if (job_fifo_first == NULL)
+            job_fifo_first = job_fifo_last = i;
+        else
+        if (urgent) {
+            /* prepend */
+            i->next = job_fifo_first;
+            job_fifo_first = i;
+        }
+        else {
+            /* append */
+            job_fifo_last->next = i;
+            job_fifo_last = i;
         }
 
+        p_stat->job_fifo_size++;
+
         /* unlock the mutex */
         pthread_mutex_unlock(&job_mutex);
 
@@ -475,8 +469,16 @@ void job_wait(xs_val **job)
         pthread_mutex_lock(&job_mutex);
 
         /* dequeue */
-        if (job_fifo != NULL) {
-            job_fifo = xs_list_shift(job_fifo, job);
+        job_fifo_item *i = job_fifo_first;
+
+        if (i != NULL) {
+            job_fifo_first = i->next;
+
+            if (job_fifo_first == NULL)
+                job_fifo_last = NULL;
+
+            *job = i->job;
+            xs_free(i);
 
             p_stat->job_fifo_size--;
         }
@@ -604,6 +606,16 @@ static void *background_thread(void *arg)
 }
 
 
+static jmp_buf on_break;
+
+void term_handler(int s)
+{
+    (void)s;
+
+    longjmp(on_break, 1);
+}
+
+
 void httpd(void)
 /* starts the server */
 {
@@ -663,8 +675,6 @@ void httpd(void)
         return;
     }
 
-    job_fifo = xs_list_new();
-
     /* initialize sleep control */
     pthread_mutex_init(&sleep_mutex, NULL);
     pthread_cond_init(&sleep_cond, NULL);
@@ -717,10 +727,6 @@ void httpd(void)
     for (n = 0; n < p_stat->n_threads; n++)
         pthread_join(threads[n], NULL);
 
-    pthread_mutex_lock(&job_mutex);
-    job_fifo = xs_free(job_fifo);
-    pthread_mutex_unlock(&job_mutex);
-
     sem_close(job_sem);
     sem_unlink(sem_name);
 

+ 0 - 1
snac.h

@@ -294,7 +294,6 @@ int deluser(snac *user);
 
 extern const char *snac_blurb;
 
-int job_fifo_ready(void);
 void job_post(const xs_val *job, int urgent);
 void job_wait(xs_val **job);