]> code.citadel.org Git - citadel.git/blobdiff - citadel/event_client.c
way back from DB -> IO Queue
[citadel.git] / citadel / event_client.c
index c46429337da7f1db65dff1b53db14ea916679d38..b3bb59eb8a2622585f51006651f765b4674f5614 100644 (file)
@@ -68,7 +68,7 @@ void SetEVState(AsyncIO *IO, eIOState State)
 
 }
 
-
+eNextState QueueAnEventContext(AsyncIO *IO);
 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
 static void IO_abort_shutdown_callback(struct ev_loop *loop,
                                       ev_cleanup *watcher,
@@ -86,7 +86,7 @@ extern struct ev_loop *event_db;
 extern ev_async DBAddJob;
 extern ev_async DBExitEventLoop;
 
-eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
+eNextState QueueAnDBOperation(AsyncIO *IO)
 {
        IOAddHandler *h;
        int i;
@@ -94,7 +94,10 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
        SetEVState(IO, eDBQ);
        h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
        h->IO = IO;
-       h->EvAttch = CB;
+
+       assert(IO->ReAttachCB != NULL);
+
+       h->EvAttch = IO->ReAttachCB;
        ev_cleanup_init(&IO->db_abort_by_shutdown,
                        IO_abort_shutdown_callback);
        IO->db_abort_by_shutdown.data = IO;
@@ -161,12 +164,15 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
        assert(IO->NextDBOperation);
        switch (IO->NextDBOperation(IO))
        {
+       case eSendReply:
+               ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
+               QueueAnEventContext(IO);
+               break;
        case eDBQuery:
                break;
        case eSendDNSQuery:
        case eReadDNSReply:
        case eConnect:
-       case eSendReply:
        case eSendMore:
        case eSendFile:
        case eReadMessage:
@@ -219,7 +225,7 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop,
 }
 
 
-eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
+eNextState QueueAnEventContext(AsyncIO *IO)
 {
        IOAddHandler *h;
        int i;
@@ -227,7 +233,11 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
        SetEVState(IO, eIOQ);
        h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
        h->IO = IO;
-       h->EvAttch = CB;
+
+       assert(IO->ReAttachCB != NULL);
+
+       h->EvAttch = IO->ReAttachCB;
+
        ev_cleanup_init(&IO->abort_by_shutdown,
                        IO_abort_shutdown_callback);
        IO->abort_by_shutdown.data = IO;
@@ -260,12 +270,20 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
 eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB, int CloseFDs)
 {
        StopClientWatchers(IO, CloseFDs);
-       return QueueDBOperation(IO, CB);
+       IO->ReAttachCB = CB;
+       return eDBQuery;
 }
 eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB)
 {
        StopDBWatchers(IO);
-       return QueueEventContext(IO, CB);
+       IO->ReAttachCB = CB;
+       return eSendReply;
+}
+
+eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
+{
+       IO->ReAttachCB = CB;
+       return QueueAnEventContext(IO);
 }
 
 extern eNextState evcurl_handle_start(AsyncIO *IO);
@@ -310,7 +328,8 @@ eNextState QueueCurlContext(AsyncIO *IO)
 eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB)
 {
        StopCurlWatchers(IO);
-       return QueueDBOperation(IO, CB);
+       IO->ReAttachCB = CB;
+       return eDBQuery;
 }
 
 
@@ -497,13 +516,12 @@ eReadState HandleInbound(AsyncIO *IO)
                        assert(IO->ReadDone);
                        ev_io_stop(event_base, &IO->recv_event);
                        rc = IO->ReadDone(IO);
-                       if  (rc != eDBQuery) {
-                               IO->NextState = rc;
-                               Finished = StrBufCheckBuffer(&IO->RecvBuf);
+                       if  (rc == eDBQuery) {
+                               return QueueAnDBOperation(IO);
                        }
                        else {
-                               return rc;
-
+                               IO->NextState = rc;
+                               Finished = StrBufCheckBuffer(&IO->RecvBuf);
                        }
                }
        }
@@ -1239,10 +1257,12 @@ void KillAsyncIOContext(AsyncIO *IO)
        case eReadMore:
        case eReadPayload:
        case eReadFile:
-               QueueEventContext(&Ctx->IO, KillOtherContextNow);
+               IO->ReAttachCB = KillOtherContextNow;
+               QueueAnEventContext(&Ctx->IO);
                break;
        case eDBQuery:
-               QueueDBOperation(&Ctx->IO, KillOtherContextNow);
+               IO->ReAttachCB = KillOtherContextNow;
+               QueueAnDBOperation(&Ctx->IO);
                break;
        case eTerminateConnection:
        case eAbort: