X-Git-Url: https://mattmccutchen.net/utils/utils.git/blobdiff_plain/40ea9b7868f2b7746e7cbabfba6aba982096392a..273c390351c42303171c25215304d1cfd6ca02d4:/continusync diff --git a/continusync b/continusync new file mode 100755 index 0000000..abb2b08 --- /dev/null +++ b/continusync @@ -0,0 +1,286 @@ +#!/usr/bin/env perl + +# Continuous mirroring script around inotifywait and rsync, as suggested by +# Buck Huppmann. Supports local and remote pushing. +# EXPERIMENTAL! THERE IS ABSOLUTELY NO WARRANTY! +# -- Matt McCutchen +# See: http://www.kepreon.com/~matt/utils/#continusync + +# Usage: +# continusync path/to/srcdir/ path/to/destdir/ +# continusync path/to/srcdir/ [user@]host:path/to/destdir/ + +# It seems to work, but it runs rsync once per event, which is ridiculous. +# TODO: Event batching!!! +# TODO: Do the recursive deletion in perl instead of calling rm(1). + +use warnings; +use strict; + +# Configuration. TODO: Add options for these. + +# Let the rsyncs we invoke piggyback on the main ssh connection. +# For this to work, you have to set a ControlPath in your ~/.ssh/config ; +# see the ssh_config(5) man page. +our @rshArgs = ('ssh', '-o', 'ControlMaster auto'); + +our $csPath = 'continusync'; + +# Don't put -r or --delete here. +# cp2 :) +our @rsyncArgs = ('rsync', '-lE', '--chmod=ugo=rwX', '-i'); + +use IPC::Open2; +use IO::Handle; + +# readFully(fh, length) -> data +sub readFully(*$) { + my ($fh, $bytesLeft) = @_; + my ($buf, $off, $rv) = ('', 0); + while ($bytesLeft > 0) { + $rv = sysread($fh, $buf, $bytesLeft, $off); + return undef if $rv == 0; # HMMM: May lose partial read + die "Read error" unless $rv > 0; + $bytesLeft -= $rv; + $off += $rv; + } + return $buf; +} + +# writeFully(fh, data) +sub writeFully(*$) { + my ($fh, $buf) = @_; + my ($bytesLeft, $off, $rv) = (length($buf), 0); + while ($bytesLeft > 0) { + $rv = syswrite($fh, $buf, $bytesLeft, $off); + die "Write error" unless $rv > 0; + $bytesLeft -= $rv; + $off += $rv; + } +} + +# readMsg(fh) -> (type, body) +sub readMsg(*) { + my ($fh) = @_; + my $head = readFully($fh, 8); + return (undef, undef) unless defined($head); + my ($type, $bodyLen) = unpack('NN', $head); + return ($type, readFully($fh, $bodyLen)); +} + +# writeMsg(fh, type, body) +sub writeMsg(*$$) { + my ($fh, $type, $body) = @_; + writeFully($fh, pack('NN/a*', $type, $body)); +} + +# Message types +#sub MSG_EXIT { 0; } +sub MSG_REMOTE_PATH { 1; } +sub MSG_PERFORMED { 2; } +sub MSG_RENAME { 3; } +sub MSG_DELREC { 4; } +sub MSG_DELETED { 5; } + +sub doServer($) { + my ($dest) = @_; + + chdir($dest); + + my ($type, $body); + while (($type, $body) = readMsg(STDIN), defined($type)) { + if ($type == MSG_RENAME) { + my ($src, $dest) = unpack('N/a*N/a*', $body); + rename($src, $dest); + writeMsg(STDOUT, MSG_PERFORMED, ''); + } elsif ($type == MSG_DELREC) { + my $victim = $body; + my ($rmPid, $fromRm); + $rmPid = open($fromRm, '-|', 'rm', '-rf', '-v', $victim); + my $rmLine; + while (defined($rmLine = <$fromRm>)) { + chomp($rmLine); + if ($rmLine =~ /^[^`]*`(.*)'[^']*$/) { + writeMsg(STDOUT, MSG_DELETED, $1); + } + } + close($fromRm); + waitpid($rmPid, 0); + writeMsg(STDOUT, MSG_PERFORMED, ''); + } + } +} + +# The stuff below applies only to the client. + +our ($src, $dest); +our $localDestFH; + +our ($fromServer, $toServer, $serverPid); +our ($fromInwt, $inwtPid); + +sub clientQuit() { + print "Caught a signal. Shutting down.\n"; + + #print STDOUT "serverPid is $serverPid\n"; + close($fromServer); + close($toServer); + waitpid($serverPid, 0); + + #print STDOUT "inwtPid is $inwtPid\n"; + kill(2, $inwtPid); + close($fromInwt); + waitpid($inwtPid, 0); + + exit(0); +} + +sub doRsync($$@) { + my ($isRecursive, $isDelete, @paths) = @_; + + my ($rsyncPid, $toRsync); + $rsyncPid = open($toRsync, '|-', @rsyncArgs, + ($isRecursive ? '-r' : '-d'), ($isDelete ? '--del' : ()), + '--no-implied-dirs', '-t', '--from0', '--files-from=-', '.', $dest); + foreach my $p (@paths) { + print $toRsync $p, "\0"; + } + close($toRsync); + waitpid($rsyncPid, 0); +} + +sub doRename($$) { + my ($src, $dest) = @_; + writeMsg($toServer, MSG_RENAME, pack('N/a*N/a*', $src, $dest)); + readMsg($fromServer); # MSG_PERFORMED + print "*movefrom $src\n", + "*moveto $dest\n"; +} + +sub doDelete($) { + my ($path) = @_; + writeMsg($toServer, MSG_DELREC, $path); + my ($type, $body); + while (($type, $body) = readMsg($fromServer), $type == MSG_DELETED) { + print "*deleting $body\n"; + } + # Also reads the final MSG_PERFORMED. +} + +# move_self so we can reliably detect moves out +our @interestingEvents = ('modify', 'attrib', 'move', 'move_self', 'create', 'delete'); + +sub doClient($$) { + ($src, $dest) = @_; + + print "Continusync starting up.\n", + "This software is EXPERIMENTAL. There is ABSOLUTELY NO WARRANTY.\n"; + + # Get a server process. + # Echoes of rsync... + if ($dest =~ /^([^:]*):(.*)$/) { + # Invoke over remote shell + my ($uhost, $rdest) = ($1, $2); + $serverPid = open2($fromServer, $toServer, @rshArgs, $uhost, $csPath, '--server'); + # Pass path on stdin to stop the shell from messing with it. + # Echoes of rsync daemon protocol... + writeMsg($toServer, MSG_REMOTE_PATH, $rdest); + } else { + # Fork locally + my ($fromClient, $toClient); + pipe($fromServer, $toClient); + pipe($fromClient, $toServer); + $serverPid = fork(); + if ($serverPid == 0) { + # Child server + close($fromServer); + close($toServer); + open(STDIN, "<&", $fromClient); + open(STDOUT, ">&", $toClient); + doServer($dest); + exit(0); + } + # Parent client + close($fromClient); + close($toClient); + # Get a dest path that we can pass to rsync even after we chdir into the source. + { + local $^F = 100000; + open($localDestFH, '<', $dest); + } + $dest = "/proc/self/fd/" . fileno($localDestFH); + } + + chdir($src); + + # Get inotifywait. + $inwtPid = open($fromInwt, '-|'); + if ($inwtPid == 0) { + # Parent wants all our output on the single filehandle $fromInwt. + open(STDERR, ">&", STDOUT); + my @args = ('inotifywait', '-r', '-m', '--format', "%e\n%w\n%f", map(('-e', $_), @interestingEvents), '.'); + exec(@args); + } + + <$fromInwt>; # `Setting up watches' + <$fromInwt>; # `Watches established' + $SIG{INT} = \&clientQuit; + print "Continuously mirroring. Give me a SIGINT when you want me to quit.\n"; + + # Now we can do the initial copy without danger of losing events. + doRsync(1, 1, '.'); + + # Consecutive MOVED_FROM and MOVED_TO events constitute an internal + # move. A move-out followed by a move-in gives an intervening + # MOVED_SELF, so we aren't fooled. + my $movedFrom = undef; + + for (;;) { + my ($e, $w, $f); + chomp($e = <$fromInwt>); + chomp($w = <$fromInwt>); + chomp($f = <$fromInwt>); + my $path = $w . $f; + $path =~ s,^\./(.),$1,; # Remove initial ./ if it isn't all + my $isDir = ($e =~ s/,ISDIR$//); + #print "Got event: ($e,$isDir,$w,$f)\n"; + + if (defined($movedFrom)) { + if ($e eq 'MOVED_TO') { + # Complete the move. + doRename($movedFrom, $path); + next; + } else { + # Moved out. + doDelete($movedFrom); + } + $movedFrom = undef; + } + + if ($e eq 'MODIFY') { + doRsync(0, 0, $path); + } elsif ($e eq 'ATTRIB') { + doRsync(0, 0, $path); + } elsif ($e eq 'MOVED_FROM') { + $movedFrom = $path; + } elsif ($e eq 'MOVED_TO') { + # Moved in. + # Must be recursive in case it was an entire directory. + doRsync(1, 0, $path); + } elsif ($e eq 'CREATE') { + doRsync(0, 0, $path); + } elsif ($e eq 'DELETE') { + doDelete($path); + } + } + # not reached +} + +if ($ARGV[0] eq '--server') { + #STDOUT->autoflush(1); + my ($type, $dest) = readMsg(STDIN); + doServer($dest); +} else { + doClient($ARGV[0], $ARGV[1]); +} +