]> code.citadel.org Git - citadel.git/blobdiff - citadel/event_client.c
libevent Migration:
[citadel.git] / citadel / event_client.c
index 6ffc4c96a03594e2243306d0bb1b30c35eb92580..ba18107d76c1307c46ab7754c4c39796eb28dc02 100644 (file)
@@ -77,6 +77,11 @@ extern EventContextAttach EventContextAttachPtr;
 int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB)
 {
        citthread_mutex_lock(&EventQueueMutex);
+       if (event_add_pipe[1] == -1) {
+               citthread_mutex_unlock(&EventQueueMutex);
+
+               return -1;
+       }
 
        QueueEventAddPtr = Ctx;
        EventContextAttachPtr = CB;
@@ -90,12 +95,26 @@ int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB)
 
 int ShutDownEventQueue(void)
 {
+       citthread_mutex_lock(&EventQueueMutex);
+       if (event_add_pipe[1] == -1) {
+               citthread_mutex_unlock(&EventQueueMutex);
+
+               return -1;
+       }
        write(event_add_pipe[1], "x_", 1);
        close(event_add_pipe[1]);
+       event_add_pipe[1] = -1;
+       citthread_mutex_unlock(&EventQueueMutex);
        return 0;
 }
 
+void FreeAsyncIOContents(AsyncIO *IO)
+{
+       FreeStrBuf(&IO->IOBuf);
+       FreeStrBuf(&IO->SendBuf.Buf);
+       FreeStrBuf(&IO->RecvBuf.Buf);
 
+}
 
 /*
 static void
@@ -117,6 +136,19 @@ setup_signal_handlers(struct instance *instance)
 }
 */
 
+void ShutDownCLient(AsyncIO *IO)
+{
+       event_del(&IO->send_event);
+       event_del(&IO->recv_event);
+       IO->Terminate(IO->Data);
+
+//     citthread_mutex_lock(&EventQueueMutex);
+
+///QueueEvents /// todo remove from hash.
+
+//     citthread_mutex_unlock(&EventQueueMutex);
+}
+
 eReadState HandleInbound(AsyncIO *IO)
 {
        eReadState Finished = eBufferNotEmpty;
@@ -161,6 +193,9 @@ eReadState HandleInbound(AsyncIO *IO)
                event_add(&IO->send_event, NULL);
                        
        }
+       else if ((IO->NextState == eTerminateConnection) ||
+                (IO->NextState == eAbort) )
+               ShutDownCLient(IO);
        return Finished;
 }
 
@@ -186,7 +221,12 @@ IO_send_callback(int fd, short event, void *ctx)
                    break;
            case eSendMore:
                    IO->NextState = IO->SendDone(IO->Data);
-                   event_add(&IO->send_event, NULL);
+
+                   if ((IO->NextState == eTerminateConnection) ||
+                            (IO->NextState == eAbort) )
+                           ShutDownCLient(IO);
+                   else
+                           event_add(&IO->send_event, NULL);
                    break;
            case eReadMessage:
                    if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
@@ -243,12 +283,14 @@ void InitEventIO(AsyncIO *IO,
                 void *pData, 
                 IO_CallBack ReadDone, 
                 IO_CallBack SendDone, 
+                IO_CallBack Terminate, 
                 IO_LineReaderCallback LineReader,
                 int ReadFirst)
 {
        IO->Data = pData;
        IO->SendDone = SendDone;
        IO->ReadDone = ReadDone;
+       IO->Terminate = Terminate;
        IO->LineReader = LineReader;
 
        event_set(&IO->recv_event,