|
''' |
|
Utility for simple distribution of work on multiple processes, by |
|
making sure only one process is working on a job at once. |
|
''' |
|
|
|
import os, errno, socket, atexit, time, sys |
|
|
|
def exclusive_dirfn(*args): |
|
''' |
|
Convenience function to get exclusive access to an unfinished |
|
experiment directory. Exits the program if the directory is |
|
already done or busy (using exit_of_job_done). Otherwise, |
|
returns a function creates filenames within that directory. |
|
''' |
|
directory = os.path.join(*[str(a) for a in args]) |
|
exit_if_job_done(directory) |
|
def dirfn(*fn): |
|
return os.path.join(directory, *fn) |
|
dirfn.dir = directory |
|
def done(): |
|
mark_job_done(directory) |
|
dirfn.done = done |
|
print('Working in %s' % directory) |
|
return dirfn |
|
|
|
def exit_if_job_done(directory, redo=False, force=False, verbose=True): |
|
if pidfile_taken(os.path.join(directory, 'lockfile.pid'), |
|
force=force, verbose=verbose): |
|
sys.exit(0) |
|
donefile = os.path.join(directory, 'done.txt') |
|
if os.path.isfile(donefile): |
|
with open(donefile) as f: |
|
msg = f.read() |
|
if redo or force: |
|
if verbose: |
|
print('Removing %s %s' % (donefile, msg)) |
|
os.remove(donefile) |
|
else: |
|
if verbose: |
|
print('%s %s' % (donefile, msg)) |
|
sys.exit(0) |
|
|
|
def mark_job_done(directory): |
|
with open(os.path.join(directory, 'done.txt'), 'w') as f: |
|
f.write('done by %d@%s %s at %s' % |
|
(os.getpid(), socket.gethostname(), |
|
os.getenv('STY', ''), |
|
time.strftime('%c'))) |
|
|
|
def pidfile_taken(path, verbose=False, force=False): |
|
''' |
|
Usage. To grab an exclusive lock for the remaining duration of the |
|
current process (and exit if another process already has the lock), |
|
do this: |
|
|
|
if pidfile_taken('job_423/lockfile.pid', verbose=True): |
|
sys.exit(0) |
|
|
|
To do a batch of jobs, just run a script that does them all on |
|
each available machine, sharing a network filesystem. When each |
|
job grabs a lock, then this will automatically distribute the |
|
jobs so that each one is done just once on one machine. |
|
''' |
|
|
|
|
|
try: |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR) |
|
except OSError as e: |
|
if e.errno == errno.EEXIST: |
|
|
|
conflicter = 'race' |
|
try: |
|
with open(path, 'r') as lockfile: |
|
conflicter = lockfile.read().strip() or 'empty' |
|
except: |
|
pass |
|
|
|
if force: |
|
if verbose: |
|
print('Removing %s from %s' % (path, conflicter)) |
|
os.remove(path) |
|
return pidfile_taken(path, verbose=verbose, force=False) |
|
if verbose: |
|
print('%s held by %s' % (path, conflicter)) |
|
return conflicter |
|
else: |
|
|
|
raise |
|
|
|
lockfile = os.fdopen(fd, 'r+') |
|
atexit.register(delete_pidfile, lockfile, path) |
|
|
|
lockfile.write('%d@%s %s\n' % (os.getpid(), socket.gethostname(), |
|
os.getenv('STY', ''))) |
|
lockfile.flush() |
|
os.fsync(lockfile) |
|
|
|
return None |
|
|
|
def delete_pidfile(lockfile, path): |
|
''' |
|
Runs at exit after pidfile_taken succeeds. |
|
''' |
|
if lockfile is not None: |
|
try: |
|
lockfile.close() |
|
except: |
|
pass |
|
try: |
|
os.unlink(path) |
|
except: |
|
pass |
|
|