185c739a441638b157bc62549d907dc44216ba72
[citadel.git] / citadel / modules / network / serv_netspool.c
1 /*
2  * This module handles shared rooms, inter-Citadel mail, and outbound
3  * mailing list processing.
4  *
5  * Copyright (c) 2000-2012 by the citadel.org team
6  *
7  *  This program is open source software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License, version 3.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  * ** NOTE **   A word on the S_NETCONFIGS semaphore:
16  * This is a fairly high-level type of critical section.  It ensures that no
17  * two threads work on the netconfigs files at the same time.  Since we do
18  * so many things inside these, here are the rules:
19  *  1. begin_critical_section(S_NETCONFIGS) *before* begin_ any others.
20  *  2. Do *not* perform any I/O with the client during these sections.
21  *
22  */
23
24 /*
25  * Duration of time (in seconds) after which pending list subscribe/unsubscribe
26  * requests that have not been confirmed will be deleted.
27  */
28 #define EXP     259200  /* three days */
29
30 #include "sysdep.h"
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <fcntl.h>
35 #include <ctype.h>
36 #include <signal.h>
37 #include <pwd.h>
38 #include <errno.h>
39 #include <sys/stat.h>
40 #include <sys/types.h>
41 #include <dirent.h>
42 #if TIME_WITH_SYS_TIME
43 # include <sys/time.h>
44 # include <time.h>
45 #else
46 # if HAVE_SYS_TIME_H
47 #  include <sys/time.h>
48 # else
49 #  include <time.h>
50 # endif
51 #endif
52 #ifdef HAVE_SYSCALL_H
53 # include <syscall.h>
54 #else 
55 # if HAVE_SYS_SYSCALL_H
56 #  include <sys/syscall.h>
57 # endif
58 #endif
59
60 #include <sys/wait.h>
61 #include <string.h>
62 #include <limits.h>
63 #include <libcitadel.h>
64 #include "citadel.h"
65 #include "server.h"
66 #include "citserver.h"
67 #include "support.h"
68 #include "config.h"
69 #include "user_ops.h"
70 #include "database.h"
71 #include "msgbase.h"
72 #include "internet_addressing.h"
73 #include "serv_network.h"
74 #include "clientsocket.h"
75 #include "file_ops.h"
76 #include "citadel_dirs.h"
77 #include "threads.h"
78 #include "context.h"
79 #include "netconfig.h"
80 #include "netspool.h"
81 #include "netmail.h"
82 #include "ctdl_module.h"
83
84
85 HashList *CfgTypeHash = NULL;
86 typedef struct CfgLineType CfgLineType;
87 typedef void (*CfgLineParser)(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, SpoolControl *sc);
88 typedef void (*CfgLineSerializer)(const CfgLineType *ThisOne, StrBuf *OuptputBuffer, SpoolControl *sc, namelist *data);
89 typedef void (*CfgLineDeAllocator)(const CfgLineType *ThisOne, namelist **data);
90 struct CfgLineType {
91         RoomNetCfg C;
92         CfgLineParser Parser;
93         CfgLineSerializer Serializer;
94         CfgLineDeAllocator DeAllocator;
95         ConstStr Str;
96         int IsSingleLine;
97 };
98 #define REGISTERRoomCfgType(a, p, uniq, s, d) RegisterRoomCfgType(#a, sizeof(#a) - 1, a, p, uniq, s, d);
99
100 void RegisterRoomCfgType(const char* Name, long len, RoomNetCfg eCfg, CfgLineParser p, int uniq, CfgLineSerializer s, CfgLineDeAllocator d)
101 {
102         CfgLineType *pCfg;
103
104         pCfg = (CfgLineType*) malloc(sizeof(CfgLineType));
105         pCfg->Parser = p;
106         pCfg->Serializer = s;
107         pCfg->C = eCfg;
108         pCfg->Str.Key = Name;
109         pCfg->Str.len = len;
110         pCfg->IsSingleLine = uniq;
111
112         if (CfgTypeHash == NULL)
113                 CfgTypeHash = NewHash(1, NULL);
114         Put(CfgTypeHash, Name, len, pCfg, NULL);
115 }
116
117 const CfgLineType *GetCfgTypeByStr(const char *Key, long len)
118 {
119         void *pv;
120         
121         if (GetHash(CfgTypeHash, Key, len, &pv) && (pv != NULL))
122         {
123                 return (const CfgLineType *) pv;
124         }
125         else
126         {
127                 return NULL;
128         }
129 }
130
131 const CfgLineType *GetCfgTypeByEnum(RoomNetCfg eCfg, HashPos *It)
132 {
133         const char *Key;
134         long len;
135         void *pv;
136         CfgLineType *pCfg;
137
138         RewindHashPos(CfgTypeHash, It, 1);
139         while (GetNextHashPos(CfgTypeHash, It, &len, &Key, &pv) && (pv != NULL))
140         {
141                 pCfg = (CfgLineType*) pv;
142                 if (pCfg->C == eCfg)
143                         return pCfg;
144         }
145         return NULL;
146 }
147 void ParseDefault(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, SpoolControl *sc)
148 {
149         namelist *nptr;
150
151         nptr = (namelist *)
152                 malloc(sizeof(namelist));
153         nptr->next = sc->NetConfigs[ThisOne->C];
154         nptr->Value = NewStrBufPlain(LinePos, StrLength(Line) - ( LinePos - ChrPtr(Line)) );
155         sc->NetConfigs[ThisOne->C] = nptr;
156 }
157
158 void ParseLastSent(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, SpoolControl *sc)
159 {
160         sc->lastsent = extract_long(LinePos, 0);
161 }
162 void ParseIgnetPushShare(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, SpoolControl *sc)
163 {
164 /*
165         extract_token(nodename, LinePos, 0, '|', sizeof nodename);
166         extract_token(roomname, LinePos, 1, '|', sizeof roomname);
167         mptr = (maplist *) malloc(sizeof(maplist));
168         mptr->next = sc->ignet_push_shares;
169         strcpy(mptr->remote_nodename, nodename);
170         strcpy(mptr->remote_roomname, roomname);
171         sc->ignet_push_shares = mptr;
172 */
173 }
174
175 void ParseRoomAlias(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, SpoolControl *sc)
176 {
177 /*
178         if (sc->sender != NULL)
179                 continue; / * just one alowed... * /
180         extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
181         sc->sender = nptr;
182 */
183 }
184
185 void ParseSubPendingLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, SpoolControl *sc)
186 {
187
188         if (time(NULL) - extract_long(LinePos, 3) > EXP) {
189                 //      skipthisline = 1;
190         }
191         else
192         {
193         }
194
195 }
196 void ParseUnSubPendingLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, SpoolControl *sc)
197 {
198         ///int skipthisline = 0;
199         if (time(NULL) - extract_long(LinePos, 2) > EXP) {
200                 //      skipthisline = 1;
201         }
202
203 }
204
205 void DeleteGenericCfgLine(const CfgLineType *ThisOne, namelist **data)
206 {
207         FreeStrBuf(&(*data)->Value);
208         free(*data);
209         *data = NULL;
210 }
211 int read_spoolcontrol_file(SpoolControl **scc, char *filename)
212 {
213         int fd;
214         const char *ErrStr = NULL;
215         const char *Pos;
216         const CfgLineType *pCfg;
217         StrBuf *Line;
218         StrBuf *InStr;
219         SpoolControl *sc;
220
221         fd = open(filename, O_NONBLOCK|O_RDONLY);
222         if (fd == -1) {
223                 *scc = NULL;
224                 return 0;
225         }
226         sc = malloc(sizeof(SpoolControl));
227         memset(sc, 0, sizeof(SpoolControl));
228         *scc = sc;
229
230         while (StrBufTCP_read_line(Line, &fd, 0, &ErrStr) >= 0) {
231                 if (StrLength(Line) == 0)
232                         continue;
233                 Pos = NULL;
234                 InStr = NewStrBufPlain(NULL, StrLength(Line));
235                 StrBufExtract_NextToken(InStr, Line, &Pos, '|');
236
237                 pCfg = GetCfgTypeByStr(SKEY(InStr));
238                 if (pCfg != NULL)
239                 {
240                         pCfg->Parser(pCfg, Line, Pos, sc);
241                 }
242                 else
243                 {
244                         if (sc->misc == NULL)
245                         {
246                                 sc->misc = NewStrBufDup(Line);
247                         }
248                         else
249                         {
250                                 if(StrLength(sc->misc) > 0)
251                                         StrBufAppendBufPlain(sc->misc, HKEY("\n"), 0);
252                                 StrBufAppendBuf(sc->misc, Line, 0);
253                         }
254                 }
255         }
256         if (fd > 0)
257                 close(fd);
258         FreeStrBuf(&InStr);
259         FreeStrBuf(&Line);
260         return 1;
261 }
262
263 void SerializeLastSent(const CfgLineType *ThisOne, StrBuf *OutputBuffer, SpoolControl *sc, namelist *data)
264 {
265         StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0);
266         StrBufAppendPrintf(OutputBuffer, "|%ld\n", sc->lastsent);
267 }
268
269 void SerializeGeneric(const CfgLineType *ThisOne, StrBuf *OutputBuffer, SpoolControl *sc, namelist *data)
270 {
271         StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0);
272         StrBufAppendBuf(OutputBuffer, data->Value, 0);
273         StrBufAppendBufPlain(OutputBuffer, HKEY("\n"), 0);
274 }
275 void SerializeIgnetPushShare(const CfgLineType *ThisOne, StrBuf *OutputBuffer, SpoolControl *sc, namelist *data)
276 {
277         StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0);
278 /*
279                         StrBufAppendPrintf(Cfg, "ignet_push_share|%s", sc->ignet_push_shares->remote_nodename);
280                         if (!IsEmptyStr(sc->ignet_push_shares->remote_roomname)) {
281                                 StrBufAppendPrintf(Cfg, "|%s", sc->ignet_push_shares->remote_roomname);
282                         }
283                         StrBufAppendPrintf(Cfg, "\n");
284                         mptr = sc->ignet_push_shares->next;
285                         free(sc->ignet_push_shares);
286                         sc->ignet_push_shares = mptr;
287 */
288         StrBufAppendBuf(OutputBuffer, data->Value, 0);
289         StrBufAppendBufPlain(OutputBuffer, HKEY("\n"), 0);
290 }
291
292 int save_spoolcontrol_file(SpoolControl *sc, char *filename)
293 {
294         RoomNetCfg eCfg;
295         StrBuf *Cfg;
296         char tempfilename[PATH_MAX];
297         int TmpFD;
298         long len;
299         time_t unixtime;
300         struct timeval tv;
301         long reltid; /* if we don't have SYS_gettid, use "random" value */
302         StrBuf *OutBuffer;
303         int rc;
304         HashPos *CfgIt;
305
306         len = strlen(filename);
307         memcpy(tempfilename, filename, len + 1);
308
309
310 #if defined(HAVE_SYSCALL_H) && defined (SYS_gettid)
311         reltid = syscall(SYS_gettid);
312 #endif
313         gettimeofday(&tv, NULL);
314         /* Promote to time_t; types differ on some OSes (like darwin) */
315         unixtime = tv.tv_sec;
316
317         sprintf(tempfilename + len, ".%ld-%ld", reltid, unixtime);
318         errno = 0;
319         TmpFD = open(tempfilename, O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR);
320         Cfg = NewStrBuf();
321         if ((TmpFD < 0) || (errno != 0)) {
322                 syslog(LOG_CRIT, "ERROR: cannot open %s: %s\n",
323                         filename, strerror(errno));
324                 unlink(tempfilename);
325                 return 0;
326         }
327         else {
328                 CfgIt = GetNewHashPos(CfgTypeHash, 1);
329                 fchown(TmpFD, config.c_ctdluid, 0);
330                 for (eCfg = subpending; eCfg < maxRoomNetCfg; eCfg ++)
331                 {
332                         const CfgLineType *pCfg;
333                         pCfg = GetCfgTypeByEnum(eCfg, CfgIt);
334                         if (pCfg->IsSingleLine)
335                         {
336                                 pCfg->Serializer(pCfg, OutBuffer, sc, NULL);
337                         }
338                         else
339                         {
340                                 namelist *pName = sc->NetConfigs[pCfg->C];
341                                 while (pName != NULL)
342                                 {
343                                         pCfg->Serializer(pCfg, OutBuffer, sc, pName);
344                                         pName = pName->next;
345                                 }
346                                 
347                                 
348                         }
349
350                 }
351                 DeleteHashPos(&CfgIt);
352
353
354                 if (sc->misc != NULL) {
355                         StrBufAppendBuf(OutBuffer, sc->misc, 0);
356                 }
357
358                 rc = write(TmpFD, ChrPtr(OutBuffer), StrLength(OutBuffer));
359                 if ((rc >=0 ) && (rc == StrLength(Cfg))) 
360                 {
361                         close(TmpFD);
362                         rename(tempfilename, filename);
363                         rc = 1;
364                 }
365                 else {
366                         syslog(LOG_EMERG, 
367                                       "unable to write %s; [%s]; not enough space on the disk?\n", 
368                                       tempfilename, 
369                                       strerror(errno));
370                         close(TmpFD);
371                         unlink(tempfilename);
372                         rc = 0;
373                 }
374                 FreeStrBuf(&OutBuffer);
375                 
376         }
377         return rc;
378 }
379
380
381
382 void free_spoolcontrol_struct(SpoolControl **scc)
383 {
384         RoomNetCfg eCfg;
385         HashPos *CfgIt;
386         SpoolControl *sc;
387
388         sc = *scc;
389         CfgIt = GetNewHashPos(CfgTypeHash, 1);
390         for (eCfg = subpending; eCfg < maxRoomNetCfg; eCfg ++)
391         {
392                 const CfgLineType *pCfg;
393                 namelist *pNext, *pName;
394                 
395                 pCfg = GetCfgTypeByEnum(eCfg, CfgIt);
396                 pName= sc->NetConfigs[pCfg->C];
397                 while (pName != NULL)
398                 {
399                         pNext = pName->next;
400                         pCfg->DeAllocator(pCfg, &pName);
401                         pName = pNext;
402                 }
403         }
404         DeleteHashPos(&CfgIt);
405
406         FreeStrBuf(&sc->Sender);
407         FreeStrBuf(&sc->RoomInfo);
408         FreeStrBuf(&sc->misc);
409         free(sc);
410         *scc=NULL;
411 }
412
413 #if 0
414 int writenfree_spoolcontrol_file(SpoolControl **scc, char *filename)
415 {
416         char tempfilename[PATH_MAX];
417         int TmpFD;
418         SpoolControl *sc;
419         namelist *nptr = NULL;
420         maplist *mptr = NULL;
421         long len;
422         time_t unixtime;
423         struct timeval tv;
424         long reltid; /* if we don't have SYS_gettid, use "random" value */
425         StrBuf *Cfg;
426         int rc;
427
428         len = strlen(filename);
429         memcpy(tempfilename, filename, len + 1);
430
431
432 #if defined(HAVE_SYSCALL_H) && defined (SYS_gettid)
433         reltid = syscall(SYS_gettid);
434 #endif
435         gettimeofday(&tv, NULL);
436         /* Promote to time_t; types differ on some OSes (like darwin) */
437         unixtime = tv.tv_sec;
438
439         sprintf(tempfilename + len, ".%ld-%ld", reltid, unixtime);
440         sc = *scc;
441         errno = 0;
442         TmpFD = open(tempfilename, O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR);
443         Cfg = NewStrBuf();
444         if ((TmpFD < 0) || (errno != 0)) {
445                 syslog(LOG_CRIT, "ERROR: cannot open %s: %s\n",
446                         filename, strerror(errno));
447                 free_spoolcontrol_struct(scc);
448                 unlink(tempfilename);
449         }
450         else {
451                 fchown(TmpFD, config.c_ctdluid, 0);
452                 StrBufAppendPrintf(Cfg, "lastsent|%ld\n", sc->lastsent);
453                 
454                 /* Write out the listrecps while freeing from memory at the
455                  * same time.  Am I clever or what?  :)
456                  */
457                 while (sc->listrecps != NULL) {
458                     StrBufAppendPrintf(Cfg, "listrecp|%s\n", sc->listrecps->name);
459                         nptr = sc->listrecps->next;
460                         free(sc->listrecps);
461                         sc->listrecps = nptr;
462                 }
463                 /* Do the same for digestrecps */
464                 while (sc->digestrecps != NULL) {
465                         StrBufAppendPrintf(Cfg, "digestrecp|%s\n", sc->digestrecps->name);
466                         nptr = sc->digestrecps->next;
467                         free(sc->digestrecps);
468                         sc->digestrecps = nptr;
469                 }
470                 /* Do the same for participates */
471                 while (sc->participates != NULL) {
472                         StrBufAppendPrintf(Cfg, "participate|%s\n", sc->participates->name);
473                         nptr = sc->participates->next;
474                         free(sc->participates);
475                         sc->participates = nptr;
476                 }
477                 while (sc->ignet_push_shares != NULL) {
478                         StrBufAppendPrintf(Cfg, "ignet_push_share|%s", sc->ignet_push_shares->remote_nodename);
479                         if (!IsEmptyStr(sc->ignet_push_shares->remote_roomname)) {
480                                 StrBufAppendPrintf(Cfg, "|%s", sc->ignet_push_shares->remote_roomname);
481                         }
482                         StrBufAppendPrintf(Cfg, "\n");
483                         mptr = sc->ignet_push_shares->next;
484                         free(sc->ignet_push_shares);
485                         sc->ignet_push_shares = mptr;
486                 }
487                 if (sc->misc != NULL) {
488                         StrBufAppendBufPlain(Cfg, sc->misc, -1, 0);
489                 }
490                 free(sc->misc);
491
492                 rc = write(TmpFD, ChrPtr(Cfg), StrLength(Cfg));
493                 if ((rc >=0 ) && (rc == StrLength(Cfg))) 
494                 {
495                         close(TmpFD);
496                         rename(tempfilename, filename);
497                 }
498                 else {
499                         syslog(LOG_EMERG, 
500                                       "unable to write %s; [%s]; not enough space on the disk?\n", 
501                                       tempfilename, 
502                                       strerror(errno));
503                         close(TmpFD);
504                         unlink(tempfilename);
505                 }
506                 FreeStrBuf(&Cfg);
507                 free(sc);
508                 *scc=NULL;
509         }
510         return 1;
511 }
512 #endif
513 int is_recipient(SpoolControl *sc, const char *Name)
514 {
515         const RoomNetCfg RecipientCfgs[] = {
516                 listrecp,
517                 digestrecp,
518                 participate,
519                 maxRoomNetCfg
520         };
521         int i;
522         namelist *nptr;
523         size_t len;
524         
525         len = strlen(Name);
526         i = 0;
527         while (RecipientCfgs[i] != maxRoomNetCfg)
528         {
529                 nptr = sc->NetConfigs[RecipientCfgs[i]];
530                 
531                 while (nptr != NULL)
532                 {
533                         if ((StrLength(nptr->Value) == len) && 
534                             (!strcmp(Name, ChrPtr(nptr->Value))))
535                         {
536                                 return 1;
537                         }
538                         nptr = nptr->next;
539                 }
540         }
541         return 0;
542 }
543
544
545 /*
546  * Batch up and send all outbound traffic from the current room
547  */
548 void network_spoolout_room(RoomProcList *room_to_spool,                        
549                            HashList *working_ignetcfg,
550                            HashList *the_netmap)
551 {
552         char buf[SIZ];
553         char filename[PATH_MAX];
554         SpoolControl *sc;
555         int i;
556
557         /*
558          * If the room doesn't exist, don't try to perform its networking tasks.
559          * Normally this should never happen, but once in a while maybe a room gets
560          * queued for networking and then deleted before it can happen.
561          */
562         if (CtdlGetRoom(&CC->room, room_to_spool->name) != 0) {
563                 syslog(LOG_CRIT, "ERROR: cannot load <%s>\n", room_to_spool->name);
564                 return;
565         }
566
567         assoc_file_name(filename, sizeof filename, &CC->room, ctdl_netcfg_dir);
568         begin_critical_section(S_NETCONFIGS);
569
570         /* Only do net processing for rooms that have netconfigs */
571         if (!read_spoolcontrol_file(&sc, filename))
572         {
573                 end_critical_section(S_NETCONFIGS);
574                 return;
575         }
576         syslog(LOG_INFO, "Networking started for <%s>\n", CC->room.QRname);
577
578         sc->working_ignetcfg = working_ignetcfg;
579         sc->the_netmap = the_netmap;
580
581         /* If there are digest recipients, we have to build a digest */
582         if (sc->NetConfigs[digestrecp] != NULL) {
583                 sc->digestfp = tmpfile();
584                 fprintf(sc->digestfp, "Content-type: text/plain\n\n");
585         }
586
587         /* Do something useful */
588         CtdlForEachMessage(MSGS_GT, sc->lastsent, NULL, NULL, NULL,
589                 network_spool_msg, sc);
590
591         /* If we wrote a digest, deliver it and then close it */
592         snprintf(buf, sizeof buf, "room_%s@%s",
593                 CC->room.QRname, config.c_fqdn);
594         for (i=0; buf[i]; ++i) {
595                 buf[i] = tolower(buf[i]);
596                 if (isspace(buf[i])) buf[i] = '_';
597         }
598         if (sc->digestfp != NULL) {
599                 fprintf(sc->digestfp,   " -----------------------------------"
600                                         "------------------------------------"
601                                         "-------\n"
602                                         "You are subscribed to the '%s' "
603                                         "list.\n"
604                                         "To post to the list: %s\n",
605                                         CC->room.QRname, buf
606                 );
607                 network_deliver_digest(sc);     /* deliver and close */
608         }
609
610         /* Now rewrite the config file */
611         //// todo writenfree_spoolcontrol_file(&sc, filename);
612         end_critical_section(S_NETCONFIGS);
613 }
614
615 /*
616  * Process a buffer containing a single message from a single file
617  * from the inbound queue 
618  */
619 void network_process_buffer(char *buffer, long size, HashList *working_ignetcfg, HashList *the_netmap, int *netmap_changed)
620 {
621         struct CitContext *CCC = CC;
622         StrBuf *Buf = NULL;
623         struct CtdlMessage *msg = NULL;
624         long pos;
625         int field;
626         struct recptypes *recp = NULL;
627         char target_room[ROOMNAMELEN];
628         struct ser_ret sermsg;
629         char *oldpath = NULL;
630         char filename[PATH_MAX];
631         FILE *fp;
632         const StrBuf *nexthop = NULL;
633         unsigned char firstbyte;
634         unsigned char lastbyte;
635
636         QN_syslog(LOG_DEBUG, "network_process_buffer() processing %ld bytes\n", size);
637
638         /* Validate just a little bit.  First byte should be FF and * last byte should be 00. */
639         firstbyte = buffer[0];
640         lastbyte = buffer[size-1];
641         if ( (firstbyte != 255) || (lastbyte != 0) ) {
642                 QN_syslog(LOG_ERR, "Corrupt message ignored.  Length=%ld, firstbyte = %d, lastbyte = %d\n",
643                           size, firstbyte, lastbyte);
644                 return;
645         }
646
647         /* Set default target room to trash */
648         strcpy(target_room, TWITROOM);
649
650         /* Load the message into memory */
651         msg = (struct CtdlMessage *) malloc(sizeof(struct CtdlMessage));
652         memset(msg, 0, sizeof(struct CtdlMessage));
653         msg->cm_magic = CTDLMESSAGE_MAGIC;
654         msg->cm_anon_type = buffer[1];
655         msg->cm_format_type = buffer[2];
656
657         for (pos = 3; pos < size; ++pos) {
658                 field = buffer[pos];
659                 msg->cm_fields[field] = strdup(&buffer[pos+1]);
660                 pos = pos + strlen(&buffer[(int)pos]);
661         }
662
663         /* Check for message routing */
664         if (msg->cm_fields['D'] != NULL) {
665                 if (strcasecmp(msg->cm_fields['D'], config.c_nodename)) {
666
667                         /* route the message */
668                         Buf = NewStrBufPlain(msg->cm_fields['D'], -1);
669                         if (is_valid_node(&nexthop, 
670                                           NULL, 
671                                           Buf, 
672                                           working_ignetcfg, 
673                                           the_netmap) == 0) 
674                         {
675                                 /* prepend our node to the path */
676                                 if (msg->cm_fields['P'] != NULL) {
677                                         oldpath = msg->cm_fields['P'];
678                                         msg->cm_fields['P'] = NULL;
679                                 }
680                                 else {
681                                         oldpath = strdup("unknown_user");
682                                 }
683                                 size = strlen(oldpath) + SIZ;
684                                 msg->cm_fields['P'] = malloc(size);
685                                 snprintf(msg->cm_fields['P'], size, "%s!%s",
686                                         config.c_nodename, oldpath);
687                                 free(oldpath);
688
689                                 /* serialize the message */
690                                 serialize_message(&sermsg, msg);
691
692                                 /* now send it */
693                                 if (StrLength(nexthop) == 0) {
694                                         nexthop = Buf;
695                                 }
696                                 snprintf(filename,
697                                          sizeof filename,
698                                          "%s/%s@%lx%x",
699                                          ctdl_netout_dir,
700                                          ChrPtr(nexthop),
701                                          time(NULL),
702                                          rand()
703                                 );
704                                 QN_syslog(LOG_DEBUG, "Appending to %s\n", filename);
705                                 fp = fopen(filename, "ab");
706                                 if (fp != NULL) {
707                                         fwrite(sermsg.ser, sermsg.len, 1, fp);
708                                         fclose(fp);
709                                 }
710                                 else {
711                                         QN_syslog(LOG_ERR, "%s: %s\n", filename, strerror(errno));
712                                 }
713                                 free(sermsg.ser);
714                                 CtdlFreeMessage(msg);
715                                 FreeStrBuf(&Buf);
716                                 return;
717                         }
718                         
719                         else {  /* invalid destination node name */
720                                 FreeStrBuf(&Buf);
721
722                                 network_bounce(msg,
723 "A message you sent could not be delivered due to an invalid destination node"
724 " name.  Please check the address and try sending the message again.\n");
725                                 msg = NULL;
726                                 return;
727
728                         }
729                 }
730         }
731
732         /*
733          * Check to see if we already have a copy of this message, and
734          * abort its processing if so.  (We used to post a warning to Aide>
735          * every time this happened, but the network is now so densely
736          * connected that it's inevitable.)
737          */
738         if (network_usetable(msg) != 0) {
739                 CtdlFreeMessage(msg);
740                 return;
741         }
742
743         /* Learn network topology from the path */
744         if ((msg->cm_fields['N'] != NULL) && (msg->cm_fields['P'] != NULL)) {
745                 network_learn_topology(msg->cm_fields['N'], 
746                                        msg->cm_fields['P'], 
747                                        the_netmap, 
748                                        netmap_changed);
749         }
750
751         /* Is the sending node giving us a very persuasive suggestion about
752          * which room this message should be saved in?  If so, go with that.
753          */
754         if (msg->cm_fields['C'] != NULL) {
755                 safestrncpy(target_room, msg->cm_fields['C'], sizeof target_room);
756         }
757
758         /* Otherwise, does it have a recipient?  If so, validate it... */
759         else if (msg->cm_fields['R'] != NULL) {
760                 recp = validate_recipients(msg->cm_fields['R'], NULL, 0);
761                 if (recp != NULL) if (recp->num_error != 0) {
762                         network_bounce(msg,
763                                 "A message you sent could not be delivered due to an invalid address.\n"
764                                 "Please check the address and try sending the message again.\n");
765                         msg = NULL;
766                         free_recipients(recp);
767                         QNM_syslog(LOG_DEBUG, "Bouncing message due to invalid recipient address.\n");
768                         return;
769                 }
770                 strcpy(target_room, "");        /* no target room if mail */
771         }
772
773         /* Our last shot at finding a home for this message is to see if
774          * it has the O field (Originating room) set.
775          */
776         else if (msg->cm_fields['O'] != NULL) {
777                 safestrncpy(target_room, msg->cm_fields['O'], sizeof target_room);
778         }
779
780         /* Strip out fields that are only relevant during transit */
781         if (msg->cm_fields['D'] != NULL) {
782                 free(msg->cm_fields['D']);
783                 msg->cm_fields['D'] = NULL;
784         }
785         if (msg->cm_fields['C'] != NULL) {
786                 free(msg->cm_fields['C']);
787                 msg->cm_fields['C'] = NULL;
788         }
789
790         /* save the message into a room */
791         if (PerformNetprocHooks(msg, target_room) == 0) {
792                 msg->cm_flags = CM_SKIP_HOOKS;
793                 CtdlSubmitMsg(msg, recp, target_room, 0);
794         }
795         CtdlFreeMessage(msg);
796         free_recipients(recp);
797 }
798
799
800 /*
801  * Process a single message from a single file from the inbound queue 
802  */
803 void network_process_message(FILE *fp, 
804                              long msgstart, 
805                              long msgend,
806                              HashList *working_ignetcfg,
807                              HashList *the_netmap, 
808                              int *netmap_changed)
809 {
810         long hold_pos;
811         long size;
812         char *buffer;
813
814         hold_pos = ftell(fp);
815         size = msgend - msgstart + 1;
816         buffer = malloc(size);
817         if (buffer != NULL) {
818                 fseek(fp, msgstart, SEEK_SET);
819                 if (fread(buffer, size, 1, fp) > 0) {
820                         network_process_buffer(buffer, 
821                                                size, 
822                                                working_ignetcfg, 
823                                                the_netmap, 
824                                                netmap_changed);
825                 }
826                 free(buffer);
827         }
828
829         fseek(fp, hold_pos, SEEK_SET);
830 }
831
832
833 /*
834  * Process a single file from the inbound queue 
835  */
836 void network_process_file(char *filename,
837                           HashList *working_ignetcfg,
838                           HashList *the_netmap, 
839                           int *netmap_changed)
840 {
841         struct CitContext *CCC = CC;
842         FILE *fp;
843         long msgstart = (-1L);
844         long msgend = (-1L);
845         long msgcur = 0L;
846         int ch;
847
848
849         fp = fopen(filename, "rb");
850         if (fp == NULL) {
851                 QN_syslog(LOG_CRIT, "Error opening %s: %s\n", filename, strerror(errno));
852                 return;
853         }
854
855         fseek(fp, 0L, SEEK_END);
856         QN_syslog(LOG_INFO, "network: processing %ld bytes from %s\n", ftell(fp), filename);
857         rewind(fp);
858
859         /* Look for messages in the data stream and break them out */
860         while (ch = getc(fp), ch >= 0) {
861         
862                 if (ch == 255) {
863                         if (msgstart >= 0L) {
864                                 msgend = msgcur - 1;
865                                 network_process_message(fp,
866                                                         msgstart,
867                                                         msgend,
868                                                         working_ignetcfg,
869                                                         the_netmap,
870                                                         netmap_changed);
871                         }
872                         msgstart = msgcur;
873                 }
874
875                 ++msgcur;
876         }
877
878         msgend = msgcur - 1;
879         if (msgstart >= 0L) {
880                 network_process_message(fp,
881                                         msgstart,
882                                         msgend,
883                                         working_ignetcfg,
884                                         the_netmap,
885                                         netmap_changed);
886         }
887
888         fclose(fp);
889         unlink(filename);
890 }
891
892
893 /*
894  * Process anything in the inbound queue
895  */
896 void network_do_spoolin(HashList *working_ignetcfg, HashList *the_netmap, int *netmap_changed)
897 {
898         struct CitContext *CCC = CC;
899         DIR *dp;
900         struct dirent *d;
901         struct stat statbuf;
902         char filename[PATH_MAX];
903         static time_t last_spoolin_mtime = 0L;
904
905         /*
906          * Check the spoolin directory's modification time.  If it hasn't
907          * been touched, we don't need to scan it.
908          */
909         if (stat(ctdl_netin_dir, &statbuf)) return;
910         if (statbuf.st_mtime == last_spoolin_mtime) {
911                 QNM_syslog(LOG_DEBUG, "network: nothing in inbound queue\n");
912                 return;
913         }
914         last_spoolin_mtime = statbuf.st_mtime;
915         QNM_syslog(LOG_DEBUG, "network: processing inbound queue\n");
916
917         /*
918          * Ok, there's something interesting in there, so scan it.
919          */
920         dp = opendir(ctdl_netin_dir);
921         if (dp == NULL) return;
922
923         while (d = readdir(dp), d != NULL) {
924                 if ((strcmp(d->d_name, ".")) && (strcmp(d->d_name, ".."))) {
925                         snprintf(filename, 
926                                 sizeof filename,
927                                 "%s/%s",
928                                 ctdl_netin_dir,
929                                 d->d_name
930                         );
931                         network_process_file(filename,
932                                              working_ignetcfg,
933                                              the_netmap,
934                                              netmap_changed);
935                 }
936         }
937
938         closedir(dp);
939 }
940
941 /*
942  * Step 1: consolidate files in the outbound queue into one file per neighbor node
943  * Step 2: delete any files in the outbound queue that were for neighbors who no longer exist.
944  */
945 void network_consolidate_spoolout(HashList *working_ignetcfg, HashList *the_netmap)
946 {
947         struct CitContext *CCC = CC;
948         IOBuffer IOB;
949         FDIOBuffer FDIO;
950         int d_namelen;
951         DIR *dp;
952         struct dirent *d;
953         struct dirent *filedir_entry;
954         const char *pch;
955         char spooloutfilename[PATH_MAX];
956         char filename[PATH_MAX];
957         const StrBuf *nexthop;
958         StrBuf *NextHop;
959         int i;
960         struct stat statbuf;
961         int nFailed = 0;
962
963         /* Step 1: consolidate files in the outbound queue into one file per neighbor node */
964         d = (struct dirent *)malloc(offsetof(struct dirent, d_name) + PATH_MAX + 1);
965         if (d == NULL)  return;
966
967         dp = opendir(ctdl_netout_dir);
968         if (dp == NULL) {
969                 free(d);
970                 return;
971         }
972
973         NextHop = NewStrBuf();
974         memset(&IOB, 0, sizeof(IOBuffer));
975         memset(&FDIO, 0, sizeof(FDIOBuffer));
976         FDIO.IOB = &IOB;
977
978         while ((readdir_r(dp, d, &filedir_entry) == 0) &&
979                (filedir_entry != NULL))
980         {
981 #ifdef _DIRENT_HAVE_D_NAMELEN
982                 d_namelen = filedir_entry->d_namelen;
983 #else
984
985 #ifndef DT_UNKNOWN
986 #define DT_UNKNOWN     0
987 #define DT_DIR         4
988 #define DT_REG         8
989 #define DT_LNK         10
990
991 #define IFTODT(mode)   (((mode) & 0170000) >> 12)
992 #define DTTOIF(dirtype)        ((dirtype) << 12)
993 #endif
994                 d_namelen = strlen(filedir_entry->d_name);
995 #endif
996                 if ((d_namelen > 1) && filedir_entry->d_name[d_namelen - 1] == '~')
997                         continue; /* Ignore backup files... */
998
999                 if ((d_namelen == 1) && 
1000                     (filedir_entry->d_name[0] == '.'))
1001                         continue;
1002
1003                 if ((d_namelen == 2) && 
1004                     (filedir_entry->d_name[0] == '.') &&
1005                     (filedir_entry->d_name[1] == '.'))
1006                         continue;
1007
1008                 pch = strchr(filedir_entry->d_name, '@');
1009                 if (pch == NULL)
1010                         continue;
1011
1012                 snprintf(filename, 
1013                          sizeof filename,
1014                          "%s/%s",
1015                          ctdl_netout_dir,
1016                          filedir_entry->d_name);
1017
1018                 StrBufPlain(NextHop,
1019                             filedir_entry->d_name,
1020                             pch - filedir_entry->d_name);
1021
1022                 snprintf(spooloutfilename,
1023                          sizeof spooloutfilename,
1024                          "%s/%s",
1025                          ctdl_netout_dir,
1026                          ChrPtr(NextHop));
1027
1028                 QN_syslog(LOG_DEBUG, "Consolidate %s to %s\n", filename, ChrPtr(NextHop));
1029                 if (network_talking_to(SKEY(NextHop), NTT_CHECK)) {
1030                         nFailed++;
1031                         QN_syslog(LOG_DEBUG,
1032                                   "Currently online with %s - skipping for now\n",
1033                                   ChrPtr(NextHop)
1034                                 );
1035                 }
1036                 else {
1037                         size_t dsize;
1038                         size_t fsize;
1039                         int infd, outfd;
1040                         const char *err = NULL;
1041                         network_talking_to(SKEY(NextHop), NTT_ADD);
1042
1043                         infd = open(filename, O_RDONLY);
1044                         if (infd == -1) {
1045                                 nFailed++;
1046                                 QN_syslog(LOG_ERR,
1047                                           "failed to open %s for reading due to %s; skipping.\n",
1048                                           filename, strerror(errno)
1049                                         );
1050                                 network_talking_to(SKEY(NextHop), NTT_REMOVE);
1051                                 continue;                               
1052                         }
1053                         
1054                         outfd = open(spooloutfilename,
1055                                   O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY, 
1056                                   S_IRUSR|S_IWUSR);
1057                         if (outfd == -1)
1058                         {
1059                                 outfd = open(spooloutfilename,
1060                                              O_EXCL|O_NONBLOCK|O_WRONLY, 
1061                                              S_IRUSR | S_IWUSR);
1062                         }
1063                         if (outfd == -1) {
1064                                 nFailed++;
1065                                 QN_syslog(LOG_ERR,
1066                                           "failed to open %s for reading due to %s; skipping.\n",
1067                                           spooloutfilename, strerror(errno)
1068                                         );
1069                                 close(infd);
1070                                 network_talking_to(SKEY(NextHop), NTT_REMOVE);
1071                                 continue;
1072                         }
1073
1074                         dsize = lseek(outfd, 0, SEEK_END);
1075                         lseek(outfd, -dsize, SEEK_SET);
1076
1077                         fstat(infd, &statbuf);
1078                         fsize = statbuf.st_size;
1079 /*
1080                         fsize = lseek(infd, 0, SEEK_END);
1081 */                      
1082                         IOB.fd = infd;
1083                         FDIOBufferInit(&FDIO, &IOB, outfd, fsize + dsize);
1084                         FDIO.ChunkSendRemain = fsize;
1085                         FDIO.TotalSentAlready = dsize;
1086                         err = NULL;
1087                         errno = 0;
1088                         do {} while ((FileMoveChunked(&FDIO, &err) > 0) && (err == NULL));
1089                         if (err == NULL) {
1090                                 unlink(filename);
1091                         }
1092                         else {
1093                                 nFailed++;
1094                                 QN_syslog(LOG_ERR,
1095                                           "failed to append to %s [%s]; rolling back..\n",
1096                                           spooloutfilename, strerror(errno)
1097                                         );
1098                                 /* whoops partial append?? truncate spooloutfilename again! */
1099                                 ftruncate(outfd, dsize);
1100                         }
1101                         FDIOBufferDelete(&FDIO);
1102                         close(infd);
1103                         close(outfd);
1104                         network_talking_to(SKEY(NextHop), NTT_REMOVE);
1105                 }
1106         }
1107         closedir(dp);
1108
1109         if (nFailed > 0) {
1110                 FreeStrBuf(&NextHop);
1111                 QN_syslog(LOG_INFO,
1112                           "skipping Spoolcleanup because of %d files unprocessed.\n",
1113                           nFailed
1114                         );
1115
1116                 return;
1117         }
1118
1119         /* Step 2: delete any files in the outbound queue that were for neighbors who no longer exist */
1120         dp = opendir(ctdl_netout_dir);
1121         if (dp == NULL) {
1122                 FreeStrBuf(&NextHop);
1123                 free(d);
1124                 return;
1125         }
1126
1127         while ((readdir_r(dp, d, &filedir_entry) == 0) &&
1128                (filedir_entry != NULL))
1129         {
1130 #ifdef _DIRENT_HAVE_D_NAMELEN
1131                 d_namelen = filedir_entry->d_namelen;
1132                 d_type = filedir_entry->d_type;
1133 #else
1134
1135 #ifndef DT_UNKNOWN
1136 #define DT_UNKNOWN     0
1137 #define DT_DIR         4
1138 #define DT_REG         8
1139 #define DT_LNK         10
1140
1141 #define IFTODT(mode)   (((mode) & 0170000) >> 12)
1142 #define DTTOIF(dirtype)        ((dirtype) << 12)
1143 #endif
1144                 d_namelen = strlen(filedir_entry->d_name);
1145 #endif
1146                 if ((d_namelen == 1) && 
1147                     (filedir_entry->d_name[0] == '.'))
1148                         continue;
1149
1150                 if ((d_namelen == 2) && 
1151                     (filedir_entry->d_name[0] == '.') &&
1152                     (filedir_entry->d_name[1] == '.'))
1153                         continue;
1154
1155                 pch = strchr(filedir_entry->d_name, '@');
1156                 if (pch == NULL) /* no @ in name? consolidated file. */
1157                         continue;
1158
1159                 StrBufPlain(NextHop,
1160                             filedir_entry->d_name,
1161                             pch - filedir_entry->d_name);
1162
1163                 snprintf(filename, 
1164                         sizeof filename,
1165                         "%s/%s",
1166                         ctdl_netout_dir,
1167                         filedir_entry->d_name
1168                 );
1169
1170                 i = is_valid_node(&nexthop,
1171                                   NULL,
1172                                   NextHop,
1173                                   working_ignetcfg,
1174                                   the_netmap);
1175         
1176                 if ( (i != 0) || (StrLength(nexthop) > 0) ) {
1177                         unlink(filename);
1178                 }
1179         }
1180         FreeStrBuf(&NextHop);
1181         free(d);
1182         closedir(dp);
1183 }
1184
1185
1186
1187
1188 /*
1189  * It's ok if these directories already exist.  Just fail silently.
1190  */
1191 void create_spool_dirs(void) {
1192         if ((mkdir(ctdl_spool_dir, 0700) != 0) && (errno != EEXIST))
1193                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_spool_dir, strerror(errno));
1194         if (chown(ctdl_spool_dir, CTDLUID, (-1)) != 0)
1195                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_spool_dir, strerror(errno));
1196         if ((mkdir(ctdl_netin_dir, 0700) != 0) && (errno != EEXIST))
1197                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netin_dir, strerror(errno));
1198         if (chown(ctdl_netin_dir, CTDLUID, (-1)) != 0)
1199                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netin_dir, strerror(errno));
1200         if ((mkdir(ctdl_nettmp_dir, 0700) != 0) && (errno != EEXIST))
1201                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_nettmp_dir, strerror(errno));
1202         if (chown(ctdl_nettmp_dir, CTDLUID, (-1)) != 0)
1203                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_nettmp_dir, strerror(errno));
1204         if ((mkdir(ctdl_netout_dir, 0700) != 0) && (errno != EEXIST))
1205                 syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netout_dir, strerror(errno));
1206         if (chown(ctdl_netout_dir, CTDLUID, (-1)) != 0)
1207                 syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netout_dir, strerror(errno));
1208 }
1209
1210 /*
1211  * Module entry point
1212  */
1213 CTDL_MODULE_INIT(network_spool)
1214 {
1215         if (!threading)
1216         {
1217 //              REGISTERRoomCfgType(subpending, ParseSubPendingLine, 0, SerializeSubPendingLine, DeleteSubPendingLine); /// todo: move this to mailinglist manager
1218 //              REGISTERRoomCfgType(unsubpending, ParseUnSubPendingLine0, SerializeUnSubPendingLine, DeleteUnSubPendingLine); /// todo: move this to mailinglist manager
1219 //              REGISTERRoomCfgType(lastsent, ParseLastSent, 1, SerializeLastSent, DeleteLastSent);
1220 ///             REGISTERRoomCfgType(ignet_push_share, ParseIgnetPushShare, 0, SerializeIgnetPushShare, DeleteIgnetPushShare); // todo: move this to the ignet client
1221                 REGISTERRoomCfgType(listrecp, ParseDefault, 0, SerializeGeneric, DeleteGenericCfgLine);
1222                 REGISTERRoomCfgType(digestrecp, ParseDefault, 0, SerializeGeneric, DeleteGenericCfgLine);
1223                 REGISTERRoomCfgType(pop3client, ParseDefault, 0, SerializeGeneric, DeleteGenericCfgLine);/// todo: implement pop3 specific parser
1224                 REGISTERRoomCfgType(rssclient, ParseDefault, 0, SerializeGeneric, DeleteGenericCfgLine); /// todo: implement rss specific parser
1225                 REGISTERRoomCfgType(participate, ParseDefault, 0, SerializeGeneric, DeleteGenericCfgLine);
1226                 REGISTERRoomCfgType(roommailalias, ParseRoomAlias, 0, SerializeGeneric, DeleteGenericCfgLine);
1227
1228                 create_spool_dirs();
1229 //////todo              CtdlRegisterCleanupHook(destroy_network_queue_room);
1230         }
1231         return "network_spool";
1232 }