this is a large commit which adds io multiplexing, thus giving error
[rsync/rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   Utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 static int64 total_written;
28 static int64 total_read;
29
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static time_t last_io;
33
34 extern int verbose;
35 extern int sparse_files;
36 extern int io_timeout;
37
38 int64 write_total(void)
39 {
40         return total_written;
41 }
42
43 int64 read_total(void)
44 {
45         return total_read;
46 }
47
48 static int buffer_f_in = -1;
49
50 void setup_nonblocking(int f_in,int f_out)
51 {
52         set_blocking(f_out,0);
53         buffer_f_in = f_in;
54 }
55
56 static void check_timeout(void)
57 {
58         time_t t;
59         
60         if (!io_timeout) return;
61
62         if (!last_io) {
63                 last_io = time(NULL);
64                 return;
65         }
66
67         t = time(NULL);
68
69         if (last_io && io_timeout && (t-last_io)>io_timeout) {
70                 rprintf(FERROR,"read timeout after %d second - exiting\n", 
71                         (int)(t-last_io));
72                 exit_cleanup(1);
73         }
74 }
75
76
77 static char *read_buffer;
78 static char *read_buffer_p;
79 static int read_buffer_len;
80 static int read_buffer_size;
81
82
83 /* continue trying to read len bytes - don't return until len
84    has been read */
85 static void read_loop(int fd, char *buf, int len)
86 {
87         while (len) {
88                 int n = read(fd, buf, len);
89                 if (n > 0) {
90                         buf += n;
91                         len -= n;
92                 }
93                 if (n == 0) {
94                         rprintf(FERROR,"EOF in read_loop\n");
95                         exit_cleanup(1);
96                 }
97                 if (n == -1) {
98                         fd_set fds;
99                         struct timeval tv;
100
101                         if (errno != EAGAIN && errno != EWOULDBLOCK) {
102                                 rprintf(FERROR,"io error: %s\n", 
103                                         strerror(errno));
104                                 exit_cleanup(1);
105                         }
106
107                         FD_ZERO(&fds);
108                         FD_SET(fd, &fds);
109                         tv.tv_sec = io_timeout;
110                         tv.tv_usec = 0;
111
112                         if (select(fd+1, &fds, NULL, NULL, 
113                                    io_timeout?&tv:NULL) != 1) {
114                                 check_timeout();
115                         }
116                 }
117         }
118 }
119
120 static int read_unbuffered(int fd, char *buf, int len)
121 {
122         static int remaining;
123         char ibuf[4];
124         int tag, ret=0;
125         char line[1024];
126
127         if (!io_multiplexing_in) return read(fd, buf, len);
128
129         while (ret == 0) {
130                 if (remaining) {
131                         len = MIN(len, remaining);
132                         read_loop(fd, buf, len);
133                         remaining -= len;
134                         ret = len;
135                         continue;
136                 }
137
138                 read_loop(fd, ibuf, 4);
139                 tag = IVAL(ibuf, 0);
140                 remaining = tag & 0xFFFFFF;
141                 tag = tag >> 24;
142
143                 if (tag == MPLEX_BASE) continue;
144
145                 tag -= MPLEX_BASE;
146
147                 if (tag != FERROR && tag != FINFO) {
148                         rprintf(FERROR,"unexpected tag %d\n", tag);
149                         exit_cleanup(1);
150                 }
151
152                 if (remaining > sizeof(line)-1) {
153                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
154                                 remaining);
155                         exit_cleanup(1);
156                 }
157
158                 read_loop(fd, line, remaining);
159                 line[remaining] = 0;
160
161                 rprintf(tag,"%s", line);
162                 remaining = 0;
163         }
164
165         return ret;
166 }
167
168
169 /* This function was added to overcome a deadlock problem when using
170  * ssh.  It looks like we can't allow our receive queue to get full or
171  * ssh will clag up. Uggh.  */
172 static void read_check(int f)
173 {
174         int n;
175
176         if (f == -1) return;
177
178         if (read_buffer_len == 0) {
179                 read_buffer_p = read_buffer;
180         }
181
182         if ((n=num_waiting(f)) <= 0)
183                 return;
184
185         /* things could deteriorate if we read in really small chunks */
186         if (n < 10) n = 1024;
187
188         if (n > MAX_READ_BUFFER/4)
189                 n = MAX_READ_BUFFER/4;
190
191         if (read_buffer_p != read_buffer) {
192                 memmove(read_buffer,read_buffer_p,read_buffer_len);
193                 read_buffer_p = read_buffer;
194         }
195
196         if (n > (read_buffer_size - read_buffer_len)) {
197                 read_buffer_size += n;
198                 if (!read_buffer)
199                         read_buffer = (char *)malloc(read_buffer_size);
200                 else
201                         read_buffer = (char *)realloc(read_buffer,read_buffer_size);
202                 if (!read_buffer) out_of_memory("read check");      
203                 read_buffer_p = read_buffer;      
204         }
205
206         n = read_unbuffered(f,read_buffer+read_buffer_len,n);
207         if (n > 0) {
208                 read_buffer_len += n;
209         }
210 }
211
212 static int readfd(int fd,char *buffer,int N)
213 {
214         int  ret;
215         int total=0;  
216         struct timeval tv;
217         
218         if (read_buffer_len < N)
219                 read_check(buffer_f_in);
220         
221         while (total < N) {
222                 if (read_buffer_len > 0 && buffer_f_in == fd) {
223                         ret = MIN(read_buffer_len,N-total);
224                         memcpy(buffer+total,read_buffer_p,ret);
225                         read_buffer_p += ret;
226                         read_buffer_len -= ret;
227                         total += ret;
228                         continue;
229                 } 
230
231                 io_flush();
232
233                 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
234                         fd_set fds;
235
236                         if (errno != EAGAIN && errno != EWOULDBLOCK)
237                                 return -1;
238                         FD_ZERO(&fds);
239                         FD_SET(fd, &fds);
240                         tv.tv_sec = io_timeout;
241                         tv.tv_usec = 0;
242
243                         if (select(fd+1, &fds, NULL, NULL, 
244                                    io_timeout?&tv:NULL) != 1) {
245                                 check_timeout();
246                         }
247                 }
248
249                 if (ret <= 0)
250                         return total;
251                 total += ret;
252         }
253
254         if (io_timeout)
255                 last_io = time(NULL);
256         return total;
257 }
258
259
260 int32 read_int(int f)
261 {
262   int ret;
263   char b[4];
264   if ((ret=readfd(f,b,4)) != 4) {
265     if (verbose > 1) 
266       rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
267               getpid(),4,ret==-1?strerror(errno):"EOF");
268     exit_cleanup(1);
269   }
270   total_read += 4;
271   return IVAL(b,0);
272 }
273
274 int64 read_longint(int f)
275 {
276         extern int remote_version;
277         int64 ret;
278         char b[8];
279         ret = read_int(f);
280
281         if ((int32)ret != (int32)0xffffffff) return ret;
282
283 #ifdef NO_INT64
284         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
285         exit_cleanup(1);
286 #else
287         if (remote_version >= 16) {
288                 if ((ret=readfd(f,b,8)) != 8) {
289                         if (verbose > 1) 
290                                 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
291                                         getpid(),8,ret==-1?strerror(errno):"EOF");
292                         exit_cleanup(1);
293                 }
294                 total_read += 8;
295                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
296         }
297 #endif
298
299         return ret;
300 }
301
302 void read_buf(int f,char *buf,int len)
303 {
304   int ret;
305   if ((ret=readfd(f,buf,len)) != len) {
306     if (verbose > 1) 
307       rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
308               getpid(),len,ret==-1?strerror(errno):"EOF");
309     exit_cleanup(1);
310   }
311   total_read += len;
312 }
313
314 void read_sbuf(int f,char *buf,int len)
315 {
316         read_buf(f,buf,len);
317         buf[len] = 0;
318 }
319
320 unsigned char read_byte(int f)
321 {
322   unsigned char c;
323   read_buf(f,(char *)&c,1);
324   return c;
325 }
326
327
328 static char last_byte;
329 static int last_sparse;
330
331 int sparse_end(int f)
332 {
333         if (last_sparse) {
334                 do_lseek(f,-1,SEEK_CUR);
335                 return (write(f,&last_byte,1) == 1 ? 0 : -1);
336         }
337         last_sparse = 0;
338         return 0;
339 }
340
341
342 static int write_sparse(int f,char *buf,int len)
343 {
344         int l1=0,l2=0;
345         int ret;
346
347         for (l1=0;l1<len && buf[l1]==0;l1++) ;
348         for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
349
350         last_byte = buf[len-1];
351
352         if (l1 == len || l2 > 0)
353                 last_sparse=1;
354
355         if (l1 > 0)
356                 do_lseek(f,l1,SEEK_CUR);  
357
358         if (l1 == len) 
359                 return len;
360
361         if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
362                 if (ret == -1 || ret == 0) return ret;
363                 return (l1+ret);
364         }
365
366         if (l2 > 0)
367                 do_lseek(f,l2,SEEK_CUR);
368         
369         return len;
370 }
371
372
373
374 int write_file(int f,char *buf,int len)
375 {
376         int ret = 0;
377
378         if (!sparse_files) 
379                 return write(f,buf,len);
380
381         while (len>0) {
382                 int len1 = MIN(len, SPARSE_WRITE_SIZE);
383                 int r1 = write_sparse(f, buf, len1);
384                 if (r1 <= 0) {
385                         if (ret > 0) return ret;
386                         return r1;
387                 }
388                 len -= r1;
389                 buf += r1;
390                 ret += r1;
391         }
392         return ret;
393 }
394
395
396 static int writefd_unbuffered(int fd,char *buf,int len)
397 {
398         int total = 0;
399         fd_set w_fds, r_fds;
400         int fd_count, count, got_select=0;
401         struct timeval tv;
402
403         while (total < len) {
404                 int ret = write(fd,buf+total,len-total);
405
406                 if (ret == 0) return total;
407
408                 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN)) 
409                         return -1;
410
411                 if (ret == -1 && got_select) {
412                         /* hmmm, we got a write select on the fd and
413                            then failed to write.  Why doesn't that
414                            mean that the fd is dead? It doesn't on
415                            some systems it seems (eg. IRIX) */
416                         u_sleep(1000);
417                 }
418
419                 got_select = 0;
420
421
422                 if (ret != -1) {
423                         total += ret;
424                         continue;
425                 }
426
427                 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
428                         read_check(buffer_f_in);
429
430                 fd_count = fd+1;
431                 FD_ZERO(&w_fds);
432                 FD_ZERO(&r_fds);
433                 FD_SET(fd,&w_fds);
434                 if (buffer_f_in != -1) {
435                         FD_SET(buffer_f_in,&r_fds);
436                         if (buffer_f_in > fd) 
437                                 fd_count = buffer_f_in+1;
438                 }
439
440                 tv.tv_sec = BLOCKING_TIMEOUT;
441                 tv.tv_usec = 0;
442                 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
443                                &w_fds,NULL,&tv);
444                 
445                 if (count == -1 && errno != EINTR) {
446                         if (verbose > 1) 
447                                 rprintf(FERROR,"select error: %s\n", strerror(errno));
448                         exit_cleanup(1);
449                 }
450                 
451                 if (count == 0) {
452                         check_timeout();
453                         continue;
454                 }
455                 
456                 if (FD_ISSET(fd, &w_fds)) {
457                         got_select = 1;
458                 }
459         }
460
461         if (io_timeout)
462                 last_io = time(NULL);
463         
464         return total;
465 }
466
467
468 static char *io_buffer;
469 static int io_buffer_count;
470 static int io_out_fd;
471
472 void io_start_buffering(int fd)
473 {
474         if (io_buffer) return;
475         io_out_fd = fd;
476         io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
477         if (!io_buffer) out_of_memory("writefd");
478         io_buffer_count = 0;
479
480         /* leave room for the multiplex header in case it's needed */
481         io_buffer += 4;
482 }
483
484 void io_flush(void)
485 {
486         int fd = io_out_fd;
487         if (!io_buffer_count) return;
488
489         if (io_multiplexing_out) {
490                 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
491                 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
492                     io_buffer_count+4) {
493                         rprintf(FERROR,"write failed\n");
494                         exit_cleanup(1);
495                 }
496         } else {
497                 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) != 
498                     io_buffer_count) {
499                         rprintf(FERROR,"write failed\n");
500                         exit_cleanup(1);
501                 }
502         }
503         io_buffer_count = 0;
504 }
505
506 void io_end_buffering(int fd)
507 {
508         io_flush();
509         if (!io_multiplexing_out) {
510                 free(io_buffer-4);
511                 io_buffer = NULL;
512         }
513 }
514
515 static int writefd(int fd,char *buf,int len1)
516 {
517         int len = len1;
518
519         if (!io_buffer) return writefd_unbuffered(fd, buf, len);
520
521         while (len) {
522                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
523                 if (n > 0) {
524                         memcpy(io_buffer+io_buffer_count, buf, n);
525                         buf += n;
526                         len -= n;
527                         io_buffer_count += n;
528                 }
529                 
530                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
531         }
532
533         return len1;
534 }
535
536
537 void write_int(int f,int32 x)
538 {
539         int ret;
540         char b[4];
541         SIVAL(b,0,x);
542         if ((ret=writefd(f,b,4)) != 4) {
543                 rprintf(FERROR,"write_int failed : %s\n",
544                         ret==-1?strerror(errno):"EOF");
545                 exit_cleanup(1);
546         }
547         total_written += 4;
548 }
549
550 void write_longint(int f, int64 x)
551 {
552         extern int remote_version;
553         char b[8];
554         int ret;
555
556         if (remote_version < 16 || x <= 0x7FFFFFFF) {
557                 write_int(f, (int)x);
558                 return;
559         }
560
561         write_int(f, -1);
562         SIVAL(b,0,(x&0xFFFFFFFF));
563         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
564
565         if ((ret=writefd(f,b,8)) != 8) {
566                 rprintf(FERROR,"write_longint failed : %s\n",
567                         ret==-1?strerror(errno):"EOF");
568                 exit_cleanup(1);
569         }
570         total_written += 8;
571 }
572
573 void write_buf(int f,char *buf,int len)
574 {
575         int ret;
576         if ((ret=writefd(f,buf,len)) != len) {
577                 rprintf(FERROR,"write_buf failed : %s\n",
578                         ret==-1?strerror(errno):"EOF");
579                 exit_cleanup(1);
580         }
581         total_written += len;
582 }
583
584 /* write a string to the connection */
585 void write_sbuf(int f,char *buf)
586 {
587         write_buf(f, buf, strlen(buf));
588 }
589
590
591 void write_byte(int f,unsigned char c)
592 {
593         write_buf(f,(char *)&c,1);
594 }
595
596 void write_flush(int f)
597 {
598 }
599
600
601 int read_line(int f, char *buf, int maxlen)
602 {
603         while (maxlen) {
604                 read_buf(f, buf, 1);
605                 if (buf[0] == '\n') {
606                         buf[0] = 0;
607                         break;
608                 }
609                 if (buf[0] != '\r') {
610                         buf++;
611                         maxlen--;
612                 }
613         }
614         if (maxlen == 0) {
615                 *buf = 0;
616                 return 0;
617         }
618         return 1;
619 }
620
621
622 void io_printf(int fd, const char *format, ...)
623 {
624         va_list ap;  
625         char buf[1024];
626         int len;
627         
628         va_start(ap, format);
629         len = vslprintf(buf, sizeof(buf)-1, format, ap);
630         va_end(ap);
631
632         if (len < 0) exit_cleanup(1);
633
634         write_sbuf(fd, buf);
635 }
636
637
638 /* setup for multiplexing an error stream with the data stream */
639 void io_start_multiplex_out(int fd)
640 {
641         io_start_buffering(fd);
642         io_multiplexing_out = 1;
643 }
644
645 /* setup for multiplexing an error stream with the data stream */
646 void io_start_multiplex_in(int fd)
647 {
648         if (read_buffer_len) {
649                 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
650                 exit_cleanup(1);
651         }
652
653         io_multiplexing_in = 1;
654 }
655
656 /* write an message to the error stream */
657 int io_multiplex_write(int f, char *buf, int len)
658 {
659         if (!io_multiplexing_out) return 0;
660
661         io_flush();
662
663         SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
664         memcpy(io_buffer, buf, len);
665
666         writefd_unbuffered(io_out_fd, io_buffer-4, len+4);
667         return 1;
668 }
669
670 void io_close_input(int fd)
671 {
672         buffer_f_in = -1;
673 }