ev_loop *event_base;
-long EvIDSource = 0;
+long EvIDSource = 1;
/*****************************************************************************
* libevent / curl integration *
*****************************************************************************/
char *f;
AsyncIO *IO = (AsyncIO*) vIO;
CURLcode sta;
+ const char *Action;
if (IO == NULL) {
sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
return -1;
}
IO = (AsyncIO *) f;
+ EV_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl);
if (IO->SendBuf.fd != 0)
{
ev_io_stop(event_base, &IO->recv_event);
ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
curl_multi_assign(mhnd, fd, IO);
}
- EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
+
+ Action = "";
+ switch (action)
+ {
+ case CURL_POLL_NONE:
+ Action = "CURL_POLL_NONE";
+ break;
+ case CURL_POLL_REMOVE:
+ Action = "CURL_POLL_REMOVE";
+ break;
+ case CURL_POLL_IN:
+ Action = "CURL_POLL_IN";
+ break;
+ case CURL_POLL_OUT:
+ Action = "CURL_POLL_OUT";
+ break;
+ case CURL_POLL_INOUT:
+ Action = "CURL_POLL_INOUT";
+ break;
+ }
+
+
+ EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", (int)fd, Action, action);
switch (action)
{
OPT(NOSIGNAL, (long)1);
OPT(FAILONERROR, (long)1);
OPT(ENCODING, "");
- OPT(FOLLOWLOCATION, (long)1);
+ OPT(FOLLOWLOCATION, (long)0);
OPT(MAXREDIRS, (long)7);
OPT(USERAGENT, CITADEL);
}
DeleteHashPos(&It);
DeleteHashContent(&q);
- syslog(LOG_DEBUG, "EVENT Q Read done.\n");
+ syslog(LOG_DEBUG, "EVENT Q Add done.\n");
}
}
DeleteHashPos(&It);
DeleteHashContent(&q);
- syslog(LOG_DEBUG, "DBEVENT Q Read done.\n");
+ syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
}
cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
- return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+ return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
else
return eAbort;
}
#ifndef DEBUG_RSS
if (cdbut != NULL) {
/* Item has already been seen */
- syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
+ EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
cdb_free(cdbut);
/* rewrite the record anyway, to update the timestamp */
&Ctx->ThisMsg->ut, sizeof(struct UseTable) );
if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
- return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+ return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
else
return eAbort;
}
int msglen = 0;
StrBuf *Message;
StrBuf *guid;
+ AsyncIO *IO = &Cfg->IO;
int n;
}
/* translate Item into message. */
- syslog(LOG_DEBUG, "RSS: translating item...\n");
+ EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
StrBufSpaceToBlank(ri->description);
msg = malloc(sizeof(struct CtdlMessage));
if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
return 0;
- Cfg->RefCount++;
ri = (rss_item*) malloc(sizeof(rss_item));
memset(ri, 0, sizeof(rss_item));
void DeleteRssCfg(void *vptr)
{
rss_aggregator *rncptr = (rss_aggregator *)vptr;
+ AsyncIO *IO = &rncptr->IO;
+ EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
FreeStrBuf(&rncptr->Url);
FreeStrBuf(&rncptr->rooms);
DeleteHash(&rncptr->OtherQRnumbers);
FreeURL(&rncptr->IO.ConnectMe);
+ DeleteHashPos (&rncptr->Pos);
+ DeleteHash (&rncptr->Messages);
+ if (rncptr->recp.recp_room != NULL)
+ free(rncptr->recp.recp_room);
+
+
if (rncptr->Item != NULL)
{
FreeStrBuf(&rncptr->Item->guid);
eNextState RSSAggregatorTerminate(AsyncIO *IO)
{
rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
-
- HashPos *At;
- long HKLen;
- const char *HK;
- void *vData;
- pthread_mutex_lock(&RSSQueueMutex);
- rncptr->RefCount --;
- if (rncptr->RefCount == 0)
- {
- UnlinkRSSAggregator(rncptr);
-
- }
- pthread_mutex_unlock(&RSSQueueMutex);
+ EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
- At = GetNewHashPos(RSSFetchUrls, 0);
- pthread_mutex_lock(&RSSQueueMutex);
- GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At);
- GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
- DeleteEntryFromHash(RSSFetchUrls, At);
- pthread_mutex_unlock(&RSSQueueMutex);
- DeleteHashPos (&rncptr->Pos);
- DeleteHash (&rncptr->Messages);
- if (rncptr->recp.recp_room != NULL)
- free(rncptr->recp.recp_room);
- DeleteHashPos(&At);
+ UnlinkRSSAggregator(rncptr);
return eAbort;
}
use_this_rncptr = (rss_aggregator *)vptr;
if (use_this_rncptr != NULL)
{
- /* mustn't attach to an active session */
- if (use_this_rncptr->RefCount > 0)
- {
- DeleteRssCfg(rncptr);
- Count->count--;
- }
- else
+ long *QRnumber;
+ StrBufAppendBufPlain(use_this_rncptr->rooms,
+ qrbuf->QRname,
+ -1, 0);
+ if (use_this_rncptr->roomlist_parts == 1)
{
- long *QRnumber;
- StrBufAppendBufPlain(use_this_rncptr->rooms,
- qrbuf->QRname,
- -1, 0);
- if (use_this_rncptr->roomlist_parts == 1)
- {
- use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
- }
- QRnumber = (long*)malloc(sizeof(long));
- *QRnumber = qrbuf->QRnumber;
- Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
- use_this_rncptr->roomlist_parts++;
+ use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
}
- pthread_mutex_unlock(&RSSQueueMutex);
+ QRnumber = (long*)malloc(sizeof(long));
+ *QRnumber = qrbuf->QRnumber;
+ Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
+ use_this_rncptr->roomlist_parts++;
+ pthread_mutex_unlock(&RSSQueueMutex);
FreeStrBuf(&rncptr->Url);
free(rncptr);
*/
if (doing_rssclient) return;
doing_rssclient = 1;
+ if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
+ return;
syslog(LOG_DEBUG, "rssclient started\n");
CtdlForEachRoom(rssclient_scan_room, NULL);
GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
(vrptr != NULL)) {
rptr = (rss_aggregator *)vrptr;
- if (rptr->RefCount == 0)
- if (!rss_do_fetching(rptr))
- UnlinkRSSAggregator(rptr);
+ if (!rss_do_fetching(rptr))
+ UnlinkRSSAggregator(rptr);
}
DeleteHashPos(&it);
pthread_mutex_unlock(&RSSQueueMutex);