+typedef struct __networker_save_message {
+ AsyncIO IO;
+ struct CtdlMessage *Msg;
+ struct recptypes *recp;
+ rss_aggregator *Cfg;
+ StrBuf *MsgGUID;
+ StrBuf *Message;
+ struct UseTable ut;
+} networker_save_message;
+
+
+void DeleteRoomReference(long QRnumber)
+{
+ HashPos *At;
+ long HKLen;
+ const char *HK;
+ void *vData = NULL;
+ rss_room_counter *pRoomC;
+
+ At = GetNewHashPos(RSSQueueRooms, 0);
+
+ GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At);
+ GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
+ if (vData != NULL)
+ {
+ pRoomC = (rss_room_counter *) vData;
+ pRoomC->count --;
+ if (pRoomC->count == 0)
+ DeleteEntryFromHash(RSSQueueRooms, At);
+ }
+ DeleteHashPos(&At);
+}
+
+void UnlinkRooms(rss_aggregator *Cfg)
+{
+
+ DeleteRoomReference(Cfg->QRnumber);
+ if (Cfg->OtherQRnumbers != NULL)
+ {
+ long HKLen;
+ const char *HK;
+ HashPos *At;
+ void *vData;
+
+ At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
+ while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) &&
+ (vData != NULL))
+ {
+ long *lData = (long*) vData;
+ DeleteRoomReference(*lData);
+ }
+/*
+ if (server_shutting_down)
+ break; / * TODO */
+
+ DeleteHashPos(&At);
+ }
+}
+
+void UnlinkRSSAggregator(rss_aggregator *Cfg)
+{
+ HashPos *At;
+
+ UnlinkRooms(Cfg);
+
+ At = GetNewHashPos(RSSFetchUrls, 0);
+ if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At) == 0)
+ {
+ DeleteEntryFromHash(RSSFetchUrls, At);
+ }
+ DeleteHashPos(&At);
+ last_run = time(NULL);
+}
+
+eNextState FreeNetworkSaveMessage (AsyncIO *IO)
+{
+ networker_save_message *Ctx = (networker_save_message *) IO->Data;
+
+ pthread_mutex_lock(&RSSQueueMutex);
+ Ctx->Cfg->RefCount --;
+
+ if (Ctx->Cfg->RefCount == 0)
+ {
+ UnlinkRSSAggregator(Ctx->Cfg);
+
+ }
+ pthread_mutex_unlock(&RSSQueueMutex);
+
+ CtdlFreeMessage(Ctx->Msg);
+ free_recipients(Ctx->recp);
+ FreeStrBuf(&Ctx->Message);
+ FreeStrBuf(&Ctx->MsgGUID);
+ free(Ctx);
+ last_run = time(NULL);
+ return eAbort;
+}
+
+eNextState AbortNetworkSaveMessage (AsyncIO *IO)
+{
+ return eAbort; ///TODO
+}
+
+eNextState RSSSaveMessage(AsyncIO *IO)
+{
+ networker_save_message *Ctx = (networker_save_message *) IO->Data;
+
+ Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
+
+ CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
+
+ /* write the uidl to the use table so we don't store this item again */
+ cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
+
+ return eTerminateConnection;
+}
+
+eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
+{
+ struct cdbdata *cdbut;
+ networker_save_message *Ctx = (networker_save_message *) IO->Data;
+
+ /* Find out if we've already seen this item */
+ strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
+ Ctx->ut.ut_timestamp = time(NULL);
+
+ cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
+#ifndef DEBUG_RSS
+ if (cdbut != NULL) {
+ /* Item has already been seen */
+ syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
+ cdb_free(cdbut);
+
+ /* rewrite the record anyway, to update the timestamp */
+ cdb_store(CDB_USETABLE,
+ SKEY(Ctx->MsgGUID),
+ &Ctx->ut, sizeof(struct UseTable) );
+ return eAbort;
+ }
+ else
+#endif
+ {
+ NextDBOperation(IO, RSSSaveMessage);
+ return eSendMore;
+ }
+}
+void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregator *Cfg)
+{
+ networker_save_message *Ctx;
+
+ Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
+ memset(Ctx, 0, sizeof(networker_save_message));
+
+ Ctx->MsgGUID = MsgGUID;
+ Ctx->Message = MessageBody;
+ Ctx->Msg = Msg;
+ Ctx->Cfg = Cfg;
+ Ctx->recp = recp;
+ Ctx->IO.Data = Ctx;
+ Ctx->IO.CitContext = CloneContext(&rss_CC);
+ Ctx->IO.Terminate = FreeNetworkSaveMessage;
+ Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
+ QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
+}
+
+