Browse Source

Connection jobs are treated as urgent.

default 2 years ago
parent
commit
5036cb5e11
4 changed files with 13 additions and 9 deletions
  1. 1 1
      activitypub.c
  2. 1 1
      data.c
  3. 10 6
      httpd.c
  4. 1 1
      snac.h

+ 1 - 1
activitypub.c

@@ -1316,7 +1316,7 @@ int process_queue(void)
         xs *q_item = dequeue(fn);
 
         if (q_item != NULL) {
-            job_post(q_item);
+            job_post(q_item, 0);
             cnt++;
         }
     }

+ 1 - 1
data.c

@@ -1497,7 +1497,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey,
 
     /* if it's to be sent right now, bypass the disk queue and post the job */
     if (retries == 0 && job_fifo_ready())
-        job_post(qmsg);
+        job_post(qmsg, 0);
     else {
         qmsg = _enqueue_put(fn, qmsg);
         srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));

+ 10 - 6
httpd.c

@@ -262,7 +262,7 @@ int job_fifo_ready(void)
 }
 
 
-void job_post(const xs_val *job)
+void job_post(const xs_val *job, int urgent)
 /* posts a job for the threads to process it */
 {
     if (job != NULL) {
@@ -270,8 +270,12 @@ void job_post(const xs_val *job)
         pthread_mutex_lock(&job_mutex);
 
         /* add to the fifo */
-        if (job_fifo != NULL)
-            job_fifo = xs_list_append(job_fifo, job);
+        if (job_fifo != NULL) {
+            if (urgent)
+                job_fifo = xs_list_insert(job_fifo, 0, job);
+            else
+                job_fifo = xs_list_append(job_fifo, job);
+        }
 
         /* unlock the mutex */
         pthread_mutex_unlock(&job_mutex);
@@ -386,7 +390,7 @@ static void *background_thread(void *arg)
 
             xs *q_item = xs_dict_new();
             q_item = xs_dict_append(q_item, "type", "purge");
-            job_post(q_item);
+            job_post(q_item, 0);
         }
 
         if (cnt == 0) {
@@ -485,7 +489,7 @@ void httpd(void)
 
             if (f != NULL) {
                 xs *job = xs_data_new(&f, sizeof(FILE *));
-                job_post(job);
+                job_post(job, 1);
             }
             else
                 break;
@@ -496,7 +500,7 @@ void httpd(void)
 
     /* send as many empty jobs as working threads */
     for (n = 1; n < n_threads; n++)
-        job_post(NULL);
+        job_post(NULL, 0);
 
     /* wait for all the threads to exit */
     for (n = 0; n < n_threads; n++)

+ 1 - 1
snac.h

@@ -218,5 +218,5 @@ int adduser(const char *uid);
 int resetpwd(snac *snac);
 
 int job_fifo_ready(void);
-void job_post(const xs_val *job);
+void job_post(const xs_val *job, int urgent);
 void job_wait(xs_val **job);