root/cserver/server.c

Revision 1203, 14.9 kB (checked in by efphe, 1 year ago)

moving code from fenilot.org

Line 
1 /*
2  * Copyright (c) 2006, Federico Tomassini AKA efphe (effetom AT gmail DOT com)
3  * All rights reserved.
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  *
7  *     * Redistributions of source code must retain the above copyright
8  *       notice, this list of conditions and the following disclaimer.
9  *     * Redistributions in binary form must reproduce the above copyright
10  *       notice, this list of conditions and the following disclaimer in the
11  *       documentation and/or other materials provided with the distribution.
12  *     * Neither the name of the University of California, Berkeley nor the
13  *       names of its contributors may be used to endorse or promote products
14  *       derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
17  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27
28 #include "server.h"
29 #include "server_utils.h"
30 #include "wsignals.h"
31 #include "wmalloc.h"
32
33 static pmt _servers_must_die_mutex=PTHREAD_MUTEX_INITIALIZER;
34 static int _servers_must_die=0;
35 static pmt _servers_working_threads_mutex=PTHREAD_MUTEX_INITIALIZER;
36 static pct _servers_working_threads_cond=PTHREAD_COND_INITIALIZER;
37 static int _servers_working_threads=0;
38
39 /*
40  * Server API
41  *  Handle the server struct with the following functions,
42  *  never directly.
43  */
44
45 void server_log_init(LogLevel level, SyslogFacility facility, int on_stderr)
46 {
47     if (level==-1)
48         level= SYSLOG_LEVEL_INFO;
49     if (facility==-1)
50         facility= LOG_DAEMON;
51     log_init("cserver", level, facility, on_stderr);
52     return;
53 }
54
55 void server_set_signal_handler(server *s, int signum, void(*f)(int))
56 {
57     ssh **o,*k;
58     sigset_t temp;
59     void (*old_f)(int)=0;
60
61     w_sigemptyset(&temp);
62     w_sigaddset(&temp, signum);
63     o=&(s->signal_handlers);
64
65     while ((k=*o)) {
66         /* Overwrite handler. */
67         if (k->signum==signum) {
68             old_f= k->f;
69             k->f= f;
70         }
71
72         /*
73          * Ei, this handler is already used. Collect
74          * signal in temp.
75          */
76         if (k->f==f) {
77             w_sigaddset(&k->sa_mask, signum);
78             w_sigaddset(&temp, k->signum);
79         }
80         o=&((*o)->next);
81     }
82
83     if (!old_f) /* This is a new signal. */
84         *o=_server_new_signal_handler(signum, &temp, f);
85
86     else { /* A signal handler was modified. Remove dependencies. */
87         o=&(s->signal_handlers);
88         while((k=*o)) {
89             if (k->f == old_f )
90                 w_sigdelset(&k->sa_mask, signum);
91             o=&((*o)->next);
92         }
93     }
94
95     return;
96 }
97
98
99 void server_init(server *s)
100 {
101     memset(s,0,sizeof(server));
102
103     server_set_tps(s,DEF_THREADS_PER_SOCKET);
104     server_set_family(s,AF_INET);
105     server_set_type(s,SOCK_STREAM);
106     server_set_backlog(s,DEF_SERVER_BACKLOG);
107     server_set_signal_handler(s,SIGINT,&_server_default_sigint_handler);
108     server_set_signal_handler(s,SIGTERM,&_server_default_sigterm_handler);
109     server_log_init(-1,-1,1);
110
111     return;
112 }
113
114 void server_set_name(server *s,char *name)
115 {
116     int slen;
117
118     slen=strlen(name)+1;
119     if (slen>MAX_SERVER_NAME_LEN)
120         fatal("Server name (%s) is too long: max is %d",
121                 name, MAX_SERVER_NAME_LEN);
122     strcpy(s->name,name);
123 }
124
125 void server_set_backlog(server *s,int backlog)
126 {
127     if (backlog>MAX_SERVER_BACKLOG)
128         fatal("Backlog value too high: max is %d.",
129                 MAX_SERVER_BACKLOG);
130     s->backlog=backlog;
131     return;
132 }
133
134 void server_set_tps(server *s,int tps)
135 {
136     if (tps>MAX_THREADS_PER_SOCKET)
137         fatal("Too many threads per socket: max is %d.",
138                 MAX_THREADS_PER_SOCKET);
139     s->threads_per_socket=tps;
140     return;
141 }
142
143 void servers_destroy(server *s, int nserver)
144 {
145     int i;
146
147     for (i=0; i<nserver; i++)
148         _server_destroy(s+i);
149 }
150
151 int server_add_hostname(server *s,char *hname)
152 {
153     int n,len;
154     char *shname;
155
156     n=s->hostnames_n;
157     if (n>= MAX_SERVER_HOSTNAMES)
158         fatal("Too many server hostnames: max is %d.",
159                 MAX_SERVER_HOSTNAMES);
160
161     len=strlen(hname);
162     s->hostnames[n]= (char*)w_malloc(len+1);
163     shname= s->hostnames[n];
164     strcpy(shname, hname);
165     s->hostnames_n++;
166     return 0;
167 }
168
169 void server_set_type(server *s, int type)
170 {
171     s->type= type;
172     return;
173 }
174    
175 void server_set_family(server *s, int family)
176 {
177     s->family= family;
178     return;
179 }
180
181
182 void server_set_port(server *s,uint16_t port)
183 {
184     s->port=port;
185     return;
186 }
187
188 void server_set_handler(server *s,
189         void (*f)(int,struct sockaddr*,socklen_t*))
190 {
191     s->handler=f;
192     return;
193 }
194
195
196 void servers_serve(server *servers,int nserver)
197 {
198     int i;
199     server *s;
200
201     for (i=0; i<nserver; i ++) {
202         s=servers+i;
203         _server_serve(s);
204     }
205
206     while (pause())
207         if (_servers_are_stopped())
208             break;
209
210     servers_destroy(servers, nserver);
211
212     return;
213 }
214
215 /*
216  * do setsockopt for each socket on server.
217  * Returns the sum of each return value of setsockopt,
218  * so, the number of error is the module of the
219  * returned value
220  */
221 int server_setsockopt(server *s,
222                       int level,
223                       int optname,
224                       const void *optval,
225                       socklen_t optlen)
226 {
227     int i,res=0;
228
229     for (i=0; i<s->sockets_n; i++)
230         res+=setsockopt(s->sockets[i], level, optname,
231                 optval, optlen);
232     return res;
233 }
234
235 void servers_stop(void)
236 {
237     w_pthread_mutex_lock(&_servers_must_die_mutex);
238     _servers_must_die=1;
239     w_pthread_mutex_unlock(&_servers_must_die_mutex);
240     return;
241 }
242
243 void servers_slow_stop(void)
244 {
245     /* Waiting threads end serving */
246     servers_stop();
247
248     w_pthread_mutex_lock(&_servers_working_threads_mutex);
249
250     /* If there are active connections, wait */
251     if (_servers_working_threads)
252         w_pthread_cond_wait_unlock(&_servers_working_threads_cond,
253                                    &_servers_working_threads_mutex);
254
255     /* Waiting for active connections, then stop. */
256     return;
257 }
258
259 /*
260  * Internal functions. Internal use.
261  */
262
263 ssh* _server_new_signal_handler(int signum, sigset_t *sa_mask, void(*f)(int))
264 {
265     ssh *sh;
266
267     sh=(ssh*)w_malloc(SSH_SIZE);
268
269     sh->signum= signum;
270     sh->f= f;
271     sh->next= 0;
272     w_sigemptyset(&sh->sa_mask);
273
274     if (sa_mask)
275         memcpy(&sh->sa_mask,sa_mask,sizeof(sigset_t));
276     else
277         w_sigaddset(&sh->sa_mask,signum);
278
279     return sh;
280 }
281
282 ssh* _server_has_signal_handler(server *s, int signum)
283 {
284     ssh* o;
285
286     o=s->signal_handlers;
287
288     while(o) {
289         if (o->signum==signum)
290             return o;
291         else o= o-> next;
292     }
293
294     return NULL;
295 }
296
297 int _servers_are_stopped()
298 {
299     int res;
300
301     w_pthread_mutex_lock(&_servers_must_die_mutex);
302     res=_servers_must_die;
303     w_pthread_mutex_unlock(&_servers_must_die_mutex);
304     return res;
305 }
306
307 void _server_destroy(server *s)
308 {
309     int n_hname,i;
310     ssh *sh,*temp;
311
312     n_hname=s->hostnames_n;
313
314     sh=s->signal_handlers;
315     while(sh) {
316         temp=sh->next;
317         w_free(sh);
318         sh=temp;
319     }
320
321     if (n_hname)
322         for (i=0;i<n_hname;i++)
323             w_free(s->hostnames[i]);
324     return;
325 }
326    
327 int _server_add_socket(server *s,int skt)
328 {
329     int n;
330
331     n=s->sockets_n;
332     if (n>= MAX_SERVER_SOCKETS)
333         fatal("Too many socket for one server. Max is %d.",
334                 MAX_SERVER_SOCKETS);
335
336     s->sockets[n]=skt;
337     s->sockets_n++;
338     return 0;
339 }
340 int _server_prepare_bind_sockets(server *s)
341 {
342     char *hname,strport[6];
343     int i,res,n_hname,count,skt,family;
344     struct addrinfo iaddr,*riaddr,*temp;
345
346     count=0;
347     n_hname= s->hostnames_n;
348     memset(&iaddr,0,sizeof(iaddr));
349
350     family=s->family;
351     iaddr.ai_family=family;
352     iaddr.ai_socktype=s->type;
353     snprintf(strport,6,"%d",s->port);
354
355     if (n_hname) {
356         for (i=0; i<n_hname; i++) {
357             hname= s->hostnames[i];
358            
359             res=getaddrinfo(hname,strport,&iaddr,&riaddr);
360             if (res)
361                 continue;
362
363             for (temp=riaddr; temp; temp=temp->ai_next) {
364                 skt=w_socket(temp->ai_family,temp->ai_socktype,0,0);
365                 if (skt<0)
366                     continue;
367
368                 //XXX utilizzare server_setsockopt()
369                 w_setsockopt_reuseaddr(skt);
370
371                 res= bind(skt,temp->ai_addr,temp->ai_addrlen);
372                 if (res==-1) {
373                     close(skt);
374                     continue;
375                 }
376
377                 _server_add_socket(s,skt);
378                 count++;
379                 //break;
380             }
381             freeaddrinfo(riaddr);
382         }
383     } else {
384
385         iaddr.ai_flags |= AI_PASSIVE;
386
387         res=getaddrinfo(0,strport,&iaddr,&riaddr);
388         if (res) fatal("Fatal getaddrinfo: %s.",gai_strerror(errno));
389
390         for (temp=riaddr; temp; temp=temp->ai_next) {
391
392             skt=w_socket(family,s->type,0,0);
393             if (skt<0)
394                 continue;
395
396             //XXX utilizzare server_setsockopt()
397             w_setsockopt_reuseaddr(skt);
398
399             res= bind(skt,temp->ai_addr,temp->ai_addrlen);
400             if (res==-1) {
401                 close(skt);
402                 continue;
403             }
404
405             _server_add_socket(s,skt);
406             count++;
407             break;
408         }
409     }
410     return count;
411 }
412
413 void _server_listen(server *s)
414 {
415     int i,b;
416     int *skts;
417
418     /* if SOCK_DGRAM, no listen. */
419     if (s->type==SOCK_DGRAM)
420         return;
421
422     skts=s->sockets;
423     b=s->backlog;
424
425     for (i=0; i<s->sockets_n; i++)
426         w_listen(skts[i],b);
427
428     return;
429 }
430
431 void _servers_thread_begin_work(void)
432 {
433     w_pthread_mutex_lock(&_servers_working_threads_mutex);
434     _servers_working_threads++;
435     w_pthread_mutex_unlock(&_servers_working_threads_mutex);
436     return;
437 }
438
439 void _server_thread_ends_work(void)
440 {
441     w_pthread_mutex_lock(&_servers_working_threads_mutex);
442
443     _servers_working_threads--;
444     if (!_servers_working_threads)
445         w_pthread_cond_signal(&_servers_working_threads_cond);
446
447     w_pthread_mutex_unlock(&_servers_working_threads_mutex);
448     return;
449 }
450
451 int _server_thread_begins(pmt *m)
452 {
453     w_pthread_mutex_lock(m);
454
455     if (_servers_are_stopped())
456         return 0;
457
458    return 1;
459 }
460
461 void* _server_accept_thread(void *arg)
462 {
463     server *s;
464     int thread_skt,client_skt,type,res,skt_pos;
465     struct sockaddr sa;
466     socklen_t salen;
467     pmt *skt_mutex,*proc_sock;
468     pct  *c;
469     struct pollfd p;
470     void (*f)(int,struct sockaddr*,socklen_t*);
471
472     s=(server*)arg;
473     c=&(s->threads_counter_cond);
474     proc_sock=&(s->processing_socket_mutex);
475     salen= sizeof(struct sockaddr);
476
477     /*
478      * Getting my socket, decrementing the threads counter.
479      * If i'm the last thread for my socket, signal
480      * the condition to server_serve()
481      */
482     w_pthread_mutex_lock(proc_sock);
483
484     skt_pos= s->processing_socket;
485
486     s->threads_counter--;
487     if (!(s->threads_counter)) {
488         w_pthread_mutex_unlock(proc_sock);
489         w_pthread_cond_signal(c);
490     } else
491         w_pthread_mutex_unlock(proc_sock);
492
493     /* Now it's time to process data */
494     thread_skt= s->sockets[skt_pos];
495     skt_mutex= s->sockets_mutex + skt_pos;
496     f= s->handler;
497     type= s->type;
498
499     if (type==SOCK_DGRAM) {
500         p.events=POLLIN;
501         p.fd=thread_skt;
502     }
503
504     /* Wait my signal to start */
505     w_pthread_mutex_lock_unlock(&(s->start));
506
507     switch (type) {
508             case SOCK_STREAM:
509                 for (;;) {
510
511                     if (!_server_thread_begins(skt_mutex))
512                         break;
513
514                     client_skt= w_accept(thread_skt,&sa,&salen);
515                     w_pthread_mutex_unlock(skt_mutex);
516    
517                     _servers_thread_begin_work();
518
519                     if (client_skt!=-1) {
520                         f(client_skt,&sa,&salen);
521                         close(client_skt);
522                     }
523
524                     _server_thread_ends_work();
525                 }
526             case SOCK_DGRAM:
527                 for (;;) {
528
529                     if (!_server_thread_begins(skt_mutex))
530                         break;
531
532                     res= w_poll(&p,1,-1);
533                     w_pthread_mutex_unlock(skt_mutex);
534
535                     _servers_thread_begin_work();
536
537                     if (p.revents & POLLIN) {
538                         f(thread_skt,0,0);
539                     }
540
541                     _server_thread_ends_work();
542                 }
543             default:
544                 fatal("Socket type not supported.");
545                 }
546
547     /* Servers are stopped. */
548     w_pthread_mutex_unlock(skt_mutex);
549     return NULL;
550 }
551
552 void _server_init_signals(server *s)
553 {
554     ssh *sh;
555
556     sh=s->signal_handlers;
557
558     for (sh=s->signal_handlers; sh; sh= sh->next)
559         w_sigaction(sh->signum, &sh->sa_mask, sh->f);
560
561     return;
562 }
563
564
565 void _server_init_mutexes(server *s)
566 {
567     int n_socks,i;
568     pmt *t;
569
570     n_socks=s->sockets_n;
571
572     t=s->sockets_mutex;
573     for (i=0; i<n_socks; i++)
574         w_pthread_mutex_init(t+i,0);
575
576     w_pthread_mutex_init(&(s->processing_socket_mutex),0);
577     w_pthread_mutex_init(&(s->threads_counter_mutex),0);
578     w_pthread_mutex_init(&(s->start),0);
579     w_pthread_cond_init(&(s->threads_counter_cond),0);
580     return;
581 }
582
583 void _server_default_sigint_handler(int signum)
584 {
585     servers_stop();
586     return;
587 }
588
589 void _server_default_sigterm_handler(int signum)
590 {
591     servers_slow_stop();
592     return;
593 }
594
595 void _server_serve(server *s)
596 {
597     int n_socks,i,j,tps;
598     pthread_t  t;
599     pthread_attr_t ta;
600     pmt *m,*start;
601     pct  *c;
602
603     /* Init services */
604     _server_prepare_bind_sockets(s);
605     _server_listen(s);
606
607     n_socks=s->sockets_n;
608     if (!n_socks)
609         fatal("No socket available. Exit.");
610
611     m=&(s->threads_counter_mutex);
612     c=&(s->threads_counter_cond);
613     tps=s->threads_per_socket;
614     start=&(s->start);
615
616     w_pthread_attr_init(&ta);
617     w_pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
618
619     _server_init_mutexes(s);
620     _server_init_signals(s);
621
622     /* Threads: wait my signal before serving ... */
623     w_pthread_mutex_lock(start);
624
625     for(i=0; i<n_socks; i++) {
626         w_pthread_mutex_lock(m);
627
628         s->processing_socket=i;
629         s->threads_counter=tps;
630
631         for (j=0; j<tps; j++)
632             w_pthread_create(&t,&ta,&_server_accept_thread,(void*)s);
633
634         /* Wait the cascade ... */
635         w_pthread_cond_wait_unlock(c,m);
636     }
637
638     w_pthread_attr_destroy(&ta);
639
640     /* Go my children, go. */
641     w_pthread_mutex_unlock(start);
642     return;
643 }
644
Note: See TracBrowser for help on using the browser.