if f_in == f_out then don't close one of them
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/*
2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18*/
19
20/*
21 Utilities used in rsync
22
23 tridge, June 1996
24 */
25#include "rsync.h"
26
27static int64 total_written;
28static int64 total_read;
29
30extern int verbose;
31extern int sparse_files;
32extern int io_timeout;
33
34int64 write_total(void)
35{
36 return total_written;
37}
38
39int64 read_total(void)
40{
41 return total_read;
42}
43
44static int buffer_f_in = -1;
45
46void setup_nonblocking(int f_in,int f_out)
47{
48 set_blocking(f_out,0);
49 buffer_f_in = f_in;
50}
51
52
53static char *read_buffer;
54static char *read_buffer_p;
55static int read_buffer_len;
56static int read_buffer_size;
57
58
59/* This function was added to overcome a deadlock problem when using
60 * ssh. It looks like we can't allow our receive queue to get full or
61 * ssh will clag up. Uggh. */
62static void read_check(int f)
63{
64 int n;
65
66 if (f == -1) return;
67
68 if (read_buffer_len == 0) {
69 read_buffer_p = read_buffer;
70 }
71
72 if ((n=num_waiting(f)) <= 0)
73 return;
74
75 /* things could deteriorate if we read in really small chunks */
76 if (n < 10) n = 1024;
77
78 if (n > MAX_READ_BUFFER/4)
79 n = MAX_READ_BUFFER/4;
80
81 if (read_buffer_p != read_buffer) {
82 memmove(read_buffer,read_buffer_p,read_buffer_len);
83 read_buffer_p = read_buffer;
84 }
85
86 if (n > (read_buffer_size - read_buffer_len)) {
87 read_buffer_size += n;
88 if (!read_buffer)
89 read_buffer = (char *)malloc(read_buffer_size);
90 else
91 read_buffer = (char *)realloc(read_buffer,read_buffer_size);
92 if (!read_buffer) out_of_memory("read check");
93 read_buffer_p = read_buffer;
94 }
95
96 n = read(f,read_buffer+read_buffer_len,n);
97 if (n > 0) {
98 read_buffer_len += n;
99 }
100}
101
102static time_t last_io;
103
104
105static void check_timeout(void)
106{
107 time_t t;
108
109 if (!io_timeout) return;
110
111 if (!last_io) {
112 last_io = time(NULL);
113 return;
114 }
115
116 t = time(NULL);
117
118 if (last_io && io_timeout && (t-last_io)>io_timeout) {
119 rprintf(FERROR,"read timeout after %d second - exiting\n",
120 (int)(t-last_io));
121 exit_cleanup(1);
122 }
123}
124
125static int readfd(int fd,char *buffer,int N)
126{
127 int ret;
128 int total=0;
129 struct timeval tv;
130
131 if (read_buffer_len < N)
132 read_check(buffer_f_in);
133
134 while (total < N) {
135 if (read_buffer_len > 0 && buffer_f_in == fd) {
136 ret = MIN(read_buffer_len,N-total);
137 memcpy(buffer+total,read_buffer_p,ret);
138 read_buffer_p += ret;
139 read_buffer_len -= ret;
140 total += ret;
141 continue;
142 }
143
144 while ((ret = read(fd,buffer + total,N-total)) == -1) {
145 fd_set fds;
146
147 if (errno != EAGAIN && errno != EWOULDBLOCK)
148 return -1;
149 FD_ZERO(&fds);
150 FD_SET(fd, &fds);
151 tv.tv_sec = io_timeout;
152 tv.tv_usec = 0;
153
154 if (select(fd+1, &fds, NULL, NULL,
155 io_timeout?&tv:NULL) != 1) {
156 check_timeout();
157 }
158 }
159
160 if (ret <= 0)
161 return total;
162 total += ret;
163 }
164
165 if (io_timeout)
166 last_io = time(NULL);
167 return total;
168}
169
170
171int32 read_int(int f)
172{
173 int ret;
174 char b[4];
175 if ((ret=readfd(f,b,4)) != 4) {
176 if (verbose > 1)
177 rprintf(FERROR,"(%d) Error reading %d bytes : %s\n",
178 getpid(),4,ret==-1?strerror(errno):"EOF");
179 exit_cleanup(1);
180 }
181 total_read += 4;
182 return IVAL(b,0);
183}
184
185int64 read_longint(int f)
186{
187 extern int remote_version;
188 int64 ret;
189 char b[8];
190 ret = read_int(f);
191
192 if ((int32)ret != (int32)0xffffffff) return ret;
193
194#ifdef NO_INT64
195 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
196 exit_cleanup(1);
197#else
198 if (remote_version >= 16) {
199 if ((ret=readfd(f,b,8)) != 8) {
200 if (verbose > 1)
201 rprintf(FERROR,"(%d) Error reading %d bytes : %s\n",
202 getpid(),8,ret==-1?strerror(errno):"EOF");
203 exit_cleanup(1);
204 }
205 total_read += 8;
206 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
207 }
208#endif
209
210 return ret;
211}
212
213void read_buf(int f,char *buf,int len)
214{
215 int ret;
216 if ((ret=readfd(f,buf,len)) != len) {
217 if (verbose > 1)
218 rprintf(FERROR,"(%d) Error reading %d bytes : %s\n",
219 getpid(),len,ret==-1?strerror(errno):"EOF");
220 exit_cleanup(1);
221 }
222 total_read += len;
223}
224
225void read_sbuf(int f,char *buf,int len)
226{
227 read_buf(f,buf,len);
228 buf[len] = 0;
229}
230
231unsigned char read_byte(int f)
232{
233 unsigned char c;
234 read_buf(f,(char *)&c,1);
235 return c;
236}
237
238
239static char last_byte;
240static int last_sparse;
241
242int sparse_end(int f)
243{
244 if (last_sparse) {
245 do_lseek(f,-1,SEEK_CUR);
246 return (write(f,&last_byte,1) == 1 ? 0 : -1);
247 }
248 last_sparse = 0;
249 return 0;
250}
251
252
253static int write_sparse(int f,char *buf,int len)
254{
255 int l1=0,l2=0;
256 int ret;
257
258 for (l1=0;l1<len && buf[l1]==0;l1++) ;
259 for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
260
261 last_byte = buf[len-1];
262
263 if (l1 == len || l2 > 0)
264 last_sparse=1;
265
266 if (l1 > 0)
267 do_lseek(f,l1,SEEK_CUR);
268
269 if (l1 == len)
270 return len;
271
272 if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
273 if (ret == -1 || ret == 0) return ret;
274 return (l1+ret);
275 }
276
277 if (l2 > 0)
278 do_lseek(f,l2,SEEK_CUR);
279
280 return len;
281}
282
283
284
285int write_file(int f,char *buf,int len)
286{
287 int ret = 0;
288
289 if (!sparse_files)
290 return write(f,buf,len);
291
292 while (len>0) {
293 int len1 = MIN(len, SPARSE_WRITE_SIZE);
294 int r1 = write_sparse(f, buf, len1);
295 if (r1 <= 0) {
296 if (ret > 0) return ret;
297 return r1;
298 }
299 len -= r1;
300 buf += r1;
301 ret += r1;
302 }
303 return ret;
304}
305
306
307static int writefd_unbuffered(int fd,char *buf,int len)
308{
309 int total = 0;
310 fd_set w_fds, r_fds;
311 int fd_count, count, got_select=0;
312 struct timeval tv;
313
314 if (buffer_f_in == -1)
315 return write(fd,buf,len);
316
317 while (total < len) {
318 int ret = write(fd,buf+total,len-total);
319
320 if (ret == 0) return total;
321
322 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN))
323 return -1;
324
325 if (ret == -1 && got_select) {
326 /* hmmm, we got a write select on the fd and then failed to write.
327 Why doesn't that mean that the fd is dead? It doesn't on some
328 systems it seems (eg. IRIX) */
329 u_sleep(1000);
330#if 0
331 rprintf(FERROR,"write exception\n");
332 exit_cleanup(1);
333#endif
334 }
335
336 got_select = 0;
337
338
339 if (ret == -1) {
340 if (read_buffer_len < MAX_READ_BUFFER)
341 read_check(buffer_f_in);
342
343 fd_count = fd+1;
344 FD_ZERO(&w_fds);
345 FD_ZERO(&r_fds);
346 FD_SET(fd,&w_fds);
347 if (buffer_f_in != -1) {
348 FD_SET(buffer_f_in,&r_fds);
349 if (buffer_f_in > fd)
350 fd_count = buffer_f_in+1;
351 }
352
353 tv.tv_sec = BLOCKING_TIMEOUT;
354 tv.tv_usec = 0;
355 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
356 &w_fds,NULL,&tv);
357
358 if (count == -1 && errno != EINTR) {
359 if (verbose > 1)
360 rprintf(FERROR,"select error: %s\n", strerror(errno));
361 exit_cleanup(1);
362 }
363
364 if (count == 0) {
365 check_timeout();
366 continue;
367 }
368
369 if (FD_ISSET(fd, &w_fds)) {
370 got_select = 1;
371 }
372 } else {
373 total += ret;
374 }
375 }
376
377 if (io_timeout)
378 last_io = time(NULL);
379
380 return total;
381}
382
383static char *io_buffer;
384static int io_buffer_count;
385
386void io_start_buffering(int fd)
387{
388 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
389 if (!io_buffer) out_of_memory("writefd");
390 io_buffer_count = 0;
391}
392
393void io_end_buffering(int fd)
394{
395 if (io_buffer_count) {
396 if (writefd_unbuffered(fd, io_buffer,
397 io_buffer_count) !=
398 io_buffer_count) {
399 rprintf(FERROR,"write failed\n");
400 exit_cleanup(1);
401 }
402 io_buffer_count = 0;
403 }
404 free(io_buffer);
405 io_buffer = NULL;
406}
407
408static int writefd(int fd,char *buf,int len1)
409{
410 int len = len1;
411
412 if (!io_buffer) return writefd_unbuffered(fd, buf, len);
413
414 while (len) {
415 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
416 if (n > 0) {
417 memcpy(io_buffer+io_buffer_count, buf, n);
418 buf += n;
419 len -= n;
420 io_buffer_count += n;
421 }
422
423 if (io_buffer_count == IO_BUFFER_SIZE) {
424 if (writefd_unbuffered(fd, io_buffer,
425 io_buffer_count) !=
426 io_buffer_count) {
427 return -1;
428 }
429 io_buffer_count = 0;
430 }
431 }
432
433 return len1;
434}
435
436
437void write_int(int f,int32 x)
438{
439 int ret;
440 char b[4];
441 SIVAL(b,0,x);
442 if ((ret=writefd(f,b,4)) != 4) {
443 rprintf(FERROR,"write_int failed : %s\n",
444 ret==-1?strerror(errno):"EOF");
445 exit_cleanup(1);
446 }
447 total_written += 4;
448}
449
450void write_longint(int f, int64 x)
451{
452 extern int remote_version;
453 char b[8];
454 int ret;
455
456 if (remote_version < 16 || x <= 0x7FFFFFFF) {
457 write_int(f, (int)x);
458 return;
459 }
460
461 write_int(f, -1);
462 SIVAL(b,0,(x&0xFFFFFFFF));
463 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
464
465 if ((ret=writefd(f,b,8)) != 8) {
466 rprintf(FERROR,"write_longint failed : %s\n",
467 ret==-1?strerror(errno):"EOF");
468 exit_cleanup(1);
469 }
470 total_written += 8;
471}
472
473void write_buf(int f,char *buf,int len)
474{
475 int ret;
476 if ((ret=writefd(f,buf,len)) != len) {
477 rprintf(FERROR,"write_buf failed : %s\n",
478 ret==-1?strerror(errno):"EOF");
479 exit_cleanup(1);
480 }
481 total_written += len;
482}
483
484/* write a string to the connection */
485void write_sbuf(int f,char *buf)
486{
487 write_buf(f, buf, strlen(buf));
488}
489
490
491void write_byte(int f,unsigned char c)
492{
493 write_buf(f,(char *)&c,1);
494}
495
496void write_flush(int f)
497{
498}
499
500
501int read_line(int f, char *buf, int maxlen)
502{
503 while (maxlen) {
504 read_buf(f, buf, 1);
505 if (buf[0] == '\n') {
506 buf[0] = 0;
507 break;
508 }
509 if (buf[0] != '\r') {
510 buf++;
511 maxlen--;
512 }
513 }
514 if (maxlen == 0) {
515 *buf = 0;
516 return 0;
517 }
518 return 1;
519}
520
521
522void io_printf(int fd, const char *format, ...)
523{
524 va_list ap;
525 char buf[1024];
526 int len;
527
528 va_start(ap, format);
529 len = vslprintf(buf, sizeof(buf)-1, format, ap);
530 va_end(ap);
531
532 if (len < 0) exit_cleanup(1);
533
534 write_sbuf(fd, buf);
535}