Add a little more protocol documentation.
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2001 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 /*
22   socket and pipe IO utilities used in rsync 
23
24   tridge, June 1996
25   */
26 #include "rsync.h"
27
28 /* if no timeout is specified then use a 60 second select timeout */
29 #define SELECT_TIMEOUT 60
30
31 extern int bwlimit;
32
33 static int io_multiplexing_out;
34 static int io_multiplexing_in;
35 static int multiplex_in_fd;
36 static int multiplex_out_fd;
37 static time_t last_io;
38 static int eof_error=1;
39 extern int verbose;
40 extern int io_timeout;
41 extern struct stats stats;
42
43 static int io_error_fd = -1;
44
45 static void read_loop(int fd, char *buf, int len);
46
47 static void check_timeout(void)
48 {
49         extern int am_server, am_daemon;
50         time_t t;
51
52         err_list_push();
53         
54         if (!io_timeout) return;
55
56         if (!last_io) {
57                 last_io = time(NULL);
58                 return;
59         }
60
61         t = time(NULL);
62
63         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
64                 if (!am_server && !am_daemon) {
65                         rprintf(FERROR,"io timeout after %d seconds - exiting\n", 
66                                 (int)(t-last_io));
67                 }
68                 exit_cleanup(RERR_TIMEOUT);
69         }
70 }
71
72 /* setup the fd used to propogate errors */
73 void io_set_error_fd(int fd)
74 {
75         io_error_fd = fd;
76 }
77
78 /* read some data from the error fd and write it to the write log code */
79 static void read_error_fd(void)
80 {
81         char buf[200];
82         int n;
83         int fd = io_error_fd;
84         int tag, len;
85
86         /* io_error_fd is temporarily disabled -- is this meant to
87          * prevent indefinite recursion? */
88         io_error_fd = -1;
89
90         read_loop(fd, buf, 4);
91         tag = IVAL(buf, 0);
92
93         len = tag & 0xFFFFFF;
94         tag = tag >> 24;
95         tag -= MPLEX_BASE;
96
97         while (len) {
98                 n = len;
99                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
100                 read_loop(fd, buf, n);
101                 rwrite((enum logcode)tag, buf, n);
102                 len -= n;
103         }
104
105         io_error_fd = fd;
106 }
107
108
109 static int no_flush;
110
111 /*
112  * Read from a socket with IO timeout. return the number of bytes
113  * read. If no bytes can be read then exit, never return a number <= 0.
114  *
115  * TODO: If the remote shell connection fails, then current versions
116  * actually report an "unexpected EOF" error here.  Since it's a
117  * fairly common mistake to try to use rsh when ssh is required, we
118  * should trap that: if we fail to read any data at all, we should
119  * give a better explanation.  We can tell whether the connection has
120  * started by looking e.g. at whether the remote version is known yet.
121  */
122 static int read_timeout(int fd, char *buf, int len)
123 {
124         int n, ret=0;
125
126         io_flush();
127
128         while (ret == 0) {
129                 fd_set fds;
130                 struct timeval tv;
131                 int fd_count = fd+1;
132
133                 FD_ZERO(&fds);
134                 FD_SET(fd, &fds);
135                 if (io_error_fd != -1) {
136                         FD_SET(io_error_fd, &fds);
137                         if (io_error_fd > fd) fd_count = io_error_fd+1;
138                 }
139
140                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
141                 tv.tv_usec = 0;
142
143                 errno = 0;
144
145                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
146                         if (errno == EBADF) {
147                                 exit_cleanup(RERR_SOCKETIO);
148                         }
149                         check_timeout();
150                         continue;
151                 }
152
153                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
154                         read_error_fd();
155                 }
156
157                 if (!FD_ISSET(fd, &fds)) continue;
158
159                 n = read(fd, buf, len);
160
161                 if (n > 0) {
162                         buf += n;
163                         len -= n;
164                         ret += n;
165                         if (io_timeout)
166                                 last_io = time(NULL);
167                         continue;
168                 }
169
170                 if (n == -1 && errno == EINTR) {
171                         continue;
172                 }
173
174                 if (n == -1 && 
175                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
176                         continue;
177                 }
178
179
180                 if (n == 0) {
181                         if (eof_error) {
182                                 rprintf(FERROR,
183                                         "%s: connection to server unexpectedly closed"
184                                         " (%.0f bytes read so far)\n",
185                                         RSYNC_NAME, (double)stats.total_read);
186                         }
187                         exit_cleanup(RERR_STREAMIO);
188                 }
189
190                 /* this prevents us trying to write errors on a dead socket */
191                 io_multiplexing_close();
192
193                 rprintf(FERROR,"read error: %s\n", strerror(errno));
194                 exit_cleanup(RERR_STREAMIO);
195         }
196
197         return ret;
198 }
199
200 /* continue trying to read len bytes - don't return until len
201    has been read */
202 static void read_loop(int fd, char *buf, int len)
203 {
204         while (len) {
205                 int n = read_timeout(fd, buf, len);
206
207                 buf += n;
208                 len -= n;
209         }
210 }
211
212 /* read from the file descriptor handling multiplexing - 
213    return number of bytes read
214    never return <= 0 */
215 static int read_unbuffered(int fd, char *buf, int len)
216 {
217         static int remaining;
218         int tag, ret=0;
219         char line[1024];
220
221         if (!io_multiplexing_in || fd != multiplex_in_fd) 
222                 return read_timeout(fd, buf, len);
223
224         while (ret == 0) {
225                 if (remaining) {
226                         len = MIN(len, remaining);
227                         read_loop(fd, buf, len);
228                         remaining -= len;
229                         ret = len;
230                         continue;
231                 }
232
233                 read_loop(fd, line, 4);
234                 tag = IVAL(line, 0);
235
236                 remaining = tag & 0xFFFFFF;
237                 tag = tag >> 24;
238
239                 if (tag == MPLEX_BASE) continue;
240
241                 tag -= MPLEX_BASE;
242
243                 if (tag != FERROR && tag != FINFO) {
244                         rprintf(FERROR,"unexpected tag %d\n", tag);
245                         exit_cleanup(RERR_STREAMIO);
246                 }
247
248                 if (remaining > sizeof(line)-1) {
249                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
250                                 remaining);
251                         exit_cleanup(RERR_STREAMIO);
252                 }
253
254                 read_loop(fd, line, remaining);
255                 line[remaining] = 0;
256
257                 rprintf((enum logcode)tag,"%s", line);
258                 remaining = 0;
259         }
260
261         return ret;
262 }
263
264
265 /* do a buffered read from fd. don't return until all N bytes
266    have been read. If all N can't be read then exit with an error */
267 static void readfd(int fd,char *buffer,int N)
268 {
269         int  ret;
270         int total=0;  
271         
272         while (total < N) {
273                 io_flush();
274
275                 ret = read_unbuffered(fd,buffer + total,N-total);
276                 total += ret;
277         }
278
279         stats.total_read += total;
280 }
281
282
283 int32 read_int(int f)
284 {
285         char b[4];
286         int32 ret;
287
288         readfd(f,b,4);
289         ret = IVAL(b,0);
290         if (ret == (int32)0xffffffff) return -1;
291         return ret;
292 }
293
294 int64 read_longint(int f)
295 {
296         extern int remote_version;
297         int64 ret;
298         char b[8];
299         ret = read_int(f);
300
301         if ((int32)ret != (int32)0xffffffff) {
302                 return ret;
303         }
304
305 #ifdef NO_INT64
306         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
307         exit_cleanup(RERR_UNSUPPORTED);
308 #else
309         if (remote_version >= 16) {
310                 readfd(f,b,8);
311                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
312         }
313 #endif
314
315         return ret;
316 }
317
318 void read_buf(int f,char *buf,int len)
319 {
320         readfd(f,buf,len);
321 }
322
323 void read_sbuf(int f,char *buf,int len)
324 {
325         read_buf(f,buf,len);
326         buf[len] = 0;
327 }
328
329 unsigned char read_byte(int f)
330 {
331         unsigned char c;
332         read_buf(f,(char *)&c,1);
333         return c;
334 }
335
336 /* write len bytes to fd */
337 static void writefd_unbuffered(int fd,char *buf,int len)
338 {
339         int total = 0;
340         fd_set w_fds, r_fds;
341         int fd_count, count;
342         struct timeval tv;
343
344         err_list_push();
345
346         no_flush++;
347
348         while (total < len) {
349                 FD_ZERO(&w_fds);
350                 FD_ZERO(&r_fds);
351                 FD_SET(fd,&w_fds);
352                 fd_count = fd;
353
354                 if (io_error_fd != -1) {
355                         FD_SET(io_error_fd,&r_fds);
356                         if (io_error_fd > fd_count) 
357                                 fd_count = io_error_fd;
358                 }
359
360                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
361                 tv.tv_usec = 0;
362
363                 errno = 0;
364
365                 count = select(fd_count+1,
366                                io_error_fd != -1?&r_fds:NULL,
367                                &w_fds,NULL,
368                                &tv);
369
370                 if (count <= 0) {
371                         if (errno == EBADF) {
372                                 exit_cleanup(RERR_SOCKETIO);
373                         }
374                         check_timeout();
375                         continue;
376                 }
377
378                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
379                         read_error_fd();
380                 }
381
382                 if (FD_ISSET(fd, &w_fds)) {
383                         int ret, n = len-total;
384                         ret = write(fd,buf+total,n);
385
386                         if (ret == -1 && errno == EINTR) {
387                                 continue;
388                         }
389
390                         if (ret == -1 && 
391                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
392                                 msleep(1);
393                                 continue;
394                         }
395
396                         if (ret <= 0) {
397                                 rprintf(FERROR,
398                                         "error writing %d unbuffered bytes"
399                                         " - exiting: %s\n", len,
400                                         strerror(errno));
401                                 exit_cleanup(RERR_STREAMIO);
402                         }
403
404                         /* Sleep after writing to limit I/O bandwidth */
405                         if (bwlimit)
406                         {
407                             tv.tv_sec = 0;
408                             tv.tv_usec = ret * 1000 / bwlimit;
409                             while (tv.tv_usec > 1000000)
410                             {
411                                 tv.tv_sec++;
412                                 tv.tv_usec -= 1000000;
413                             }
414                             select(0, NULL, NULL, NULL, &tv);
415                         }
416  
417                         total += ret;
418
419                         if (io_timeout)
420                                 last_io = time(NULL);
421                 }
422         }
423
424         no_flush--;
425 }
426
427
428 static char *io_buffer;
429 static int io_buffer_count;
430
431 void io_start_buffering(int fd)
432 {
433         if (io_buffer) return;
434         multiplex_out_fd = fd;
435         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
436         if (!io_buffer) out_of_memory("writefd");
437         io_buffer_count = 0;
438 }
439
440 /* write an message to a multiplexed stream. If this fails then rsync
441    exits */
442 static void mplex_write(int fd, enum logcode code, char *buf, int len)
443 {
444         char buffer[4096];
445         int n = len;
446
447         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
448
449         if (n > (sizeof(buffer)-4)) {
450                 n = sizeof(buffer)-4;
451         }
452
453         memcpy(&buffer[4], buf, n);
454         writefd_unbuffered(fd, buffer, n+4);
455
456         len -= n;
457         buf += n;
458
459         if (len) {
460                 writefd_unbuffered(fd, buf, len);
461         }
462 }
463
464
465 void io_flush(void)
466 {
467         int fd = multiplex_out_fd;
468
469         err_list_push();
470
471         if (!io_buffer_count || no_flush) return;
472
473         if (io_multiplexing_out) {
474                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
475         } else {
476                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
477         }
478         io_buffer_count = 0;
479 }
480
481
482 /* XXX: fd is ignored, which seems a little strange. */
483 void io_end_buffering(int fd)
484 {
485         io_flush();
486         if (!io_multiplexing_out) {
487                 free(io_buffer);
488                 io_buffer = NULL;
489         }
490 }
491
492 static void writefd(int fd,char *buf,int len)
493 {
494         stats.total_written += len;
495
496         err_list_push();
497
498         if (!io_buffer || fd != multiplex_out_fd) {
499                 writefd_unbuffered(fd, buf, len);
500                 return;
501         }
502
503         while (len) {
504                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
505                 if (n > 0) {
506                         memcpy(io_buffer+io_buffer_count, buf, n);
507                         buf += n;
508                         len -= n;
509                         io_buffer_count += n;
510                 }
511                 
512                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
513         }
514 }
515
516
517 void write_int(int f,int32 x)
518 {
519         char b[4];
520         SIVAL(b,0,x);
521         writefd(f,b,4);
522 }
523
524
525 /*
526  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
527  * 64-bit types on this platform.
528  */
529 void write_longint(int f, int64 x)
530 {
531         extern int remote_version;
532         char b[8];
533
534         if (remote_version < 16 || x <= 0x7FFFFFFF) {
535                 write_int(f, (int)x);
536                 return;
537         }
538
539         write_int(f, (int32)0xFFFFFFFF);
540         SIVAL(b,0,(x&0xFFFFFFFF));
541         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
542
543         writefd(f,b,8);
544 }
545
546 void write_buf(int f,char *buf,int len)
547 {
548         writefd(f,buf,len);
549 }
550
551 /* write a string to the connection */
552 static void write_sbuf(int f,char *buf)
553 {
554         write_buf(f, buf, strlen(buf));
555 }
556
557
558 void write_byte(int f,unsigned char c)
559 {
560         write_buf(f,(char *)&c,1);
561 }
562
563 int read_line(int f, char *buf, int maxlen)
564 {
565         eof_error = 0;
566
567         while (maxlen) {
568                 buf[0] = 0;
569                 read_buf(f, buf, 1);
570                 if (buf[0] == 0) return 0;
571                 if (buf[0] == '\n') {
572                         buf[0] = 0;
573                         break;
574                 }
575                 if (buf[0] != '\r') {
576                         buf++;
577                         maxlen--;
578                 }
579         }
580         if (maxlen == 0) {
581                 *buf = 0;
582                 return 0;
583         }
584
585         eof_error = 1;
586
587         return 1;
588 }
589
590
591 void io_printf(int fd, const char *format, ...)
592 {
593         va_list ap;  
594         char buf[1024];
595         int len;
596         
597         va_start(ap, format);
598         len = vsnprintf(buf, sizeof(buf), format, ap);
599         va_end(ap);
600
601         if (len < 0) exit_cleanup(RERR_STREAMIO);
602
603         write_sbuf(fd, buf);
604 }
605
606
607 /* setup for multiplexing an error stream with the data stream */
608 void io_start_multiplex_out(int fd)
609 {
610         multiplex_out_fd = fd;
611         io_flush();
612         io_start_buffering(fd);
613         io_multiplexing_out = 1;
614 }
615
616 /* setup for multiplexing an error stream with the data stream */
617 void io_start_multiplex_in(int fd)
618 {
619         multiplex_in_fd = fd;
620         io_flush();
621         io_multiplexing_in = 1;
622 }
623
624 /* write an message to the multiplexed error stream */
625 int io_multiplex_write(enum logcode code, char *buf, int len)
626 {
627         if (!io_multiplexing_out) return 0;
628
629         io_flush();
630         stats.total_written += (len+4);
631         mplex_write(multiplex_out_fd, code, buf, len);
632         return 1;
633 }
634
635 /* stop output multiplexing */
636 void io_multiplexing_close(void)
637 {
638         io_multiplexing_out = 0;
639 }
640