rework the networking locking facility using the hashlist.
[citadel.git] / citadel / modules / network / serv_netspool.c
1 /*
2  * This module handles shared rooms, inter-Citadel mail, and outbound
3  * mailing list processing.
4  *
5  * Copyright (c) 2000-2011 by the citadel.org team
6  *
7  *  This program is open source software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; either version 3 of the License, or
10  *  (at your option) any later version.
11  *
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *  GNU General Public License for more details.
16  *
17  *  You should have received a copy of the GNU General Public License
18  *  along with this program; if not, write to the Free Software
19  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  * ** NOTE **   A word on the S_NETCONFIGS semaphore:
22  * This is a fairly high-level type of critical section.  It ensures that no
23  * two threads work on the netconfigs files at the same time.  Since we do
24  * so many things inside these, here are the rules:
25  *  1. begin_critical_section(S_NETCONFIGS) *before* begin_ any others.
26  *  2. Do *not* perform any I/O with the client during these sections.
27  *
28  */
29
30 /*
31  * Duration of time (in seconds) after which pending list subscribe/unsubscribe
32  * requests that have not been confirmed will be deleted.
33  */
34 #define EXP     259200  /* three days */
35
36 #include "sysdep.h"
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <stdio.h>
40 #include <fcntl.h>
41 #include <ctype.h>
42 #include <signal.h>
43 #include <pwd.h>
44 #include <errno.h>
45 #include <sys/stat.h>
46 #include <sys/types.h>
47 #include <dirent.h>
48 #if TIME_WITH_SYS_TIME
49 # include <sys/time.h>
50 # include <time.h>
51 #else
52 # if HAVE_SYS_TIME_H
53 #  include <sys/time.h>
54 # else
55 #  include <time.h>
56 # endif
57 #endif
58 #ifdef HAVE_SYSCALL_H
59 # include <syscall.h>
60 #else 
61 # if HAVE_SYS_SYSCALL_H
62 #  include <sys/syscall.h>
63 # endif
64 #endif
65
66 #include <sys/wait.h>
67 #include <string.h>
68 #include <limits.h>
69 #include <libcitadel.h>
70 #include "citadel.h"
71 #include "server.h"
72 #include "citserver.h"
73 #include "support.h"
74 #include "config.h"
75 #include "user_ops.h"
76 #include "database.h"
77 #include "msgbase.h"
78 #include "internet_addressing.h"
79 #include "serv_network.h"
80 #include "clientsocket.h"
81 #include "file_ops.h"
82 #include "citadel_dirs.h"
83 #include "threads.h"
84
85 #ifndef HAVE_SNPRINTF
86 #include "snprintf.h"
87 #endif
88
89 #include "context.h"
90 #include "netconfig.h"
91 #include "netspool.h"
92 #include "netmail.h"
93 #include "ctdl_module.h"
94
95 /*
96  * Learn topology from path fields
97  */
98 void network_learn_topology(char *node, char *path, NetMap *the_netmap, int *netmap_changed) {
99         char nexthop[256];
100         NetMap *nmptr;
101
102         strcpy(nexthop, "");
103
104         if (num_tokens(path, '!') < 3) return;
105         for (nmptr = the_netmap; nmptr != NULL; nmptr = nmptr->next) {
106                 if (!strcasecmp(nmptr->nodename, node)) {
107                         extract_token(nmptr->nexthop, path, 0, '!', sizeof nmptr->nexthop);
108                         nmptr->lastcontact = time(NULL);
109                         (*netmap_changed) ++;
110                         return;
111                 }
112         }
113
114         /* If we got here then it's not in the map, so add it. */
115         nmptr = (NetMap *) malloc(sizeof (NetMap));
116         strcpy(nmptr->nodename, node);
117         nmptr->lastcontact = time(NULL);
118         extract_token(nmptr->nexthop, path, 0, '!', sizeof nmptr->nexthop);
119         nmptr->next = the_netmap;
120         the_netmap = nmptr;
121         (*netmap_changed) ++;
122 }
123
124
125         
126
127 int read_spoolcontrol_file(SpoolControl **scc, char *filename)
128 {
129         FILE *fp;
130         char instr[SIZ];
131         char buf[SIZ];
132         char nodename[256];
133         char roomname[ROOMNAMELEN];
134         size_t miscsize = 0;
135         size_t linesize = 0;
136         int skipthisline = 0;
137         namelist *nptr = NULL;
138         maplist *mptr = NULL;
139         SpoolControl *sc;
140
141         fp = fopen(filename, "r");
142         if (fp == NULL) {
143                 return 0;
144         }
145         sc = malloc(sizeof(SpoolControl));
146         memset(sc, 0, sizeof(SpoolControl));
147         *scc = sc;
148
149         while (fgets(buf, sizeof buf, fp) != NULL) {
150                 buf[strlen(buf)-1] = 0;
151
152                 extract_token(instr, buf, 0, '|', sizeof instr);
153                 if (!strcasecmp(instr, strof(lastsent))) {
154                         sc->lastsent = extract_long(buf, 1);
155                 }
156                 else if (!strcasecmp(instr, strof(listrecp))) {
157                         nptr = (namelist *)
158                                 malloc(sizeof(namelist));
159                         nptr->next = sc->listrecps;
160                         extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
161                         sc->listrecps = nptr;
162                 }
163                 else if (!strcasecmp(instr, strof(participate))) {
164                         nptr = (namelist *)
165                                 malloc(sizeof(namelist));
166                         nptr->next = sc->participates;
167                         extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
168                         sc->participates = nptr;
169                 }
170                 else if (!strcasecmp(instr, strof(digestrecp))) {
171                         nptr = (namelist *)
172                                 malloc(sizeof(namelist));
173                         nptr->next = sc->digestrecps;
174                         extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
175                         sc->digestrecps = nptr;
176                 }
177                 else if (!strcasecmp(instr, strof(ignet_push_share))) {
178                         extract_token(nodename, buf, 1, '|', sizeof nodename);
179                         extract_token(roomname, buf, 2, '|', sizeof roomname);
180                         mptr = (maplist *) malloc(sizeof(maplist));
181                         mptr->next = sc->ignet_push_shares;
182                         strcpy(mptr->remote_nodename, nodename);
183                         strcpy(mptr->remote_roomname, roomname);
184                         sc->ignet_push_shares = mptr;
185                 }
186                 else {
187                         /* Preserve 'other' lines ... *unless* they happen to
188                          * be subscribe/unsubscribe pendings with expired
189                          * timestamps.
190                          */
191                         skipthisline = 0;
192                         if (!strncasecmp(buf, strof(subpending)"|", 11)) {
193                                 if (time(NULL) - extract_long(buf, 4) > EXP) {
194                                         skipthisline = 1;
195                                 }
196                         }
197                         if (!strncasecmp(buf, strof(unsubpending)"|", 13)) {
198                                 if (time(NULL) - extract_long(buf, 3) > EXP) {
199                                         skipthisline = 1;
200                                 }
201                         }
202
203                         if (skipthisline == 0) {
204                                 linesize = strlen(buf);
205                                 sc->misc = realloc(sc->misc,
206                                         (miscsize + linesize + 2) );
207                                 sprintf(&sc->misc[miscsize], "%s\n", buf);
208                                 miscsize = miscsize + linesize + 1;
209                         }
210                 }
211
212
213         }
214         fclose(fp);
215         return 1;
216 }
217
218 void free_spoolcontrol_struct(SpoolControl **scc)
219 {
220         SpoolControl *sc;
221         namelist *nptr = NULL;
222         maplist *mptr = NULL;
223
224         sc = *scc;
225         while (sc->listrecps != NULL) {
226                 nptr = sc->listrecps->next;
227                 free(sc->listrecps);
228                 sc->listrecps = nptr;
229         }
230         /* Do the same for digestrecps */
231         while (sc->digestrecps != NULL) {
232                 nptr = sc->digestrecps->next;
233                 free(sc->digestrecps);
234                 sc->digestrecps = nptr;
235         }
236         /* Do the same for participates */
237         while (sc->participates != NULL) {
238                 nptr = sc->participates->next;
239                 free(sc->participates);
240                 sc->participates = nptr;
241         }
242         while (sc->ignet_push_shares != NULL) {
243                 mptr = sc->ignet_push_shares->next;
244                 free(sc->ignet_push_shares);
245                 sc->ignet_push_shares = mptr;
246         }
247         free(sc->misc);
248         free(sc);
249         *scc=NULL;
250 }
251
252 int writenfree_spoolcontrol_file(SpoolControl **scc, char *filename)
253 {
254         char tempfilename[PATH_MAX];
255         int TmpFD;
256         SpoolControl *sc;
257         namelist *nptr = NULL;
258         maplist *mptr = NULL;
259         long len;
260         time_t unixtime;
261         struct timeval tv;
262         long reltid; /* if we don't have SYS_gettid, use "random" value */
263         StrBuf *Cfg;
264         int rc;
265
266         len = strlen(filename);
267         memcpy(tempfilename, filename, len + 1);
268
269
270 #if defined(HAVE_SYSCALL_H) && defined (SYS_gettid)
271         reltid = syscall(SYS_gettid);
272 #endif
273         gettimeofday(&tv, NULL);
274         /* Promote to time_t; types differ on some OSes (like darwin) */
275         unixtime = tv.tv_sec;
276
277         sprintf(tempfilename + len, ".%ld-%ld", reltid, unixtime);
278         sc = *scc;
279         errno = 0;
280         TmpFD = open(tempfilename, O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR);
281         Cfg = NewStrBuf();
282         if ((TmpFD < 0) || (errno != 0)) {
283                 syslog(LOG_CRIT, "ERROR: cannot open %s: %s\n",
284                         filename, strerror(errno));
285                 free_spoolcontrol_struct(scc);
286                 unlink(tempfilename);
287         }
288         else {
289                 fchown(TmpFD, config.c_ctdluid, 0);
290                 StrBufAppendPrintf(Cfg, "lastsent|%ld\n", sc->lastsent);
291                 
292                 /* Write out the listrecps while freeing from memory at the
293                  * same time.  Am I clever or what?  :)
294                  */
295                 while (sc->listrecps != NULL) {
296                     StrBufAppendPrintf(Cfg, "listrecp|%s\n", sc->listrecps->name);
297                         nptr = sc->listrecps->next;
298                         free(sc->listrecps);
299                         sc->listrecps = nptr;
300                 }
301                 /* Do the same for digestrecps */
302                 while (sc->digestrecps != NULL) {
303                         StrBufAppendPrintf(Cfg, "digestrecp|%s\n", sc->digestrecps->name);
304                         nptr = sc->digestrecps->next;
305                         free(sc->digestrecps);
306                         sc->digestrecps = nptr;
307                 }
308                 /* Do the same for participates */
309                 while (sc->participates != NULL) {
310                         StrBufAppendPrintf(Cfg, "participate|%s\n", sc->participates->name);
311                         nptr = sc->participates->next;
312                         free(sc->participates);
313                         sc->participates = nptr;
314                 }
315                 while (sc->ignet_push_shares != NULL) {
316                         StrBufAppendPrintf(Cfg, "ignet_push_share|%s", sc->ignet_push_shares->remote_nodename);
317                         if (!IsEmptyStr(sc->ignet_push_shares->remote_roomname)) {
318                                 StrBufAppendPrintf(Cfg, "|%s", sc->ignet_push_shares->remote_roomname);
319                         }
320                         StrBufAppendPrintf(Cfg, "\n");
321                         mptr = sc->ignet_push_shares->next;
322                         free(sc->ignet_push_shares);
323                         sc->ignet_push_shares = mptr;
324                 }
325                 if (sc->misc != NULL) {
326                         StrBufAppendBufPlain(Cfg, sc->misc, -1, 0);
327                 }
328                 free(sc->misc);
329
330                 rc = write(TmpFD, ChrPtr(Cfg), StrLength(Cfg));
331                 if ((rc >=0 ) && (rc == StrLength(Cfg))) 
332                 {
333                         close(TmpFD);
334                         rename(tempfilename, filename);
335                 }
336                 else {
337                         syslog(LOG_EMERG, 
338                                       "unable to write %s; [%s]; not enough space on the disk?\n", 
339                                       tempfilename, 
340                                       strerror(errno));
341                         close(TmpFD);
342                         unlink(tempfilename);
343                 }
344                 FreeStrBuf(&Cfg);
345                 free(sc);
346                 *scc=NULL;
347         }
348         return 1;
349 }
350 int is_recipient(SpoolControl *sc, const char *Name)
351 {
352         namelist *nptr;
353         size_t len;
354
355         len = strlen(Name);
356         nptr = sc->listrecps;
357         while (nptr != NULL) {
358                 if (strncmp(Name, nptr->name, len)==0)
359                         return 1;
360                 nptr = nptr->next;
361         }
362         /* Do the same for digestrecps */
363         nptr = sc->digestrecps;
364         while (nptr != NULL) {
365                 if (strncmp(Name, nptr->name, len)==0)
366                         return 1;
367                 nptr = nptr->next;
368         }
369         /* Do the same for participates */
370         nptr = sc->participates;
371         while (nptr != NULL) {
372                 if (strncmp(Name, nptr->name, len)==0)
373                         return 1;
374                 nptr = nptr->next;
375         }
376         return 0;
377 }
378
379
380 /*
381  * Batch up and send all outbound traffic from the current room
382  */
383 void network_spoolout_room(char *room_to_spool,                        
384                            char *working_ignetcfg,
385                            NetMap *the_netmap)
386 {
387         char buf[SIZ];
388         char filename[PATH_MAX];
389         SpoolControl *sc;
390         int i;
391
392         /*
393          * If the room doesn't exist, don't try to perform its networking tasks.
394          * Normally this should never happen, but once in a while maybe a room gets
395          * queued for networking and then deleted before it can happen.
396          */
397         if (CtdlGetRoom(&CC->room, room_to_spool) != 0) {
398                 syslog(LOG_CRIT, "ERROR: cannot load <%s>\n", room_to_spool);
399                 return;
400         }
401
402         assoc_file_name(filename, sizeof filename, &CC->room, ctdl_netcfg_dir);
403         begin_critical_section(S_NETCONFIGS);
404
405         /* Only do net processing for rooms that have netconfigs */
406         if (!read_spoolcontrol_file(&sc, filename))
407         {
408                 end_critical_section(S_NETCONFIGS);
409                 return;
410         }
411         syslog(LOG_INFO, "Networking started for <%s>\n", CC->room.QRname);
412
413         sc->working_ignetcfg = working_ignetcfg;
414         sc->the_netmap = the_netmap;
415
416         /* If there are digest recipients, we have to build a digest */
417         if (sc->digestrecps != NULL) {
418                 sc->digestfp = tmpfile();
419                 fprintf(sc->digestfp, "Content-type: text/plain\n\n");
420         }
421
422         /* Do something useful */
423         CtdlForEachMessage(MSGS_GT, sc->lastsent, NULL, NULL, NULL,
424                 network_spool_msg, sc);
425
426         /* If we wrote a digest, deliver it and then close it */
427         snprintf(buf, sizeof buf, "room_%s@%s",
428                 CC->room.QRname, config.c_fqdn);
429         for (i=0; buf[i]; ++i) {
430                 buf[i] = tolower(buf[i]);
431                 if (isspace(buf[i])) buf[i] = '_';
432         }
433         if (sc->digestfp != NULL) {
434                 fprintf(sc->digestfp,   " -----------------------------------"
435                                         "------------------------------------"
436                                         "-------\n"
437                                         "You are subscribed to the '%s' "
438                                         "list.\n"
439                                         "To post to the list: %s\n",
440                                         CC->room.QRname, buf
441                 );
442                 network_deliver_digest(sc);     /* deliver and close */
443         }
444
445         /* Now rewrite the config file */
446         writenfree_spoolcontrol_file(&sc, filename);
447         end_critical_section(S_NETCONFIGS);
448 }
449
450 /*
451  * Process a buffer containing a single message from a single file
452  * from the inbound queue 
453  */
454 void network_process_buffer(char *buffer, long size, char *working_ignetcfg, NetMap *the_netmap, int *netmap_changed)
455 {
456         struct CtdlMessage *msg = NULL;
457         long pos;
458         int field;
459         struct recptypes *recp = NULL;
460         char target_room[ROOMNAMELEN];
461         struct ser_ret sermsg;
462         char *oldpath = NULL;
463         char filename[PATH_MAX];
464         FILE *fp;
465         char nexthop[SIZ];
466         unsigned char firstbyte;
467         unsigned char lastbyte;
468
469         syslog(LOG_DEBUG, "network_process_buffer() processing %ld bytes\n", size);
470
471         /* Validate just a little bit.  First byte should be FF and * last byte should be 00. */
472         firstbyte = buffer[0];
473         lastbyte = buffer[size-1];
474         if ( (firstbyte != 255) || (lastbyte != 0) ) {
475                 syslog(LOG_ERR, "Corrupt message ignored.  Length=%ld, firstbyte = %d, lastbyte = %d\n",
476                         size, firstbyte, lastbyte);
477                 return;
478         }
479
480         /* Set default target room to trash */
481         strcpy(target_room, TWITROOM);
482
483         /* Load the message into memory */
484         msg = (struct CtdlMessage *) malloc(sizeof(struct CtdlMessage));
485         memset(msg, 0, sizeof(struct CtdlMessage));
486         msg->cm_magic = CTDLMESSAGE_MAGIC;
487         msg->cm_anon_type = buffer[1];
488         msg->cm_format_type = buffer[2];
489
490         for (pos = 3; pos < size; ++pos) {
491                 field = buffer[pos];
492                 msg->cm_fields[field] = strdup(&buffer[pos+1]);
493                 pos = pos + strlen(&buffer[(int)pos]);
494         }
495
496         /* Check for message routing */
497         if (msg->cm_fields['D'] != NULL) {
498                 if (strcasecmp(msg->cm_fields['D'], config.c_nodename)) {
499
500                         /* route the message */
501                         strcpy(nexthop, "");
502                         if (is_valid_node(nexthop, 
503                                           NULL, 
504                                           msg->cm_fields['D'], 
505                                           working_ignetcfg, 
506                                           the_netmap) == 0) 
507                         {
508                                 /* prepend our node to the path */
509                                 if (msg->cm_fields['P'] != NULL) {
510                                         oldpath = msg->cm_fields['P'];
511                                         msg->cm_fields['P'] = NULL;
512                                 }
513                                 else {
514                                         oldpath = strdup("unknown_user");
515                                 }
516                                 size = strlen(oldpath) + SIZ;
517                                 msg->cm_fields['P'] = malloc(size);
518                                 snprintf(msg->cm_fields['P'], size, "%s!%s",
519                                         config.c_nodename, oldpath);
520                                 free(oldpath);
521
522                                 /* serialize the message */
523                                 serialize_message(&sermsg, msg);
524
525                                 /* now send it */
526                                 if (IsEmptyStr(nexthop)) {
527                                         strcpy(nexthop, msg->cm_fields['D']);
528                                 }
529                                 snprintf(filename, 
530                                         sizeof filename,
531                                         "%s/%s@%lx%x",
532                                         ctdl_netout_dir,
533                                         nexthop,
534                                         time(NULL),
535                                         rand()
536                                 );
537                                 syslog(LOG_DEBUG, "Appending to %s\n", filename);
538                                 fp = fopen(filename, "ab");
539                                 if (fp != NULL) {
540                                         fwrite(sermsg.ser, sermsg.len, 1, fp);
541                                         fclose(fp);
542                                 }
543                                 else {
544                                         syslog(LOG_ERR, "%s: %s\n", filename, strerror(errno));
545                                 }
546                                 free(sermsg.ser);
547                                 CtdlFreeMessage(msg);
548                                 return;
549                         }
550                         
551                         else {  /* invalid destination node name */
552
553                                 network_bounce(msg,
554 "A message you sent could not be delivered due to an invalid destination node"
555 " name.  Please check the address and try sending the message again.\n");
556                                 msg = NULL;
557                                 return;
558
559                         }
560                 }
561         }
562
563         /*
564          * Check to see if we already have a copy of this message, and
565          * abort its processing if so.  (We used to post a warning to Aide>
566          * every time this happened, but the network is now so densely
567          * connected that it's inevitable.)
568          */
569         if (network_usetable(msg) != 0) {
570                 CtdlFreeMessage(msg);
571                 return;
572         }
573
574         /* Learn network topology from the path */
575         if ((msg->cm_fields['N'] != NULL) && (msg->cm_fields['P'] != NULL)) {
576                 network_learn_topology(msg->cm_fields['N'], 
577                                        msg->cm_fields['P'], 
578                                        the_netmap, 
579                                        netmap_changed);
580         }
581
582         /* Is the sending node giving us a very persuasive suggestion about
583          * which room this message should be saved in?  If so, go with that.
584          */
585         if (msg->cm_fields['C'] != NULL) {
586                 safestrncpy(target_room, msg->cm_fields['C'], sizeof target_room);
587         }
588
589         /* Otherwise, does it have a recipient?  If so, validate it... */
590         else if (msg->cm_fields['R'] != NULL) {
591                 recp = validate_recipients(msg->cm_fields['R'], NULL, 0);
592                 if (recp != NULL) if (recp->num_error != 0) {
593                         network_bounce(msg,
594                                 "A message you sent could not be delivered due to an invalid address.\n"
595                                 "Please check the address and try sending the message again.\n");
596                         msg = NULL;
597                         free_recipients(recp);
598                         syslog(LOG_DEBUG, "Bouncing message due to invalid recipient address.\n");
599                         return;
600                 }
601                 strcpy(target_room, "");        /* no target room if mail */
602         }
603
604         /* Our last shot at finding a home for this message is to see if
605          * it has the O field (Originating room) set.
606          */
607         else if (msg->cm_fields['O'] != NULL) {
608                 safestrncpy(target_room, msg->cm_fields['O'], sizeof target_room);
609         }
610
611         /* Strip out fields that are only relevant during transit */
612         if (msg->cm_fields['D'] != NULL) {
613                 free(msg->cm_fields['D']);
614                 msg->cm_fields['D'] = NULL;
615         }
616         if (msg->cm_fields['C'] != NULL) {
617                 free(msg->cm_fields['C']);
618                 msg->cm_fields['C'] = NULL;
619         }
620
621         /* save the message into a room */
622         if (PerformNetprocHooks(msg, target_room) == 0) {
623                 msg->cm_flags = CM_SKIP_HOOKS;
624                 CtdlSubmitMsg(msg, recp, target_room, 0);
625         }
626         CtdlFreeMessage(msg);
627         free_recipients(recp);
628 }
629
630
631 /*
632  * Process a single message from a single file from the inbound queue 
633  */
634 void network_process_message(FILE *fp, 
635                              long msgstart, 
636                              long msgend,
637                              char *working_ignetcfg,
638                              NetMap *the_netmap, 
639                              int *netmap_changed)
640 {
641         long hold_pos;
642         long size;
643         char *buffer;
644
645         hold_pos = ftell(fp);
646         size = msgend - msgstart + 1;
647         buffer = malloc(size);
648         if (buffer != NULL) {
649                 fseek(fp, msgstart, SEEK_SET);
650                 if (fread(buffer, size, 1, fp) > 0) {
651                         network_process_buffer(buffer, 
652                                                size, 
653                                                working_ignetcfg, 
654                                                the_netmap, 
655                                                netmap_changed);
656                 }
657                 free(buffer);
658         }
659
660         fseek(fp, hold_pos, SEEK_SET);
661 }
662
663
664 /*
665  * Process a single file from the inbound queue 
666  */
667 void network_process_file(char *filename,
668                           char *working_ignetcfg,
669                           NetMap *the_netmap, 
670                           int *netmap_changed)
671 {
672         FILE *fp;
673         long msgstart = (-1L);
674         long msgend = (-1L);
675         long msgcur = 0L;
676         int ch;
677
678
679         fp = fopen(filename, "rb");
680         if (fp == NULL) {
681                 syslog(LOG_CRIT, "Error opening %s: %s\n", filename, strerror(errno));
682                 return;
683         }
684
685         fseek(fp, 0L, SEEK_END);
686         syslog(LOG_INFO, "network: processing %ld bytes from %s\n", ftell(fp), filename);
687         rewind(fp);
688
689         /* Look for messages in the data stream and break them out */
690         while (ch = getc(fp), ch >= 0) {
691         
692                 if (ch == 255) {
693                         if (msgstart >= 0L) {
694                                 msgend = msgcur - 1;
695                                 network_process_message(fp,
696                                                         msgstart,
697                                                         msgend,
698                                                         working_ignetcfg,
699                                                         the_netmap,
700                                                         netmap_changed);
701                         }
702                         msgstart = msgcur;
703                 }
704
705                 ++msgcur;
706         }
707
708         msgend = msgcur - 1;
709         if (msgstart >= 0L) {
710                 network_process_message(fp,
711                                         msgstart,
712                                         msgend,
713                                         working_ignetcfg,
714                                         the_netmap,
715                                         netmap_changed);
716         }
717
718         fclose(fp);
719         unlink(filename);
720 }
721
722
723 /*
724  * Process anything in the inbound queue
725  */
726 void network_do_spoolin(char *working_ignetcfg, NetMap *the_netmap, int *netmap_changed)
727 {
728         DIR *dp;
729         struct dirent *d;
730         struct stat statbuf;
731         char filename[PATH_MAX];
732         static time_t last_spoolin_mtime = 0L;
733
734         /*
735          * Check the spoolin directory's modification time.  If it hasn't
736          * been touched, we don't need to scan it.
737          */
738         if (stat(ctdl_netin_dir, &statbuf)) return;
739         if (statbuf.st_mtime == last_spoolin_mtime) {
740                 syslog(LOG_DEBUG, "network: nothing in inbound queue\n");
741                 return;
742         }
743         last_spoolin_mtime = statbuf.st_mtime;
744         syslog(LOG_DEBUG, "network: processing inbound queue\n");
745
746         /*
747          * Ok, there's something interesting in there, so scan it.
748          */
749         dp = opendir(ctdl_netin_dir);
750         if (dp == NULL) return;
751
752         while (d = readdir(dp), d != NULL) {
753                 if ((strcmp(d->d_name, ".")) && (strcmp(d->d_name, ".."))) {
754                         snprintf(filename, 
755                                 sizeof filename,
756                                 "%s/%s",
757                                 ctdl_netin_dir,
758                                 d->d_name
759                         );
760                         network_process_file(filename,
761                                              working_ignetcfg,
762                                              the_netmap,
763                                              netmap_changed);
764                 }
765         }
766
767         closedir(dp);
768 }
769
770 /*
771  * Step 1: consolidate files in the outbound queue into one file per neighbor node
772  * Step 2: delete any files in the outbound queue that were for neighbors who no longer exist.
773  */
774 void network_consolidate_spoolout(char *working_ignetcfg, NetMap *the_netmap)
775 {
776         DIR *dp;
777         struct dirent *d;
778         char filename[PATH_MAX];
779         char cmd[PATH_MAX];
780         char nexthop[256];
781         long nexthoplen;
782         int i;
783         char *ptr;
784
785         /* Step 1: consolidate files in the outbound queue into one file per neighbor node */
786         dp = opendir(ctdl_netout_dir);
787         if (dp == NULL) return;
788         while (d = readdir(dp), d != NULL) {
789                 if (
790                         (strcmp(d->d_name, "."))
791                         && (strcmp(d->d_name, ".."))
792                         && (strchr(d->d_name, '@') != NULL)
793                 ) {
794                         nexthoplen = safestrncpy(nexthop, d->d_name, sizeof nexthop);
795                         ptr = strchr(nexthop, '@');
796                         if (ptr) {
797                                 *ptr = 0;
798                                 nexthoplen = ptr - nexthop;
799                         }                               
800         
801                         snprintf(filename, 
802                                 sizeof filename,
803                                 "%s/%s",
804                                 ctdl_netout_dir,
805                                 d->d_name
806                         );
807         
808                         syslog(LOG_DEBUG, "Consolidate %s to %s\n", filename, nexthop);
809                         if (network_talking_to(nexthop, nexthoplen, NTT_CHECK)) {
810                                 syslog(LOG_DEBUG,
811                                         "Currently online with %s - skipping for now\n",
812                                         nexthop
813                                 );
814                         }
815                         else {
816                                 network_talking_to(nexthop, nexthoplen, NTT_ADD);
817                                 snprintf(cmd, sizeof cmd, "/bin/cat %s >>%s/%s && /bin/rm -f %s",
818                                         filename,
819                                         ctdl_netout_dir, nexthop,
820                                         filename
821                                 );
822                                 system(cmd);
823                                 network_talking_to(nexthop, nexthoplen, NTT_REMOVE);
824                         }
825                 }
826         }
827         closedir(dp);
828
829         /* Step 2: delete any files in the outbound queue that were for neighbors who no longer exist */
830
831         dp = opendir(ctdl_netout_dir);
832         if (dp == NULL) return;
833
834         while (d = readdir(dp), d != NULL) {
835                 if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
836                         continue;
837                 ptr = strchr(d->d_name, '@');
838                 if (d != NULL)
839                         continue;
840                 snprintf(filename, 
841                         sizeof filename,
842                         "%s/%s",
843                         ctdl_netout_dir,
844                         d->d_name
845                 );
846
847                 strcpy(nexthop, "");
848                 i = is_valid_node(nexthop,
849                                   NULL,
850                                   d->d_name,
851                                   working_ignetcfg,
852                                   the_netmap);
853         
854                 if ( (i != 0) || !IsEmptyStr(nexthop) ) {
855                         unlink(filename);
856                 }
857         }
858
859
860         closedir(dp);
861 }
862
863
864
865
866 /*
867  * It's ok if these directories already exist.  Just fail silently.
868  */
869 void create_spool_dirs(void) {
870         if ((mkdir(ctdl_spool_dir, 0700) != 0) && (errno != EEXIST))
871                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_spool_dir, strerror(errno));
872         if (chown(ctdl_spool_dir, CTDLUID, (-1)) != 0)
873                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_spool_dir, strerror(errno));
874         if ((mkdir(ctdl_netin_dir, 0700) != 0) && (errno != EEXIST))
875                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netin_dir, strerror(errno));
876         if (chown(ctdl_netin_dir, CTDLUID, (-1)) != 0)
877                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netin_dir, strerror(errno));
878         if ((mkdir(ctdl_nettmp_dir, 0700) != 0) && (errno != EEXIST))
879                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_nettmp_dir, strerror(errno));
880         if (chown(ctdl_nettmp_dir, CTDLUID, (-1)) != 0)
881                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_nettmp_dir, strerror(errno));
882         if ((mkdir(ctdl_netout_dir, 0700) != 0) && (errno != EEXIST))
883                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netout_dir, strerror(errno));
884         if (chown(ctdl_netout_dir, CTDLUID, (-1)) != 0)
885                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netout_dir, strerror(errno));
886 }
887
888 /*
889  * Module entry point
890  */
891 CTDL_MODULE_INIT(network_spool)
892 {
893         if (!threading)
894         {
895                 create_spool_dirs();
896                 CtdlRegisterCleanupHook(destroy_network_queue_room);
897         }
898         return "network_spool";
899 }