[Yt-svn] commit/yt: MatthewTurk: Trying out using an execution thread for handling REPL executions and other
Bitbucket
commits-noreply at bitbucket.org
Mon May 23 09:51:41 PDT 2011
1 new changeset in yt:
http://bitbucket.org/yt_analysis/yt/changeset/b8a4c0e7cd40/
changeset: r4274:b8a4c0e7cd40
branch: yt
user: MatthewTurk
date: 2011-05-23 18:51:25
summary: Trying out using an execution thread for handling REPL executions and other
tasks, instead of lock/unlock. This has fixed -- for me -- the issue of
multiple widgets showing up after a long-running project operation. I believe
that was related to Ext re-trying the call to create_proj multiple times, but
I am not entirely sure. Hopefully this will not result in any issues, but it
will need testing.
affected #: 1 file (2.0 KB)
--- a/yt/gui/reason/extdirect_repl.py Sat May 21 01:05:31 2011 -0400
+++ b/yt/gui/reason/extdirect_repl.py Mon May 23 09:51:25 2011 -0700
@@ -39,6 +39,7 @@
import base64
import imp
import threading
+import Queue
from yt.funcs import *
from yt.utilities.logger import ytLogger, ufstring
@@ -87,14 +88,59 @@
self.locks[str(func)] = threading.Lock()
@wraps(func)
def locker(*args, **kwargs):
+ print "Acquiring lock on %s" % (str(func))
with self.locks[str(func)]:
rv = func(*args, **kwargs)
- print "Regained lock:", rv
+ print "Regained lock on %s" % (str(func))
return rv
return locker
lockit = MethodLock()
+class ExecutionThread(threading.Thread):
+ def __init__(self, repl):
+ self.repl = repl
+ self.queue = Queue.Queue()
+ threading.Thread.__init__(self)
+ self.daemon = True
+
+ def run(self):
+ while 1:
+ print "Checking for a queue ..."
+ try:
+ task = self.queue.get(True, 10)
+ except (Queue.Full, Queue.Empty):
+ if self.repl.stopped: return
+ continue
+ print "Received the task", task
+ if task['type'] == 'code':
+ self.execute_one(task['code'], task['hide'])
+ self.queue.task_done()
+ elif task['type'] == 'add_widget':
+ print "Adding new widget"
+ self.queue.task_done()
+ new_code = self.repl._add_widget(
+ task['name'], task['widget_data_name'])
+ print "Got this command:", new_code
+ self.repl.execute(new_code, hide=True)
+ print "Executed!"
+
+ def execute_one(self, code, hide):
+ self.repl.executed_cell_texts.append(code)
+
+ result = ProgrammaticREPL.execute(self.repl, code)
+ if self.repl.debug:
+ print "==================== Cell Execution ===================="
+ print code
+ print "==================== ===================="
+ print result
+ print "========================================================"
+ if hide: return
+ self.repl.payload_handler.add_payload(
+ {'type': 'cell_results',
+ 'output': result,
+ 'input': highlighter(code)})
+
def deliver_image(im):
if hasattr(im, 'read'):
img_data = base64.b64encode(im.read())
@@ -150,6 +196,7 @@
server = None
stopped = False
debug = False
+ _heartbeat_timer = None
def __init__(self, base_extjs_path, locals=None):
# First we do the standard initialization
@@ -181,6 +228,7 @@
self.pflist = ExtDirectParameterFileList()
self.executed_cell_texts = []
self.payload_handler = PayloadHandler()
+ self.execution_thread = ExecutionThread(self)
# Now we load up all the yt.mods stuff, but only after we've finished
# setting up.
reason_pylab()
@@ -194,6 +242,8 @@
# Setup our heartbeat
self.last_heartbeat = time.time()
self._check_heartbeat()
+ self.execution_thread.start()
+ if self.debug: time.sleep(3)
def exception_handler(self, exc):
result = {'type': 'cell_results',
@@ -207,6 +257,7 @@
handler.setFormatter(formatter)
ytLogger.addHandler(handler)
+
def index(self):
"""Return an HTTP-based Read-Eval-Print-Loop terminal."""
# For now this doesn't work! We will need to move to a better method
@@ -216,13 +267,13 @@
def heartbeat(self):
self.last_heartbeat = time.time()
- if self.debug: print "### Heartbeat ... started"
- for i in range(30): # The total time to wait
+ if self.debug: print "### Heartbeat ... started: %s" % (time.ctime())
+ for i in range(30):
# Check for stop
- if self.stopped: return {'type':'shutdown'}# No race condition
+ if self.stopped: return {'type':'shutdown'} # No race condition
if self.payload_handler.event.wait(1): # One second timeout
return self.payload_handler.deliver_payloads()
- if self.debug: print "### Heartbeat ... finished"
+ if self.debug: print "### Heartbeat ... finished: %s" % (time.ctime())
return []
def _check_heartbeat(self):
@@ -237,6 +288,7 @@
# server instance by default.
self.shutdown()
return
+ if self._heartbeat_timer is not None: return
self._heartbeat_timer = threading.Timer(10, self._check_heartbeat)
self._heartbeat_timer.start()
@@ -293,22 +345,11 @@
def _highlighter_css(self):
return highlighter_css
- @lockit
def execute(self, code, hide = False):
- self.executed_cell_texts.append(code)
-
- result = ProgrammaticREPL.execute(self, code)
- if self.debug:
- print "==================== Cell Execution ===================="
- print code
- print "==================== ===================="
- print result
- print "========================================================"
- if hide: return
- self.payload_handler.add_payload(
- {'type': 'cell_results',
- 'output': result,
- 'input': highlighter(code)})
+ task = {'type': 'code',
+ 'code': code,
+ 'hide': hide}
+ self.execution_thread.queue.put(task)
def get_history(self):
return self.executed_cell_texts[:]
@@ -375,7 +416,9 @@
return cs
def _add_widget(self, widget_name, widget_data_name = None):
- # This should be sanitized
+ # We need to make sure that we aren't running in advance of a new
+ # object being added.
+ self.execution_thread.queue.join()
widget = self.locals[widget_name]
uu = str(uuid.uuid1()).replace("-","_")
varname = "%s_%s" % (widget._widget_name, uu)
@@ -413,7 +456,9 @@
field_z = field_z, weight = weight)
funccall = "\n".join(line.strip() for line in funccall.splitlines())
self.execute(funccall, hide=True)
- self.execute(self._add_widget('_tpp', '_twidget_data'), hide=True)
+ self.execution_thread.queue.put({'type': 'add_widget',
+ 'name': '_tpp',
+ 'widget_data_name': '_twidget_data'})
@lockit
def create_proj(self, pfname, axis, field, weight, onmax):
@@ -450,7 +495,9 @@
# There is a call to do this, but I have forgotten it ...
funccall = "\n".join((line.strip() for line in funccall.splitlines()))
self.execute(funccall, hide = True)
- self.execute(self._add_widget('_tpw', '_twidget_data'), hide = True)
+ self.execution_thread.queue.put({'type': 'add_widget',
+ 'name': '_tpw',
+ 'widget_data_name': '_twidget_data'})
@lockit
def create_slice(self, pfname, center, axis, field, onmax):
@@ -488,7 +535,9 @@
# There is a call to do this, but I have forgotten it ...
funccall = "\n".join((line.strip() for line in funccall.splitlines()))
self.execute(funccall, hide = True)
- self.execute(self._add_widget('_tpw', '_twidget_data'), hide = True)
+ self.execution_thread.queue.put({'type': 'add_widget',
+ 'name': '_tpw',
+ 'widget_data_name': '_twidget_data'})
@lockit
def create_grid_dataview(self, pfname):
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list