[Yt-svn] commit/yt: MatthewTurk: Trying out using an execution thread for handling REPL executions and other

Bitbucket commits-noreply at bitbucket.org
Mon May 23 09:51:41 PDT 2011


1 new changeset in yt:

http://bitbucket.org/yt_analysis/yt/changeset/b8a4c0e7cd40/
changeset:   r4274:b8a4c0e7cd40
branch:      yt
user:        MatthewTurk
date:        2011-05-23 18:51:25
summary:     Trying out using an execution thread for handling REPL executions and other
tasks, instead of lock/unlock.  This has fixed -- for me -- the issue of
multiple widgets showing up after a long-running project operation.  I believe
that was related to Ext re-trying the call to create_proj multiple times, but
I am not entirely sure.  Hopefully this will not result in any issues, but it
will need testing.
affected #:  1 file (2.0 KB)

--- a/yt/gui/reason/extdirect_repl.py	Sat May 21 01:05:31 2011 -0400
+++ b/yt/gui/reason/extdirect_repl.py	Mon May 23 09:51:25 2011 -0700
@@ -39,6 +39,7 @@
 import base64
 import imp
 import threading
+import Queue
 
 from yt.funcs import *
 from yt.utilities.logger import ytLogger, ufstring
@@ -87,14 +88,59 @@
             self.locks[str(func)] = threading.Lock()
         @wraps(func)
         def locker(*args, **kwargs):
+            print "Acquiring lock on %s" % (str(func))
             with self.locks[str(func)]:
                 rv = func(*args, **kwargs)
-            print "Regained lock:", rv
+            print "Regained lock on %s" % (str(func))
             return rv
         return locker
 
 lockit = MethodLock()
 
+class ExecutionThread(threading.Thread):
+    def __init__(self, repl):
+        self.repl = repl
+        self.queue = Queue.Queue()
+        threading.Thread.__init__(self)
+        self.daemon = True
+
+    def run(self):
+        while 1:
+            print "Checking for a queue ..."
+            try:
+                task = self.queue.get(True, 10)
+            except (Queue.Full, Queue.Empty):
+                if self.repl.stopped: return
+                continue
+            print "Received the task", task
+            if task['type'] == 'code':
+                self.execute_one(task['code'], task['hide'])
+                self.queue.task_done()
+            elif task['type'] == 'add_widget':
+                print "Adding new widget"
+                self.queue.task_done()
+                new_code = self.repl._add_widget(
+                    task['name'], task['widget_data_name'])
+                print "Got this command:", new_code
+                self.repl.execute(new_code, hide=True)
+                print "Executed!"
+
+    def execute_one(self, code, hide):
+        self.repl.executed_cell_texts.append(code)
+
+        result = ProgrammaticREPL.execute(self.repl, code)
+        if self.repl.debug:
+            print "==================== Cell Execution ===================="
+            print code
+            print "====================                ===================="
+            print result
+            print "========================================================"
+        if hide: return
+        self.repl.payload_handler.add_payload(
+            {'type': 'cell_results',
+             'output': result,
+             'input': highlighter(code)})
+
 def deliver_image(im):
     if hasattr(im, 'read'):
         img_data = base64.b64encode(im.read())
@@ -150,6 +196,7 @@
     server = None
     stopped = False
     debug = False
+    _heartbeat_timer = None
 
     def __init__(self, base_extjs_path, locals=None):
         # First we do the standard initialization
@@ -181,6 +228,7 @@
         self.pflist = ExtDirectParameterFileList()
         self.executed_cell_texts = []
         self.payload_handler = PayloadHandler()
+        self.execution_thread = ExecutionThread(self)
         # Now we load up all the yt.mods stuff, but only after we've finished
         # setting up.
         reason_pylab()
@@ -194,6 +242,8 @@
         # Setup our heartbeat
         self.last_heartbeat = time.time()
         self._check_heartbeat()
+        self.execution_thread.start()
+        if self.debug: time.sleep(3)
 
     def exception_handler(self, exc):
         result = {'type': 'cell_results',
@@ -207,6 +257,7 @@
         handler.setFormatter(formatter)
         ytLogger.addHandler(handler)
 
+
     def index(self):
         """Return an HTTP-based Read-Eval-Print-Loop terminal."""
         # For now this doesn't work!  We will need to move to a better method
@@ -216,13 +267,13 @@
 
     def heartbeat(self):
         self.last_heartbeat = time.time()
-        if self.debug: print "### Heartbeat ... started"
-        for i in range(30): # The total time to wait
+        if self.debug: print "### Heartbeat ... started: %s" % (time.ctime())
+        for i in range(30):
             # Check for stop
-            if self.stopped: return {'type':'shutdown'}# No race condition
+            if self.stopped: return {'type':'shutdown'} # No race condition
             if self.payload_handler.event.wait(1): # One second timeout
                 return self.payload_handler.deliver_payloads()
-        if self.debug: print "### Heartbeat ... finished"
+        if self.debug: print "### Heartbeat ... finished: %s" % (time.ctime())
         return []
 
     def _check_heartbeat(self):
@@ -237,6 +288,7 @@
             # server instance by default.
             self.shutdown()
             return
+        if self._heartbeat_timer is not None: return
         self._heartbeat_timer = threading.Timer(10, self._check_heartbeat)
         self._heartbeat_timer.start()
 
@@ -293,22 +345,11 @@
     def _highlighter_css(self):
         return highlighter_css
 
-    @lockit
     def execute(self, code, hide = False):
-        self.executed_cell_texts.append(code)
-
-        result = ProgrammaticREPL.execute(self, code)
-        if self.debug:
-            print "==================== Cell Execution ===================="
-            print code
-            print "====================                ===================="
-            print result
-            print "========================================================"
-        if hide: return
-        self.payload_handler.add_payload(
-            {'type': 'cell_results',
-             'output': result,
-             'input': highlighter(code)})
+            task = {'type': 'code',
+                    'code': code,
+                    'hide': hide}
+            self.execution_thread.queue.put(task)
 
     def get_history(self):
         return self.executed_cell_texts[:]
@@ -375,7 +416,9 @@
         return cs
 
     def _add_widget(self, widget_name, widget_data_name = None):
-        # This should be sanitized
+        # We need to make sure that we aren't running in advance of a new
+        # object being added.
+        self.execution_thread.queue.join()
         widget = self.locals[widget_name]
         uu = str(uuid.uuid1()).replace("-","_")
         varname = "%s_%s" % (widget._widget_name, uu)
@@ -413,7 +456,9 @@
                    field_z = field_z, weight = weight)
         funccall = "\n".join(line.strip() for line in funccall.splitlines())
         self.execute(funccall, hide=True)
-        self.execute(self._add_widget('_tpp', '_twidget_data'), hide=True)
+        self.execution_thread.queue.put({'type': 'add_widget',
+                                         'name': '_tpp',
+                                         'widget_data_name': '_twidget_data'})
 
     @lockit
     def create_proj(self, pfname, axis, field, weight, onmax):
@@ -450,7 +495,9 @@
         # There is a call to do this, but I have forgotten it ...
         funccall = "\n".join((line.strip() for line in funccall.splitlines()))
         self.execute(funccall, hide = True)
-        self.execute(self._add_widget('_tpw', '_twidget_data'), hide = True)
+        self.execution_thread.queue.put({'type': 'add_widget',
+                                         'name': '_tpw',
+                                         'widget_data_name': '_twidget_data'})
 
     @lockit
     def create_slice(self, pfname, center, axis, field, onmax):
@@ -488,7 +535,9 @@
         # There is a call to do this, but I have forgotten it ...
         funccall = "\n".join((line.strip() for line in funccall.splitlines()))
         self.execute(funccall, hide = True)
-        self.execute(self._add_widget('_tpw', '_twidget_data'), hide = True)
+        self.execution_thread.queue.put({'type': 'add_widget',
+                                         'name': '_tpw',
+                                         'widget_data_name': '_twidget_data'})
 
     @lockit
     def create_grid_dataview(self, pfname):

Repository URL: https://bitbucket.org/yt_analysis/yt/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.



More information about the yt-svn mailing list