From 5aa1ba0c1dcad62931ba27bb66bf115233086d6c Mon Sep 17 00:00:00 2001 From: Siddharth Ravikumar Date: Thu, 19 Nov 2015 08:23:31 -0500 Subject: Now ComboxDirMonitor uses Lock. Addresses bug#11. --- ChangeLog | 9 +++++ combox/cbox.py | 6 ++-- combox/events.py | 97 ++++++++++++++++++++++++++++------------------------ tests/events_test.py | 36 +++++++++---------- 4 files changed, 83 insertions(+), 65 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2d294fb..e55a3fe 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2015-11-19 Siddharth Ravikumar + + * combox/events.py: ComboxDirMonitor now uses Lock; now this Lock + is shared by both the ComboxDirMonitor and the NodeDirMonitors. + + Files changed because of the change to ComboxDirMonitor: combox/cbox.py + + This changed addresses bug#11 (See TODO.org). + 2015-11-13 Siddharth Ravikumar bumping to v0.1.3 diff --git a/combox/cbox.py b/combox/cbox.py index 3184a92..a9b751c 100644 --- a/combox/cbox.py +++ b/combox/cbox.py @@ -41,11 +41,11 @@ def run_cb(config): Runs combox. """ db_lock = Lock() - nodem_lock = Lock() + monitor_lock = Lock() # start combox directory (cd) monitor (cdm) combox_dir = path.abspath(config['combox_dir']) - cd_monitor = ComboxDirMonitor(config, db_lock) + cd_monitor = ComboxDirMonitor(config, db_lock, monitor_lock) cd_observer = Observer() cd_observer.schedule(cd_monitor, combox_dir, recursive=True) @@ -60,7 +60,7 @@ def run_cb(config): for node in node_dirs: nd_monitor = NodeDirMonitor(config, db_lock, - nodem_lock) + monitor_lock) nd_observer = Observer() nd_observer.schedule(nd_monitor, node, recursive=True) nd_observer.start() diff --git a/combox/events.py b/combox/events.py index e599bc5..5b4ff0c 100644 --- a/combox/events.py +++ b/combox/events.py @@ -42,9 +42,11 @@ class ComboxDirMonitor(LoggingEventHandler): """ - def __init__(self, config, dblock): + def __init__(self, config, dblock, monitor_lock): """ config: a dictinary which contains combox configuration. + dblock: Lock for ComboxSilo. + monitor_lock: Lock for directory monitors. """ super(ComboxDirMonitor, self).__init__() @@ -54,6 +56,7 @@ class ComboxDirMonitor(LoggingEventHandler): self.config = config self.silo = ComboxSilo(self.config, dblock) + self.lock = monitor_lock # tracks files that are created during the course of this run. self.just_created = {} @@ -134,14 +137,16 @@ class ComboxDirMonitor(LoggingEventHandler): super(ComboxDirMonitor, self).on_moved(event) if event.is_directory: - # creates a corresponding directory at the node dirs. - move_nodedir(event.src_path, event.dest_path, self.config) + with self.lock: + # creates a corresponding directory at the node dirs. + move_nodedir(event.src_path, event.dest_path, self.config) else: - # file moved - move_shards(event.src_path, event.dest_path, self.config) - # update file info in silo. - self.silo.remove(event.src_path) - self.silo.update(event.dest_path) + with self.lock: + # file moved + move_shards(event.src_path, event.dest_path, self.config) + # update file info in silo. + self.silo.remove(event.src_path) + self.silo.update(event.dest_path) def on_created(self, event): @@ -161,14 +166,16 @@ class ComboxDirMonitor(LoggingEventHandler): not event.is_directory) if event.is_directory and (not path.exists(file_node_path)): - # creates a corresponding directory at the node dirs. - mk_nodedir(event.src_path, self.config) + with self.lock: + # creates a corresponding directory at the node dirs. + mk_nodedir(event.src_path, self.config) elif (not event.is_directory) and (not path.exists( file_node_path)): - # file was created - split_and_encrypt(event.src_path, self.config) - # store file info in silo. - self.silo.update(event.src_path) + with self.lock: + # file was created + split_and_encrypt(event.src_path, self.config) + # store file info in silo. + self.silo.update(event.src_path) def on_deleted(self, event): @@ -179,13 +186,15 @@ class ComboxDirMonitor(LoggingEventHandler): if event.is_directory and (path.exists(file_node_path)): # Delete corresponding directory in the nodes. - rm_nodedir(event.src_path, self.config) + with self.lock: + rm_nodedir(event.src_path, self.config) elif(not event.is_directory) and (path.exists(file_node_path)): - # remove the corresponding file shards in the node - # directories. - rm_shards(event.src_path, self.config) - # remove file info from silo. - self.silo.remove(event.src_path) + with self.lock: + # remove the corresponding file shards in the node + # directories. + rm_shards(event.src_path, self.config) + # remove file info from silo. + self.silo.remove(event.src_path) def on_modified(self, event): @@ -202,29 +211,29 @@ class ComboxDirMonitor(LoggingEventHandler): pass else: # file was modified + with self.lock: + f_size_MiB = path.getsize(event.src_path) / 1048576.0 + log_i('%s modified %f' % (event.src_path, f_size_MiB)) + + # introduce delay to prevent multiple "file modified" + # events from being generated. + if f_size_MiB >= 30: + sleep_time = (f_size_MiB / 30.0) + log_i("waiting on_modified combox monitor %f" % sleep_time) + time.sleep(sleep_time) + log_i("end waiting on_modified combox monitor") + else: + time.sleep(1) - f_size_MiB = path.getsize(event.src_path) / 1048576.0 - log_i('%s modified %f' % (event.src_path, f_size_MiB)) - - # introduce delay to prevent multiple "file modified" - # events from being generated. - if f_size_MiB >= 30: - sleep_time = (f_size_MiB / 30.0) - log_i("waiting on_modified combox monitor %f" % sleep_time) - time.sleep(sleep_time) - log_i("end waiting on_modified combox monitor") - else: - time.sleep(1) - - if self.just_created[event.src_path]: - self.just_created[event.src_path] = False - log_i("Just created file %s. So ignoring on_modified call." % ( - event.src_path)) - return + if self.just_created[event.src_path]: + self.just_created[event.src_path] = False + log_i("Just created file %s. So ignoring on_modified call." % ( + event.src_path)) + return - split_and_encrypt(event.src_path, self.config) - # update file info in silo. - self.silo.update(event.src_path) + split_and_encrypt(event.src_path, self.config) + # update file info in silo. + self.silo.update(event.src_path) class NodeDirMonitor(LoggingEventHandler): @@ -232,15 +241,15 @@ class NodeDirMonitor(LoggingEventHandler): """ - def __init__(self, config, dblock, nodem_lock): + def __init__(self, config, dblock, monitor_lock): """ config: a dictinary which contains combox configuration. dblock: Lock for the ComboxSilo. - nodem_lock: Lock for NodeDirMonitors. + monitor_lock: Lock for directory monitors. """ super(NodeDirMonitor, self).__init__() - self.lock = nodem_lock + self.lock = monitor_lock logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') diff --git a/tests/events_test.py b/tests/events_test.py index da35884..4ef75ce 100644 --- a/tests/events_test.py +++ b/tests/events_test.py @@ -55,7 +55,7 @@ class TestEvents(object): """Set things up.""" self.silo_lock = Lock() - self.nodem_lock = Lock() + self.monitor_lock = Lock() self.config = get_config() self.silo = ComboxSilo(self.config, self.silo_lock) @@ -77,7 +77,7 @@ class TestEvents(object): Tests the ComboxDirMonitor class. """ - event_handler = ComboxDirMonitor(self.config, self.silo_lock) + event_handler = ComboxDirMonitor(self.config, self.silo_lock, self.monitor_lock) observer = Observer() observer.schedule(event_handler, self.FILES_DIR, recursive=True) observer.start() @@ -211,7 +211,7 @@ class TestEvents(object): # test file deletion and addition os.rename(self.lorem, self.lorem_moved) - cdm = ComboxDirMonitor(self.config, self.silo_lock) + cdm = ComboxDirMonitor(self.config, self.silo_lock, self.monitor_lock) cdm.housekeep() silo = ComboxSilo(self.config, self.silo_lock) @@ -228,7 +228,7 @@ class TestEvents(object): copyfile(self.lorem, self.lorem_ipsum) assert path.exists(self.lorem_ipsum) - cdm = ComboxDirMonitor(self.config, self.silo_lock) + cdm = ComboxDirMonitor(self.config, self.silo_lock, self.monitor_lock) cdm.housekeep() silo = ComboxSilo(self.config, self.silo_lock) @@ -252,7 +252,7 @@ class TestEvents(object): """ nmonitor = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) assert_equal(2, nmonitor.num_nodes) @@ -268,7 +268,7 @@ class TestEvents(object): # monitor them. for node in nodes: nmonitor = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) observer = Observer() observer.schedule(nmonitor, node, recursive=True) observer.start() @@ -330,7 +330,7 @@ class TestEvents(object): # monitor them. for node in nodes: nmonitor = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) observer = Observer() observer.schedule(nmonitor, node, recursive=True) observer.start() @@ -389,7 +389,7 @@ class TestEvents(object): # monitor them. for node in nodes: nmonitor = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) observer = Observer() observer.schedule(nmonitor, node, recursive=True) observer.start() @@ -447,7 +447,7 @@ class TestEvents(object): # monitor them. for node in nodes: nmonitor = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) observer = Observer() observer.schedule(nmonitor, node, recursive=True) observer.start() @@ -531,7 +531,7 @@ class TestEvents(object): """ event_handler = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) observer = Observer() observer.schedule(event_handler, self.NODE_DIR, recursive=True) observer.start() @@ -615,7 +615,7 @@ class TestEvents(object): # monitor them. for node in nodes: nmonitor = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) observer = Observer() observer.schedule(nmonitor, node, recursive=True) observer.start() @@ -624,7 +624,7 @@ class TestEvents(object): observers.append(observer) # event_handler = NodeDirMonitor(self.config, self.silo_lock, - # self.nodem_lock) + # self.monitor_lock) # observer = Observer() # observer.schedule(event_handler, self.NODE_DIR, recursive=True) # observer.start() @@ -698,7 +698,7 @@ class TestEvents(object): silo.update(testf2) ndm = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) ndm.housekeep() assert not path.exists(testf1) @@ -724,7 +724,7 @@ class TestEvents(object): remove(node_paths(lorem_c, self.config, True)[0]) ndm = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) ndm.housekeep() assert path.exists(hmutant) @@ -752,7 +752,7 @@ class TestEvents(object): silo.update(testf2) ndm = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) ndm.housekeep() assert not path.exists(testf1) @@ -769,7 +769,7 @@ class TestEvents(object): hmutant_content) ndm = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) ndm.housekeep() assert path.exists(hmutant) @@ -798,7 +798,7 @@ class TestEvents(object): lcopy_content) ndm = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) ndm.housekeep() ## check if the lorem_file_copy's info is updated in silo @@ -815,7 +815,7 @@ class TestEvents(object): shard = 'some.shard0' not_shard = 'some.extension' ndm = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) + self.monitor_lock) assert_equal(True, ndm.shardp(shard)) assert_equal(False, ndm.shardp(not_shard)) -- cgit v1.2.3