Completed the rewrite of the network poller. Again I'm not terribly happy with this...
[citadel.git] / citadel / modules / networkclient / serv_networkclient.c
1 /*
2  * This module polls other Citadel servers for inter-site networking.
3  *
4  * Copyright (c) 2000-2017 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License, version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * Implementation note:
15  * I'm not really happy with this.  It looks more like a Gen 1 poller
16  * than a Gen 3 poller, because libcurl (understandably) does not have
17  * support for Citadel protocol.  In the not too distant future I would
18  * like to remove Citadel-to-Citadel protocol entirely, and replace it
19  * with NNTP.
20  */
21
22
23 #include "sysdep.h"
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <stdio.h>
27 #include <fcntl.h>
28 #include <ctype.h>
29 #include <signal.h>
30 #include <pwd.h>
31 #include <errno.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <dirent.h>
35 #if TIME_WITH_SYS_TIME
36 # include <sys/time.h>
37 # include <time.h>
38 #else
39 # if HAVE_SYS_TIME_H
40 #  include <sys/time.h>
41 # else
42 #  include <time.h>
43 # endif
44 #endif
45 #ifdef HAVE_SYSCALL_H
46 # include <syscall.h>
47 #else 
48 # if HAVE_SYS_SYSCALL_H
49 #  include <sys/syscall.h>
50 # endif
51 #endif
52
53 #include <sys/wait.h>
54 #include <string.h>
55 #include <limits.h>
56 #include <libcitadel.h>
57 #include "citadel.h"
58 #include "server.h"
59 #include "citserver.h"
60 #include "support.h"
61 #include "config.h"
62 #include "user_ops.h"
63 #include "database.h"
64 #include "msgbase.h"
65 #include "internet_addressing.h"
66 #include "clientsocket.h"
67 #include "citadel_dirs.h"
68 #include "threads.h"
69 #include "context.h"
70 #include "ctdl_module.h"
71
72 struct CitContext networker_client_CC;
73
74
75 /*
76  * Poll one Citadel node (the Citadel node name, host/ip, port number, and shared secret are supplied by the caller)
77  */
78 void network_poll_node(StrBuf *node, StrBuf *host, StrBuf *port, StrBuf *secret)
79 {
80         char buf[SIZ];
81         CC->SBuf.Buf = NewStrBuf();
82         CC->sMigrateBuf = NewStrBuf();
83         CC->SBuf.ReadWritePointer = NULL;
84         int bytes_read = 0;
85         int bytes_total = 0;
86         int this_block = 0;
87         StrBuf *SpoolFileName = NULL;
88         FILE *netoutfp = NULL;
89
90         syslog(LOG_DEBUG, "netpoll: polling %s at %s:%s", ChrPtr(node), ChrPtr(host), ChrPtr(port));
91
92         int sock = sock_connect((char *)ChrPtr(host), (char *)ChrPtr(port));
93         if (sock < 0) {
94                 syslog(LOG_ERR, "%s: %s", ChrPtr(host), strerror(errno));
95                 return;
96         }
97
98         /* Read the server greeting */
99         if (sock_getln(&sock, buf, sizeof buf) < 0) {
100                 goto bail;
101         }
102
103         /* Check that the remote is who we think it is and warn the site admin if not */
104         if (strncmp(&buf[4], ChrPtr(node), StrLength(node))) {
105                 CtdlAideMessage(buf, "Connected to wrong node!");
106                 syslog(LOG_ERR, "netpoll: was expecting node <%s> but got %s", ChrPtr(node), buf);
107                 goto bail;
108         }
109
110         /* We're talking to the correct node.  Now identify ourselves. */
111         snprintf(buf, sizeof buf, "NETP %s|%s", CtdlGetConfigStr("c_nodename"), ChrPtr(secret));
112         sock_puts(&sock, buf);
113         if (sock_getln(&sock, buf, sizeof buf) < 0) {
114                 goto bail;
115         }
116         if (buf[0] != '2') {
117                 CtdlAideMessage(buf, "Could not authenticate to network peer");
118                 syslog(LOG_ERR, "netpoll: could not authenticate to <%s> : %s", ChrPtr(node), buf);
119                 goto bail;
120         }
121
122         /* Tell it we want to download anything headed our way. */
123         sock_puts(&sock, "NDOP");
124         if (sock_getln(&sock, buf, sizeof buf) < 0) {
125                 goto bail;
126         }
127         if (buf[0] != '2') {
128                 CtdlAideMessage(buf, "NDOP error");
129                 syslog(LOG_ERR, "netpoll: NDOP error talking to <%s> : %s", ChrPtr(node), buf);
130                 goto bail;
131         }
132
133         bytes_total = atoi(&buf[4]);
134         bytes_read = 0;
135
136         SpoolFileName = NewStrBuf();
137         StrBufPrintf(SpoolFileName,                     // Incoming packets get dropped into the "spoolin/" directory
138                 "%s/%s.%lx%x",
139                 ctdl_netin_dir,
140                 ChrPtr(node),
141                 time(NULL),
142                 rand()
143         );
144         StrBufStripSlashes(SpoolFileName, 1);
145
146         FILE *netinfp = fopen(ChrPtr(SpoolFileName), "w");
147         FreeStrBuf(&SpoolFileName);
148         if (!netinfp) {
149                 goto bail;
150         }
151
152         while (bytes_read < bytes_total) {
153                 snprintf(buf, sizeof buf, "READ %d|%d", bytes_read, bytes_total-bytes_read);
154                 sock_puts(&sock, buf);
155                 if (sock_getln(&sock, buf, sizeof buf) < 0) {
156                         fclose(netinfp);
157                         goto bail;
158                 }
159                 if (buf[0] == '6') {
160                         this_block = atoi(&buf[4]);
161
162                         // Use buffered reads to download the data from remote server
163                         StrBuf *ThisBlockBuf = NewStrBuf();
164                         int blen = socket_read_blob(&sock, ThisBlockBuf, this_block, 20);
165                         if (blen > 0) {
166                                 fwrite(ChrPtr(ThisBlockBuf), blen, 1, netinfp);
167                                 bytes_read += blen;
168                         }
169                         FreeStrBuf(&ThisBlockBuf);
170                         if (blen < this_block) {
171                                 syslog(LOG_DEBUG, "netpoll: got short block, ftn");
172                                 fclose(netinfp);
173                                 goto bail;
174                         }
175                 }
176         }
177
178         syslog(LOG_DEBUG, "netpoll: downloaded %d of %d bytes from %s", bytes_read, bytes_total, ChrPtr(node));
179
180         if (fclose(netinfp) == 0) {
181                 sock_puts(&sock, "CLOS");                               // CLOSing the download causes it to be deleted on the other node
182                 if (sock_getln(&sock, buf, sizeof buf) < 0) {
183                         goto bail;
184                 }
185         }
186
187         // Now get ready to send our network data to the other node.
188         SpoolFileName = NewStrBuf();
189         StrBufPrintf(SpoolFileName,                                     // Outgoing packets come from the "spoolout/" directory
190                 "%s/%s",
191                 ctdl_netout_dir,
192                 ChrPtr(node)
193         );
194         netoutfp = fopen(ChrPtr(SpoolFileName), "r");
195         if (!netoutfp) {
196                 goto bail;
197         }
198
199         /* Tell it we want to upload. */
200         sock_puts(&sock, "NUOP");
201         if (sock_getln(&sock, buf, sizeof buf) < 0) {
202                 goto bail;
203         }
204         if (buf[0] != '2') {
205                 CtdlAideMessage(buf, "NUOP error");
206                 syslog(LOG_ERR, "netpoll: NUOP error talking to <%s> : %s", ChrPtr(node), buf);
207                 goto bail;
208         }
209
210         fseek(netoutfp, 0, SEEK_END);
211         long total_to_send = ftell(netoutfp);
212         rewind(netoutfp);
213
214         syslog(LOG_DEBUG, "netpoll: I want to send %ld bytes to %s", total_to_send, ChrPtr(node));
215         long bytes_sent = 0;
216         while (bytes_sent < total_to_send) {
217                 if (server_shutting_down) {
218                         goto bail;
219                 }
220
221                 this_block = total_to_send - bytes_sent;
222                 if (this_block > sizeof(buf)) {
223                         this_block = sizeof(buf);
224                 }
225
226                 snprintf(buf, sizeof buf, "WRIT %d", this_block);
227                 sock_puts(&sock, buf);
228                 if (sock_getln(&sock, buf, sizeof buf) < 0) {
229                         goto bail;
230                 }
231                 if (buf[0] != '7') {
232                         goto bail;
233                 }
234                 this_block = atol(&buf[4]);
235                 fread(buf, this_block, 1, netoutfp);
236                 if (sock_write(&sock, buf, this_block) != this_block) {
237                         goto bail;
238                 }
239                 bytes_sent += this_block;
240                 syslog(LOG_DEBUG, "netpoll: sent %ld of %ld bytes to %s", bytes_sent, total_to_send, ChrPtr(node));
241         }
242
243         /* Tell them we're done. */
244         sock_puts(&sock, "UCLS 1");                                     // UCLS 1 causes it to close *and delete* on the other node
245         if (sock_getln(&sock, buf, sizeof buf) < 0) {
246                 goto bail;
247         }
248         if (buf[0] == '2') {
249                 unlink(ChrPtr(SpoolFileName));
250         }
251
252 bail:   close(sock);
253         if (SpoolFileName != NULL) FreeStrBuf(&SpoolFileName);
254         if (netoutfp != NULL) fclose(netoutfp);
255         FreeStrBuf(&CC->SBuf.Buf);
256         FreeStrBuf(&CC->sMigrateBuf);
257 }
258
259
260 /*
261  * Poll other Citadel nodes and transfer inbound/outbound network data.
262  * Set "full" to nonzero to force a poll of every node, or to zero to poll
263  * only nodes to which we have data to send.
264  */
265 void network_poll_other_citadel_nodes(int full_poll, HashList *ignetcfg)
266 {
267         const char *key;
268         long len;
269         HashPos *Pos;
270         void *vCfg;
271         StrBuf *SpoolFileName;
272         
273         int poll = 0;
274         
275         if (GetCount(ignetcfg) ==0) {
276                 syslog(LOG_DEBUG, "netpoll: no neighbor nodes are configured - not polling");
277                 return;
278         }
279         become_session(&networker_client_CC);
280
281         SpoolFileName = NewStrBufPlain(ctdl_netout_dir, -1);
282
283         Pos = GetNewHashPos(ignetcfg, 0);
284
285         while (GetNextHashPos(ignetcfg, Pos, &len, &key, &vCfg))
286         {
287                 /* Use the string tokenizer to grab one line at a time */
288                 if (server_shutting_down) {
289                         return;
290                 }
291                 CtdlNodeConf *pNode = (CtdlNodeConf*) vCfg;
292                 poll = 0;
293                 
294                 StrBuf *node = NewStrBufDup(pNode->NodeName);
295                 StrBuf *host = NewStrBufDup(pNode->Host);
296                 StrBuf *port = NewStrBufDup(pNode->Port);
297                 StrBuf *secret = NewStrBufDup(pNode->Secret);
298                 
299                 if ( (StrLength(node) != 0) && 
300                      (StrLength(secret) != 0) &&
301                      (StrLength(host) != 0)
302                 ) {
303                         poll = full_poll;
304                         if (poll == 0)
305                         {
306                                 StrBufAppendBufPlain(SpoolFileName, HKEY("/"), 0);
307                                 StrBufAppendBuf(SpoolFileName, node, 0);
308                                 StrBufStripSlashes(SpoolFileName, 1);
309                                 
310                                 if (access(ChrPtr(SpoolFileName), R_OK) == 0) {
311                                         poll = 1;
312                                 }
313                         }
314                 }
315                 if (poll && (StrLength(host) > 0) && strcmp("0.0.0.0", ChrPtr(host))) {
316                         if (!CtdlNetworkTalkingTo(SKEY(node), NTT_CHECK)) {
317                                 network_poll_node(node, host, port, secret);
318                         }
319                 }
320                 FreeStrBuf(&node);
321                 FreeStrBuf(&host);
322                 FreeStrBuf(&secret);
323                 FreeStrBuf(&port);
324         }
325         FreeStrBuf(&SpoolFileName);
326         DeleteHashPos(&Pos);
327 }
328
329
330 void network_do_clientqueue(void)
331 {
332         HashList *working_ignetcfg;
333         int full_processing = 1;
334         static time_t last_run = 0L;
335
336         /*
337          * Run the full set of processing tasks no more frequently
338          * than once every n seconds
339          */
340         if ( (time(NULL) - last_run) < CtdlGetConfigLong("c_net_freq") )
341         {
342                 full_processing = 0;
343                 syslog(LOG_DEBUG, "netpoll: full processing in %ld seconds.",
344                         CtdlGetConfigLong("c_net_freq") - (time(NULL)- last_run)
345                 );
346         }
347
348         working_ignetcfg = CtdlLoadIgNetCfg();
349         /*
350          * Poll other Citadel nodes.  Maybe.  If "full_processing" is set
351          * then we poll everyone.  Otherwise we only poll nodes we have stuff
352          * to send to.
353          */
354         network_poll_other_citadel_nodes(full_processing, working_ignetcfg);
355         DeleteHash(&working_ignetcfg);
356 }
357
358
359 /*
360  * Module entry point
361  */
362 CTDL_MODULE_INIT(network_client)
363 {
364         if (!threading)
365         {
366                 CtdlFillSystemContext(&networker_client_CC, "CitNetworker");
367                 CtdlRegisterSessionHook(network_do_clientqueue, EVT_TIMER, PRIO_SEND + 10);
368
369         }
370         return "networkclient";
371 }