summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog9
-rw-r--r--combox/cbox.py6
-rw-r--r--combox/events.py97
-rw-r--r--tests/events_test.py36
4 files changed, 83 insertions, 65 deletions
diff --git a/ChangeLog b/ChangeLog
index 2d294fb..e55a3fe 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,12 @@
+2015-11-19 Siddharth Ravikumar <sravik@bgsu.edu>
+
+ * combox/events.py: ComboxDirMonitor now uses Lock; now this Lock
+ is shared by both the ComboxDirMonitor and the NodeDirMonitors.
+
+ Files changed because of the change to ComboxDirMonitor: combox/cbox.py
+
+ This changed addresses bug#11 (See TODO.org).
+
2015-11-13 Siddharth Ravikumar <sravik@bgsu.edu>
bumping to v0.1.3
diff --git a/combox/cbox.py b/combox/cbox.py
index 3184a92..a9b751c 100644
--- a/combox/cbox.py
+++ b/combox/cbox.py
@@ -41,11 +41,11 @@ def run_cb(config):
Runs combox.
"""
db_lock = Lock()
- nodem_lock = Lock()
+ monitor_lock = Lock()
# start combox directory (cd) monitor (cdm)
combox_dir = path.abspath(config['combox_dir'])
- cd_monitor = ComboxDirMonitor(config, db_lock)
+ cd_monitor = ComboxDirMonitor(config, db_lock, monitor_lock)
cd_observer = Observer()
cd_observer.schedule(cd_monitor, combox_dir, recursive=True)
@@ -60,7 +60,7 @@ def run_cb(config):
for node in node_dirs:
nd_monitor = NodeDirMonitor(config, db_lock,
- nodem_lock)
+ monitor_lock)
nd_observer = Observer()
nd_observer.schedule(nd_monitor, node, recursive=True)
nd_observer.start()
diff --git a/combox/events.py b/combox/events.py
index e599bc5..5b4ff0c 100644
--- a/combox/events.py
+++ b/combox/events.py
@@ -42,9 +42,11 @@ class ComboxDirMonitor(LoggingEventHandler):
"""
- def __init__(self, config, dblock):
+ def __init__(self, config, dblock, monitor_lock):
"""
config: a dictinary which contains combox configuration.
+ dblock: Lock for ComboxSilo.
+ monitor_lock: Lock for directory monitors.
"""
super(ComboxDirMonitor, self).__init__()
@@ -54,6 +56,7 @@ class ComboxDirMonitor(LoggingEventHandler):
self.config = config
self.silo = ComboxSilo(self.config, dblock)
+ self.lock = monitor_lock
# tracks files that are created during the course of this run.
self.just_created = {}
@@ -134,14 +137,16 @@ class ComboxDirMonitor(LoggingEventHandler):
super(ComboxDirMonitor, self).on_moved(event)
if event.is_directory:
- # creates a corresponding directory at the node dirs.
- move_nodedir(event.src_path, event.dest_path, self.config)
+ with self.lock:
+ # creates a corresponding directory at the node dirs.
+ move_nodedir(event.src_path, event.dest_path, self.config)
else:
- # file moved
- move_shards(event.src_path, event.dest_path, self.config)
- # update file info in silo.
- self.silo.remove(event.src_path)
- self.silo.update(event.dest_path)
+ with self.lock:
+ # file moved
+ move_shards(event.src_path, event.dest_path, self.config)
+ # update file info in silo.
+ self.silo.remove(event.src_path)
+ self.silo.update(event.dest_path)
def on_created(self, event):
@@ -161,14 +166,16 @@ class ComboxDirMonitor(LoggingEventHandler):
not event.is_directory)
if event.is_directory and (not path.exists(file_node_path)):
- # creates a corresponding directory at the node dirs.
- mk_nodedir(event.src_path, self.config)
+ with self.lock:
+ # creates a corresponding directory at the node dirs.
+ mk_nodedir(event.src_path, self.config)
elif (not event.is_directory) and (not path.exists(
file_node_path)):
- # file was created
- split_and_encrypt(event.src_path, self.config)
- # store file info in silo.
- self.silo.update(event.src_path)
+ with self.lock:
+ # file was created
+ split_and_encrypt(event.src_path, self.config)
+ # store file info in silo.
+ self.silo.update(event.src_path)
def on_deleted(self, event):
@@ -179,13 +186,15 @@ class ComboxDirMonitor(LoggingEventHandler):
if event.is_directory and (path.exists(file_node_path)):
# Delete corresponding directory in the nodes.
- rm_nodedir(event.src_path, self.config)
+ with self.lock:
+ rm_nodedir(event.src_path, self.config)
elif(not event.is_directory) and (path.exists(file_node_path)):
- # remove the corresponding file shards in the node
- # directories.
- rm_shards(event.src_path, self.config)
- # remove file info from silo.
- self.silo.remove(event.src_path)
+ with self.lock:
+ # remove the corresponding file shards in the node
+ # directories.
+ rm_shards(event.src_path, self.config)
+ # remove file info from silo.
+ self.silo.remove(event.src_path)
def on_modified(self, event):
@@ -202,29 +211,29 @@ class ComboxDirMonitor(LoggingEventHandler):
pass
else:
# file was modified
+ with self.lock:
+ f_size_MiB = path.getsize(event.src_path) / 1048576.0
+ log_i('%s modified %f' % (event.src_path, f_size_MiB))
+
+ # introduce delay to prevent multiple "file modified"
+ # events from being generated.
+ if f_size_MiB >= 30:
+ sleep_time = (f_size_MiB / 30.0)
+ log_i("waiting on_modified combox monitor %f" % sleep_time)
+ time.sleep(sleep_time)
+ log_i("end waiting on_modified combox monitor")
+ else:
+ time.sleep(1)
- f_size_MiB = path.getsize(event.src_path) / 1048576.0
- log_i('%s modified %f' % (event.src_path, f_size_MiB))
-
- # introduce delay to prevent multiple "file modified"
- # events from being generated.
- if f_size_MiB >= 30:
- sleep_time = (f_size_MiB / 30.0)
- log_i("waiting on_modified combox monitor %f" % sleep_time)
- time.sleep(sleep_time)
- log_i("end waiting on_modified combox monitor")
- else:
- time.sleep(1)
-
- if self.just_created[event.src_path]:
- self.just_created[event.src_path] = False
- log_i("Just created file %s. So ignoring on_modified call." % (
- event.src_path))
- return
+ if self.just_created[event.src_path]:
+ self.just_created[event.src_path] = False
+ log_i("Just created file %s. So ignoring on_modified call." % (
+ event.src_path))
+ return
- split_and_encrypt(event.src_path, self.config)
- # update file info in silo.
- self.silo.update(event.src_path)
+ split_and_encrypt(event.src_path, self.config)
+ # update file info in silo.
+ self.silo.update(event.src_path)
class NodeDirMonitor(LoggingEventHandler):
@@ -232,15 +241,15 @@ class NodeDirMonitor(LoggingEventHandler):
"""
- def __init__(self, config, dblock, nodem_lock):
+ def __init__(self, config, dblock, monitor_lock):
"""
config: a dictinary which contains combox configuration.
dblock: Lock for the ComboxSilo.
- nodem_lock: Lock for NodeDirMonitors.
+ monitor_lock: Lock for directory monitors.
"""
super(NodeDirMonitor, self).__init__()
- self.lock = nodem_lock
+ self.lock = monitor_lock
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
diff --git a/tests/events_test.py b/tests/events_test.py
index da35884..4ef75ce 100644
--- a/tests/events_test.py
+++ b/tests/events_test.py
@@ -55,7 +55,7 @@ class TestEvents(object):
"""Set things up."""
self.silo_lock = Lock()
- self.nodem_lock = Lock()
+ self.monitor_lock = Lock()
self.config = get_config()
self.silo = ComboxSilo(self.config, self.silo_lock)
@@ -77,7 +77,7 @@ class TestEvents(object):
Tests the ComboxDirMonitor class.
"""
- event_handler = ComboxDirMonitor(self.config, self.silo_lock)
+ event_handler = ComboxDirMonitor(self.config, self.silo_lock, self.monitor_lock)
observer = Observer()
observer.schedule(event_handler, self.FILES_DIR, recursive=True)
observer.start()
@@ -211,7 +211,7 @@ class TestEvents(object):
# test file deletion and addition
os.rename(self.lorem, self.lorem_moved)
- cdm = ComboxDirMonitor(self.config, self.silo_lock)
+ cdm = ComboxDirMonitor(self.config, self.silo_lock, self.monitor_lock)
cdm.housekeep()
silo = ComboxSilo(self.config, self.silo_lock)
@@ -228,7 +228,7 @@ class TestEvents(object):
copyfile(self.lorem, self.lorem_ipsum)
assert path.exists(self.lorem_ipsum)
- cdm = ComboxDirMonitor(self.config, self.silo_lock)
+ cdm = ComboxDirMonitor(self.config, self.silo_lock, self.monitor_lock)
cdm.housekeep()
silo = ComboxSilo(self.config, self.silo_lock)
@@ -252,7 +252,7 @@ class TestEvents(object):
"""
nmonitor = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
assert_equal(2, nmonitor.num_nodes)
@@ -268,7 +268,7 @@ class TestEvents(object):
# monitor them.
for node in nodes:
nmonitor = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
observer = Observer()
observer.schedule(nmonitor, node, recursive=True)
observer.start()
@@ -330,7 +330,7 @@ class TestEvents(object):
# monitor them.
for node in nodes:
nmonitor = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
observer = Observer()
observer.schedule(nmonitor, node, recursive=True)
observer.start()
@@ -389,7 +389,7 @@ class TestEvents(object):
# monitor them.
for node in nodes:
nmonitor = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
observer = Observer()
observer.schedule(nmonitor, node, recursive=True)
observer.start()
@@ -447,7 +447,7 @@ class TestEvents(object):
# monitor them.
for node in nodes:
nmonitor = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
observer = Observer()
observer.schedule(nmonitor, node, recursive=True)
observer.start()
@@ -531,7 +531,7 @@ class TestEvents(object):
"""
event_handler = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
observer = Observer()
observer.schedule(event_handler, self.NODE_DIR, recursive=True)
observer.start()
@@ -615,7 +615,7 @@ class TestEvents(object):
# monitor them.
for node in nodes:
nmonitor = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
observer = Observer()
observer.schedule(nmonitor, node, recursive=True)
observer.start()
@@ -624,7 +624,7 @@ class TestEvents(object):
observers.append(observer)
# event_handler = NodeDirMonitor(self.config, self.silo_lock,
- # self.nodem_lock)
+ # self.monitor_lock)
# observer = Observer()
# observer.schedule(event_handler, self.NODE_DIR, recursive=True)
# observer.start()
@@ -698,7 +698,7 @@ class TestEvents(object):
silo.update(testf2)
ndm = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
ndm.housekeep()
assert not path.exists(testf1)
@@ -724,7 +724,7 @@ class TestEvents(object):
remove(node_paths(lorem_c, self.config, True)[0])
ndm = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
ndm.housekeep()
assert path.exists(hmutant)
@@ -752,7 +752,7 @@ class TestEvents(object):
silo.update(testf2)
ndm = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
ndm.housekeep()
assert not path.exists(testf1)
@@ -769,7 +769,7 @@ class TestEvents(object):
hmutant_content)
ndm = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
ndm.housekeep()
assert path.exists(hmutant)
@@ -798,7 +798,7 @@ class TestEvents(object):
lcopy_content)
ndm = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
ndm.housekeep()
## check if the lorem_file_copy's info is updated in silo
@@ -815,7 +815,7 @@ class TestEvents(object):
shard = 'some.shard0'
not_shard = 'some.extension'
ndm = NodeDirMonitor(self.config, self.silo_lock,
- self.nodem_lock)
+ self.monitor_lock)
assert_equal(True, ndm.shardp(shard))
assert_equal(False, ndm.shardp(not_shard))