Import the remaining utilities.
[utils/utils.git] / continusync
diff --git a/continusync b/continusync
new file mode 100755 (executable)
index 0000000..abb2b08
--- /dev/null
@@ -0,0 +1,286 @@
+#!/usr/bin/env perl
+
+# Continuous mirroring script around inotifywait and rsync, as suggested by
+# Buck Huppmann.  Supports local and remote pushing.
+# EXPERIMENTAL!  THERE IS ABSOLUTELY NO WARRANTY!
+# -- Matt McCutchen <hashproduct@gmail.com>
+# See: http://www.kepreon.com/~matt/utils/#continusync
+
+# Usage:
+#     continusync path/to/srcdir/ path/to/destdir/
+#     continusync path/to/srcdir/ [user@]host:path/to/destdir/
+
+# It seems to work, but it runs rsync once per event, which is ridiculous.
+# TODO: Event batching!!!
+# TODO: Do the recursive deletion in perl instead of calling rm(1).
+
+use warnings;
+use strict;
+
+# Configuration.  TODO: Add options for these.
+
+# Let the rsyncs we invoke piggyback on the main ssh connection.
+# For this to work, you have to set a ControlPath in your ~/.ssh/config ;
+# see the ssh_config(5) man page.
+our @rshArgs = ('ssh', '-o', 'ControlMaster auto');
+
+our $csPath = 'continusync';
+
+# Don't put -r or --delete here.
+# cp2 :)
+our @rsyncArgs = ('rsync', '-lE', '--chmod=ugo=rwX', '-i');
+
+use IPC::Open2;
+use IO::Handle;
+
+# readFully(fh, length) -> data
+sub readFully(*$) {
+       my ($fh, $bytesLeft) = @_;
+       my ($buf, $off, $rv) = ('', 0);
+       while ($bytesLeft > 0) {
+               $rv = sysread($fh, $buf, $bytesLeft, $off);
+               return undef if $rv == 0; # HMMM: May lose partial read
+               die "Read error" unless $rv > 0;
+               $bytesLeft -= $rv;
+               $off += $rv;
+       }
+       return $buf;
+}
+
+# writeFully(fh, data)
+sub writeFully(*$) {
+       my ($fh, $buf) = @_;
+       my ($bytesLeft, $off, $rv) = (length($buf), 0);
+       while ($bytesLeft > 0) {
+               $rv = syswrite($fh, $buf, $bytesLeft, $off);
+               die "Write error" unless $rv > 0;
+               $bytesLeft -= $rv;
+               $off += $rv;
+       }
+}
+
+# readMsg(fh) -> (type, body)
+sub readMsg(*) {
+       my ($fh) = @_;
+       my $head = readFully($fh, 8);
+       return (undef, undef) unless defined($head);
+       my ($type, $bodyLen) = unpack('NN', $head);
+       return ($type, readFully($fh, $bodyLen));
+}
+
+# writeMsg(fh, type, body)
+sub writeMsg(*$$) {
+       my ($fh, $type, $body) = @_;
+       writeFully($fh, pack('NN/a*', $type, $body));
+}
+
+# Message types
+#sub MSG_EXIT { 0; }
+sub MSG_REMOTE_PATH { 1; }
+sub MSG_PERFORMED { 2; }
+sub MSG_RENAME { 3; }
+sub MSG_DELREC { 4; }
+sub MSG_DELETED { 5; }
+
+sub doServer($) {
+       my ($dest) = @_;
+
+       chdir($dest);
+
+       my ($type, $body);
+       while (($type, $body) = readMsg(STDIN), defined($type)) {
+               if ($type == MSG_RENAME) {
+                       my ($src, $dest) = unpack('N/a*N/a*', $body);
+                       rename($src, $dest);
+                       writeMsg(STDOUT, MSG_PERFORMED, '');
+               } elsif ($type == MSG_DELREC) {
+                       my $victim = $body;
+                       my ($rmPid, $fromRm);
+                       $rmPid = open($fromRm, '-|', 'rm', '-rf', '-v', $victim);
+                       my $rmLine;
+                       while (defined($rmLine = <$fromRm>)) {
+                               chomp($rmLine);
+                               if ($rmLine =~ /^[^`]*`(.*)'[^']*$/) {
+                                       writeMsg(STDOUT, MSG_DELETED, $1);
+                               }
+                       }
+                       close($fromRm);
+                       waitpid($rmPid, 0);
+                       writeMsg(STDOUT, MSG_PERFORMED, '');
+               }
+       }
+}
+
+# The stuff below applies only to the client.
+
+our ($src, $dest);
+our $localDestFH;
+
+our ($fromServer, $toServer, $serverPid);
+our ($fromInwt, $inwtPid);
+
+sub clientQuit() {
+       print "Caught a signal.  Shutting down.\n";
+
+       #print STDOUT "serverPid is $serverPid\n";
+       close($fromServer);
+       close($toServer);
+       waitpid($serverPid, 0);
+
+       #print STDOUT "inwtPid is $inwtPid\n";
+       kill(2, $inwtPid);
+       close($fromInwt);
+       waitpid($inwtPid, 0);
+
+       exit(0);
+}
+
+sub doRsync($$@) {
+       my ($isRecursive, $isDelete, @paths) = @_;
+
+       my ($rsyncPid, $toRsync);
+       $rsyncPid = open($toRsync, '|-', @rsyncArgs,
+               ($isRecursive ? '-r' : '-d'), ($isDelete ? '--del' : ()),
+               '--no-implied-dirs', '-t', '--from0', '--files-from=-', '.', $dest);
+       foreach my $p (@paths) {
+               print $toRsync $p, "\0";
+       }
+       close($toRsync);
+       waitpid($rsyncPid, 0);
+}
+
+sub doRename($$) {
+       my ($src, $dest) = @_;
+       writeMsg($toServer, MSG_RENAME, pack('N/a*N/a*', $src, $dest));
+       readMsg($fromServer); # MSG_PERFORMED
+       print "*movefrom   $src\n",
+               "*moveto     $dest\n";
+}
+
+sub doDelete($) {
+       my ($path) = @_;
+       writeMsg($toServer, MSG_DELREC, $path);
+       my ($type, $body);
+       while (($type, $body) = readMsg($fromServer), $type == MSG_DELETED) {
+               print "*deleting   $body\n";
+       }
+       # Also reads the final MSG_PERFORMED.
+}
+
+# move_self so we can reliably detect moves out
+our @interestingEvents = ('modify', 'attrib', 'move', 'move_self', 'create', 'delete');
+
+sub doClient($$) {
+       ($src, $dest) = @_;
+
+       print "Continusync starting up.\n",
+               "This software is EXPERIMENTAL.  There is ABSOLUTELY NO WARRANTY.\n";
+
+       # Get a server process.
+       # Echoes of rsync...
+       if ($dest =~ /^([^:]*):(.*)$/) {
+               # Invoke over remote shell
+               my ($uhost, $rdest) = ($1, $2);
+               $serverPid = open2($fromServer, $toServer, @rshArgs, $uhost, $csPath, '--server');
+               # Pass path on stdin to stop the shell from messing with it.
+               # Echoes of rsync daemon protocol...
+               writeMsg($toServer, MSG_REMOTE_PATH, $rdest);
+       } else {
+               # Fork locally
+               my ($fromClient, $toClient);
+               pipe($fromServer, $toClient);
+               pipe($fromClient, $toServer);
+               $serverPid = fork();
+               if ($serverPid == 0) {
+                       # Child server
+                       close($fromServer);
+                       close($toServer);
+                       open(STDIN, "<&", $fromClient);
+                       open(STDOUT, ">&", $toClient);
+                       doServer($dest);
+                       exit(0);
+               }
+               # Parent client
+               close($fromClient);
+               close($toClient);
+               # Get a dest path that we can pass to rsync even after we chdir into the source.
+               {
+                       local $^F = 100000;
+                       open($localDestFH, '<', $dest);
+               }
+               $dest = "/proc/self/fd/" . fileno($localDestFH);
+       }
+
+       chdir($src);
+
+       # Get inotifywait.
+       $inwtPid = open($fromInwt, '-|');
+       if ($inwtPid == 0) {
+               # Parent wants all our output on the single filehandle $fromInwt.
+               open(STDERR, ">&", STDOUT);
+               my @args = ('inotifywait', '-r', '-m', '--format', "%e\n%w\n%f", map(('-e', $_), @interestingEvents), '.');
+               exec(@args);
+       }
+
+       <$fromInwt>; # `Setting up watches'
+       <$fromInwt>; # `Watches established'
+       $SIG{INT} = \&clientQuit;
+       print "Continuously mirroring.  Give me a SIGINT when you want me to quit.\n";
+
+       # Now we can do the initial copy without danger of losing events.
+       doRsync(1, 1, '.');
+
+       # Consecutive MOVED_FROM and MOVED_TO events constitute an internal
+       # move.  A move-out followed by a move-in gives an intervening
+       # MOVED_SELF, so we aren't fooled.
+       my $movedFrom = undef;
+
+       for (;;) {
+               my ($e, $w, $f);
+               chomp($e = <$fromInwt>);
+               chomp($w = <$fromInwt>);
+               chomp($f = <$fromInwt>);
+               my $path = $w . $f;
+               $path =~ s,^\./(.),$1,; # Remove initial ./ if it isn't all
+               my $isDir = ($e =~ s/,ISDIR$//);
+               #print "Got event: ($e,$isDir,$w,$f)\n";
+
+               if (defined($movedFrom)) {
+                       if ($e eq 'MOVED_TO') {
+                               # Complete the move.
+                               doRename($movedFrom, $path);
+                               next;
+                       } else {
+                               # Moved out.
+                               doDelete($movedFrom);
+                       }
+                       $movedFrom = undef;
+               }
+
+               if ($e eq 'MODIFY') {
+                       doRsync(0, 0, $path);
+               } elsif ($e eq 'ATTRIB') {
+                       doRsync(0, 0, $path);
+               } elsif ($e eq 'MOVED_FROM') {
+                       $movedFrom = $path;
+               } elsif ($e eq 'MOVED_TO') {
+                       # Moved in.
+                       # Must be recursive in case it was an entire directory.
+                       doRsync(1, 0, $path);
+               } elsif ($e eq 'CREATE') {
+                       doRsync(0, 0, $path);
+               } elsif ($e eq 'DELETE') {
+                       doDelete($path);
+               }
+       }
+       # not reached
+}
+
+if ($ARGV[0] eq '--server') {
+       #STDOUT->autoflush(1);
+       my ($type, $dest) = readMsg(STDIN);
+       doServer($dest);
+} else {
+       doClient($ARGV[0], $ARGV[1]);
+}
+