Networker:
[citadel.git] / citadel / modules / network / serv_netspool.c
1 /*
2  * This module handles shared rooms, inter-Citadel mail, and outbound
3  * mailing list processing.
4  *
5  * Copyright (c) 2000-2011 by the citadel.org team
6  *
7  *  This program is open source software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; either version 3 of the License, or
10  *  (at your option) any later version.
11  *
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *  GNU General Public License for more details.
16  *
17  *  You should have received a copy of the GNU General Public License
18  *  along with this program; if not, write to the Free Software
19  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  * ** NOTE **   A word on the S_NETCONFIGS semaphore:
22  * This is a fairly high-level type of critical section.  It ensures that no
23  * two threads work on the netconfigs files at the same time.  Since we do
24  * so many things inside these, here are the rules:
25  *  1. begin_critical_section(S_NETCONFIGS) *before* begin_ any others.
26  *  2. Do *not* perform any I/O with the client during these sections.
27  *
28  */
29
30 /*
31  * Duration of time (in seconds) after which pending list subscribe/unsubscribe
32  * requests that have not been confirmed will be deleted.
33  */
34 #define EXP     259200  /* three days */
35
36 #include "sysdep.h"
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <stdio.h>
40 #include <fcntl.h>
41 #include <ctype.h>
42 #include <signal.h>
43 #include <pwd.h>
44 #include <errno.h>
45 #include <sys/stat.h>
46 #include <sys/types.h>
47 #include <dirent.h>
48 #if TIME_WITH_SYS_TIME
49 # include <sys/time.h>
50 # include <time.h>
51 #else
52 # if HAVE_SYS_TIME_H
53 #  include <sys/time.h>
54 # else
55 #  include <time.h>
56 # endif
57 #endif
58 #ifdef HAVE_SYSCALL_H
59 # include <syscall.h>
60 #else 
61 # if HAVE_SYS_SYSCALL_H
62 #  include <sys/syscall.h>
63 # endif
64 #endif
65
66 #include <sys/wait.h>
67 #include <string.h>
68 #include <limits.h>
69 #include <libcitadel.h>
70 #include "citadel.h"
71 #include "server.h"
72 #include "citserver.h"
73 #include "support.h"
74 #include "config.h"
75 #include "user_ops.h"
76 #include "database.h"
77 #include "msgbase.h"
78 #include "internet_addressing.h"
79 #include "serv_network.h"
80 #include "clientsocket.h"
81 #include "file_ops.h"
82 #include "citadel_dirs.h"
83 #include "threads.h"
84
85 #ifndef HAVE_SNPRINTF
86 #include "snprintf.h"
87 #endif
88
89 #include "context.h"
90 #include "netconfig.h"
91 #include "netspool.h"
92 #include "netmail.h"
93 #include "ctdl_module.h"
94
95 /*
96  * Learn topology from path fields
97  */
98 static void network_learn_topology(char *node, char *path, NetMap **the_netmap, int *netmap_changed)
99 {
100         char nexthop[256];
101         NetMap *nmptr;
102
103         *nexthop = '\0';
104
105         if (num_tokens(path, '!') < 3) return;
106         for (nmptr = *the_netmap; nmptr != NULL; nmptr = nmptr->next) {
107                 if (!strcasecmp(nmptr->nodename, node)) {
108                         extract_token(nmptr->nexthop, path, 0, '!', sizeof nmptr->nexthop);
109                         nmptr->lastcontact = time(NULL);
110                         (*netmap_changed) ++;
111                         return;
112                 }
113         }
114
115         /* If we got here then it's not in the map, so add it. */
116         nmptr = (NetMap *) malloc(sizeof (NetMap));
117         strcpy(nmptr->nodename, node);
118         nmptr->lastcontact = time(NULL);
119         extract_token(nmptr->nexthop, path, 0, '!', sizeof nmptr->nexthop);
120         nmptr->next = *the_netmap;
121         *the_netmap = nmptr;
122         (*netmap_changed) ++;
123 }
124
125
126         
127
128 int read_spoolcontrol_file(SpoolControl **scc, char *filename)
129 {
130         FILE *fp;
131         char instr[SIZ];
132         char buf[SIZ];
133         char nodename[256];
134         char roomname[ROOMNAMELEN];
135         size_t miscsize = 0;
136         size_t linesize = 0;
137         int skipthisline = 0;
138         namelist *nptr = NULL;
139         maplist *mptr = NULL;
140         SpoolControl *sc;
141
142         fp = fopen(filename, "r");
143         if (fp == NULL) {
144                 return 0;
145         }
146         sc = malloc(sizeof(SpoolControl));
147         memset(sc, 0, sizeof(SpoolControl));
148         *scc = sc;
149
150         while (fgets(buf, sizeof buf, fp) != NULL) {
151                 buf[strlen(buf)-1] = 0;
152
153                 extract_token(instr, buf, 0, '|', sizeof instr);
154                 if (!strcasecmp(instr, strof(lastsent))) {
155                         sc->lastsent = extract_long(buf, 1);
156                 }
157                 else if (!strcasecmp(instr, strof(listrecp))) {
158                         nptr = (namelist *)
159                                 malloc(sizeof(namelist));
160                         nptr->next = sc->listrecps;
161                         extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
162                         sc->listrecps = nptr;
163                 }
164                 else if (!strcasecmp(instr, strof(participate))) {
165                         nptr = (namelist *)
166                                 malloc(sizeof(namelist));
167                         nptr->next = sc->participates;
168                         extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
169                         sc->participates = nptr;
170                 }
171                 else if (!strcasecmp(instr, strof(digestrecp))) {
172                         nptr = (namelist *)
173                                 malloc(sizeof(namelist));
174                         nptr->next = sc->digestrecps;
175                         extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
176                         sc->digestrecps = nptr;
177                 }
178                 else if (!strcasecmp(instr, strof(ignet_push_share))) {
179                         extract_token(nodename, buf, 1, '|', sizeof nodename);
180                         extract_token(roomname, buf, 2, '|', sizeof roomname);
181                         mptr = (maplist *) malloc(sizeof(maplist));
182                         mptr->next = sc->ignet_push_shares;
183                         strcpy(mptr->remote_nodename, nodename);
184                         strcpy(mptr->remote_roomname, roomname);
185                         sc->ignet_push_shares = mptr;
186                 }
187                 else {
188                         /* Preserve 'other' lines ... *unless* they happen to
189                          * be subscribe/unsubscribe pendings with expired
190                          * timestamps.
191                          */
192                         skipthisline = 0;
193                         if (!strncasecmp(buf, strof(subpending)"|", 11)) {
194                                 if (time(NULL) - extract_long(buf, 4) > EXP) {
195                                         skipthisline = 1;
196                                 }
197                         }
198                         if (!strncasecmp(buf, strof(unsubpending)"|", 13)) {
199                                 if (time(NULL) - extract_long(buf, 3) > EXP) {
200                                         skipthisline = 1;
201                                 }
202                         }
203
204                         if (skipthisline == 0) {
205                                 linesize = strlen(buf);
206                                 sc->misc = realloc(sc->misc,
207                                         (miscsize + linesize + 2) );
208                                 sprintf(&sc->misc[miscsize], "%s\n", buf);
209                                 miscsize = miscsize + linesize + 1;
210                         }
211                 }
212
213
214         }
215         fclose(fp);
216         return 1;
217 }
218
219 void free_spoolcontrol_struct(SpoolControl **scc)
220 {
221         SpoolControl *sc;
222         namelist *nptr = NULL;
223         maplist *mptr = NULL;
224
225         sc = *scc;
226         while (sc->listrecps != NULL) {
227                 nptr = sc->listrecps->next;
228                 free(sc->listrecps);
229                 sc->listrecps = nptr;
230         }
231         /* Do the same for digestrecps */
232         while (sc->digestrecps != NULL) {
233                 nptr = sc->digestrecps->next;
234                 free(sc->digestrecps);
235                 sc->digestrecps = nptr;
236         }
237         /* Do the same for participates */
238         while (sc->participates != NULL) {
239                 nptr = sc->participates->next;
240                 free(sc->participates);
241                 sc->participates = nptr;
242         }
243         while (sc->ignet_push_shares != NULL) {
244                 mptr = sc->ignet_push_shares->next;
245                 free(sc->ignet_push_shares);
246                 sc->ignet_push_shares = mptr;
247         }
248         free(sc->misc);
249         free(sc);
250         *scc=NULL;
251 }
252
253 int writenfree_spoolcontrol_file(SpoolControl **scc, char *filename)
254 {
255         char tempfilename[PATH_MAX];
256         int TmpFD;
257         SpoolControl *sc;
258         namelist *nptr = NULL;
259         maplist *mptr = NULL;
260         long len;
261         time_t unixtime;
262         struct timeval tv;
263         long reltid; /* if we don't have SYS_gettid, use "random" value */
264         StrBuf *Cfg;
265         int rc;
266
267         len = strlen(filename);
268         memcpy(tempfilename, filename, len + 1);
269
270
271 #if defined(HAVE_SYSCALL_H) && defined (SYS_gettid)
272         reltid = syscall(SYS_gettid);
273 #endif
274         gettimeofday(&tv, NULL);
275         /* Promote to time_t; types differ on some OSes (like darwin) */
276         unixtime = tv.tv_sec;
277
278         sprintf(tempfilename + len, ".%ld-%ld", reltid, unixtime);
279         sc = *scc;
280         errno = 0;
281         TmpFD = open(tempfilename, O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR);
282         Cfg = NewStrBuf();
283         if ((TmpFD < 0) || (errno != 0)) {
284                 syslog(LOG_CRIT, "ERROR: cannot open %s: %s\n",
285                         filename, strerror(errno));
286                 free_spoolcontrol_struct(scc);
287                 unlink(tempfilename);
288         }
289         else {
290                 fchown(TmpFD, config.c_ctdluid, 0);
291                 StrBufAppendPrintf(Cfg, "lastsent|%ld\n", sc->lastsent);
292                 
293                 /* Write out the listrecps while freeing from memory at the
294                  * same time.  Am I clever or what?  :)
295                  */
296                 while (sc->listrecps != NULL) {
297                     StrBufAppendPrintf(Cfg, "listrecp|%s\n", sc->listrecps->name);
298                         nptr = sc->listrecps->next;
299                         free(sc->listrecps);
300                         sc->listrecps = nptr;
301                 }
302                 /* Do the same for digestrecps */
303                 while (sc->digestrecps != NULL) {
304                         StrBufAppendPrintf(Cfg, "digestrecp|%s\n", sc->digestrecps->name);
305                         nptr = sc->digestrecps->next;
306                         free(sc->digestrecps);
307                         sc->digestrecps = nptr;
308                 }
309                 /* Do the same for participates */
310                 while (sc->participates != NULL) {
311                         StrBufAppendPrintf(Cfg, "participate|%s\n", sc->participates->name);
312                         nptr = sc->participates->next;
313                         free(sc->participates);
314                         sc->participates = nptr;
315                 }
316                 while (sc->ignet_push_shares != NULL) {
317                         StrBufAppendPrintf(Cfg, "ignet_push_share|%s", sc->ignet_push_shares->remote_nodename);
318                         if (!IsEmptyStr(sc->ignet_push_shares->remote_roomname)) {
319                                 StrBufAppendPrintf(Cfg, "|%s", sc->ignet_push_shares->remote_roomname);
320                         }
321                         StrBufAppendPrintf(Cfg, "\n");
322                         mptr = sc->ignet_push_shares->next;
323                         free(sc->ignet_push_shares);
324                         sc->ignet_push_shares = mptr;
325                 }
326                 if (sc->misc != NULL) {
327                         StrBufAppendBufPlain(Cfg, sc->misc, -1, 0);
328                 }
329                 free(sc->misc);
330
331                 rc = write(TmpFD, ChrPtr(Cfg), StrLength(Cfg));
332                 if ((rc >=0 ) && (rc == StrLength(Cfg))) 
333                 {
334                         close(TmpFD);
335                         rename(tempfilename, filename);
336                 }
337                 else {
338                         syslog(LOG_EMERG, 
339                                       "unable to write %s; [%s]; not enough space on the disk?\n", 
340                                       tempfilename, 
341                                       strerror(errno));
342                         close(TmpFD);
343                         unlink(tempfilename);
344                 }
345                 FreeStrBuf(&Cfg);
346                 free(sc);
347                 *scc=NULL;
348         }
349         return 1;
350 }
351 int is_recipient(SpoolControl *sc, const char *Name)
352 {
353         namelist *nptr;
354         size_t len;
355
356         len = strlen(Name);
357         nptr = sc->listrecps;
358         while (nptr != NULL) {
359                 if (strncmp(Name, nptr->name, len)==0)
360                         return 1;
361                 nptr = nptr->next;
362         }
363         /* Do the same for digestrecps */
364         nptr = sc->digestrecps;
365         while (nptr != NULL) {
366                 if (strncmp(Name, nptr->name, len)==0)
367                         return 1;
368                 nptr = nptr->next;
369         }
370         /* Do the same for participates */
371         nptr = sc->participates;
372         while (nptr != NULL) {
373                 if (strncmp(Name, nptr->name, len)==0)
374                         return 1;
375                 nptr = nptr->next;
376         }
377         return 0;
378 }
379
380
381 /*
382  * Batch up and send all outbound traffic from the current room
383  */
384 void network_spoolout_room(RoomProcList *room_to_spool,                        
385                            char *working_ignetcfg,
386                            NetMap *the_netmap)
387 {
388         char buf[SIZ];
389         char filename[PATH_MAX];
390         SpoolControl *sc;
391         int i;
392
393         /*
394          * If the room doesn't exist, don't try to perform its networking tasks.
395          * Normally this should never happen, but once in a while maybe a room gets
396          * queued for networking and then deleted before it can happen.
397          */
398         if (CtdlGetRoom(&CC->room, room_to_spool->name) != 0) {
399                 syslog(LOG_CRIT, "ERROR: cannot load <%s>\n", room_to_spool->name);
400                 return;
401         }
402
403         assoc_file_name(filename, sizeof filename, &CC->room, ctdl_netcfg_dir);
404         begin_critical_section(S_NETCONFIGS);
405
406         /* Only do net processing for rooms that have netconfigs */
407         if (!read_spoolcontrol_file(&sc, filename))
408         {
409                 end_critical_section(S_NETCONFIGS);
410                 return;
411         }
412         syslog(LOG_INFO, "Networking started for <%s>\n", CC->room.QRname);
413
414         sc->working_ignetcfg = working_ignetcfg;
415         sc->the_netmap = the_netmap;
416
417         /* If there are digest recipients, we have to build a digest */
418         if (sc->digestrecps != NULL) {
419                 sc->digestfp = tmpfile();
420                 fprintf(sc->digestfp, "Content-type: text/plain\n\n");
421         }
422
423         /* Do something useful */
424         CtdlForEachMessage(MSGS_GT, sc->lastsent, NULL, NULL, NULL,
425                 network_spool_msg, sc);
426
427         /* If we wrote a digest, deliver it and then close it */
428         snprintf(buf, sizeof buf, "room_%s@%s",
429                 CC->room.QRname, config.c_fqdn);
430         for (i=0; buf[i]; ++i) {
431                 buf[i] = tolower(buf[i]);
432                 if (isspace(buf[i])) buf[i] = '_';
433         }
434         if (sc->digestfp != NULL) {
435                 fprintf(sc->digestfp,   " -----------------------------------"
436                                         "------------------------------------"
437                                         "-------\n"
438                                         "You are subscribed to the '%s' "
439                                         "list.\n"
440                                         "To post to the list: %s\n",
441                                         CC->room.QRname, buf
442                 );
443                 network_deliver_digest(sc);     /* deliver and close */
444         }
445
446         /* Now rewrite the config file */
447         writenfree_spoolcontrol_file(&sc, filename);
448         end_critical_section(S_NETCONFIGS);
449 }
450
451 /*
452  * Process a buffer containing a single message from a single file
453  * from the inbound queue 
454  */
455 void network_process_buffer(char *buffer, long size, char *working_ignetcfg, NetMap **the_netmap, int *netmap_changed)
456 {
457         struct CtdlMessage *msg = NULL;
458         long pos;
459         int field;
460         struct recptypes *recp = NULL;
461         char target_room[ROOMNAMELEN];
462         struct ser_ret sermsg;
463         char *oldpath = NULL;
464         char filename[PATH_MAX];
465         FILE *fp;
466         char nexthop[SIZ];
467         unsigned char firstbyte;
468         unsigned char lastbyte;
469
470         syslog(LOG_DEBUG, "network_process_buffer() processing %ld bytes\n", size);
471
472         /* Validate just a little bit.  First byte should be FF and * last byte should be 00. */
473         firstbyte = buffer[0];
474         lastbyte = buffer[size-1];
475         if ( (firstbyte != 255) || (lastbyte != 0) ) {
476                 syslog(LOG_ERR, "Corrupt message ignored.  Length=%ld, firstbyte = %d, lastbyte = %d\n",
477                         size, firstbyte, lastbyte);
478                 return;
479         }
480
481         /* Set default target room to trash */
482         strcpy(target_room, TWITROOM);
483
484         /* Load the message into memory */
485         msg = (struct CtdlMessage *) malloc(sizeof(struct CtdlMessage));
486         memset(msg, 0, sizeof(struct CtdlMessage));
487         msg->cm_magic = CTDLMESSAGE_MAGIC;
488         msg->cm_anon_type = buffer[1];
489         msg->cm_format_type = buffer[2];
490
491         for (pos = 3; pos < size; ++pos) {
492                 field = buffer[pos];
493                 msg->cm_fields[field] = strdup(&buffer[pos+1]);
494                 pos = pos + strlen(&buffer[(int)pos]);
495         }
496
497         /* Check for message routing */
498         if (msg->cm_fields['D'] != NULL) {
499                 if (strcasecmp(msg->cm_fields['D'], config.c_nodename)) {
500
501                         /* route the message */
502                         strcpy(nexthop, "");
503                         if (is_valid_node(nexthop, 
504                                           NULL, 
505                                           msg->cm_fields['D'], 
506                                           working_ignetcfg, 
507                                           *the_netmap) == 0) 
508                         {
509                                 /* prepend our node to the path */
510                                 if (msg->cm_fields['P'] != NULL) {
511                                         oldpath = msg->cm_fields['P'];
512                                         msg->cm_fields['P'] = NULL;
513                                 }
514                                 else {
515                                         oldpath = strdup("unknown_user");
516                                 }
517                                 size = strlen(oldpath) + SIZ;
518                                 msg->cm_fields['P'] = malloc(size);
519                                 snprintf(msg->cm_fields['P'], size, "%s!%s",
520                                         config.c_nodename, oldpath);
521                                 free(oldpath);
522
523                                 /* serialize the message */
524                                 serialize_message(&sermsg, msg);
525
526                                 /* now send it */
527                                 if (IsEmptyStr(nexthop)) {
528                                         strcpy(nexthop, msg->cm_fields['D']);
529                                 }
530                                 snprintf(filename, 
531                                         sizeof filename,
532                                         "%s/%s@%lx%x",
533                                         ctdl_netout_dir,
534                                         nexthop,
535                                         time(NULL),
536                                         rand()
537                                 );
538                                 syslog(LOG_DEBUG, "Appending to %s\n", filename);
539                                 fp = fopen(filename, "ab");
540                                 if (fp != NULL) {
541                                         fwrite(sermsg.ser, sermsg.len, 1, fp);
542                                         fclose(fp);
543                                 }
544                                 else {
545                                         syslog(LOG_ERR, "%s: %s\n", filename, strerror(errno));
546                                 }
547                                 free(sermsg.ser);
548                                 CtdlFreeMessage(msg);
549                                 return;
550                         }
551                         
552                         else {  /* invalid destination node name */
553
554                                 network_bounce(msg,
555 "A message you sent could not be delivered due to an invalid destination node"
556 " name.  Please check the address and try sending the message again.\n");
557                                 msg = NULL;
558                                 return;
559
560                         }
561                 }
562         }
563
564         /*
565          * Check to see if we already have a copy of this message, and
566          * abort its processing if so.  (We used to post a warning to Aide>
567          * every time this happened, but the network is now so densely
568          * connected that it's inevitable.)
569          */
570         if (network_usetable(msg) != 0) {
571                 CtdlFreeMessage(msg);
572                 return;
573         }
574
575         /* Learn network topology from the path */
576         if ((msg->cm_fields['N'] != NULL) && (msg->cm_fields['P'] != NULL)) {
577                 network_learn_topology(msg->cm_fields['N'], 
578                                        msg->cm_fields['P'], 
579                                        the_netmap, 
580                                        netmap_changed);
581         }
582
583         /* Is the sending node giving us a very persuasive suggestion about
584          * which room this message should be saved in?  If so, go with that.
585          */
586         if (msg->cm_fields['C'] != NULL) {
587                 safestrncpy(target_room, msg->cm_fields['C'], sizeof target_room);
588         }
589
590         /* Otherwise, does it have a recipient?  If so, validate it... */
591         else if (msg->cm_fields['R'] != NULL) {
592                 recp = validate_recipients(msg->cm_fields['R'], NULL, 0);
593                 if (recp != NULL) if (recp->num_error != 0) {
594                         network_bounce(msg,
595                                 "A message you sent could not be delivered due to an invalid address.\n"
596                                 "Please check the address and try sending the message again.\n");
597                         msg = NULL;
598                         free_recipients(recp);
599                         syslog(LOG_DEBUG, "Bouncing message due to invalid recipient address.\n");
600                         return;
601                 }
602                 strcpy(target_room, "");        /* no target room if mail */
603         }
604
605         /* Our last shot at finding a home for this message is to see if
606          * it has the O field (Originating room) set.
607          */
608         else if (msg->cm_fields['O'] != NULL) {
609                 safestrncpy(target_room, msg->cm_fields['O'], sizeof target_room);
610         }
611
612         /* Strip out fields that are only relevant during transit */
613         if (msg->cm_fields['D'] != NULL) {
614                 free(msg->cm_fields['D']);
615                 msg->cm_fields['D'] = NULL;
616         }
617         if (msg->cm_fields['C'] != NULL) {
618                 free(msg->cm_fields['C']);
619                 msg->cm_fields['C'] = NULL;
620         }
621
622         /* save the message into a room */
623         if (PerformNetprocHooks(msg, target_room) == 0) {
624                 msg->cm_flags = CM_SKIP_HOOKS;
625                 CtdlSubmitMsg(msg, recp, target_room, 0);
626         }
627         CtdlFreeMessage(msg);
628         free_recipients(recp);
629 }
630
631
632 /*
633  * Process a single message from a single file from the inbound queue 
634  */
635 void network_process_message(FILE *fp, 
636                              long msgstart, 
637                              long msgend,
638                              char *working_ignetcfg,
639                              NetMap **the_netmap, 
640                              int *netmap_changed)
641 {
642         long hold_pos;
643         long size;
644         char *buffer;
645
646         hold_pos = ftell(fp);
647         size = msgend - msgstart + 1;
648         buffer = malloc(size);
649         if (buffer != NULL) {
650                 fseek(fp, msgstart, SEEK_SET);
651                 if (fread(buffer, size, 1, fp) > 0) {
652                         network_process_buffer(buffer, 
653                                                size, 
654                                                working_ignetcfg, 
655                                                the_netmap, 
656                                                netmap_changed);
657                 }
658                 free(buffer);
659         }
660
661         fseek(fp, hold_pos, SEEK_SET);
662 }
663
664
665 /*
666  * Process a single file from the inbound queue 
667  */
668 void network_process_file(char *filename,
669                           char *working_ignetcfg,
670                           NetMap **the_netmap, 
671                           int *netmap_changed)
672 {
673         FILE *fp;
674         long msgstart = (-1L);
675         long msgend = (-1L);
676         long msgcur = 0L;
677         int ch;
678
679
680         fp = fopen(filename, "rb");
681         if (fp == NULL) {
682                 syslog(LOG_CRIT, "Error opening %s: %s\n", filename, strerror(errno));
683                 return;
684         }
685
686         fseek(fp, 0L, SEEK_END);
687         syslog(LOG_INFO, "network: processing %ld bytes from %s\n", ftell(fp), filename);
688         rewind(fp);
689
690         /* Look for messages in the data stream and break them out */
691         while (ch = getc(fp), ch >= 0) {
692         
693                 if (ch == 255) {
694                         if (msgstart >= 0L) {
695                                 msgend = msgcur - 1;
696                                 network_process_message(fp,
697                                                         msgstart,
698                                                         msgend,
699                                                         working_ignetcfg,
700                                                         the_netmap,
701                                                         netmap_changed);
702                         }
703                         msgstart = msgcur;
704                 }
705
706                 ++msgcur;
707         }
708
709         msgend = msgcur - 1;
710         if (msgstart >= 0L) {
711                 network_process_message(fp,
712                                         msgstart,
713                                         msgend,
714                                         working_ignetcfg,
715                                         the_netmap,
716                                         netmap_changed);
717         }
718
719         fclose(fp);
720         unlink(filename);
721 }
722
723
724 /*
725  * Process anything in the inbound queue
726  */
727 void network_do_spoolin(char *working_ignetcfg, NetMap **the_netmap, int *netmap_changed)
728 {
729         DIR *dp;
730         struct dirent *d;
731         struct stat statbuf;
732         char filename[PATH_MAX];
733         static time_t last_spoolin_mtime = 0L;
734
735         /*
736          * Check the spoolin directory's modification time.  If it hasn't
737          * been touched, we don't need to scan it.
738          */
739         if (stat(ctdl_netin_dir, &statbuf)) return;
740         if (statbuf.st_mtime == last_spoolin_mtime) {
741                 syslog(LOG_DEBUG, "network: nothing in inbound queue\n");
742                 return;
743         }
744         last_spoolin_mtime = statbuf.st_mtime;
745         syslog(LOG_DEBUG, "network: processing inbound queue\n");
746
747         /*
748          * Ok, there's something interesting in there, so scan it.
749          */
750         dp = opendir(ctdl_netin_dir);
751         if (dp == NULL) return;
752
753         while (d = readdir(dp), d != NULL) {
754                 if ((strcmp(d->d_name, ".")) && (strcmp(d->d_name, ".."))) {
755                         snprintf(filename, 
756                                 sizeof filename,
757                                 "%s/%s",
758                                 ctdl_netin_dir,
759                                 d->d_name
760                         );
761                         network_process_file(filename,
762                                              working_ignetcfg,
763                                              the_netmap,
764                                              netmap_changed);
765                 }
766         }
767
768         closedir(dp);
769 }
770
771 /*
772  * Step 1: consolidate files in the outbound queue into one file per neighbor node
773  * Step 2: delete any files in the outbound queue that were for neighbors who no longer exist.
774  */
775 void network_consolidate_spoolout(char *working_ignetcfg, NetMap *the_netmap)
776 {
777         DIR *dp;
778         struct dirent *d;
779         char filename[PATH_MAX];
780         char cmd[PATH_MAX];
781         char nexthop[256];
782         long nexthoplen;
783         int i;
784         char *ptr;
785
786         /* Step 1: consolidate files in the outbound queue into one file per neighbor node */
787         dp = opendir(ctdl_netout_dir);
788         if (dp == NULL) return;
789         while (d = readdir(dp), d != NULL) {
790                 if (
791                         (strcmp(d->d_name, "."))
792                         && (strcmp(d->d_name, ".."))
793                         && (strchr(d->d_name, '@') != NULL)
794                 ) {
795                         nexthoplen = safestrncpy(nexthop, d->d_name, sizeof nexthop);
796                         ptr = strchr(nexthop, '@');
797                         if (ptr) {
798                                 *ptr = 0;
799                                 nexthoplen = ptr - nexthop;
800                         }                               
801         
802                         snprintf(filename, 
803                                 sizeof filename,
804                                 "%s/%s",
805                                 ctdl_netout_dir,
806                                 d->d_name
807                         );
808         
809                         syslog(LOG_DEBUG, "Consolidate %s to %s\n", filename, nexthop);
810                         if (network_talking_to(nexthop, nexthoplen, NTT_CHECK)) {
811                                 syslog(LOG_DEBUG,
812                                         "Currently online with %s - skipping for now\n",
813                                         nexthop
814                                 );
815                         }
816                         else {
817                                 network_talking_to(nexthop, nexthoplen, NTT_ADD);
818                                 snprintf(cmd, sizeof cmd, "/bin/cat %s >>%s/%s && /bin/rm -f %s",
819                                         filename,
820                                         ctdl_netout_dir, nexthop,
821                                         filename
822                                 );
823                                 system(cmd);
824                                 network_talking_to(nexthop, nexthoplen, NTT_REMOVE);
825                         }
826                 }
827         }
828         closedir(dp);
829
830         /* Step 2: delete any files in the outbound queue that were for neighbors who no longer exist */
831
832         dp = opendir(ctdl_netout_dir);
833         if (dp == NULL) return;
834
835         while (d = readdir(dp), d != NULL) {
836                 if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
837                         continue;
838
839                 snprintf(filename, 
840                         sizeof filename,
841                         "%s/%s",
842                         ctdl_netout_dir,
843                         d->d_name
844                 );
845
846                 strcpy(nexthop, "");
847                 i = is_valid_node(nexthop,
848                                   NULL,
849                                   d->d_name,
850                                   working_ignetcfg,
851                                   the_netmap);
852         
853                 if ( (i != 0) || !IsEmptyStr(nexthop) ) {
854                         unlink(filename);
855                 }
856         }
857
858
859         closedir(dp);
860 }
861
862
863
864
865 /*
866  * It's ok if these directories already exist.  Just fail silently.
867  */
868 void create_spool_dirs(void) {
869         if ((mkdir(ctdl_spool_dir, 0700) != 0) && (errno != EEXIST))
870                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_spool_dir, strerror(errno));
871         if (chown(ctdl_spool_dir, CTDLUID, (-1)) != 0)
872                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_spool_dir, strerror(errno));
873         if ((mkdir(ctdl_netin_dir, 0700) != 0) && (errno != EEXIST))
874                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netin_dir, strerror(errno));
875         if (chown(ctdl_netin_dir, CTDLUID, (-1)) != 0)
876                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netin_dir, strerror(errno));
877         if ((mkdir(ctdl_nettmp_dir, 0700) != 0) && (errno != EEXIST))
878                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_nettmp_dir, strerror(errno));
879         if (chown(ctdl_nettmp_dir, CTDLUID, (-1)) != 0)
880                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_nettmp_dir, strerror(errno));
881         if ((mkdir(ctdl_netout_dir, 0700) != 0) && (errno != EEXIST))
882                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netout_dir, strerror(errno));
883         if (chown(ctdl_netout_dir, CTDLUID, (-1)) != 0)
884                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netout_dir, strerror(errno));
885 }
886
887 /*
888  * Module entry point
889  */
890 CTDL_MODULE_INIT(network_spool)
891 {
892         if (!threading)
893         {
894                 create_spool_dirs();
895 //////todo              CtdlRegisterCleanupHook(destroy_network_queue_room);
896         }
897         return "network_spool";
898 }