From fb12d1703a6ef266b7f30b54bcf1afc1bad381d5 Mon Sep 17 00:00:00 2001 From: Siddharth Ravikumar Date: Wed, 22 Apr 2015 20:48:00 -0400 Subject: rewrote combox.events.NodeDirMonitor.on_moved method. - Now the node monitor waits for all shards of the file, which was moved/renamed on a remote computer, to get moved/renamed on this computer, before it finally reconstructs the moved/renamed file in the combox directory on this computer. - Updated the tests for on_moved method at tests.events_test.TestEvents.test_NDM_onmoved method. modified: combox/events.py modified: tests/events_test.py --- combox/events.py | 15 +++++++++++---- tests/events_test.py | 42 +++++++++++++++++++++++++++++++----------- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/combox/events.py b/combox/events.py index 3cb8ebd..7026756 100644 --- a/combox/events.py +++ b/combox/events.py @@ -281,10 +281,17 @@ class NodeDirMonitor(LoggingEventHandler): if not path.exists(dest_cb_path): # means this path was move on another computer that is # running combox. - try: - os.rename(src_cb_path, dest_cb_path) - except OSError, e: - print "Jeez, failed to rename path.", e + with self.lock: + self.silo.node_set('file_moved', src_cb_path) + num = self.silo.node_get('file_moved', src_cb_path) + if num != self.num_nodes: + return + else: + try: + os.rename(src_cb_path, dest_cb_path) + except OSError, e: + print "Jeez, failed to rename path.", e + self.silo.node_rem('file_moved', src_cb_path) if not event.is_directory: self.silo.remove(src_cb_path) diff --git a/tests/events_test.py b/tests/events_test.py index 9d90f0e..5d65709 100644 --- a/tests/events_test.py +++ b/tests/events_test.py @@ -496,11 +496,29 @@ class TestEvents(object): def test_NDM_onmoved(self): """Testing on_moved method in NodeDirMonitor""" - event_handler = NodeDirMonitor(self.config, self.silo_lock, - self.nodem_lock) - observer = Observer() - observer.schedule(event_handler, self.NODE_DIR, recursive=True) - observer.start() + nodes = get_nodedirs(self.config) + num_nodes = len(get_nodedirs(self.config)) + + nmonitors = [] + observers = [] + + # create an observer for each node directory and make it + # monitor them. + for node in nodes: + nmonitor = NodeDirMonitor(self.config, self.silo_lock, + self.nodem_lock) + observer = Observer() + observer.schedule(nmonitor, node, recursive=True) + observer.start() + + nmonitors.append(nmonitor) + observers.append(observer) + + # event_handler = NodeDirMonitor(self.config, self.silo_lock, + # self.nodem_lock) + # observer = Observer() + # observer.schedule(event_handler, self.NODE_DIR, recursive=True) + # observer.start() self.testf = "%s.onm" % self.TEST_FILE copyfile(self.TEST_FILE, self.testf) @@ -517,9 +535,8 @@ class TestEvents(object): move_shards(self.testf, self.testf_moved, self.config) time.sleep(1) assert path.isfile(self.testf_moved) - - silo = ComboxSilo(self.config, self.silo_lock) assert silo.exists(self.testf_moved) + assert not silo.exists(self.testf) # test directory move/rename dirt = path.join(self.FILES_DIR, "fom") @@ -531,7 +548,6 @@ class TestEvents(object): split_and_encrypt(dirt_lorem, self.config) time.sleep(1) - silo = ComboxSilo(self.config, self.silo_lock) silo.update(dirt_lorem) dirt_m = path.join(self.FILES_DIR, "mof") @@ -541,15 +557,19 @@ class TestEvents(object): assert path.isdir(dirt_m) assert path.isfile(dirt_m_lorem) + assert not path.isdir(dirt) + assert not path.isdir(dirt_lorem) - silo = ComboxSilo(self.config, self.silo_lock) assert silo.exists(dirt_m_lorem) + assert not silo.exists(dirt_lorem) self.purge_list.append(self.testf_moved) self.purge_list.append(dirt_m) - observer.stop() - observer.join() + # stop the zarking observers. + for i in range(num_nodes): + observers[i].stop() + observers[i].join() def test_NDM_housekeep(self): -- cgit v1.2.3