X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Fnetwork%2Fserv_netspool.c;h=1aeb10ffc1537421996d66b255c214ee7f4e9f2d;hb=50a7f9b7c2183eb912b5c22d8f70b82a8310a6c6;hp=544bf1f04a60d1c0bd636f8cf3e04934e63ce4d5;hpb=8b276ba2d09c1d606b6b282961c737b6b4e26d21;p=citadel.git diff --git a/citadel/modules/network/serv_netspool.c b/citadel/modules/network/serv_netspool.c index 544bf1f04..1aeb10ffc 100644 --- a/citadel/modules/network/serv_netspool.c +++ b/citadel/modules/network/serv_netspool.c @@ -2,7 +2,7 @@ * This module handles shared rooms, inter-Citadel mail, and outbound * mailing list processing. * - * Copyright (c) 2000-2011 by the citadel.org team + * Copyright (c) 2000-2012 by the citadel.org team * * This program is open source software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -92,35 +92,6 @@ #include "netmail.h" #include "ctdl_module.h" -/* - * Learn topology from path fields - */ -void network_learn_topology(char *node, char *path) { - char nexthop[256]; - NetMap *nmptr; - - strcpy(nexthop, ""); - - if (num_tokens(path, '!') < 3) return; - for (nmptr = the_netmap; nmptr != NULL; nmptr = nmptr->next) { - if (!strcasecmp(nmptr->nodename, node)) { - extract_token(nmptr->nexthop, path, 0, '!', sizeof nmptr->nexthop); - nmptr->lastcontact = time(NULL); - ++netmap_changed; - return; - } - } - - /* If we got here then it's not in the map, so add it. */ - nmptr = (NetMap *) malloc(sizeof (NetMap)); - strcpy(nmptr->nodename, node); - nmptr->lastcontact = time(NULL); - extract_token(nmptr->nexthop, path, 0, '!', sizeof nmptr->nexthop); - nmptr->next = the_netmap; - the_netmap = nmptr; - ++netmap_changed; -} - @@ -380,7 +351,10 @@ int is_recipient(SpoolControl *sc, const char *Name) /* * Batch up and send all outbound traffic from the current room */ -void network_spoolout_room(char *room_to_spool) { +void network_spoolout_room(RoomProcList *room_to_spool, + HashList *working_ignetcfg, + HashList *the_netmap) +{ char buf[SIZ]; char filename[PATH_MAX]; SpoolControl *sc; @@ -391,8 +365,8 @@ void network_spoolout_room(char *room_to_spool) { * Normally this should never happen, but once in a while maybe a room gets * queued for networking and then deleted before it can happen. */ - if (CtdlGetRoom(&CC->room, room_to_spool) != 0) { - syslog(LOG_CRIT, "ERROR: cannot load <%s>\n", room_to_spool); + if (CtdlGetRoom(&CC->room, room_to_spool->name) != 0) { + syslog(LOG_CRIT, "ERROR: cannot load <%s>\n", room_to_spool->name); return; } @@ -407,6 +381,9 @@ void network_spoolout_room(char *room_to_spool) { } syslog(LOG_INFO, "Networking started for <%s>\n", CC->room.QRname); + sc->working_ignetcfg = working_ignetcfg; + sc->the_netmap = the_netmap; + /* If there are digest recipients, we have to build a digest */ if (sc->digestrecps != NULL) { sc->digestfp = tmpfile(); @@ -445,7 +422,10 @@ void network_spoolout_room(char *room_to_spool) { * Process a buffer containing a single message from a single file * from the inbound queue */ -void network_process_buffer(char *buffer, long size) { +void network_process_buffer(char *buffer, long size, HashList *working_ignetcfg, HashList *the_netmap, int *netmap_changed) +{ + struct CitContext *CCC = CC; + StrBuf *Buf = NULL; struct CtdlMessage *msg = NULL; long pos; int field; @@ -455,18 +435,18 @@ void network_process_buffer(char *buffer, long size) { char *oldpath = NULL; char filename[PATH_MAX]; FILE *fp; - char nexthop[SIZ]; + const StrBuf *nexthop = NULL; unsigned char firstbyte; unsigned char lastbyte; - syslog(LOG_DEBUG, "network_process_buffer() processing %ld bytes\n", size); + QN_syslog(LOG_DEBUG, "network_process_buffer() processing %ld bytes\n", size); /* Validate just a little bit. First byte should be FF and * last byte should be 00. */ firstbyte = buffer[0]; lastbyte = buffer[size-1]; if ( (firstbyte != 255) || (lastbyte != 0) ) { - syslog(LOG_ERR, "Corrupt message ignored. Length=%ld, firstbyte = %d, lastbyte = %d\n", - size, firstbyte, lastbyte); + QN_syslog(LOG_ERR, "Corrupt message ignored. Length=%ld, firstbyte = %d, lastbyte = %d\n", + size, firstbyte, lastbyte); return; } @@ -491,8 +471,13 @@ void network_process_buffer(char *buffer, long size) { if (strcasecmp(msg->cm_fields['D'], config.c_nodename)) { /* route the message */ - strcpy(nexthop, ""); - if (is_valid_node(nexthop, NULL, msg->cm_fields['D']) == 0) { + Buf = NewStrBufPlain(msg->cm_fields['D'], -1); + if (is_valid_node(&nexthop, + NULL, + Buf, + working_ignetcfg, + the_netmap) == 0) + { /* prepend our node to the path */ if (msg->cm_fields['P'] != NULL) { oldpath = msg->cm_fields['P']; @@ -511,32 +496,34 @@ void network_process_buffer(char *buffer, long size) { serialize_message(&sermsg, msg); /* now send it */ - if (IsEmptyStr(nexthop)) { - strcpy(nexthop, msg->cm_fields['D']); + if (StrLength(nexthop) == 0) { + nexthop = Buf; } - snprintf(filename, - sizeof filename, - "%s/%s@%lx%x", - ctdl_netout_dir, - nexthop, - time(NULL), - rand() + snprintf(filename, + sizeof filename, + "%s/%s@%lx%x", + ctdl_netout_dir, + ChrPtr(nexthop), + time(NULL), + rand() ); - syslog(LOG_DEBUG, "Appending to %s\n", filename); + QN_syslog(LOG_DEBUG, "Appending to %s\n", filename); fp = fopen(filename, "ab"); if (fp != NULL) { fwrite(sermsg.ser, sermsg.len, 1, fp); fclose(fp); } else { - syslog(LOG_ERR, "%s: %s\n", filename, strerror(errno)); + QN_syslog(LOG_ERR, "%s: %s\n", filename, strerror(errno)); } free(sermsg.ser); CtdlFreeMessage(msg); + FreeStrBuf(&Buf); return; } else { /* invalid destination node name */ + FreeStrBuf(&Buf); network_bounce(msg, "A message you sent could not be delivered due to an invalid destination node" @@ -561,7 +548,10 @@ void network_process_buffer(char *buffer, long size) { /* Learn network topology from the path */ if ((msg->cm_fields['N'] != NULL) && (msg->cm_fields['P'] != NULL)) { - network_learn_topology(msg->cm_fields['N'], msg->cm_fields['P']); + network_learn_topology(msg->cm_fields['N'], + msg->cm_fields['P'], + the_netmap, + netmap_changed); } /* Is the sending node giving us a very persuasive suggestion about @@ -580,7 +570,7 @@ void network_process_buffer(char *buffer, long size) { "Please check the address and try sending the message again.\n"); msg = NULL; free_recipients(recp); - syslog(LOG_DEBUG, "Bouncing message due to invalid recipient address.\n"); + QNM_syslog(LOG_DEBUG, "Bouncing message due to invalid recipient address.\n"); return; } strcpy(target_room, ""); /* no target room if mail */ @@ -616,7 +606,13 @@ void network_process_buffer(char *buffer, long size) { /* * Process a single message from a single file from the inbound queue */ -void network_process_message(FILE *fp, long msgstart, long msgend) { +void network_process_message(FILE *fp, + long msgstart, + long msgend, + HashList *working_ignetcfg, + HashList *the_netmap, + int *netmap_changed) +{ long hold_pos; long size; char *buffer; @@ -627,7 +623,11 @@ void network_process_message(FILE *fp, long msgstart, long msgend) { if (buffer != NULL) { fseek(fp, msgstart, SEEK_SET); if (fread(buffer, size, 1, fp) > 0) { - network_process_buffer(buffer, size); + network_process_buffer(buffer, + size, + working_ignetcfg, + the_netmap, + netmap_changed); } free(buffer); } @@ -639,7 +639,12 @@ void network_process_message(FILE *fp, long msgstart, long msgend) { /* * Process a single file from the inbound queue */ -void network_process_file(char *filename) { +void network_process_file(char *filename, + HashList *working_ignetcfg, + HashList *the_netmap, + int *netmap_changed) +{ + struct CitContext *CCC = CC; FILE *fp; long msgstart = (-1L); long msgend = (-1L); @@ -649,12 +654,12 @@ void network_process_file(char *filename) { fp = fopen(filename, "rb"); if (fp == NULL) { - syslog(LOG_CRIT, "Error opening %s: %s\n", filename, strerror(errno)); + QN_syslog(LOG_CRIT, "Error opening %s: %s\n", filename, strerror(errno)); return; } fseek(fp, 0L, SEEK_END); - syslog(LOG_INFO, "network: processing %ld bytes from %s\n", ftell(fp), filename); + QN_syslog(LOG_INFO, "network: processing %ld bytes from %s\n", ftell(fp), filename); rewind(fp); /* Look for messages in the data stream and break them out */ @@ -663,7 +668,12 @@ void network_process_file(char *filename) { if (ch == 255) { if (msgstart >= 0L) { msgend = msgcur - 1; - network_process_message(fp, msgstart, msgend); + network_process_message(fp, + msgstart, + msgend, + working_ignetcfg, + the_netmap, + netmap_changed); } msgstart = msgcur; } @@ -673,7 +683,12 @@ void network_process_file(char *filename) { msgend = msgcur - 1; if (msgstart >= 0L) { - network_process_message(fp, msgstart, msgend); + network_process_message(fp, + msgstart, + msgend, + working_ignetcfg, + the_netmap, + netmap_changed); } fclose(fp); @@ -684,7 +699,9 @@ void network_process_file(char *filename) { /* * Process anything in the inbound queue */ -void network_do_spoolin(void) { +void network_do_spoolin(HashList *working_ignetcfg, HashList *the_netmap, int *netmap_changed) +{ + struct CitContext *CCC = CC; DIR *dp; struct dirent *d; struct stat statbuf; @@ -697,11 +714,11 @@ void network_do_spoolin(void) { */ if (stat(ctdl_netin_dir, &statbuf)) return; if (statbuf.st_mtime == last_spoolin_mtime) { - syslog(LOG_DEBUG, "network: nothing in inbound queue\n"); + QNM_syslog(LOG_DEBUG, "network: nothing in inbound queue\n"); return; } last_spoolin_mtime = statbuf.st_mtime; - syslog(LOG_DEBUG, "network: processing inbound queue\n"); + QNM_syslog(LOG_DEBUG, "network: processing inbound queue\n"); /* * Ok, there's something interesting in there, so scan it. @@ -717,7 +734,10 @@ void network_do_spoolin(void) { ctdl_netin_dir, d->d_name ); - network_process_file(filename); + network_process_file(filename, + working_ignetcfg, + the_netmap, + netmap_changed); } } @@ -728,83 +748,243 @@ void network_do_spoolin(void) { * Step 1: consolidate files in the outbound queue into one file per neighbor node * Step 2: delete any files in the outbound queue that were for neighbors who no longer exist. */ -void network_consolidate_spoolout(void) { +void network_consolidate_spoolout(HashList *working_ignetcfg, HashList *the_netmap) +{ + struct CitContext *CCC = CC; + IOBuffer IOB; + FDIOBuffer FDIO; + int d_namelen; DIR *dp; struct dirent *d; + struct dirent *filedir_entry; + const char *pch; + char spooloutfilename[PATH_MAX]; char filename[PATH_MAX]; - char cmd[PATH_MAX]; - char nexthop[256]; + const StrBuf *nexthop; + StrBuf *NextHop; int i; - char *ptr; + struct stat statbuf; + int nFailed = 0; /* Step 1: consolidate files in the outbound queue into one file per neighbor node */ + d = (struct dirent *)malloc(offsetof(struct dirent, d_name) + PATH_MAX + 1); + if (d == NULL) return; + dp = opendir(ctdl_netout_dir); - if (dp == NULL) return; - while (d = readdir(dp), d != NULL) { - if ( - (strcmp(d->d_name, ".")) - && (strcmp(d->d_name, "..")) - && (strchr(d->d_name, '@') != NULL) - ) { - safestrncpy(nexthop, d->d_name, sizeof nexthop); - ptr = strchr(nexthop, '@'); - if (ptr) *ptr = 0; - - snprintf(filename, - sizeof filename, - "%s/%s", - ctdl_netout_dir, - d->d_name - ); - - syslog(LOG_DEBUG, "Consolidate %s to %s\n", filename, nexthop); - if (network_talking_to(nexthop, NTT_CHECK)) { - syslog(LOG_DEBUG, - "Currently online with %s - skipping for now\n", - nexthop + if (dp == NULL) { + free(d); + return; + } + + NextHop = NewStrBuf(); + memset(&IOB, 0, sizeof(IOBuffer)); + memset(&FDIO, 0, sizeof(FDIOBuffer)); + FDIO.IOB = &IOB; + + while ((readdir_r(dp, d, &filedir_entry) == 0) && + (filedir_entry != NULL)) + { +#ifdef _DIRENT_HAVE_D_NAMELEN + d_namelen = filedir_entry->d_namelen; +#else + +#ifndef DT_UNKNOWN +#define DT_UNKNOWN 0 +#define DT_DIR 4 +#define DT_REG 8 +#define DT_LNK 10 + +#define IFTODT(mode) (((mode) & 0170000) >> 12) +#define DTTOIF(dirtype) ((dirtype) << 12) +#endif + d_namelen = strlen(filedir_entry->d_name); +#endif + if ((d_namelen > 1) && filedir_entry->d_name[d_namelen - 1] == '~') + continue; /* Ignore backup files... */ + + if ((d_namelen == 1) && + (filedir_entry->d_name[0] == '.')) + continue; + + if ((d_namelen == 2) && + (filedir_entry->d_name[0] == '.') && + (filedir_entry->d_name[1] == '.')) + continue; + + pch = strchr(filedir_entry->d_name, '@'); + if (pch == NULL) + continue; + + snprintf(filename, + sizeof filename, + "%s/%s", + ctdl_netout_dir, + filedir_entry->d_name); + + StrBufPlain(NextHop, + filedir_entry->d_name, + pch - filedir_entry->d_name); + + snprintf(spooloutfilename, + sizeof spooloutfilename, + "%s/%s", + ctdl_netout_dir, + ChrPtr(NextHop)); + + QN_syslog(LOG_DEBUG, "Consolidate %s to %s\n", filename, ChrPtr(NextHop)); + if (network_talking_to(SKEY(NextHop), NTT_CHECK)) { + nFailed++; + QN_syslog(LOG_DEBUG, + "Currently online with %s - skipping for now\n", + ChrPtr(NextHop) ); + } + else { + size_t dsize; + size_t fsize; + int infd, outfd; + const char *err = NULL; + network_talking_to(SKEY(NextHop), NTT_ADD); + + infd = open(filename, O_RDONLY); + if (infd == -1) { + nFailed++; + QN_syslog(LOG_ERR, + "failed to open %s for reading due to %s; skipping.\n", + filename, strerror(errno) + ); + network_talking_to(SKEY(NextHop), NTT_REMOVE); + continue; + } + + outfd = open(spooloutfilename, + O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY, + S_IRUSR|S_IWUSR); + if (outfd == -1) + { + outfd = open(spooloutfilename, + O_EXCL|O_NONBLOCK|O_WRONLY, + S_IRUSR | S_IWUSR); + } + if (outfd == -1) { + nFailed++; + QN_syslog(LOG_ERR, + "failed to open %s for reading due to %s; skipping.\n", + spooloutfilename, strerror(errno) + ); + close(infd); + network_talking_to(SKEY(NextHop), NTT_REMOVE); + continue; + } + + dsize = lseek(outfd, 0, SEEK_END); + lseek(outfd, -dsize, SEEK_SET); + + fstat(infd, &statbuf); + fsize = statbuf.st_size; +/* + fsize = lseek(infd, 0, SEEK_END); +*/ + IOB.fd = infd; + FDIOBufferInit(&FDIO, &IOB, outfd, fsize + dsize); + FDIO.ChunkSendRemain = fsize; + FDIO.TotalSentAlready = dsize; + err = NULL; + errno = 0; + do {} while ((FileMoveChunked(&FDIO, &err) > 0) && (err == NULL)); + if (err == NULL) { + unlink(filename); } else { - network_talking_to(nexthop, NTT_ADD); - snprintf(cmd, sizeof cmd, "/bin/cat %s >>%s/%s && /bin/rm -f %s", - filename, - ctdl_netout_dir, nexthop, - filename - ); - system(cmd); - network_talking_to(nexthop, NTT_REMOVE); + nFailed++; + QN_syslog(LOG_ERR, + "failed to append to %s [%s]; rolling back..\n", + spooloutfilename, strerror(errno) + ); + /* whoops partial append?? truncate spooloutfilename again! */ + ftruncate(outfd, dsize); } + FDIOBufferDelete(&FDIO); + close(infd); + close(outfd); + network_talking_to(SKEY(NextHop), NTT_REMOVE); } } closedir(dp); - /* Step 2: delete any files in the outbound queue that were for neighbors who no longer exist */ + if (nFailed > 0) { + FreeStrBuf(&NextHop); + QN_syslog(LOG_INFO, + "skipping Spoolcleanup because of %d files unprocessed.\n", + nFailed + ); + + return; + } + /* Step 2: delete any files in the outbound queue that were for neighbors who no longer exist */ dp = opendir(ctdl_netout_dir); - if (dp == NULL) return; + if (dp == NULL) { + FreeStrBuf(&NextHop); + free(d); + return; + } - while (d = readdir(dp), d != NULL) { - if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, "..")) + while ((readdir_r(dp, d, &filedir_entry) == 0) && + (filedir_entry != NULL)) + { +#ifdef _DIRENT_HAVE_D_NAMELEN + d_namelen = filedir_entry->d_namelen; + d_type = filedir_entry->d_type; +#else + +#ifndef DT_UNKNOWN +#define DT_UNKNOWN 0 +#define DT_DIR 4 +#define DT_REG 8 +#define DT_LNK 10 + +#define IFTODT(mode) (((mode) & 0170000) >> 12) +#define DTTOIF(dirtype) ((dirtype) << 12) +#endif + d_namelen = strlen(filedir_entry->d_name); +#endif + if ((d_namelen == 1) && + (filedir_entry->d_name[0] == '.')) continue; - ptr = strchr(d->d_name, '@'); - if (d != NULL) + + if ((d_namelen == 2) && + (filedir_entry->d_name[0] == '.') && + (filedir_entry->d_name[1] == '.')) + continue; + + pch = strchr(filedir_entry->d_name, '@'); + if (pch == NULL) /* no @ in name? consolidated file. */ continue; + + StrBufPlain(NextHop, + filedir_entry->d_name, + pch - filedir_entry->d_name); + snprintf(filename, sizeof filename, "%s/%s", ctdl_netout_dir, - d->d_name + filedir_entry->d_name ); - strcpy(nexthop, ""); - i = is_valid_node(nexthop, NULL, d->d_name); + i = is_valid_node(&nexthop, + NULL, + NextHop, + working_ignetcfg, + the_netmap); - if ( (i != 0) || !IsEmptyStr(nexthop) ) { + if ( (i != 0) || (StrLength(nexthop) > 0) ) { unlink(filename); } } - - + FreeStrBuf(&NextHop); + free(d); closedir(dp); } @@ -841,7 +1021,7 @@ CTDL_MODULE_INIT(network_spool) if (!threading) { create_spool_dirs(); - CtdlRegisterCleanupHook(destroy_network_queue_room); +//////todo CtdlRegisterCleanupHook(destroy_network_queue_room); } return "network_spool"; }