First commit
[anna.git] / source / comm / internal / BinderSocket.cpp
1 // ANNA - Anna is Not 'N' Anymore
2 //
3 // (c) Copyright 2005-2014 Eduardo Ramos Testillano & Francisco Ruiz Rayo
4 //
5 // https://bitbucket.org/testillano/anna
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions
9 // are met:
10 //
11 //     * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 //     * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 //     * Neither the name of Google Inc. nor the names of its
18 // contributors may be used to endorse or promote products derived from
19 // this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 // Authors: eduardo.ramos.testillano@gmail.com
34 //          cisco.tierra@gmail.com
35
36
37 #include <sys/time.h>
38 #include <fcntl.h>
39 #include <strings.h>
40 #include <sys/types.h>
41 #include <poll.h>
42
43 #ifdef __CYGWIN__
44 #include <sys/uio.h>
45 #endif
46
47 #include <anna/core/functions.hpp>
48 #include <anna/core/tracing/Logger.hpp>
49
50 #include <anna/comm/internal/BinderSocket.hpp>
51 #include <anna/comm/ServerSocket.hpp>
52
53 using namespace std;
54 using namespace anna;
55
56 //-------------------------------------------------------------------------------------------------
57 // No sabemos si tenemos que actuar como Cliente o Servidor; por lo que tenemos que establecer
58 // ambos extremos del Socket.
59 //-------------------------------------------------------------------------------------------------
60 comm::BinderSocket::BinderSocket(comm::ServerSocket* serverSocket) :
61   ClientSocket(
62     string("/tmp/").append(serverSocket->getLocalAccessPoint().serialize()),
63     Type::Stream
64   ),
65   a_serverSocket(*serverSocket) {
66 }
67
68 //-------------------------------------------------------------------------------------------------
69 // (1) Si consigue hacer el bind => es el primero. A partir de ahora dara acceso al resto.
70
71 // (0) a_bindDelay esta expresando en milisegundos => maxDelay estara en nanosegundos.
72 // (0.1) Guardara los fds asociados al socket (real) y al socket por el que enviamos la informacion
73 //     para poder hacer multiples binds sobre una misma direccion.
74 // (0.2) Vamos a enviar 2 fds (el socket real y el socket UNIX).
75 // (1) El Communicator tendra que tener en cuenta que se trata de un socket bind-compartido
76 //     para meter el fd del publicador en la lista de descriptores por los que es posible que nos
77 //     lleguen mensajes.
78 // (2) Si falla el bind => cierra los recursos. Ya que nos llegara el nuevo fd atraves del mensaje
79 //     del recvmsg.
80 // (3) Conectamos como cliente con el socket UNIX => cuando la parte servidora (a traves del
81 //     Communicator) detecte nuestra presencia deberia enviarnos la informacion para que podamos
82 //     crear el socket con el bind-compartido. Limpiamos la direccion local para evitar hacer otro
83 //      bind
84 // (4) Si el 'connect' falla => se cierra el socket automaticamente => lo abre de nuevo.
85 // (5) "Cierra" el socket UNIX ya que no volvera a usarlo. A partir de ahora el valor bueno sera
86 //     el indicado en la estructura que nos pasa alguno de los servidores que estan escuchando
87 //     el socket UNIX.
88 //
89 // Visto en: http://mail-index.netbsd.org/current-users/1998/01/16/0010.html
90 //-------------------------------------------------------------------------------------------------
91 void comm::BinderSocket::requestBind(const struct sockaddr* s, const  int len)
92 throw(RuntimeException) {
93   if(::bind(a_serverSocket.a_fd, s, len) != -1) {                // (1)
94     unlink(getRemoteAccessPoint().getPath().c_str());
95     this->bind();
96     anna_socket_assert(::listen(a_fd, 10) == -1, "Error preparing listen access point");
97     return;
98   }
99
100   const int xerrno = errno;
101
102   a_serverSocket.close();                                        // (2)
103
104   if(xerrno != EADDRINUSE)
105     throw RuntimeException(asString(), xerrno, ANNA_FILE_LOCATION);
106
107   this->a_localAccessPoint.clear();                           // (3)
108   this->connect();                                               // (4)
109   waitBind(a_serverSocket.getBindDelay());
110 }
111
112 //-------------------------------------------------------------------------------------------------
113 // Este metodo se invoca desde el Communicator cuando se detecta activada en el socket UNIX
114 // empleado para comunicar todos los procesos que quieran compartir una determinada direccion.
115 //
116 // (1) Acepta la conexion por el socket UNIX. Ver (3) de sharedBind.
117 // (2) En el IOV debemos mandar al menos 1 byte.
118 // (3) Rellenamos la estructura que recojera algun otro proceso que esta esperando a leer del
119 //     socket UNIX.
120 // (4) Cierra la nueva conexion ya que no vamos a volver a usarla para nada.
121 //-------------------------------------------------------------------------------------------------
122 void comm::BinderSocket::responseBind()
123 throw(RuntimeException) {
124   struct iovec iov;
125   struct msghdr mh;
126   int fds [2];
127   int garbage(0);
128   sockaddr_un s;
129   socklen_t len(sizeof(sockaddr_un));
130   anna_memset(&s, 0, sizeof(sockaddr_un));
131   int newSocket = ::accept(a_fd, (sockaddr*) & s, &len);      // (1)
132
133   if(newSocket == -1) {
134     const int xerrno = errno;
135     throw RuntimeException(asString(), xerrno, ANNA_FILE_LOCATION);
136   }
137
138   iov.iov_len = sizeof(garbage);                                                   // (2)
139   iov.iov_base = (char*) & garbage;
140   mh.msg_iov = &iov;
141   mh.msg_iovlen = 1;
142   mh.msg_name = NULL;
143   mh.msg_namelen = 0;
144   fds [0] = a_serverSocket.a_fd;                                                  // (3)
145   fds [1] = a_fd;
146 #ifndef __sun
147   struct cmsghdr cmh;
148   char buffer [sizeof(struct cmsghdr) + sizeof(fds)];
149   mh.msg_control = &buffer;
150   mh.msg_controllen = cmh.cmsg_len = sizeof(struct cmsghdr) + sizeof(fds);
151   cmh.cmsg_level = SOL_SOCKET;
152   cmh.cmsg_type = SCM_RIGHTS;
153   anna_memcpy(buffer, &cmh, sizeof(cmh));
154   anna_memcpy(buffer + sizeof(struct cmsghdr), fds, sizeof(fds));
155   mh.msg_flags = 0;
156 #else
157   mh.msg_accrights = (caddr_t) fds;
158   mh.msg_accrightslen = sizeof(fds);
159 #endif
160
161   if(sendmsg(newSocket, &mh, 0) < 0) {
162     int xerrno = errno;
163     ::close(newSocket);
164     throw RuntimeException(asString(), xerrno, ANNA_FILE_LOCATION);
165   }
166
167   ::close(newSocket);                                                              // (4)
168   LOGDEBUG(
169     string msg("anna::comm::BinderSocket::responseBind | ");
170     msg += functions::asText("ServerSocket's fd: ", fds [0]);
171     msg += functions::asText(" | BinderSocket's fd: ", fds [1]);
172     Logger::debug(msg, ANNA_FILE_LOCATION);
173   );
174 }
175
176 void comm::BinderSocket::waitBind(const Millisecond &maxDelay)
177 throw(RuntimeException) {
178   struct msghdr mh;
179   struct iovec iov;
180   const int garbage(0);
181   int fds [2];
182   iov.iov_len = sizeof(garbage);
183   iov.iov_base = (char*) & garbage;
184   mh.msg_iov = &iov;
185   mh.msg_iovlen = 1;
186   mh.msg_name = NULL;
187   mh.msg_namelen = 0;
188 #ifndef __sun
189   char buffer [sizeof(struct cmsghdr) + sizeof(fds)];
190   mh.msg_control = &buffer;
191   mh.msg_controllen = sizeof(struct cmsghdr) + sizeof(fds);
192   mh.msg_flags = 0;
193 #else
194   mh.msg_accrights = (caddr_t) fds;
195   mh.msg_accrightslen = sizeof(fds);
196 #endif
197   pollfd poll;
198   int r;
199   poll.fd = a_fd;
200   poll.events = POLLIN | POLLRDNORM;
201
202   if(::poll(&poll, 1, maxDelay) != 1) {
203     string msg(a_serverSocket.asString());
204     msg += " | No data is received from shared bind";
205     throw RuntimeException(msg, ANNA_FILE_LOCATION);
206   }
207
208   r = recvmsg(a_fd, &mh, 0);
209
210   if(r <= 0) {
211     const int xerrno = errno;
212     std::string msg(a_serverSocket.asString());
213     msg += " | ";
214     msg += asString();
215
216     if(r == -1) {
217       msg += " | Error in shared bind reception";
218       throw RuntimeException(msg, xerrno, ANNA_FILE_LOCATION);
219     } else if(r == 0) {
220       msg += " | No item received to obtain shared bind";
221       throw RuntimeException(msg, ANNA_FILE_LOCATION);
222     }
223   }
224
225   close();
226 #ifndef __sun
227   anna_memcpy(&fds, buffer + sizeof(cmsghdr), sizeof(fds));
228 #endif
229   a_serverSocket.a_fd = fds [0];
230   a_fd = fds [1];
231   LOGDEBUG(
232     string msg("anna::comm::BinderSocket::waitBind | ");
233     msg += functions::asText("ServerSocket's fd: ", fds [0]);
234     msg += functions::asText(" | BinderSocket's fd: ", fds [1]);
235     Logger::debug(msg, ANNA_FILE_LOCATION);
236   );
237 }
238