- also call closehandler when exiting an internal job
[opensuse:build-service.git] / src / backend / BSServerEvents.pm
1 #
2 # Copyright (c) 2006, 2007 Michael Schroeder, Novell Inc.
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License version 2 as
6 # published by the Free Software Foundation.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program (see the file COPYING); if not, write to the
15 # Free Software Foundation, Inc.,
16 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17 #
18 ################################################################
19 #
20 # Event based HTTP Server. Only supports GET requests.
21 #
22
23 package BSServerEvents;
24
25 use POSIX;
26 use Socket;
27 use Fcntl qw(:DEFAULT);
28 use Symbol;
29 use BSEvents;
30 use Data::Dumper;
31
32 use strict;
33
34 our $gev;       # our event
35
36 sub gethead {
37   # parses http header and fills hash
38   # $h: reference to the hash to be filled
39   # $t: http header as string
40   my ($h, $t) = @_; 
41
42   my ($field, $data);
43   for (split(/[\r\n]+/, $t)) {
44     next if $_ eq ''; 
45     if (/^[ \t]/) {
46       next unless defined $field;
47       s/^\s*/ /;
48       $h->{$field} .= $_; 
49     } else {
50       ($field, $data) = split(/\s*:\s*/, $_, 2); 
51       $field =~ tr/A-Z/a-z/;
52       if ($h->{$field} && $h->{$field} ne '') {
53         $h->{$field} = $h->{$field}.','.$data;
54       } else {
55         $h->{$field} = $data;
56       }   
57     }   
58   }
59 }
60
61
62 sub replstream_timeout {
63   my ($ev) = @_;
64   print "replstream timeout for $ev->{'peer'}\n";
65   stream_close($ev->{'readev'}, $ev);
66 }
67
68 sub replrequest_timeout {
69   my ($ev) = @_;
70   print "replrequest timeout for $ev->{'peer'}\n";
71   $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
72   close($ev->{'fd'});
73   close($ev->{'nfd'}) if $ev->{'nfd'};
74   delete $ev->{'fd'};
75   delete $ev->{'nfd'};
76 }
77
78 sub replrequest_write {
79   my ($ev) = @_;
80   my $l = length($ev->{'replbuf'});
81   return unless $l;
82   $l = 4096 if $l > 4096;
83   my $r = syswrite($ev->{'fd'}, $ev->{'replbuf'}, $l);
84   if (!defined($r)) {
85     if ($! == POSIX::EINTR || $! == POSIX::EWOULDBLOCK) {
86       BSEvents::add($ev);
87       return;
88     }
89     print "write error for $ev->{'peer'}: $!\n";
90     $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
91     close($ev->{'fd'});
92     close($ev->{'nfd'}) if $ev->{'nfd'};
93     return;
94   }
95   if ($r == length($ev->{'replbuf'})) {
96     #print "done for $ev->{'peer'}\n";
97     $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
98     close($ev->{'fd'});
99     close($ev->{'nfd'}) if $ev->{'nfd'};
100     return;
101   }
102   $ev->{'replbuf'} = substr($ev->{'replbuf'}, $r) if $r;
103   BSEvents::add($ev);
104   return;
105 }
106
107 sub reply {
108   my ($str, @hi) = @_;
109   my $ev = $gev;
110   # print "reply to event #$ev->{'id'}\n";
111   if (!exists($ev->{'fd'})) {
112     $ev->{'handler'}->($ev) if $ev->{'handler'};
113     $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
114     print "$str\n" if defined($str) && $str ne '';
115     return;
116   }
117   if ($ev->{'streaming'}) {
118     # already in progress, can not do much here...
119     $ev->{'replbuf'} .= "\n\n$str" if defined $str;
120     $ev->{'type'} = 'write';
121     $ev->{'handler'} = \&replrequest_write;
122     $ev->{'timeouthandler'} = \&replrequest_timeout;
123     BSEvents::add($ev, $ev->{'conf'}->{'replrequest_timeout'});
124     return;
125   }
126   if (@hi && $hi[0] =~ /^status: (\d+.*)/i) {
127     my $msg = $1;
128     $msg =~ s/:/ /g;
129     $hi[0] = "HTTP/1.1 $msg";
130   } else {
131     unshift @hi, "HTTP/1.1 200 OK";
132   }
133   push @hi, "Cache-Control: no-cache";
134   push @hi, "Connection: close";
135   push @hi, "Content-Length: ".length($str) if defined $str;
136   my $data = join("\r\n", @hi)."\r\n\r\n";
137   $data .= $str if defined $str;
138   my $dummy = '';
139   sysread($ev->{'fd'}, $dummy, 1024, 0);        # clear extra input
140   $ev->{'replbuf'} = $data;
141   $ev->{'type'} = 'write';
142   $ev->{'handler'} = \&replrequest_write;
143   $ev->{'timeouthandler'} = \&replrequest_timeout;
144   BSEvents::add($ev, $ev->{'conf'}->{'replrequest_timeout'});
145 }
146
147 sub reply_error  {
148   my ($conf, $err) = @_;
149   $err ||= "unspecified error";
150   $err =~ s/\n$//s;
151   my $code = 400;
152   my $tag = ''; 
153   if ($err =~ /^(\d+)\s*([^\r\n]*)/) {
154     $code = $1; 
155     $tag = $2; 
156   } elsif ($err =~ /^([^\r\n]+)/) {
157     $tag = $1; 
158   } else {
159     $tag = 'Error';
160   }
161   if ($conf && $conf->{'errorreply'}) {
162     $conf->{'errorreply'}->($err, $code, $tag);
163   } else {
164     reply("$err\n", "Status: $code $tag", 'Content-Type: text/plain');
165   }
166 }
167
168 sub reply_stream {
169   my ($rev, @args) = @_;
170   push @args, 'Transfer-Encoding: chunked';
171   unshift @args, 'Content-Type: application/octet-stream' unless grep {/^content-type:/i} @args;
172   reply(undef, @args);
173   my $ev = $gev;
174   BSEvents::rem($ev);
175   #print "reply_stream $rev -> $ev\n";
176   $ev->{'readev'} = $rev;
177   $ev->{'handler'} = \&stream_write_handler;
178   $ev->{'timeouthandler'} = \&replstream_timeout;
179   $ev->{'streaming'} = 1;
180   $rev->{'writeev'} = $ev;
181   $rev->{'handler'} = \&stream_read_handler unless $rev->{'handler'};
182   BSEvents::add($ev, 0);
183   BSEvents::add($rev);  # do this last (because of "always" type)
184 }
185
186 sub reply_file {
187   my ($filename, @args) = @_;
188   my $fd = $filename;
189   if (!ref($fd)) {
190     $fd = gensym;
191     open($fd, '<', $filename) || die("$filename: $!\n");
192   }
193   my $rev = BSEvents::new('always');
194   $rev->{'fd'} = $fd;
195   $rev->{'makechunks'} = 1;
196   reply_stream($rev, @args);
197 }
198
199 sub makecpiohead {
200   my ($name, $s) = @_;
201   return "07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000b00000000TRAILER!!!\0\0\0\0" if !$s;
202   my $h = "07070100000000000081a4000000000000000000000001";
203   $h .= sprintf("%08x%08x", $s->[9], $s->[7]);
204   $h .= "00000000000000000000000000000000";
205   $h .= sprintf("%08x", length($name) + 1);
206   $h .= "00000000$name\0";
207   $h .= substr("\0\0\0\0", (length($h) & 3)) if length($h) & 3;
208   my $pad = '';
209   $pad = substr("\0\0\0\0", ($s->[7] & 3)) if $s->[7] & 3;
210   return ($h, $pad);
211 }
212
213 sub cpio_nextfile {
214   my ($ev) = @_;
215
216   my $data = '';
217   while(1) {
218     #print "cpio_nextfile\n";
219     $data .= $ev->{'filespad'} if defined $ev->{'filespad'};
220     delete $ev->{'filespad'};
221     my $files = $ev->{'files'};
222     my $filesno = defined($ev->{'filesno'}) ? $ev->{'filesno'} + 1 : 0;
223     my $file;
224     if ($filesno >= @$files) {
225       if ($ev->{'cpioerrors'} ne '') {
226         $file = {'data' => $ev->{'cpioerrors'}, 'name' => '.errors'};
227         $ev->{'cpioerrors'} = '';
228       } else {
229         $data .= makecpiohead();
230         return $data;
231       }
232     } else {
233       $ev->{'filesno'} = $filesno;
234       $file = $files->[$filesno];
235     }
236     if ($file->{'error'}) {
237       $ev->{'cpioerrors'} .= "$file->{'name'}: $file->{'error'}\n";
238       next;
239     }
240     my @s;
241     if (exists $file->{'filename'}) {
242       my $fd = $file->{'filename'};
243       if (!ref($fd)) {
244         $fd = gensym;
245         if (!open($fd, '<', $file->{'filename'})) {
246           $ev->{'cpioerrors'} .= "$file->{'name'}: $!\n";
247           next;
248         }
249       }
250       $ev->{'fd'} = $fd;
251       @s = stat($ev->{'fd'});
252     } else {
253       $s[7] = length($file->{'data'});
254       $s[9] = time();
255     }
256     my ($header, $pad) = makecpiohead($file->{'name'}, \@s);
257     $data .= $header;
258     $ev->{'filespad'} = $pad;
259     if (!exists $file->{'filename'}) {
260       $data .= $file->{'data'};
261       next;
262     }
263     return $data;
264   }
265 }
266
267 sub reply_cpio {
268   my ($files, @args) = @_;
269   my $rev = BSEvents::new('always');
270   $rev->{'files'} = $files;
271   $rev->{'cpioerrors'} = '';
272   $rev->{'makechunks'} = 1;
273   $rev->{'eofhandler'} = \&cpio_nextfile;
274   unshift @args, 'Content-Type: application/x-cpio';
275   reply_stream($rev, @args);
276 }
277
278 sub getrequest_timeout {
279   my ($ev) = @_;
280   print "getrequest timeout for $ev->{'peer'}\n";
281   $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
282   close($ev->{'fd'});
283   close($ev->{'nfd'}) if $ev->{'nfd'};
284 }
285
286 sub getrequest {
287   my ($ev) = @_;
288   my $buf;
289   local $gev = $ev;
290
291   eval {
292     $ev->{'reqbuf'} = '' unless exists $ev->{'reqbuf'};
293     my $r;
294     if ($ev->{'reqbuf'} eq '' && exists $ev->{'conf'}->{'getrequest_recvfd'}) {
295       my $newfd = gensym;
296       $r = $ev->{'conf'}->{'getrequest_recvfd'}->($ev->{'fd'}, $newfd, 1024);
297       if (defined($r)) {
298         if (-c $newfd) {
299           close $newfd; # /dev/null case, no handoff requested
300         } else {
301           $ev->{'nfd'} = $newfd;
302         }
303         $ev->{'reqbuf'} = $r;
304         $r = length($r);
305       }
306     } else {
307       $r = sysread($ev->{'fd'}, $ev->{'reqbuf'}, 1024, length($ev->{'reqbuf'}));
308     }
309     if (!defined($r)) {
310       if ($! == POSIX::EINTR || $! == POSIX::EWOULDBLOCK) {
311         BSEvents::add($ev);
312         return;
313       }
314       print "read error for $ev->{'peer'}: $!\n";
315       $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
316       close($ev->{'fd'});
317       close($ev->{'nfd'}) if $ev->{'nfd'};
318       return;
319     }
320     if (!$r) {
321       print "EOF for $ev->{'peer'}\n";
322       $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
323       close($ev->{'fd'});
324       close($ev->{'nfd'}) if $ev->{'nfd'};
325       return;
326     }
327     if ($ev->{'reqbuf'} !~ /^(.*?)\r?\n/s) {
328       BSEvents::add($ev);
329       return;
330     }
331     my ($act, $path, $vers, undef) = split(' ', $1, 4);
332     die("400 No method name\n") if !$act;
333     my $headers = {};
334     if ($vers) {
335       die("501 Bad method: $act\n") if $act ne 'GET';
336       if ($ev->{'reqbuf'} !~ /^(.*?)\r?\n\r?\n(.*)$/s) {
337         BSEvents::add($ev);
338         return;
339       }
340       gethead($headers, "Request: $1");
341     } elsif ($act ne 'get') {
342       die("501 Bad method, must be GET\n") if $act ne 'GET';
343     }
344     my $query_string = '';
345     if ($path =~ /^(.*?)\?(.*)$/) {
346       $path = $1;
347       $query_string = $2;
348     }
349     $path =~ s/%([a-fA-F0-9]{2})/chr(hex($1))/ge;
350     die("501 invalid path\n") unless $path =~ /^\//;
351     my $req = {'action' => $act, 'path' => $path, 'query' => $query_string, 'headers' => $headers};
352     $ev->{'request'} = $req;
353     my $conf = $ev->{'conf'};
354     $conf->{'dispatch'}->($conf, $req);
355   };
356   reply_error($ev->{'conf'}, $@) if $@;
357 }
358
359 sub newconnect {
360   my ($ev) = @_;
361   #print "newconnect!\n";
362   BSEvents::add($ev);
363   my $newfd = gensym;
364   my $peeraddr = accept($newfd, *{$ev->{'fd'}});
365   return unless $peeraddr;
366   fcntl($newfd, F_SETFL, O_NONBLOCK);
367   my $peer = 'unknown';
368   my $peerport;
369   eval {
370     my $peera;
371     ($peerport, $peera) = sockaddr_in($peeraddr);
372     $peer = inet_ntoa($peera);
373   };
374   my $cev = BSEvents::new('read', \&getrequest);
375   $cev->{'fd'} = $newfd;
376   $cev->{'peer'} = $peer;
377   $cev->{'peerport'} = $peerport if $peerport;
378   $cev->{'timeouthandler'} = \&getrequest_timeout;
379   $cev->{'conf'} = $ev->{'conf'};
380   if ($cev->{'conf'}->{'setkeepalive'}) {
381     setsockopt($cev->{'fd'}, SOL_SOCKET, SO_KEEPALIVE, pack("l",1));
382   }
383   BSEvents::add($cev, $ev->{'conf'}->{'getrequest_timeout'});
384 }
385
386 sub cloneconnect {
387   my (@reply) = @_;
388   my $ev = $gev;
389   return $ev unless exists $ev->{'nfd'};
390   fcntl($ev->{'nfd'}, F_SETFL, O_NONBLOCK);
391   my $nev = BSEvents::new('read', $ev->{'handler'});
392   $nev->{'fd'} = $ev->{'nfd'};
393   delete $ev->{'nfd'};
394   $nev->{'conf'} = $ev->{'conf'};
395   $nev->{'request'} = $ev->{'request'};
396   my $peer = 'unknown';
397   eval {
398     my $peername = getpeername($nev->{'fd'});
399     if ($peername) {
400       my ($peerport, $peera) = sockaddr_in($peername);
401       $peer = inet_ntoa($peera);
402     }
403   };
404   $nev->{'peer'} = $peer;
405   BSServerEvents::reply(@reply) if @reply;
406   $gev = $nev;  # switch to new event
407   if ($nev->{'conf'}->{'setkeepalive'}) {
408     setsockopt($nev->{'fd'}, SOL_SOCKET, SO_KEEPALIVE, pack("l",1));
409   }
410   return $nev;
411 }
412
413 sub stream_close {
414   my ($ev, $wev) = @_;
415   if ($ev) {
416     BSEvents::rem($ev) if $ev->{'fd'} && !$ev->{'paused'};
417     $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
418     close $ev->{'fd'} if $ev->{'fd'};
419     delete $ev->{'fd'};
420     delete $ev->{'writeev'};
421   }
422   if ($wev) {
423     BSEvents::rem($wev) if $wev->{'fd'} && !$wev->{'paused'};
424     $wev->{'closehandler'}->($wev) if $wev->{'closehandler'};
425     close $wev->{'fd'} if $wev->{'fd'};
426     delete $wev->{'fd'};
427     delete $wev->{'readev'};
428   }
429 }
430
431 #
432 # read from a file descriptor (socket or file)
433 # - convert to chunks if 'makechunks'
434 # - put data into write event
435 # - do flow control
436 #
437
438 sub stream_read_handler {
439   my ($ev) = @_;
440   #print "stream_read_handler $ev\n";
441   my $wev = $ev->{'writeev'};
442   $wev->{'replbuf'} = '' unless exists $wev->{'replbuf'};
443   my $r;
444   if ($ev->{'fd'}) {
445     if ($ev->{'makechunks'}) {
446       my $b = '';
447       $r = sysread($ev->{'fd'}, $b, 4096);
448       $wev->{'replbuf'} .= sprintf("%X\r\n", length($b)).$b."\r\n" if $r;
449     } else {
450       $r = sysread($ev->{'fd'}, $wev->{'replbuf'}, 4096, length($wev->{'replbuf'}));
451     }
452     if (!defined($r)) {
453       if ($! == POSIX::EINTR || $! == POSIX::EWOULDBLOCK) {
454         BSEvents::add($ev);
455         return;
456       }
457       print "stream_read_handler: $!\n";
458       # can't do much here, fallthrough in EOF code
459     }
460   }
461   if (!$r) {
462 #    print "stream_read_handler: EOF\n";
463     if ($ev->{'eofhandler'}) {
464       close $ev->{'fd'} if $ev->{'fd'};
465       delete $ev->{'fd'};
466       my $data = $ev->{'eofhandler'}->($ev);
467       if (defined($data) && $data ne '') {
468         if ($ev->{'makechunks'}) {
469           # keep those chunks small, otherwise our receiver will choke
470           while (length($data) > 4096) {
471             my $d = substr($data, 0, 4096);
472             $wev->{'replbuf'} .= sprintf("%X\r\n", length($d)).$d."\r\n";
473             $data = substr($data, 4096);
474           }
475           $wev->{'replbuf'} .= sprintf("%X\r\n", length($data)).$data."\r\n";
476         } else {
477           $wev->{'replbuf'} .= $data;
478         }
479       }
480       if ($ev->{'fd'}) {
481         stream_read_handler($ev);       # redo with new fd
482         return;
483       }
484     }
485     $wev->{'replbuf'} .= "0\r\n\r\n" if $ev->{'makechunks'};
486     $ev->{'eof'} = 1;
487     $ev->{'closehandler'}->($ev) if $ev->{'closehandler'};
488     close $ev->{'fd'} if $ev->{'fd'};
489     delete $ev->{'fd'};
490     if ($wev && $wev->{'paused'}) {
491       if (length($wev->{'replbuf'})) {
492         delete $wev->{'paused'};
493         BSEvents::add($wev)
494       } else {
495         stream_close($ev, $wev);
496       }
497     }
498     return;
499   }
500   if ($wev->{'paused'}) {
501     delete $wev->{'paused'};
502     BSEvents::add($wev);
503     # check if add() killed us
504     return unless $ev->{'fd'};
505   }
506   if (length($wev->{'replbuf'}) >= 16384) {
507     #print "write buffer too full, throttle\n";
508     $ev->{'paused'} = 1;
509   }
510   BSEvents::add($ev) unless $ev->{'paused'};
511 }
512
513 #
514 # write to a file descriptor (socket)
515 # - do flow control
516 #
517
518 sub stream_write_handler {
519   my ($ev) = @_;
520   my $rev = $ev->{'readev'};
521   #print "stream_write_handler $ev (rev=$rev)\n";
522   my $l = length($ev->{'replbuf'});
523   return unless $l;
524   $l = 4096 if $l > 4096;
525   my $r = syswrite($ev->{'fd'}, $ev->{'replbuf'}, $l);
526   if (!defined($r)) {
527     if ($! == POSIX::EINTR || $! == POSIX::EWOULDBLOCK) {
528       BSEvents::add($ev);
529       return;
530     }
531     print "stream_write_handler: $!\n";
532     $ev->{'paused'} = 1;
533     # support multiple writers
534     if ($rev->{'writeev'} != $ev) {
535       # leave reader open
536       print "reader stays open\n";
537       stream_close(undef, $ev);
538     } else {
539       stream_close($rev, $ev);
540     }
541     return;
542   }
543   $ev->{'replbuf'} = substr($ev->{'replbuf'}, $r) if $r;
544   if ($rev->{'paused'} && length($ev->{'replbuf'}) <= 8192) {
545     delete $rev->{'paused'};
546     BSEvents::add($rev);
547     if ($rev->{'writeev'} != $ev) {
548       my $wev = $rev->{'writeev'};
549       if ($wev->{'paused'} && length($wev->{'replbuf'})) {
550         #print "pushing old data\n";
551         delete $wev->{'paused'};
552         BSEvents::add($wev);
553       }
554     }
555   }
556   if (length($ev->{'replbuf'})) {
557     BSEvents::add($ev);
558   } else {
559     $ev->{'paused'} = 1;
560     stream_close($rev, $ev) if $rev->{'eof'};
561   }
562 }
563
564 sub periodic_handler {
565   my ($ev) = @_;
566   my $conf = $ev->{'conf'};
567   return unless $conf->{'periodic'};
568   $conf->{'periodic'}->($conf);
569   BSEvents::add($ev, $conf->{'periodic_interval'} || 3) if $conf->{'periodic'};
570 }
571
572 sub addserver {
573   my ($fd, $conf) = @_;
574   my $sockev = BSEvents::new('read', \&newconnect);
575   $sockev->{'fd'} = $fd;
576   $sockev->{'conf'} = $conf;
577   BSEvents::add($sockev);
578   if ($conf->{'periodic'}) {
579     my $per_ev = BSEvents::new('timeout', \&periodic_handler);
580     $per_ev->{'conf'} = $conf;
581     BSEvents::add($per_ev, $conf->{'periodic_interval'} || 3);
582   }
583   return $sockev;
584 }
585
586 1;