[Yt-svn] commit/yt: MatthewTurk: Merging; didn't mean to push those other changes, but I think they are mostly/completely stable, so it should be okay.
Bitbucket
commits-noreply at bitbucket.org
Tue Apr 5 21:28:05 PDT 2011
1 new changeset in yt:
http://bitbucket.org/yt_analysis/yt/changeset/f3faeb0d2b79/
changeset: r4091:f3faeb0d2b79
branch: yt
user: MatthewTurk
date: 2011-04-06 06:27:52
summary: Merging; didn't mean to push those other changes, but I think they are mostly/completely stable, so it should be okay.
affected #: 7 files (65.3 KB)
--- a/yt/gui/reason/bottle_mods.py Wed Apr 06 00:26:48 2011 -0400
+++ b/yt/gui/reason/bottle_mods.py Wed Apr 06 00:27:52 2011 -0400
@@ -24,11 +24,12 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-from .bottle import server_names, debug, route, run, request
+from .bottle import server_names, debug, route, run, request, \
+ ServerAdapter
import uuid
from extdirect_router import DirectRouter, DirectProviderDefinition
import json
-import logging
+import logging, threading
from yt.utilities.logger import ytLogger as mylog
from yt.funcs import *
@@ -48,6 +49,8 @@
class PayloadHandler(object):
_shared_state = {}
_hold = False
+ payloads = None
+ lock = None
def __new__(cls, *p, **k):
self = object.__new__(cls, *p, **k)
@@ -55,32 +58,26 @@
return self
def __init__(self):
- self.payloads = []
+ if self.payloads is None: self.payloads = []
+ if self.lock is None: self.lock = threading.Lock()
def deliver_payloads(self):
- if self._hold: return []
- payloads = self.payloads
- self.payloads = []
+ with self.lock:
+ if self._hold: return []
+ payloads = self.payloads
+ self.payloads = []
return payloads
def add_payload(self, to_add):
self.payloads.append(to_add)
-_ph = PayloadHandler()
-
-def append_payloads(func):
- @wraps(func)
- def wrapper(self, *args, **kwargs):
- reset = not _ph._hold
- _ph._hold = True
- func(self, *args, **kwargs)
- # Assume it returns a dict
- if not reset: return
- # In case it sets it manually
- _ph._hold = False
- payloads = _ph.deliver_payloads()
- return payloads
- return wrapper
+class YTRocketServer(ServerAdapter):
+ server_info = {} # Hack to get back at instance vars
+ def run(self, handler):
+ from rocket import Rocket
+ server = Rocket((self.host, self.port), 'wsgi', { 'wsgi_app' : handler })
+ self.server_info[id(self)] = server
+ server.start()
class BottleDirectRouter(DirectRouter):
# This class implements a mechanism for auto-routing an ExtDirect-callable
@@ -104,13 +101,14 @@
def __call__(self):
#print "Hi there, I just got this request:",
val = request.body.read()
- print val
+ #print val
#import pdb;pdb.set_trace()
rv = super(BottleDirectRouter, self).__call__(val)
#print "With this response:", rv
return rv
-def uuid_serve_functions(pre_routed = None, open_browser=False, port=9099):
+def uuid_serve_functions(pre_routed = None, open_browser=False, port=9099,
+ repl = None):
if pre_routed == None: pre_routed = route_functions
debug(mode=True)
token = uuid.uuid1()
@@ -167,15 +165,15 @@
local_browse()
try:
import rocket
- server_name = "rocket"
+ server_type = YTRocketServer
log = logging.getLogger('Rocket')
log.setLevel(logging.INFO)
- kwargs = {'timeout': 600, 'max_threads': 1}
+ kwargs = {'timeout': 600, 'max_threads': 2}
+ if repl is not None:
+ repl.server = YTRocketServer.server_info
except ImportError:
- server_name = "wsgiref"
+ server_type = server_names.get("wsgiref")
kwargs = {}
- server_type = server_names.get(server_name)
server = server_type(host='localhost', port=port, **kwargs)
- #repl.locals['server'] = server
mylog.info("Starting up the server.")
run(server=server)
--- a/yt/gui/reason/extdirect_repl.py Wed Apr 06 00:26:48 2011 -0400
+++ b/yt/gui/reason/extdirect_repl.py Wed Apr 06 00:27:52 2011 -0400
@@ -30,15 +30,17 @@
import logging
import uuid
import numpy as na
+import time
from yt.funcs import *
from yt.utilities.logger import ytLogger, ufstring
from yt.utilities.definitions import inv_axis_names
from .bottle_mods import preroute, BottleDirectRouter, notify_route, \
- PayloadHandler, append_payloads
+ PayloadHandler
from .bottle import response, request, route
from .basic_repl import ProgrammaticREPL
+import threading
try:
import pygments
@@ -59,9 +61,36 @@
local_dir = os.path.dirname(__file__)
+class MethodLock(object):
+ _shared_state = {}
+ locks = None
+
+ def __new__(cls, *p, **k):
+ self = object.__new__(cls, *p, **k)
+ self.__dict__ = cls._shared_state
+ return self
+
+ def __init__(self):
+ if self.locks is None: self.locks = {}
+
+ def __call__(self, func):
+ if str(func) not in self.locks:
+ self.locks[str(func)] = threading.Lock()
+ @wraps(func)
+ def locker(*args, **kwargs):
+ with self.locks[str(func)]:
+ rv = func(*args, **kwargs)
+ print "Regained lock:", rv
+ return rv
+ return locker
+
+lockit = MethodLock()
+
class ExtDirectREPL(ProgrammaticREPL, BottleDirectRouter):
_skip_expose = ('index')
my_name = "ExtDirectREPL"
+ timeout = 660 # a minute longer than the rocket server timeout
+ server = None
def __init__(self, base_extjs_path, locals=None):
# First we do the standard initialization
@@ -97,10 +126,12 @@
self.execute("from yt.data_objects.static_output import _cached_pfs")
self.locals['load_script'] = ext_load_script
self.locals['_widgets'] = {}
- self.locals['add_widget'] = self._add_widget
- self.locals['test_widget'] = self._test_widget
self._setup_logging_handlers()
+ # Setup our heartbeat
+ self.last_heartbeat = time.time()
+ self._check_heartbeat()
+
def _setup_logging_handlers(self):
handler = PayloadLoggingHandler()
formatter = logging.Formatter(ufstring)
@@ -114,8 +145,28 @@
vals = open(os.path.join(local_dir, "html/index.html")).read()
return vals
- def heartbeep(self):
- return {'alive': True}
+ def heartbeat(self):
+ self.last_heartbeat = time.time()
+ return self.payload_handler.deliver_payloads()
+
+ def _check_heartbeat(self):
+ if time.time() - self.last_heartbeat > self.timeout:
+ print "Shutting down after a timeout of %s" % (self.timeout)
+ #sys.exit(0)
+ # Still can't shut down yet, because bottle doesn't return the
+ # server instance by default.
+ self.shutdown()
+ return
+ print "Not shutting down from timeout."
+ self._heartbeat_timer = threading.Timer(self.timeout - 60,
+ self._check_heartbeat)
+ self._heartbeat_timer.start()
+
+ def shutdown(self):
+ if self.server is None:
+ print "Can't shutdown!"
+ self._heartbeat_timer.cancel()
+ for v in self.server.values(): v.stop()
def _help_html(self):
vals = open(os.path.join(local_dir, "html/help.html")).read()
@@ -152,9 +203,10 @@
def _highlighter_css(self):
return highlighter_css
- @append_payloads
+ @lockit
def execute(self, code):
self.executed_cell_texts.append(code)
+
result = ProgrammaticREPL.execute(self, code)
self.payload_handler.add_payload(
{'type': 'cell_results',
@@ -164,6 +216,7 @@
def get_history(self):
return self.executed_cell_texts[:]
+ @lockit
def save_session(self, filename):
if filename.startswith('~'):
filename = os.path.expanduser(filename)
@@ -184,6 +237,7 @@
'error': 'Unexpected error.'}
return {'status': 'SUCCESS', 'filename': filename}
+ @lockit
def paste_session(self):
import xmlrpclib, cStringIO
p = xmlrpclib.ServerProxy(
@@ -196,6 +250,7 @@
site = "http://paste.enzotools.org/show/%s" % ret
return {'status': 'SUCCESS', 'site': site}
+ @lockit
def _session_py(self):
cs = cStringIO.StringIO()
cs.write("\n######\n".join(self.executed_cell_texts))
@@ -203,7 +258,6 @@
response.headers["content-disposition"] = "attachment;"
return cs
- @append_payloads
def _add_widget(self, widget_name):
# This should be sanitized
widget = self.locals[widget_name]
@@ -216,9 +270,9 @@
'widget_type': widget._widget_name,
'varname': varname}
self.payload_handler.add_payload(payload)
- self.execute("%s = %s\n" % (varname, widget_name))
+ return "%s = %s\n" % (varname, widget_name)
- @append_payloads
+ @lockit
def create_proj(self, pfname, axis, field, weight):
if weight == "None": weight = None
else: weight = "'%s'" % (weight)
@@ -234,7 +288,6 @@
_tpw = PWViewerExtJS(_tsl, (DLE[_txax], DRE[_txax], DLE[_tyax], DRE[_tyax]), setup = False)
_tpw._current_field = _tfield
_tpw.set_log(_tfield, True)
- add_widget('_tpw')
""" % dict(pfname = pfname,
axis = inv_axis_names[axis],
weight = weight,
@@ -242,8 +295,9 @@
# There is a call to do this, but I have forgotten it ...
funccall = "\n".join((line.strip() for line in funccall.splitlines()))
self.execute(funccall)
+ self.execute(self._add_widget('_tpw'))
- @append_payloads
+ @lockit
def create_slice(self, pfname, center, axis, field):
funccall = """
_tpf = %(pfname)s
@@ -258,7 +312,6 @@
_tpw = PWViewerExtJS(_tsl, (DLE[_txax], DRE[_txax], DLE[_tyax], DRE[_tyax]), setup = False)
_tpw._current_field = _tfield
_tpw.set_log(_tfield, True)
- add_widget('_tpw')
""" % dict(pfname = pfname,
c1 = float(center[0]),
c2 = float(center[1]),
@@ -268,6 +321,7 @@
# There is a call to do this, but I have forgotten it ...
funccall = "\n".join((line.strip() for line in funccall.splitlines()))
self.execute(funccall)
+ self.execute(self._add_widget('_tpw'))
def _test_widget(self):
class tt(object):
--- a/yt/gui/reason/html/js/functions.js Wed Apr 06 00:26:48 2011 -0400
+++ b/yt/gui/reason/html/js/functions.js Wed Apr 06 00:27:52 2011 -0400
@@ -31,12 +31,14 @@
function cell_finished(result) {
var new_log = false;
+ var cell_resulted = false;
+ var cell;
Ext.each(result,
function(payload, index) {
if (payload['type'] == 'cell_results') {
text = "<pre>"+payload['output']+"</pre>";
formatted_input = payload['input']
- var cell = new_cell(formatted_input, text);
+ cell = new_cell(formatted_input, text);
OutputContainer.add(cell);
OutputContainer.doLayout();
notebook.doLayout();
@@ -47,6 +49,7 @@
OutputContainer.body.dom.scrollHeight -
cell.body.dom.scrollHeight - 20;
}
+ cell_resulted = true;
} else if (payload['type'] == 'png_string') {
OutputContainer.add(new Ext.Panel({
autoEl:{
@@ -76,13 +79,15 @@
widget.accept_results(payload);
}
});
- yt_rpc.ExtDirectParameterFileList.get_list_of_pfs({}, fill_tree);
if (new_log == true){
viewport.get("status-region").getView().focusRow(number_log_records-1);
}
- repl_input.body.removeClass("cell_waiting");
- repl_input.get('input_line').setReadOnly(false);
- repl_input.get("input_line").focus();
+ if (cell_resulted == true) {
+ repl_input.body.removeClass("cell_waiting");
+ repl_input.get('input_line').setReadOnly(false);
+ repl_input.get("input_line").focus();
+ yt_rpc.ExtDirectParameterFileList.get_list_of_pfs({}, fill_tree);
+ }
}
function cell_sent() {
@@ -234,7 +239,6 @@
{code: fcall}, cell_finished);
}
-
function getProjectionHandler(node){
function projectionHandler(item,pressed){
var win = new Ext.Window({
--- a/yt/gui/reason/html/js/reason.js Wed Apr 06 00:26:48 2011 -0400
+++ b/yt/gui/reason/html/js/reason.js Wed Apr 06 00:27:52 2011 -0400
@@ -42,16 +42,6 @@
var cell_count = 0;
var handle_result = function(f, a) {
- var input_line = repl_input.get("input_line")
- if (a.result == null) {
- formatted_input = input_line.getValue();
- text = "ERROR";
- var cell = new_cell(formatted_input, text);
- OutputContainer.add(cell);
- notebook.doLayout();
- input_line.setValue("");
- return;
- }
cell_finished(a.result);
}
@@ -290,7 +280,7 @@
reader: new Ext.data.ArrayReader({}, [{name: 'record'}]),
});
- var heartbeep_request = false;
+ var heartbeat_request = false;
var task_runner = new Ext.util.TaskRunner();
@@ -402,17 +392,19 @@
} else {
repl_input.get("input_line").focus(); }
- /* Set up the heartbeep */
- var heartbeep = {
+ /* Set up the heartbeat */
+ var num = 0;
+ var heartbeat = {
run:
- function(){ if (heartbeep_request == true) return;
- yt_rpc.ExtDirectREPL.heartbeep(
+ function(){ if (heartbeat_request == true) return;
+ yt_rpc.ExtDirectREPL.heartbeat(
{}, function(f, a) {
- heartbeep_request = false;
- if (f['alive'] == true) {
+ heartbeat_request = false;
+ if (f != null) {
+ handle_result(f, a);
}})},
- interval: 5000};
+ interval: 1000};
- //task_runner.start(heartbeep);
+ task_runner.start(heartbeat);
});
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/yt/gui/reason/rocket.py Wed Apr 06 00:27:52 2011 -0400
@@ -0,0 +1,1992 @@
+# -*- coding: utf-8 -*-
+
+# This file is part of the Rocket Web Server
+# Copyright (c) 2011 Timothy Farrell
+
+# Import System Modules
+import sys
+import errno
+import socket
+import logging
+import platform
+
+# Define Constants
+VERSION = '1.2.3'
+SERVER_NAME = socket.gethostname()
+SERVER_SOFTWARE = 'Rocket %s' % VERSION
+HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0])
+BUF_SIZE = 16384
+SOCKET_TIMEOUT = 1 # in secs
+THREAD_STOP_CHECK_INTERVAL = 1 # in secs, How often should threads check for a server stop message?
+IS_JYTHON = platform.system() == 'Java' # Handle special cases for Jython
+IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET])
+DEFAULT_LISTEN_QUEUE_SIZE = 5
+DEFAULT_MIN_THREADS = 10
+DEFAULT_MAX_THREADS = 0
+DEFAULTS = dict(LISTEN_QUEUE_SIZE = DEFAULT_LISTEN_QUEUE_SIZE,
+ MIN_THREADS = DEFAULT_MIN_THREADS,
+ MAX_THREADS = DEFAULT_MAX_THREADS)
+
+PY3K = sys.version_info[0] > 2
+
+class NullHandler(logging.Handler):
+ "A Logging handler to prevent library errors."
+ def emit(self, record):
+ pass
+
+if PY3K:
+ def b(val):
+ """ Convert string/unicode/bytes literals into bytes. This allows for
+ the same code to run on Python 2.x and 3.x. """
+ if isinstance(val, str):
+ return val.encode()
+ else:
+ return val
+
+ def u(val, encoding="us-ascii"):
+ """ Convert bytes into string/unicode. This allows for the
+ same code to run on Python 2.x and 3.x. """
+ if isinstance(val, bytes):
+ return val.decode(encoding)
+ else:
+ return val
+
+else:
+ def b(val):
+ """ Convert string/unicode/bytes literals into bytes. This allows for
+ the same code to run on Python 2.x and 3.x. """
+ if isinstance(val, unicode):
+ return val.encode()
+ else:
+ return val
+
+ def u(val, encoding="us-ascii"):
+ """ Convert bytes into string/unicode. This allows for the
+ same code to run on Python 2.x and 3.x. """
+ if isinstance(val, str):
+ return val.decode(encoding)
+ else:
+ return val
+
+# Import Package Modules
+# package imports removed in monolithic build
+
+__all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE',
+ 'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u',
+ 'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler']
+
+# Monolithic build...end of module: rocket/__init__.py
+# Monolithic build...start of module: rocket/connection.py
+
+# Import System Modules
+import sys
+import time
+import socket
+try:
+ import ssl
+ has_ssl = True
+except ImportError:
+ has_ssl = False
+# Import Package Modules
+# package imports removed in monolithic build
+# TODO - This part is still very experimental.
+#from .filelike import FileLikeSocket
+
+class Connection(object):
+ __slots__ = [
+ 'setblocking',
+ 'sendall',
+ 'shutdown',
+ 'makefile',
+ 'fileno',
+ 'client_addr',
+ 'client_port',
+ 'server_port',
+ 'socket',
+ 'start_time',
+ 'ssl',
+ 'secure',
+ 'recv',
+ 'send',
+ 'read',
+ 'write'
+ ]
+
+ def __init__(self, sock_tuple, port, secure=False):
+ self.client_addr, self.client_port = sock_tuple[1]
+ self.server_port = port
+ self.socket = sock_tuple[0]
+ self.start_time = time.time()
+ self.ssl = has_ssl and isinstance(self.socket, ssl.SSLSocket)
+ self.secure = secure
+
+ if IS_JYTHON:
+ # In Jython we must set TCP_NODELAY here since it does not
+ # inherit from the listening socket.
+ # See: http://bugs.jython.org/issue1309
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+ self.socket.settimeout(SOCKET_TIMEOUT)
+
+ self.sendall = self.socket.sendall
+ self.shutdown = self.socket.shutdown
+ self.fileno = self.socket.fileno
+ self.setblocking = self.socket.setblocking
+ self.recv = self.socket.recv
+ self.send = self.socket.send
+ self.makefile = self.socket.makefile
+
+# FIXME - this is not ready for prime-time yet.
+# def makefile(self, buf_size=BUF_SIZE):
+# return FileLikeSocket(self, buf_size)
+
+ def close(self):
+ if hasattr(self.socket, '_sock'):
+ try:
+ self.socket._sock.close()
+ except socket.error:
+ info = sys.exc_info()
+ if info[1].args[0] != socket.EBADF:
+ raise info[1]
+ else:
+ pass
+ self.socket.close()
+
+
+# Monolithic build...end of module: rocket/connection.py
+# Monolithic build...start of module: rocket/filelike.py
+
+# Import System Modules
+import socket
+try:
+ from io import StringIO
+except ImportError:
+ try:
+ from cStringIO import StringIO
+ except ImportError:
+ from StringIO import StringIO
+# Import Package Modules
+# package imports removed in monolithic build
+
+class FileLikeSocket(object):
+ def __init__(self, conn, buf_size=BUF_SIZE):
+ self.conn = conn
+ self.buf_size = buf_size
+ self.buffer = StringIO()
+ self.content_length = None
+
+ if self.conn.socket.gettimeout() == 0.0:
+ self.read = self.non_blocking_read
+ else:
+ self.read = self.blocking_read
+
+ def __iter__(self):
+ return self
+
+ def recv(self, size):
+ while True:
+ try:
+ return self.conn.recv(size)
+ except socket.error:
+ exc = sys.exc_info()
+ e = exc[1]
+ # FIXME - Don't raise socket_errors_nonblocking or socket_error_eintr
+ if (e.args[0] not in set()):
+ raise
+
+ def next(self):
+ data = self.readline()
+ if data == '':
+ raise StopIteration
+ return data
+
+ def non_blocking_read(self, size=None):
+ # Shamelessly adapted from Cherrypy!
+ bufr = self.buffer
+ bufr.seek(0, 2)
+ if size is None:
+ while True:
+ data = self.recv(self.buf_size)
+ if not data:
+ break
+ bufr.write(data)
+
+ self.buffer = StringIO()
+
+ return bufr.getvalue()
+ else:
+ buf_len = self.buffer.tell()
+ if buf_len >= size:
+ bufr.seek(0)
+ data = bufr.read(size)
+ self.buffer = StringIO(bufr.read())
+ return data
+
+ self.buffer = StringIO()
+ while True:
+ remaining = size - buf_len
+ data = self.recv(remaining)
+
+ if not data:
+ break
+
+ n = len(data)
+ if n == size and not buf_len:
+ return data
+
+ if n == remaining:
+ bufr.write(data)
+ del data
+ break
+
+ bufr.write(data)
+ buf_len += n
+ del data
+
+ return bufr.getvalue()
+
+ def blocking_read(self, length=None):
+ if length is None:
+ if self.content_length is not None:
+ length = self.content_length
+ else:
+ length = 1
+
+ try:
+ data = self.conn.recv(length)
+ except:
+ data = b('')
+
+ return data
+
+ def readline(self):
+ data = b("")
+ char = self.read(1)
+ while char != b('\n') and char is not b(''):
+ line = repr(char)
+ data += char
+ char = self.read(1)
+ data += char
+ return data
+
+ def readlines(self, hint="ignored"):
+ return list(self)
+
+ def close(self):
+ self.conn = None
+ self.content_length = None
+
+# Monolithic build...end of module: rocket/filelike.py
+# Monolithic build...start of module: rocket/futures.py
+
+# Import System Modules
+import time
+try:
+ from concurrent.futures import Future, ThreadPoolExecutor
+ from concurrent.futures.thread import _WorkItem
+ has_futures = True
+except ImportError:
+ has_futures = False
+
+ class Future:
+ pass
+
+ class ThreadPoolExecutor:
+ pass
+
+ class _WorkItem:
+ pass
+
+
+class WSGIFuture(Future):
+ def __init__(self, f_dict, *args, **kwargs):
+ Future.__init__(self, *args, **kwargs)
+
+ self.timeout = None
+
+ self._mem_dict = f_dict
+ self._lifespan = 30
+ self._name = None
+ self._start_time = time.time()
+
+ def set_running_or_notify_cancel(self):
+ if time.time() - self._start_time >= self._lifespan:
+ self.cancel()
+ else:
+ return super(WSGIFuture, self).set_running_or_notify_cancel()
+
+
+ def remember(self, name, lifespan=None):
+ self._lifespan = lifespan or self._lifespan
+
+ if name in self._mem_dict:
+ raise NameError('Cannot remember future by name "%s". ' % name + \
+ 'A future already exists with that name.' )
+ self._name = name
+ self._mem_dict[name] = self
+
+ return self
+
+ def forget(self):
+ if self._name in self._mem_dict and self._mem_dict[self._name] is self:
+ del self._mem_dict[self._name]
+ self._name = None
+
+class _WorkItem(object):
+ def __init__(self, future, fn, args, kwargs):
+ self.future = future
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
+
+ def run(self):
+ if not self.future.set_running_or_notify_cancel():
+ return
+
+ try:
+ result = self.fn(*self.args, **self.kwargs)
+ except BaseException:
+ e = sys.exc_info()[1]
+ self.future.set_exception(e)
+ else:
+ self.future.set_result(result)
+
+class WSGIExecutor(ThreadPoolExecutor):
+ multithread = True
+ multiprocess = False
+
+ def __init__(self, *args, **kwargs):
+ ThreadPoolExecutor.__init__(self, *args, **kwargs)
+
+ self.futures = dict()
+
+ def submit(self, fn, *args, **kwargs):
+ if self._shutdown_lock.acquire():
+ if self._shutdown:
+ self._shutdown_lock.release()
+ raise RuntimeError('Cannot schedule new futures after shutdown')
+
+ f = WSGIFuture(self.futures)
+ w = _WorkItem(f, fn, args, kwargs)
+
+ self._work_queue.put(w)
+ self._adjust_thread_count()
+ self._shutdown_lock.release()
+ return f
+ else:
+ return False
+
+class FuturesMiddleware(object):
+ "Futures middleware that adds a Futures Executor to the environment"
+ def __init__(self, app, threads=5):
+ self.app = app
+ self.executor = WSGIExecutor(threads)
+
+ def __call__(self, environ, start_response):
+ environ["wsgiorg.executor"] = self.executor
+ environ["wsgiorg.futures"] = self.executor.futures
+ return self.app(environ, start_response)
+
+# Monolithic build...end of module: rocket/futures.py
+# Monolithic build...start of module: rocket/listener.py
+
+# Import System Modules
+import os
+import socket
+import logging
+import traceback
+from threading import Thread
+
+try:
+ import ssl
+ from ssl import SSLError
+ has_ssl = True
+except ImportError:
+ has_ssl = False
+ class SSLError(socket.error):
+ pass
+# Import Package Modules
+# package imports removed in monolithic build
+
+class Listener(Thread):
+ """The Listener class is a class responsible for accepting connections
+ and queuing them to be processed by a worker thread."""
+
+ def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
+ Thread.__init__(self, *args, **kwargs)
+
+ # Instance variables
+ self.active_queue = active_queue
+ self.interface = interface
+ self.addr = interface[0]
+ self.port = interface[1]
+ self.secure = len(interface) == 4 and \
+ os.path.exists(interface[2]) and \
+ os.path.exists(interface[3])
+ self.thread = None
+ self.ready = False
+
+ # Error Log
+ self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port)
+ self.err_log.addHandler(NullHandler())
+
+ # Build the socket
+ listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ if not listener:
+ self.err_log.error("Failed to get socket.")
+ return
+
+ if self.secure:
+ if not has_ssl:
+ self.err_log.error("ssl module required to serve HTTPS.")
+ return
+ elif not os.path.exists(interface[2]):
+ data = (interface[2], interface[0], interface[1])
+ self.err_log.error("Cannot find key file "
+ "'%s'. Cannot bind to %s:%s" % data)
+ return
+ elif not os.path.exists(interface[3]):
+ data = (interface[3], interface[0], interface[1])
+ self.err_log.error("Cannot find certificate file "
+ "'%s'. Cannot bind to %s:%s" % data)
+ return
+
+ # Set socket options
+ try:
+ listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except:
+ msg = "Cannot share socket. Using %s:%i exclusively."
+ self.err_log.warning(msg % (self.addr, self.port))
+
+ try:
+ if not IS_JYTHON:
+ listener.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_NODELAY,
+ 1)
+ except:
+ msg = "Cannot set TCP_NODELAY, things might run a little slower"
+ self.err_log.warning(msg)
+
+ try:
+ listener.bind((self.addr, self.port))
+ except:
+ msg = "Socket %s:%i in use by other process and it won't share."
+ self.err_log.error(msg % (self.addr, self.port))
+ else:
+ # We want socket operations to timeout periodically so we can
+ # check if the server is shutting down
+ listener.settimeout(THREAD_STOP_CHECK_INTERVAL)
+ # Listen for new connections allowing queue_size number of
+ # connections to wait before rejecting a connection.
+ listener.listen(queue_size)
+
+ self.listener = listener
+
+ self.ready = True
+
+ def wrap_socket(self, sock):
+ try:
+ sock = ssl.wrap_socket(sock,
+ keyfile = self.interface[2],
+ certfile = self.interface[3],
+ server_side = True,
+ ssl_version = ssl.PROTOCOL_SSLv23)
+ except SSLError:
+ # Generally this happens when an HTTP request is received on a
+ # secure socket. We don't do anything because it will be detected
+ # by Worker and dealt with appropriately.
+ pass
+
+ return sock
+
+ def start(self):
+ if not self.ready:
+ self.err_log.warning('Listener started when not ready.')
+ return
+
+ if self.thread is not None and self.thread.isAlive():
+ self.err_log.warning('Listener already running.')
+ return
+
+ self.thread = Thread(target=self.listen, name="Port" + str(self.port))
+
+ self.thread.start()
+
+ def isAlive(self):
+ if self.thread is None:
+ return False
+
+ return self.thread.isAlive()
+
+ def join(self):
+ if self.thread is None:
+ return
+
+ self.ready = False
+
+ self.thread.join()
+
+ del self.thread
+ self.thread = None
+ self.ready = True
+
+ def listen(self):
+ if __debug__:
+ self.err_log.debug('Entering main loop.')
+ while True:
+ try:
+ sock, addr = self.listener.accept()
+
+ if self.secure:
+ sock = self.wrap_socket(sock)
+
+ self.active_queue.put(((sock, addr),
+ self.interface[1],
+ self.secure))
+
+ except socket.timeout:
+ # socket.timeout will be raised every THREAD_STOP_CHECK_INTERVAL
+ # seconds. When that happens, we check if it's time to die.
+
+ if not self.ready:
+ if __debug__:
+ self.err_log.debug('Listener exiting.')
+ return
+ else:
+ continue
+ except:
+ self.err_log.error(str(traceback.format_exc()))
+
+# Monolithic build...end of module: rocket/listener.py
+# Monolithic build...start of module: rocket/main.py
+
+# Import System Modules
+import sys
+import time
+import socket
+import logging
+import traceback
+from threading import Lock
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+
+# Import Package Modules
+# package imports removed in monolithic build
+
+
+
+
+
+# Setup Logging
+log = logging.getLogger('Rocket')
+log.addHandler(NullHandler())
+
+class Rocket(object):
+ """The Rocket class is responsible for handling threads and accepting and
+ dispatching connections."""
+
+ def __init__(self,
+ interfaces = ('127.0.0.1', 8000),
+ method = 'wsgi',
+ app_info = None,
+ min_threads = None,
+ max_threads = None,
+ queue_size = None,
+ timeout = 600,
+ handle_signals = True):
+
+ self.handle_signals = handle_signals
+ self.startstop_lock = Lock()
+ self.timeout = timeout
+
+ if not isinstance(interfaces, list):
+ self.interfaces = [interfaces]
+ else:
+ self.interfaces = interfaces
+
+ if min_threads is None:
+ min_threads = DEFAULTS['MIN_THREADS']
+
+ if max_threads is None:
+ max_threads = DEFAULTS['MAX_THREADS']
+
+ if not queue_size:
+ if hasattr(socket, 'SOMAXCONN'):
+ queue_size = socket.SOMAXCONN
+ else:
+ queue_size = DEFAULTS['LISTEN_QUEUE_SIZE']
+
+ if max_threads and queue_size > max_threads:
+ queue_size = max_threads
+
+ if isinstance(app_info, dict):
+ app_info['server_software'] = SERVER_SOFTWARE
+
+ self.monitor_queue = Queue()
+ self.active_queue = Queue()
+
+ self._threadpool = ThreadPool(get_method(method),
+ app_info = app_info,
+ active_queue = self.active_queue,
+ monitor_queue = self.monitor_queue,
+ min_threads = min_threads,
+ max_threads = max_threads)
+
+ # Build our socket listeners
+ self.listeners = [Listener(i, queue_size, self.active_queue) for i in self.interfaces]
+ for ndx in range(len(self.listeners)-1, 0, -1):
+ if not self.listeners[ndx].ready:
+ del self.listeners[ndx]
+
+ if not self.listeners:
+ log.critical("No interfaces to listen on...closing.")
+ sys.exit(1)
+
+ def _sigterm(self, signum, frame):
+ log.info('Received SIGTERM')
+ self.stop()
+
+ def _sighup(self, signum, frame):
+ log.info('Received SIGHUP')
+ self.restart()
+
+ def start(self, background=False):
+ log.info('Starting %s' % SERVER_SOFTWARE)
+
+ self.startstop_lock.acquire()
+
+ try:
+ # Set up our shutdown signals
+ if self.handle_signals:
+ try:
+ import signal
+ signal.signal(signal.SIGTERM, self._sigterm)
+ signal.signal(signal.SIGUSR1, self._sighup)
+ except:
+ log.debug('This platform does not support signals.')
+
+ # Start our worker threads
+ self._threadpool.start()
+
+ # Start our monitor thread
+ self._monitor = Monitor(self.monitor_queue,
+ self.active_queue,
+ self.timeout,
+ self._threadpool)
+ self._monitor.setDaemon(True)
+ self._monitor.start()
+
+ # I know that EXPR and A or B is bad but I'm keeping it for Py2.4
+ # compatibility.
+ str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '')
+
+ msg = 'Listening on sockets: '
+ msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners])
+ log.info(msg)
+
+ for l in self.listeners:
+ l.start()
+
+ finally:
+ self.startstop_lock.release()
+
+ if background:
+ return
+
+ while self._monitor.isAlive():
+ try:
+ time.sleep(THREAD_STOP_CHECK_INTERVAL)
+ except KeyboardInterrupt:
+ # Capture a keyboard interrupt when running from a console
+ break
+ except:
+ if self._monitor.isAlive():
+ log.error(str(traceback.format_exc()))
+ continue
+
+ return self.stop()
+
+ def stop(self, stoplogging = False):
+ log.info('Stopping %s' % SERVER_SOFTWARE)
+
+ self.startstop_lock.acquire()
+
+ try:
+ # Stop listeners
+ for l in self.listeners:
+ l.ready = False
+
+ # Encourage a context switch
+ time.sleep(0.01)
+
+ for l in self.listeners:
+ if l.isAlive():
+ l.join()
+
+ # Stop Monitor
+ self._monitor.stop()
+ if self._monitor.isAlive():
+ self._monitor.join()
+
+ # Stop Worker threads
+ self._threadpool.stop()
+
+ if stoplogging:
+ logging.shutdown()
+ msg = "Calling logging.shutdown() is now the responsibility of \
+ the application developer. Please update your \
+ applications to no longer call rocket.stop(True)"
+ try:
+ import warnings
+ raise warnings.DeprecationWarning(msg)
+ except ImportError:
+ raise RuntimeError(msg)
+
+ finally:
+ self.startstop_lock.release()
+
+ def restart(self):
+ self.stop()
+ self.start()
+
+def CherryPyWSGIServer(bind_addr,
+ wsgi_app,
+ numthreads = 10,
+ server_name = None,
+ max = -1,
+ request_queue_size = 5,
+ timeout = 10,
+ shutdown_timeout = 5):
+ """ A Cherrypy wsgiserver-compatible wrapper. """
+ max_threads = max
+ if max_threads < 0:
+ max_threads = 0
+ return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app},
+ min_threads = numthreads,
+ max_threads = max_threads,
+ queue_size = request_queue_size,
+ timeout = timeout)
+
+# Monolithic build...end of module: rocket/main.py
+# Monolithic build...start of module: rocket/monitor.py
+
+# Import System Modules
+import time
+import logging
+import select
+from threading import Thread
+
+# Import Package Modules
+# package imports removed in monolithic build
+
+class Monitor(Thread):
+ # Monitor worker class.
+
+ def __init__(self,
+ monitor_queue,
+ active_queue,
+ timeout,
+ threadpool,
+ *args,
+ **kwargs):
+
+ Thread.__init__(self, *args, **kwargs)
+
+ self._threadpool = threadpool
+
+ # Instance Variables
+ self.monitor_queue = monitor_queue
+ self.active_queue = active_queue
+ self.timeout = timeout
+
+ self.log = logging.getLogger('Rocket.Monitor')
+ self.log.addHandler(NullHandler())
+
+ self.connections = set()
+ self.active = False
+
+ def run(self):
+ self.active = True
+ conn_list = list()
+ list_changed = False
+
+ # We need to make sure the queue is empty before we start
+ while not self.monitor_queue.empty():
+ self.monitor_queue.get()
+
+ if __debug__:
+ self.log.debug('Entering monitor loop.')
+
+ # Enter thread main loop
+ while self.active:
+
+ # Move the queued connections to the selection pool
+ while not self.monitor_queue.empty():
+ if __debug__:
+ self.log.debug('In "receive timed-out connections" loop.')
+
+ c = self.monitor_queue.get()
+
+ if c is None:
+ # A non-client is a signal to die
+ if __debug__:
+ self.log.debug('Received a death threat.')
+ self.stop()
+ break
+
+ self.log.debug('Received a timed out connection.')
+
+ if __debug__:
+ assert(c not in self.connections)
+
+ if IS_JYTHON:
+ # Jython requires a socket to be in Non-blocking mode in
+ # order to select on it.
+ c.setblocking(False)
+
+ if __debug__:
+ self.log.debug('Adding connection to monitor list.')
+
+ self.connections.add(c)
+ list_changed = True
+
+ # Wait on those connections
+ if list_changed:
+ conn_list = list(self.connections)
+ list_changed = False
+
+ try:
+ if len(conn_list):
+ readable = select.select(conn_list,
+ [],
+ [],
+ THREAD_STOP_CHECK_INTERVAL)[0]
+ else:
+ time.sleep(THREAD_STOP_CHECK_INTERVAL)
+ readable = []
+
+ if not self.active:
+ break
+
+ # If we have any readable connections, put them back
+ for r in readable:
+ if __debug__:
+ self.log.debug('Restoring readable connection')
+
+ if IS_JYTHON:
+ # Jython requires a socket to be in Non-blocking mode in
+ # order to select on it, but the rest of the code requires
+ # that it be in blocking mode.
+ r.setblocking(True)
+
+ r.start_time = time.time()
+ self.active_queue.put(r)
+
+ self.connections.remove(r)
+ list_changed = True
+
+ except:
+ if self.active:
+ raise
+ else:
+ break
+
+ # If we have any stale connections, kill them off.
+ if self.timeout:
+ now = time.time()
+ stale = set()
+ for c in self.connections:
+ if (now - c.start_time) >= self.timeout:
+ stale.add(c)
+
+ for c in stale:
+ if __debug__:
+ # "EXPR and A or B" kept for Py2.4 compatibility
+ data = (c.client_addr, c.server_port, c.ssl and '*' or '')
+ self.log.debug('Flushing stale connection: %s:%i%s' % data)
+
+ self.connections.remove(c)
+ list_changed = True
+
+ try:
+ c.close()
+ finally:
+ del c
+
+ # Dynamically resize the threadpool to adapt to our changing needs.
+ self._threadpool.dynamic_resize()
+
+
+ def stop(self):
+ self.active = False
+
+ if __debug__:
+ self.log.debug('Flushing waiting connections')
+
+ while self.connections:
+ c = self.connections.pop()
+ try:
+ c.close()
+ finally:
+ del c
+
+ if __debug__:
+ self.log.debug('Flushing queued connections')
+
+ while not self.monitor_queue.empty():
+ c = self.monitor_queue.get()
+
+ if c is None:
+ continue
+
+ try:
+ c.close()
+ finally:
+ del c
+
+ # Place a None sentry value to cause the monitor to die.
+ self.monitor_queue.put(None)
+
+# Monolithic build...end of module: rocket/monitor.py
+# Monolithic build...start of module: rocket/threadpool.py
+
+# Import System Modules
+import logging
+# Import Package Modules
+# package imports removed in monolithic build
+
+
+# Setup Logging
+log = logging.getLogger('Rocket.Errors.ThreadPool')
+log.addHandler(NullHandler())
+
+class ThreadPool:
+ """The ThreadPool class is a container class for all the worker threads. It
+ manages the number of actively running threads."""
+
+ def __init__(self,
+ method,
+ app_info,
+ active_queue,
+ monitor_queue,
+ min_threads=DEFAULTS['MIN_THREADS'],
+ max_threads=DEFAULTS['MAX_THREADS'],
+ ):
+
+ if __debug__:
+ log.debug("Initializing ThreadPool.")
+
+ self.check_for_dead_threads = 0
+ self.active_queue = active_queue
+
+ self.worker_class = method
+ self.min_threads = min_threads
+ self.max_threads = max_threads
+ self.monitor_queue = monitor_queue
+ self.stop_server = False
+ self.alive = False
+
+ # TODO - Optimize this based on some real-world usage data
+ self.grow_threshold = int(max_threads/10) + 2
+
+ if not isinstance(app_info, dict):
+ app_info = dict()
+
+ if has_futures and app_info.get('futures'):
+ app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'],
+ 2]))
+
+ app_info.update(max_threads=max_threads,
+ min_threads=min_threads)
+
+ self.min_threads = min_threads
+ self.app_info = app_info
+
+ self.threads = set()
+
+ def start(self):
+ self.stop_server = False
+ if __debug__:
+ log.debug("Starting threads.")
+
+ self.grow(self.min_threads)
+
+ self.alive = True
+
+ def stop(self):
+ self.alive = False
+
+ if __debug__:
+ log.debug("Stopping threads.")
+
+ self.stop_server = True
+
+ # Prompt the threads to die
+ self.shrink(len(self.threads))
+
+ # Stop futures initially
+ if has_futures and self.app_info.get('futures'):
+ if __debug__:
+ log.debug("Future executor is present. Python will not "
+ "exit until all jobs have finished.")
+ self.app_info['executor'].shutdown(wait=False)
+
+ # Give them the gun
+ #active_threads = [t for t in self.threads if t.isAlive()]
+ #while active_threads:
+ # t = active_threads.pop()
+ # t.kill()
+
+ # Wait until they pull the trigger
+ for t in self.threads:
+ if t.isAlive():
+ t.join()
+
+ # Clean up the mess
+ self.bring_out_your_dead()
+
+ def bring_out_your_dead(self):
+ # Remove dead threads from the pool
+
+ dead_threads = [t for t in self.threads if not t.isAlive()]
+ for t in dead_threads:
+ if __debug__:
+ log.debug("Removing dead thread: %s." % t.getName())
+ try:
+ # Py2.4 complains here so we put it in a try block
+ self.threads.remove(t)
+ except:
+ pass
+ self.check_for_dead_threads -= len(dead_threads)
+
+ def grow(self, amount=None):
+ if self.stop_server:
+ return
+
+ if not amount:
+ amount = self.max_threads
+
+ if self.alive:
+ amount = min([amount, self.max_threads - len(self.threads)])
+
+ if __debug__:
+ log.debug("Growing by %i." % amount)
+
+ for x in range(amount):
+ worker = self.worker_class(self.app_info,
+ self.active_queue,
+ self.monitor_queue)
+
+ worker.setDaemon(True)
+ self.threads.add(worker)
+ worker.start()
+
+ def shrink(self, amount=1):
+ if __debug__:
+ log.debug("Shrinking by %i." % amount)
+
+ self.check_for_dead_threads += amount
+
+ for x in range(amount):
+ self.active_queue.put(None)
+
+ def dynamic_resize(self):
+ if (self.max_threads > self.min_threads or self.max_threads == 0):
+ if self.check_for_dead_threads > 0:
+ self.bring_out_your_dead()
+
+ queueSize = self.active_queue.qsize()
+ threadCount = len(self.threads)
+
+ if __debug__:
+ log.debug("Examining ThreadPool. %i threads and %i Q'd conxions"
+ % (threadCount, queueSize))
+
+ if queueSize == 0 and threadCount > self.min_threads:
+ self.shrink()
+
+ elif queueSize > self.grow_threshold:
+
+ self.grow(queueSize)
+
+# Monolithic build...end of module: rocket/threadpool.py
+# Monolithic build...start of module: rocket/worker.py
+
+# Import System Modules
+import re
+import sys
+import socket
+import logging
+import traceback
+from wsgiref.headers import Headers
+from threading import Thread
+from datetime import datetime
+
+try:
+ from urllib import unquote
+except ImportError:
+ from urllib.parse import unquote
+
+try:
+ from io import StringIO
+except ImportError:
+ try:
+ from cStringIO import StringIO
+ except ImportError:
+ from StringIO import StringIO
+
+try:
+ from ssl import SSLError
+except ImportError:
+ class SSLError(socket.error):
+ pass
+# Import Package Modules
+# package imports removed in monolithic build
+
+
+# Define Constants
+re_SLASH = re.compile('%2F', re.IGNORECASE)
+re_REQUEST_LINE = re.compile(r"""^
+(?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method
+\ # (single space)
+(
+ (?P<scheme>[^:/]+) # Scheme
+ (://) #
+ (?P<host>[^/]+) # Host
+)? #
+(?P<path>(\*|/[^ \?]*)) # Path
+(\? (?P<query_string>[^ ]+))? # Query String
+\ # (single space)
+(?P<protocol>HTTPS?/1\.[01]) # Protocol
+$
+""", re.X)
+LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s'
+RESPONSE = '''\
+HTTP/1.1 %s
+Content-Length: %i
+Content-Type: %s
+
+%s
+'''
+if IS_JYTHON:
+ HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'])
+
+class Worker(Thread):
+ """The Worker class is a base class responsible for receiving connections
+ and (a subclass) will run an application to process the the connection """
+
+ def __init__(self,
+ app_info,
+ active_queue,
+ monitor_queue,
+ *args,
+ **kwargs):
+
+ Thread.__init__(self, *args, **kwargs)
+
+ # Instance Variables
+ self.app_info = app_info
+ self.active_queue = active_queue
+ self.monitor_queue = monitor_queue
+
+ self.size = 0
+ self.status = "200 OK"
+ self.closeConnection = True
+ self.request_line = ""
+
+ # Request Log
+ self.req_log = logging.getLogger('Rocket.Requests')
+ self.req_log.addHandler(NullHandler())
+
+ # Error Log
+ self.err_log = logging.getLogger('Rocket.Errors.'+self.getName())
+ self.err_log.addHandler(NullHandler())
+
+ def _handleError(self, typ, val, tb):
+ if typ == SSLError:
+ if 'timed out' in val.args[0]:
+ typ = SocketTimeout
+ if typ == SocketTimeout:
+ if __debug__:
+ self.err_log.debug('Socket timed out')
+ self.monitor_queue.put(self.conn)
+ return True
+ if typ == SocketClosed:
+ self.closeConnection = True
+ if __debug__:
+ self.err_log.debug('Client closed socket')
+ return False
+ if typ == BadRequest:
+ self.closeConnection = True
+ if __debug__:
+ self.err_log.debug('Client sent a bad request')
+ return True
+ if typ == socket.error:
+ self.closeConnection = True
+ if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
+ if __debug__:
+ self.err_log.debug('Ignorable socket Error received...'
+ 'closing connection.')
+ return False
+ else:
+ self.status = "999 Utter Server Failure"
+ tb_fmt = traceback.format_exception(typ, val, tb)
+ self.err_log.error('Unhandled Error when serving '
+ 'connection:\n' + '\n'.join(tb_fmt))
+ return False
+
+ self.closeConnection = True
+ tb_fmt = traceback.format_exception(typ, val, tb)
+ self.err_log.error('\n'.join(tb_fmt))
+ self.send_response('500 Server Error')
+ return False
+
+ def run(self):
+ if __debug__:
+ self.err_log.debug('Entering main loop.')
+
+ # Enter thread main loop
+ while True:
+ conn = self.active_queue.get()
+
+ if not conn:
+ # A non-client is a signal to die
+ if __debug__:
+ self.err_log.debug('Received a death threat.')
+ return conn
+
+ if isinstance(conn, tuple):
+ conn = Connection(*conn)
+
+ self.conn = conn
+
+ if conn.ssl != conn.secure:
+ self.err_log.info('Received HTTP connection on HTTPS port.')
+ self.send_response('400 Bad Request')
+ self.closeConnection = True
+ conn.close()
+ continue
+ else:
+ if __debug__:
+ self.err_log.debug('Received a connection.')
+ self.closeConnection = False
+
+ # Enter connection serve loop
+ while True:
+ if __debug__:
+ self.err_log.debug('Serving a request')
+ try:
+ self.run_app(conn)
+ log_info = dict(client_ip = conn.client_addr,
+ time = datetime.now().strftime('%c'),
+ status = self.status.split(' ')[0],
+ size = self.size,
+ request_line = self.request_line)
+ self.req_log.info(LOG_LINE % log_info)
+ except:
+ exc = sys.exc_info()
+ handled = self._handleError(*exc)
+ if handled:
+ break
+ else:
+ if self.request_line:
+ log_info = dict(client_ip = conn.client_addr,
+ time = datetime.now().strftime('%c'),
+ status = self.status.split(' ')[0],
+ size = self.size,
+ request_line = self.request_line + ' - not stopping')
+ self.req_log.info(LOG_LINE % log_info)
+
+ if self.closeConnection:
+ try:
+ conn.close()
+ except:
+ self.err_log.error(str(traceback.format_exc()))
+
+ break
+
+ def run_app(self, conn):
+ # Must be overridden with a method reads the request from the socket
+ # and sends a response.
+ self.closeConnection = True
+ raise NotImplementedError('Overload this method!')
+
+ def send_response(self, status):
+ stat_msg = status.split(' ', 1)[1]
+ msg = RESPONSE % (status,
+ len(stat_msg),
+ 'text/plain',
+ stat_msg)
+ try:
+ self.conn.sendall(b(msg))
+ except socket.error:
+ self.closeConnection = True
+ self.err_log.error('Tried to send "%s" to client but received socket'
+ ' error' % status)
+
+ #def kill(self):
+ # if self.isAlive() and hasattr(self, 'conn'):
+ # try:
+ # self.conn.shutdown(socket.SHUT_RDWR)
+ # except socket.error:
+ # info = sys.exc_info()
+ # if info[1].args[0] != socket.EBADF:
+ # self.err_log.debug('Error on shutdown: '+str(info))
+
+ def read_request_line(self, sock_file):
+ self.request_line = ''
+ try:
+ # Grab the request line
+ d = sock_file.readline()
+ if PY3K:
+ d = d.decode('ISO-8859-1')
+
+ if d == '\r\n':
+ # Allow an extra NEWLINE at the beginning per HTTP 1.1 spec
+ if __debug__:
+ self.err_log.debug('Client sent newline')
+
+ d = sock_file.readline()
+ if PY3K:
+ d = d.decode('ISO-8859-1')
+ except socket.timeout:
+ raise SocketTimeout("Socket timed out before request.")
+
+ d = d.strip()
+
+ if not d:
+ if __debug__:
+ self.err_log.debug('Client did not send a recognizable request.')
+ raise SocketClosed('Client closed socket.')
+
+ self.request_line = d
+
+ # NOTE: I've replaced the traditional method of procedurally breaking
+ # apart the request line with a (rather unsightly) regular expression.
+ # However, Java's regexp support sucks so bad that it actually takes
+ # longer in Jython to process the regexp than procedurally. So I've
+ # left the old code here for Jython's sake...for now.
+ if IS_JYTHON:
+ return self._read_request_line_jython(d)
+
+ match = re_REQUEST_LINE.match(d)
+
+ if not match:
+ self.send_response('400 Bad Request')
+ raise BadRequest
+
+ req = match.groupdict()
+ for k,v in req.items():
+ if not v:
+ req[k] = ""
+ if k == 'path':
+ req['path'] = r'%2F'.join([unquote(x) for x in re_SLASH.split(v)])
+
+ return req
+
+ def _read_request_line_jython(self, d):
+ d = d.strip()
+ try:
+ method, uri, proto = d.split(' ')
+ if not proto.startswith('HTTP') or \
+ proto[-3:] not in ('1.0', '1.1') or \
+ method not in HTTP_METHODS:
+ self.send_response('400 Bad Request')
+ raise BadRequest
+ except ValueError:
+ self.send_response('400 Bad Request')
+ raise BadRequest
+
+ req = dict(method=method, protocol = proto)
+ scheme = ''
+ host = ''
+ if uri == '*' or uri.startswith('/'):
+ path = uri
+ elif '://' in uri:
+ scheme, rest = uri.split('://')
+ host, path = rest.split('/', 1)
+ path = '/' + path
+ else:
+ self.send_response('400 Bad Request')
+ raise BadRequest
+
+ query_string = ''
+ if '?' in path:
+ path, query_string = path.split('?', 1)
+
+ path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)])
+
+ req.update(path=path,
+ query_string=query_string,
+ scheme=scheme.lower(),
+ host=host)
+ return req
+
+
+ def read_headers(self, sock_file):
+ headers = dict()
+ l = sock_file.readline()
+
+ lname = None
+ lval = None
+ while True:
+ if PY3K:
+ try:
+ l = str(l, 'ISO-8859-1')
+ except UnicodeDecodeError:
+ self.err_log.warning('Client sent invalid header: ' + repr(l))
+
+ if l == '\r\n':
+ break
+
+ if l[0] in ' \t' and lname:
+ # Some headers take more than one line
+ lval += ',' + l.strip()
+ else:
+ # HTTP header values are latin-1 encoded
+ l = l.split(':', 1)
+ # HTTP header names are us-ascii encoded
+
+ lname = l[0].strip().upper().replace('-', '_')
+ lval = l[-1].strip()
+ headers[str(lname)] = str(lval)
+
+ l = sock_file.readline()
+ return headers
+
+class SocketTimeout(Exception):
+ "Exception for when a socket times out between requests."
+ pass
+
+class BadRequest(Exception):
+ "Exception for when a client sends an incomprehensible request."
+ pass
+
+class SocketClosed(Exception):
+ "Exception for when a socket is closed by the client."
+ pass
+
+class ChunkedReader(object):
+ def __init__(self, sock_file):
+ self.stream = sock_file
+ self.chunk_size = 0
+
+ def _read_header(self):
+ chunk_len = ""
+ try:
+ while "" == chunk_len:
+ chunk_len = self.stream.readline().strip()
+ return int(chunk_len, 16)
+ except ValueError:
+ return 0
+
+ def read(self, size):
+ data = b('')
+ chunk_size = self.chunk_size
+ while size:
+ if not chunk_size:
+ chunk_size = self._read_header()
+
+ if size < chunk_size:
+ data += self.stream.read(size)
+ chunk_size -= size
+ break
+ else:
+ if not chunk_size:
+ break
+ data += self.stream.read(chunk_size)
+ size -= chunk_size
+ chunk_size = 0
+
+ self.chunk_size = chunk_size
+ return data
+
+ def readline(self):
+ data = b('')
+ c = self.read(1)
+ while c and c != b('\n'):
+ data += c
+ c = self.read(1)
+ data += c
+ return data
+
+ def readlines(self):
+ yield self.readline()
+
+def get_method(method):
+
+
+ methods = dict(wsgi=WSGIWorker,
+ fs=FileSystemWorker)
+ return methods[method.lower()]
+
+# Monolithic build...end of module: rocket/worker.py
+# Monolithic build...start of module: rocket/methods/__init__.py
+
+# Monolithic build...end of module: rocket/methods/__init__.py
+# Monolithic build...start of module: rocket/methods/fs.py
+
+# Import System Modules
+import os
+import time
+import mimetypes
+from email.utils import formatdate
+from wsgiref.headers import Headers
+from wsgiref.util import FileWrapper
+# Import Package Modules
+# package imports removed in monolithic build
+
+
+# Define Constants
+CHUNK_SIZE = 2**16 # 64 Kilobyte chunks
+HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
+INDEX_HEADER = '''\
+<html>
+<head><title>Directory Index: %(path)s</title>
+<style> .parent { margin-bottom: 1em; }</style>
+</head>
+<body><h1>Directory Index: %(path)s</h1>
+<table>
+<tr><th>Directories</th></tr>
+'''
+INDEX_ROW = '''<tr><td><div class="%(cls)s"><a href="/%(link)s">%(name)s</a></div></td></tr>'''
+INDEX_FOOTER = '''</table></body></html>\r\n'''
+
+class LimitingFileWrapper(FileWrapper):
+ def __init__(self, limit=None, *args, **kwargs):
+ self.limit = limit
+ FileWrapper.__init__(self, *args, **kwargs)
+
+ def read(self, amt):
+ if amt > self.limit:
+ amt = self.limit
+ self.limit -= amt
+ return FileWrapper.read(self, amt)
+
+class FileSystemWorker(Worker):
+ def __init__(self, *args, **kwargs):
+ """Builds some instance variables that will last the life of the
+ thread."""
+
+ Worker.__init__(self, *args, **kwargs)
+
+ self.root = os.path.abspath(self.app_info['document_root'])
+ self.display_index = self.app_info['display_index']
+
+ def serve_file(self, filepath, headers):
+ filestat = os.stat(filepath)
+ self.size = filestat.st_size
+ modtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
+ time.gmtime(filestat.st_mtime))
+ self.headers.add_header('Last-Modified', modtime)
+ if headers.get('if_modified_since') == modtime:
+ # The browser cache is up-to-date, send a 304.
+ self.status = "304 Not Modified"
+ self.data = []
+ return
+
+ ct = mimetypes.guess_type(filepath)[0]
+ self.content_type = ct if ct else 'text/plain'
+ try:
+ f = open(filepath, 'rb')
+ self.headers['Pragma'] = 'cache'
+ self.headers['Cache-Control'] = 'private'
+ self.headers['Content-Length'] = str(self.size)
+ if self.etag:
+ self.headers.add_header('Etag', self.etag)
+ if self.expires:
+ self.headers.add_header('Expires', self.expires)
+
+ try:
+ # Implement 206 partial file support.
+ start, end = headers['range'].split('-')
+ start = 0 if not start.isdigit() else int(start)
+ end = self.size if not end.isdigit() else int(end)
+ if self.size < end or start < 0:
+ self.status = "214 Unsatisfiable Range Requested"
+ self.data = FileWrapper(f, CHUNK_SIZE)
+ else:
+ f.seek(start)
+ self.data = LimitingFileWrapper(f, CHUNK_SIZE, limit=end)
+ self.status = "206 Partial Content"
+ except:
+ self.data = FileWrapper(f, CHUNK_SIZE)
+ except IOError:
+ self.status = "403 Forbidden"
+
+ def serve_dir(self, pth, rpth):
+ def rel_path(path):
+ return os.path.normpath(path[len(self.root):] if path.startswith(self.root) else path)
+
+ if not self.display_index:
+ self.status = '404 File Not Found'
+ return b('')
+ else:
+ self.content_type = 'text/html'
+
+ dir_contents = [os.path.join(pth, x) for x in os.listdir(os.path.normpath(pth))]
+ dir_contents.sort()
+
+ dirs = [rel_path(x)+'/' for x in dir_contents if os.path.isdir(x)]
+ files = [rel_path(x) for x in dir_contents if os.path.isfile(x)]
+
+ self.data = [INDEX_HEADER % dict(path='/'+rpth)]
+ if rpth:
+ self.data += [INDEX_ROW % dict(name='(parent directory)', cls='dir parent', link='/'.join(rpth[:-1].split('/')[:-1]))]
+ self.data += [INDEX_ROW % dict(name=os.path.basename(x[:-1]), link=os.path.join(rpth, os.path.basename(x[:-1])).replace('\\', '/'), cls='dir') for x in dirs]
+ self.data += ['<tr><th>Files</th></tr>']
+ self.data += [INDEX_ROW % dict(name=os.path.basename(x), link=os.path.join(rpth, os.path.basename(x)).replace('\\', '/'), cls='file') for x in files]
+ self.data += [INDEX_FOOTER]
+ self.headers['Content-Length'] = self.size = str(sum([len(x) for x in self.data]))
+ self.status = '200 OK'
+
+ def run_app(self, conn):
+ self.status = "200 OK"
+ self.size = 0
+ self.expires = None
+ self.etag = None
+ self.content_type = 'text/plain'
+ self.content_length = None
+
+ if __debug__:
+ self.err_log.debug('Getting sock_file')
+
+ # Build our file-like object
+ sock_file = conn.makefile('rb',BUF_SIZE)
+ request = self.read_request_line(sock_file)
+ if request['method'].upper() not in ('GET', ):
+ self.status = "501 Not Implemented"
+
+ try:
+ # Get our file path
+ headers = dict([(str(k.lower()), v) for k, v in self.read_headers(sock_file).items()])
+ rpath = request.get('path', '').lstrip('/')
+ filepath = os.path.join(self.root, rpath)
+ filepath = os.path.abspath(filepath)
+ if __debug__:
+ self.err_log.debug('Request for path: %s' % filepath)
+
+ self.closeConnection = headers.get('connection', 'close').lower() == 'close'
+ self.headers = Headers([('Date', formatdate(usegmt=True)),
+ ('Server', HTTP_SERVER_SOFTWARE),
+ ('Connection', headers.get('connection', 'close')),
+ ])
+
+ if not filepath.lower().startswith(self.root.lower()):
+ # File must be within our root directory
+ self.status = "400 Bad Request"
+ self.closeConnection = True
+ elif not os.path.exists(filepath):
+ self.status = "404 File Not Found"
+ self.closeConnection = True
+ elif os.path.isdir(filepath):
+ self.serve_dir(filepath, rpath)
+ elif os.path.isfile(filepath):
+ self.serve_file(filepath, headers)
+ else:
+ # It exists but it's not a file or a directory????
+ # What is it then?
+ self.status = "501 Not Implemented"
+ self.closeConnection = True
+
+ h = self.headers
+ statcode, statstr = self.status.split(' ', 1)
+ statcode = int(statcode)
+ if statcode >= 400:
+ h.add_header('Content-Type', self.content_type)
+ self.data = [statstr]
+
+ # Build our output headers
+ header_data = HEADER_RESPONSE % (self.status, str(h))
+
+ # Send the headers
+ if __debug__:
+ self.err_log.debug('Sending Headers: %s' % repr(header_data))
+ self.conn.sendall(b(header_data))
+
+ for data in self.data:
+ self.conn.sendall(b(data))
+
+ if hasattr(self.data, 'close'):
+ self.data.close()
+
+ finally:
+ if __debug__:
+ self.err_log.debug('Finally closing sock_file')
+ sock_file.close()
+
+# Monolithic build...end of module: rocket/methods/fs.py
+# Monolithic build...start of module: rocket/methods/wsgi.py
+
+# Import System Modules
+import sys
+import socket
+from wsgiref.headers import Headers
+from wsgiref.util import FileWrapper
+
+# Import Package Modules
+# package imports removed in monolithic build
+
+
+
+if PY3K:
+ from email.utils import formatdate
+else:
+ # Caps Utils for Py2.4 compatibility
+ from email.Utils import formatdate
+
+# Define Constants
+NEWLINE = b('\r\n')
+HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
+BASE_ENV = {'SERVER_NAME': SERVER_NAME,
+ 'SCRIPT_NAME': '', # Direct call WSGI does not need a name
+ 'wsgi.errors': sys.stderr,
+ 'wsgi.version': (1, 0),
+ 'wsgi.multiprocess': False,
+ 'wsgi.run_once': False,
+ 'wsgi.file_wrapper': FileWrapper
+ }
+
+class WSGIWorker(Worker):
+ def __init__(self, *args, **kwargs):
+ """Builds some instance variables that will last the life of the
+ thread."""
+ Worker.__init__(self, *args, **kwargs)
+
+ if isinstance(self.app_info, dict):
+ multithreaded = self.app_info.get('max_threads') != 1
+ else:
+ multithreaded = False
+ self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'],
+ 'wsgi.multithread': multithreaded,
+ })
+ self.base_environ.update(BASE_ENV)
+
+ # Grab our application
+ self.app = self.app_info.get('wsgi_app')
+
+ if not hasattr(self.app, "__call__"):
+ raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))
+
+ # Enable futures
+ if has_futures and self.app_info.get('futures'):
+ executor = self.app_info['executor']
+ self.base_environ.update({"wsgiorg.executor": executor,
+ "wsgiorg.futures": executor.futures})
+
+ def build_environ(self, sock_file, conn):
+ """ Build the execution environment. """
+ # Grab the request line
+ request = self.read_request_line(sock_file)
+
+ # Copy the Base Environment
+ environ = self.base_environ.copy()
+
+ # Grab the headers
+ for k, v in self.read_headers(sock_file).items():
+ environ[str('HTTP_'+k)] = v
+
+ # Add CGI Variables
+ environ['REQUEST_METHOD'] = request['method']
+ environ['PATH_INFO'] = request['path']
+ environ['SERVER_PROTOCOL'] = request['protocol']
+ environ['SERVER_PORT'] = str(conn.server_port)
+ environ['REMOTE_PORT'] = str(conn.client_port)
+ environ['REMOTE_ADDR'] = str(conn.client_addr)
+ environ['QUERY_STRING'] = request['query_string']
+ if 'HTTP_CONTENT_LENGTH' in environ:
+ environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
+ if 'HTTP_CONTENT_TYPE' in environ:
+ environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
+
+ # Save the request method for later
+ self.request_method = environ['REQUEST_METHOD']
+
+ # Add Dynamic WSGI Variables
+ if conn.ssl:
+ environ['wsgi.url_scheme'] = 'https'
+ environ['HTTPS'] = 'on'
+ else:
+ environ['wsgi.url_scheme'] = 'http'
+
+ if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
+ environ['wsgi.input'] = ChunkedReader(sock_file)
+ else:
+ environ['wsgi.input'] = sock_file
+
+ return environ
+
+ def send_headers(self, data, sections):
+ h_set = self.header_set
+
+ # Does the app want us to send output chunked?
+ self.chunked = h_set.get('transfer-encoding', '').lower() == 'chunked'
+
+ # Add a Date header if it's not there already
+ if not 'date' in h_set:
+ h_set['Date'] = formatdate(usegmt=True)
+
+ # Add a Server header if it's not there already
+ if not 'server' in h_set:
+ h_set['Server'] = HTTP_SERVER_SOFTWARE
+
+ if 'content-length' in h_set:
+ self.size = int(h_set['content-length'])
+ else:
+ s = int(self.status.split(' ')[0])
+ if s < 200 or s not in (204, 205, 304):
+ if not self.chunked:
+ if sections == 1:
+ # Add a Content-Length header if it's not there already
+ h_set['Content-Length'] = str(len(data))
+ self.size = len(data)
+ else:
+ # If they sent us more than one section, we blow chunks
+ h_set['Transfer-Encoding'] = 'Chunked'
+ self.chunked = True
+ if __debug__:
+ self.err_log.debug('Adding header...'
+ 'Transfer-Encoding: Chunked')
+
+ if 'connection' not in h_set:
+ # If the application did not provide a connection header, fill it in
+ client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
+ if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
+ # HTTP = 1.1 defaults to keep-alive connections
+ if client_conn:
+ h_set['Connection'] = client_conn
+ else:
+ h_set['Connection'] = 'keep-alive'
+ else:
+ # HTTP < 1.1 supports keep-alive but it's quirky so we don't support it
+ h_set['Connection'] = 'close'
+
+ # Close our connection if we need to.
+ self.closeConnection = h_set.get('connection', '').lower() == 'close'
+
+ # Build our output headers
+ header_data = HEADER_RESPONSE % (self.status, str(h_set))
+
+ # Send the headers
+ if __debug__:
+ self.err_log.debug('Sending Headers: %s' % repr(header_data))
+ self.conn.sendall(b(header_data))
+ self.headers_sent = True
+
+ def write_warning(self, data, sections=None):
+ self.err_log.warning('WSGI app called write method directly. This is '
+ 'deprecated behavior. Please update your app.')
+ return self.write(data, sections)
+
+ def write(self, data, sections=None):
+ """ Write the data to the output socket. """
+
+ if self.error[0]:
+ self.status = self.error[0]
+ data = b(self.error[1])
+
+ if not self.headers_sent:
+ self.send_headers(data, sections)
+
+ if self.request_method != 'HEAD':
+ try:
+ if self.chunked:
+ self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data)))
+ else:
+ self.conn.sendall(data)
+ except socket.error:
+ # But some clients will close the connection before that
+ # resulting in a socket error.
+ self.closeConnection = True
+
+ def start_response(self, status, response_headers, exc_info=None):
+ """ Store the HTTP status and headers to be sent when self.write is
+ called. """
+ if exc_info:
+ try:
+ if self.headers_sent:
+ # Re-raise original exception if headers sent
+ # because this violates WSGI specification.
+ raise
+ finally:
+ exc_info = None
+ elif self.header_set:
+ raise AssertionError("Headers already set!")
+
+ if PY3K and not isinstance(status, str):
+ self.status = str(status, 'ISO-8859-1')
+ else:
+ self.status = status
+ # Make sure headers are bytes objects
+ try:
+ self.header_set = Headers(response_headers)
+ except UnicodeDecodeError:
+ self.error = ('500 Internal Server Error',
+ 'HTTP Headers should be bytes')
+ self.err_log.error('Received HTTP Headers from client that contain'
+ ' invalid characters for Latin-1 encoding.')
+
+ return self.write_warning
+
+ def run_app(self, conn):
+ self.size = 0
+ self.header_set = Headers([])
+ self.headers_sent = False
+ self.error = (None, None)
+ self.chunked = False
+ sections = None
+ output = None
+
+ if __debug__:
+ self.err_log.debug('Getting sock_file')
+
+ # Build our file-like object
+ sock_file = conn.makefile(BUF_SIZE)
+
+ try:
+ # Read the headers and build our WSGI environment
+ self.environ = environ = self.build_environ(sock_file, conn)
+
+ # Handle 100 Continue
+ if environ.get('HTTP_EXPECT', '') == '100-continue':
+ res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
+ conn.sendall(b(res))
+
+ # Send it to our WSGI application
+ output = self.app(environ, self.start_response)
+
+ if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
+ self.error = ('500 Internal Server Error',
+ 'WSGI applications must return a list or '
+ 'generator type.')
+
+ if hasattr(output, '__len__'):
+ sections = len(output)
+
+ for data in output:
+ # Don't send headers until body appears
+ if data:
+ self.write(data, sections)
+
+ if self.chunked:
+ # If chunked, send our final chunk length
+ self.conn.sendall(b('0\r\n\r\n'))
+ elif not self.headers_sent:
+ # Send headers if the body was empty
+ self.send_headers('', sections)
+
+ # Don't capture exceptions here. The Worker class handles
+ # them appropriately.
+ finally:
+ if __debug__:
+ self.err_log.debug('Finally closing output and sock_file')
+
+ if hasattr(output,'close'):
+ output.close()
+
+ sock_file.close()
+
+# Monolithic build...end of module: rocket/methods/wsgi.py
--- a/yt/utilities/command_line.py Wed Apr 06 00:26:48 2011 -0400
+++ b/yt/utilities/command_line.py Wed Apr 06 00:27:52 2011 -0400
@@ -951,7 +951,7 @@
hr = ExtDirectREPL(base_extjs_path)
bottle.debug()
uuid_serve_functions(open_browser=opts.open_browser,
- port=int(opts.port))
+ port=int(opts.port), repl=hr)
def run_main():
--- a/yt/utilities/parameter_file_storage.py Wed Apr 06 00:26:48 2011 -0400
+++ b/yt/utilities/parameter_file_storage.py Wed Apr 06 00:27:52 2011 -0400
@@ -183,6 +183,11 @@
self._write_out()
self.read_db()
+ def get_recent(self, n=10):
+ recs = sorted(self._records.values(),
+ key = lambda a: -a['last_seen'])[:n]
+ return recs
+
@parallel_simple_proxy
def _write_out(self):
if self._read_only: return
Repository URL: https://bitbucket.org/yt_analysis/yt/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the yt-svn
mailing list