FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
adjuster.py 489 KiB
Newer Older
    global cores ; cores = 1
    if options.multicore:
        options.squashLogs = False
        if not 'linux' in sys.platform and not 'bsd' in sys.platform:
Silas S. Brown's avatar
Silas S. Brown committed
            errExit("multicore option not supported on this platform")
            # --- it does work on darwin (Mac), but as of 10.7 some incoming connections get 'lost' so it's not a good idea
        cores = options.num_cores
        if not cores:
            import tornado.process
            cores = tornado.process.cpu_count()
        if cores==1: options.multicore = False
Silas S. Brown's avatar
Silas S. Brown committed
        elif options.js_interpreter and options.js_instances % cores:
            old = options.js_instances
            options.js_instances += (cores - (options.js_instances % cores))
            sys.stderr.write("multicore: changing js_instances %d -> %d (%d per core x %d cores)\n" % (old,options.js_instances,int(options.js_instances/cores),cores))
    if options.js_interpreter in ["HeadlessChrome","Chrome"]:
Silas S. Brown's avatar
Silas S. Brown committed
        try: # check inotify limit (Linux only)
            maxI=int(open("/proc/sys/fs/inotify/max_user_instances").read())
Silas S. Brown's avatar
Silas S. Brown committed
        except: maxI = -1
        if not maxI==-1 and options.js_instances > maxI*20: warn("This system might run out of inotify instances with that number of Chrome processes.  Try:\nsudo sysctl -n -w fs.inotify.max_user_watches=%d\nsudo sysctl -n -w fs.inotify.max_user_instances=%d" % (options.js_instances*40,options.js_instances*20))
Silas S. Brown's avatar
Silas S. Brown committed
    global js_per_core
    js_per_core = int(options.js_instances/cores)
Silas S. Brown's avatar
Silas S. Brown committed
        maxCurls = 30*js_per_core
        if options.ssl_fork: maxCurls = int(maxCurls/2)
Silas S. Brown's avatar
Silas S. Brown committed
        if not options.usepycurl: errExit("upstream_proxy is not compatible with --usepycurl=False")
        setupCurl(maxCurls,"upstream_proxy requires pycurl (try sudo pip install pycurl)")
        if not ':' in options.upstream_proxy: options.upstream_proxy += ":80"
        upstream_proxy_host,upstream_proxy_port = options.upstream_proxy.split(':') # TODO: IPv6 ?
        if not upstream_proxy_host:
            upstream_proxy_host = "127.0.0.1"
Silas S. Brown's avatar
Silas S. Brown committed
            if wsgi_mode: warn("Can't do SSL-rewrite for upstream proxy when in WSGI mode")
            else: upstream_rewrite_ssl = True
        upstream_proxy_port = int(upstream_proxy_port)
Silas S. Brown's avatar
Silas S. Brown committed
    elif options.usepycurl and not options.submitPath=='/': setupCurl(3*js_per_core) # and no error if not there
    global codeChanges ; codeChanges = []
    if options.codeChanges:
Silas S. Brown's avatar
Silas S. Brown committed
      ccLines = [x for x in [x.strip() for x in options.codeChanges.split("\n")] if x and not x.startswith("#")]
        if len(ccLines)<3: errExit("codeChanges must be a multiple of 3 lines (see --help)")
        codeChanges.append(tuple(ccLines[:3]))
        ccLines = ccLines[3:]
Silas S. Brown's avatar
Silas S. Brown committed
    if options.real_proxy:
        options.open_proxy=True
Silas S. Brown's avatar
Silas S. Brown committed
        if options.browser and "lynx" in options.browser and not "I_PROMISE_NOT_TO_LYNX_DUMP_SSL" in os.environ and not "-stdin" in options.browser and ("-dump" in options.browser or "-source" in options.browser or "-mime_header" in options.browser): errExit("Don't do that.  If Lynx wants to ask you about our self-signed certificates, it'll assume the answer is No when running non-interactively, and this will cause it to fetch the page directly (not via our proxy) which could confuse you into thinking the adjuster's not working.  If you know what you're doing, put I_PROMISE_NOT_TO_LYNX_DUMP_SSL in the environment to suppress this message (but if using js_interpreter beware of redirect to SSL).  Or you can use wget --no-check-certificate -O - | lynx -dump -stdin") # TODO: could we configure Lynx to always accept when running non-interactively?
    if options.htmlFilter and '#' in options.htmlFilter and not len(options.htmlFilter.split('#'))+1 == len(options.htmlFilterName.split('#')): errExit("Wrong number of #s in htmlFilterName for this htmlFilter setting")
Silas S. Brown's avatar
Silas S. Brown committed
    global port_randomise
    if options.fixed_ports:
        class NullDict(dict):
            def __setitem__(*args): pass
        port_randomise = NullDict()
Silas S. Brown's avatar
Silas S. Brown committed
    if options.port == -1:
        if wsgi_mode:
            warn("port=-1 won't work in WSGI mode, assuming 80")
            options.port = 80
Silas S. Brown's avatar
Silas S. Brown committed
        elif options.ssl_fork or options.background: errExit("Can't run in background or ssl-fork with an ephemeral main port, as that requires fork-before-listen so won't be able to report the allocated port number")
Silas S. Brown's avatar
Silas S. Brown committed
        elif options.fixed_ports: errExit("fixed_ports is not compatible with port==-1")
Silas S. Brown's avatar
Silas S. Brown committed
        else:
            port_randomise[options.port] = True
            if not options.internalPort:
                # DON'T set it to -1 + 1 = 0
                options.internalPort = 1024
    elif options.port < 0 or options.port > 65535:
        errExit("port out of range")
    elif not options.port:
Silas S. Brown's avatar
Silas S. Brown committed
        if wsgi_mode:
            warn("port=0 won't work in WSGI mode, assuming 80")
Silas S. Brown's avatar
Silas S. Brown committed
            options.port = 80
        else:
Silas S. Brown's avatar
Silas S. Brown committed
            options.real_proxy=options.js_reproxy=False ; options.fasterServer=""
Silas S. Brown's avatar
Silas S. Brown committed
            options.open_proxy = True # bypass the check
    if not options.publicPort:
        options.publicPort = options.port
Silas S. Brown's avatar
Silas S. Brown committed
    if not options.internalPort:
        options.internalPort = options.port + 1
    if options.internalPort in [options.publicPort,options.port]: errExit("--internalPort cannot match --port or --publicPort")
Silas S. Brown's avatar
Silas S. Brown committed
    if options.just_me:
        options.address = "localhost"
        try: socket.socket().connect(('localhost',113))
            if not 'linux' in sys.platform or not getoutput("which netstat 2>/dev/null"): errExit("--just_me requires either an ident server to be running on port 113, or the system to be Linux with a netstat command available")
        import getpass ; global myUsername ; myUsername = S(getpass.getuser())
Silas S. Brown's avatar
Silas S. Brown committed
    elif not options.password and not options.open_proxy and not options.submitPath=='/' and not options.stop: errExit("Please set a password (or --just_me), or use --open_proxy.\n(Try --help for help; did you forget a --config=file?)") # (as a special case, if submitPath=/ then we're serving nothing but submit-your-own-text and bookmarklets, which means we won't be proxying anything anyway and don't need this check)
    if options.submitBookmarkletDomain and not options.publicPort==80: warn("You will need to run another copy on "+options.submitBookmarkletDomain+" ports 80/443 for bookmarklets to work (submitBookmarkletDomain without publicPort=80)")
    if options.pdftotext and not "pdftotext version" in getoutput("pdftotext -h"): errExit("pdftotext command does not seem to be usable\nPlease install it, or unset the pdftotext option")
    if options.epubtotext and not "calibre" in getoutput("ebook-convert -h"): errExit("ebook-convert command does not seem to be usable\nPlease install calibre, or unset the epubtotext option")
    global extensions
    if options.extensions:
        extensions = __import__(options.extensions)
    else:
        class E:
            def handle(*args): return False
        extensions = E()
    global ownServer_regexp
    if options.ownServer_regexp:
        if not options.own_server: errExit("Cannot set ownServer_regexp if own_sever is not set")
        ownServer_regexp = re.compile(options.ownServer_regexp)
    else: ownServer_regexp = None
    global ipMatchingFunc
    if options.ip_messages: ipMatchingFunc=ipv4ranges_func(options.ip_messages)
    else: ipMatchingFunc = None
    global submitPathIgnorePassword, submitPathForTest
    if options.submitPath and options.submitPath.startswith('*'):
        submitPathIgnorePassword = True
        options.submitPath = options.submitPath[1:]
    else: submitPathIgnorePassword = False
    submitPathForTest = options.submitPath
    if submitPathForTest and submitPathForTest[-1]=="?": submitPathForTest = submitPathForTest[:-1] # for CGI mode: putting the ? in tells adjuster to ADD a ? before any parameters, but does not require it to be there for the base submit URL (but don't do this if not submitPathForTest because it might not be a string)
    if options.submitPath and not options.htmlText: errExit("submitPath only really makes sense if htmlText is set (or do you want users to submit actual HTML?)") # TODO: allow this? also with submitBookmarklet ??
    if options.prominentNotice=="htmlFilter":
        if not options.htmlFilter: errExit("prominentNotice=\"htmlFilter\" requires htmlFilter to be set")
        if options.htmlJson or options.htmlText: errExit("prominentNotice=\"htmlFilter\" does not work with the htmlJson or htmlText options")
Silas S. Brown's avatar
Silas S. Brown committed
    if not (options.submitPath and options.htmlFilter): options.submitBookmarklet = False # TODO: bookmarklet for character rendering? (as an additional bookmarklet if there are filters as well, and update submitBookmarklet help text) although it's rare to find a machine that lacks fonts but has a bookmarklet-capable browser
    if options.submitBookmarklet and '_IHQ_' in options.submitPath: errExit("For implementation reasons, you cannot have the string _IHQ_ in submitPath when submitBookmarklet is on.") # Sorry.  See TODO in 'def bookmarklet'
    global upstreamGuard, cRecogniseAny, cRecognise1
    upstreamGuard = set() ; cRecogniseAny = set() ; cRecognise1 = set() # cRecognise = cookies to NOT clear at url box when serving via adjust_domain_cookieName; upstreamGuard = cookies to not pass to upstream (and possibly rename if upstream sets them)
    if options.password:
        upstreamGuard.add(password_cookie_name)
        cRecogniseAny.add(password_cookie_name)
    if options.cssName:
        upstreamGuard.add("adjustCssSwitch")
        cRecognise1.add("adjustCssSwitch")
    if options.htmlFilterName:
        upstreamGuard.add("adjustNoFilter")
        cRecognise1.add("adjustNoFilter")
    if options.renderName:
        upstreamGuard.add("adjustNoRender")
        cRecognise1.add("adjustNoRender")
    if options.prominentNotice:
        upstreamGuard.add("_WA_warnOK")
        cRecognise1.add("_WA_warnOK")
    if options.htmlonly_mode:
        upstreamGuard.add(htmlmode_cookie_name)
        cRecognise1.add(htmlmode_cookie_name)
    if options.ip_messages:
        upstreamGuard.add(seen_ipMessage_cookieName)
        cRecognise1.add(seen_ipMessage_cookieName)
    h = options.headAppendCSS
    if h and '%s' in h:
        if not ';' in h: errExit("If putting %s in headAppendCSS, must also put ; with options (please read the help text)")
Silas S. Brown's avatar
Silas S. Brown committed
        if options.default_site:
            errExit("Cannot set default_site when headAppendCSS contains options, because we need the URL box to show those options")
            # TODO: unless we implement some kind of inline setting, or special options URL ?
        if options.cssHtmlAttrs and ';' in options.cssHtmlAttrs and not len(options.cssHtmlAttrs.split(';'))==len(h.rsplit(';',1)[1].split(',')): errExit("Number of choices in headAppendCSS last field does not match number of choices in cssHtmlAttrs")
        for n in range(len(h.split(';'))-1):
            upstreamGuard.add("adjustCss"+str(n)+"s")
            cRecogniseAny.add("adjustCss"+str(n)+"s")
    if options.useLXML: check_LXML()
Silas S. Brown's avatar
Silas S. Brown committed
    global allowConnectHost,allowConnectPort,allowConnectURL
    allowConnectHost=allowConnectPort=allowConnectURL=None
    if options.ssh_proxy:
        if ',' in options.ssh_proxy: sp,allowConnectURL = options.ssh_proxy.split(',',1)
        else: sp = options.ssh_proxy
        if ':' in sp: allowConnectHost,allowConnectPort=sp.rsplit(':',1)
        else: allowConnectHost,allowConnectPort = sp,"22"
Silas S. Brown's avatar
Silas S. Brown committed
    if not options.default_site: options.default_site = ""
    # (so we can .split it even if it's None or something)
Silas S. Brown's avatar
Silas S. Brown committed
    if not options.js_interpreter:
        options.js_reproxy=options.js_frames=False
Silas S. Brown's avatar
Silas S. Brown committed
    elif not options.htmlonly_mode: errExit("js_interpreter requires htmlonly_mode")
Silas S. Brown's avatar
Silas S. Brown committed

def check_injected_globals():
    # for making sure we're used correctly when imported
    # as a module by a wrapper script
    try: defined_globals
    except: return
    for s in set(globals().keys()).difference(defined_globals):
        if s in options: errExit("Error: adjuster.%s should be adjuster.options.%s" % (s,s)) # (tell them off, don't try to patch up: this could go more subtly wrong if they do it again with something we happened to have defined in our module before)
        elif type(eval(s)) in [str,bool,int]: errExit("Don't understand injected %s %s (misspelled option?)" % (repr(type(eval(s))),s))
def setup_defined_globals(): # see above
    global defined_globals
    defined_globals = True # so included in itself
    defined_globals = set(globals().keys())

Silas S. Brown's avatar
Silas S. Brown committed
#@file: log-multi.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# Logging and busy-signalling (especially multicore)
# --------------------------------------------------

class CrossProcessLogging(logging.Handler):
    def needed(self): return (options.multicore or options.ssl_fork or (options.js_interpreter and options.js_multiprocess)) and options.log_file_prefix # (not needed if stderr-only or if won't fork)
    def init(self):
        "Called by initLogging before forks.  Starts the separate logListener process."
        if not self.needed(): return
        try: logging.getLogger().handlers
Silas S. Brown's avatar
Silas S. Brown committed
        except:
            # Ouch, we won't know how to clear logging's handlers and start again in the child processes
            errExit("The logging module on this system is not suitable for --log-file-prefix with --ssl-fork or --js-multiprocess")
Silas S. Brown's avatar
Silas S. Brown committed
        if not multiprocessing: return # we'll have to open multiple files in initChild instead
        self.loggingQ=multiprocessing.Queue()
        def logListener():
          try:
            while True: logging.getLogger().handle(logging.makeLogRecord(self.loggingQ.get()))
          except KeyboardInterrupt: pass
        self.p = multiprocessing.Process(target=logListener) ; self.p.start()
        logging.getLogger().handlers = [] # clear what Tornado has already put in place when it read the configuration
        logging.getLogger().addHandler(self)
    def initChild(self,toAppend=""):
        "Called after a fork.  toAppend helps to describe the child for logfile naming when multiprocessing is not available."
        if not options.log_file_prefix: return # stderr is OK
        if multiprocessing:
Silas S. Brown's avatar
Silas S. Brown committed
            try: multiprocessing.process.current_process()._children.clear() # multiprocessing wasn't really designed for the parent to fork() later on
Silas S. Brown's avatar
Silas S. Brown committed
            except: pass # probably wrong version
            return # should be OK now
        logging.getLogger().handlers = [] # clear Tornado's
        if toAppend: options.log_file_prefix += "-"+toAppend
        else: options.log_file_prefix += "-"+str(os.getpid())
        # and get Tornado to (re-)initialise logging with these parameters:
        if hasattr(tornado.options,"enable_pretty_logging"): tornado.options.enable_pretty_logging() # Tornado 2
        else: # Tornado 4
            import tornado.log
            tornado.log.enable_pretty_logging()
    def shutdown(self):
        try: self.p.terminate() # in case KeyboardInterrupt hasn't already stopped it
        except: pass
    def emit(self, record): # simplified from Python 3.2 (but put just the dictionary, not the record obj itself, to make pickling errors less likely)
        try:
            if record.exc_info:
                dummy = self.format(record) # record.exc_text
                record.exc_info = None
            d = record.__dict__
            d['msg'],d['args'] = record.getMessage(),None
            self.loggingQ.put(d)
        except (KeyboardInterrupt, SystemExit): raise
        except: self.handleError(record)

class CrossProcess429:
    def needed(self): return options.multicore and options.js_429
    def init(self): self.q = multiprocessing.Queue()
    def startThread(self):
        if not self.needed(): return
        self.b = [False]*cores
        def listener():
            allServersBusy = False
            while True:
                coreToSet, busyStatus = self.q.get()
Silas S. Brown's avatar
Silas S. Brown committed
                self.b[coreToSet] = busyStatus
                newASB = all(self.b)
                if not newASB == allServersBusy:
                    allServersBusy = newASB
                    if allServersBusy: IOLoopInstance().add_callback(lambda *args:reallyPauseOrRestartMainServer(True)) # run it just to serve the 429s, but don't set mainServerPaused=False or add an event to the queue
                    else: IOLoopInstance().add_callback(lambda *args:reallyPauseOrRestartMainServer("IfNotPaused")) # stop it if and only if it hasn't been restarted by the main thread before this callback
Silas S. Brown's avatar
Silas S. Brown committed
        threading.Thread(target=listener,args=()).start()

def initLogging(): # MUST be after unixfork() if background
Silas S. Brown's avatar
Silas S. Brown committed
    try:
        import logging, tornado.log
        class NoSSLWarnings:
            def filter(self,record): return not (record.levelno==logging.WARNING and record.getMessage().startswith("SSL"))
        tornado.log.gen_log.addFilter(NoSSLWarnings()) # Tornado 6
    except: pass
Silas S. Brown's avatar
Silas S. Brown committed
    global CrossProcessLogging
    CrossProcessLogging = CrossProcessLogging()
    CrossProcessLogging.init()

def init429():
    global CrossProcess429
    CrossProcess429 = CrossProcess429()
    if CrossProcess429.needed(): CrossProcess429.init()
def shutdown429():
    try: CrossProcess429.q.put(("quit","quit"))
    except: pass
Silas S. Brown's avatar
Silas S. Brown committed
#@file: log-whois-etc.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# WHOIS logging and browser logging
# --------------------------------------------------

class WhoisLogger:
    def __init__(self):
        # Do NOT read options here - haven't been read yet
        # (can be constructed even if not options.whois)
        self.recent_whois = []
        self.thread_running = False
    def __call__(self,ip):
        if ip in self.recent_whois: return
        if len(self.recent_whois) > 20: # TODO: configure?
            self.recent_whois.pop(0)
        self.recent_whois.append(ip)
        self.reCheck(ip)
    def reCheck(self,ip):
        if self.thread_running: # allow only one at once
            IOLoopInstance().add_timeout(time.time()+1,lambda *args:self.reCheck(ip))
Silas S. Brown's avatar
Silas S. Brown committed
            return
        self.thread_running = True
        threading.Thread(target=whois_thread,args=(ip,self)).start()
def getWhois(ip):
    lines = getoutput("whois '"+S(ip).replace("'",'')+"'").split('\n')
Silas S. Brown's avatar
Silas S. Brown committed
    if any(l and l.lower().split()[0]=="descr:" for l in lines): checkList = ["descr:"] # ,"netname:","address:"
    else: checkList = ["orgname:"]
    ret = []
    for l in lines:
        if len(l.split())<2: continue
        field,value = l.split(None,1) ; field=field.lower()
        if field in checkList or (field=="country:" and ret) and not value in ret: ret.append(value) # omit 1st country: from RIPE/APNIC/&c, and de-dup
    return ", ".join(ret)
def whois_thread(ip,logger):
    helper_threads.append('whois')
    try: address = getWhois(ip)
    except Exception as e: address = repr(e) # e.g. UnicodeDecodeError on Python 3 if whois returns non-UTF8
Silas S. Brown's avatar
Silas S. Brown committed
    logger.thread_running = False
    if address: logging.info("whois "+ip+": "+address)
    helper_threads.remove('whois')
helper_threads = []
Silas S. Brown's avatar
Silas S. Brown committed

class NullLogger:
  def __call__(self,req): pass
class BrowserLogger:
  def __init__(self):
    # Do NOT read options here - they haven't been read yet
    self.lastBrowser = None
    self.lastIp = self.lastMethodStuff = None
    self.whoisLogger = WhoisLogger()
  def __call__(self,req):
    if req.request.remote_ip in options.ipNoLog: return
    try: ch = req.cookie_host()
    except: ch = None # shouldn't happen
    req=req.request
    if hasattr(req,"suppress_logging"): return
    if S(req.method) not in the_supported_methods and not options.logUnsupported: return
    if S(req.method)=="CONNECT" or B(req.uri).startswith(B("http://")) or B(req.uri).startswith(B("https://")): host="" # URI will have everything
Silas S. Brown's avatar
Silas S. Brown committed
    elif hasattr(req,"suppress_logger_host_convert"): host = req.host
    else: host=B(convert_to_real_host(req.host,ch))
    if host in [-1,B("error")]: host=req.host # -1 for own_server (but this shouldn't happen as it was turned into a CONNECT; we don't mind not logging own_server because it should do so itself)
Silas S. Brown's avatar
Silas S. Brown committed
    elif host: host=protocolWithHost(host)
    # elif host==0: host="http://"+ch # e.g. adjusting one of the ownServer_if_not_root pages (TODO: uncomment this?)
    else: host=""
    browser = req.headers.get("User-Agent",None)
    if browser:
Silas S. Brown's avatar
Silas S. Brown committed
        if options.squashLogs and browser==self.lastBrowser: browser = ""
        else:
            self.lastBrowser = browser
Silas S. Brown's avatar
Silas S. Brown committed
    else: self.lastBrowser,browser = None," -"
    if options.squashLogs:
        # Date (as YYMMDD) and time are already be included in Tornado logging format, a format we don't want to override, especially as it has 'start of log string syntax highlighting' on some platforms
        if req.remote_ip == self.lastIp:
            ip=""
        else:
            self.lastIp = req.remote_ip
Silas S. Brown's avatar
Silas S. Brown committed
            self.lastMethodStuff = None # always log method/version anew when IP is different
        methodStuff = (req.method, req.version)
        if methodStuff == self.lastMethodStuff:
Silas S. Brown's avatar
Silas S. Brown committed
        else:
            r='"%s %s%s %s"' % (S(req.method), S(host), S(req.uri), S(req.version))
Silas S. Brown's avatar
Silas S. Brown committed
            self.lastMethodStuff = methodStuff
        msg = S(ip)+S(r)+S(browser)
    else: msg = '%s "%s %s%s %s" %s' % (S(req.remote_ip), S(req.method), S(host), S(req.uri), S(req.version), S(browser)) # could add "- - [%s]" with time.strftime("%d/%b/%Y:%X") if don't like Tornado-logs date-time format (and - - - before the browser %s)
Silas S. Brown's avatar
Silas S. Brown committed
    logging.info(msg.replace('\x1b','[ESC]')) # make sure we are terminal safe, in case of malformed URLs
Silas S. Brown's avatar
Silas S. Brown committed
    if options.whois and hasattr(req,"valid_for_whois"): self.whoisLogger(req.remote_ip)

def initLogging_preListen():
    global nullLog, accessLog
    nullLog = NullLogger()
    accessLog = BrowserLogger()

Silas S. Brown's avatar
Silas S. Brown committed
#@file: profile.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# Profiling and process naming
# --------------------------------------------------
Silas S. Brown's avatar
Silas S. Brown committed

profile_forks_too = False # TODO: configurable
def open_profile():
Silas S. Brown's avatar
Silas S. Brown committed
    if options.profile:
        global cProfile,pstats,profileIdle
        import cProfile, pstats
        setProfile() ; profileIdle = False
Silas S. Brown's avatar
Silas S. Brown committed
        global reqsInFlight,origReqInFlight
        reqsInFlight = set() ; origReqInFlight = set()
def open_profile_pjsOnly(): # TODO: combine with above
    if options.profile:
Silas S. Brown's avatar
Silas S. Brown committed
        global profileIdle
        setProfile_pjsOnly() ; profileIdle = False
        global reqsInFlight,origReqInFlight
        reqsInFlight = set() ; origReqInFlight = set()
Silas S. Brown's avatar
Silas S. Brown committed
def setProfile():
    global theProfiler, profileIdle
Silas S. Brown's avatar
Silas S. Brown committed
    theProfiler = cProfile.Profile()
    IOLoopInstance().add_timeout(time.time()+options.profile,lambda *args:pollProfile())
    profileIdle = True ; theProfiler.enable()
    IOLoopInstance().add_timeout(time.time()+options.profile,lambda *args:pollProfile_pjsOnly())
    global profileIdle ; profileIdle = True
Silas S. Brown's avatar
Silas S. Brown committed
def pollProfile():
    theProfiler.disable()
    if not profileIdle: showProfile()
    setProfile()
def pollProfile_pjsOnly():
    if not profileIdle: showProfile(pjsOnly=True)
    setProfile_pjsOnly()
def showProfile(pjsOnly=False):
Silas S. Brown's avatar
Silas S. Brown committed
    global _doneShowProfile
    try: _doneShowProfile
    except: _doneShowProfile = False
        pstats.Stats(theProfiler,stream=s).sort_stats('cumulative').print_stats()
        pr = "\n".join([x for x in s.getvalue().split("\n") if x and not "Ordered by" in x][:options.profile_lines])
Silas S. Brown's avatar
Silas S. Brown committed
    if options.js_interpreter and len(webdriver_runner):
Silas S. Brown's avatar
Silas S. Brown committed
        global webdriver_lambda,webdriver_mu,webdriver_maxBusy
Silas S. Brown's avatar
Silas S. Brown committed
        stillUsed = sum(1 for i in webdriver_runner if i.wd_threadStart)
Silas S. Brown's avatar
Silas S. Brown committed
        maybeStuck = set()
Silas S. Brown's avatar
Silas S. Brown committed
        for i in webdriver_runner:
Silas S. Brown's avatar
Silas S. Brown committed
            ms,tr = i.maybe_stuck,i.wd_threadStart
Silas S. Brown's avatar
Silas S. Brown committed
            if ms and ms == tr and tr+30 < time.time():
                maybeStuck.add(ms)
            i.maybe_stuck = tr
Silas S. Brown's avatar
Silas S. Brown committed
        webdriver_maxBusy = max(webdriver_maxBusy,stillUsed)
Silas S. Brown's avatar
Silas S. Brown committed
        if pr: pr += "\n"
        elif not options.background: pr += ": "
Silas S. Brown's avatar
Silas S. Brown committed
        pr += "js_interpreter"
        if options.multicore: pr += "%d" % (int(webdriver_runner[0].start/js_per_core),)
Silas S. Brown's avatar
Silas S. Brown committed
        pr += " "
        if not webdriver_maxBusy: pr += "idle"
Silas S. Brown's avatar
Silas S. Brown committed
        else:
Silas S. Brown's avatar
Silas S. Brown committed
            try: # NameError unless js_429 and multicore
                if mainServerPaused: pr += "closed, "
                else: pr += "open, "
            except NameError: pass
Silas S. Brown's avatar
Silas S. Brown committed
            served = "%d served" % webdriver_mu
Silas S. Brown's avatar
Silas S. Brown committed
            if webdriver_lambda==webdriver_mu==len(webdriver_queue)==0: queue = "" # "; queue unused"
            elif not webdriver_queue: queue="; queue empty: "+served
            else: queue = "; queue %d: %d arrived, %s" % (len(webdriver_queue),webdriver_lambda,served)
            if not _doneShowProfile:
                if pjsOnly: stuck = ", next SIGUSR2 checks stuck;"
                else: stuck = ";"
            elif maybeStuck:
                stuck = ", %d stuck for " % len(maybeStuck)
                t = time.time()
                s1=int(t-max(maybeStuck)); s2=int(t-min(maybeStuck))
                if s1==s2: stuck += str(s1)
                else: stuck += "%d-%d" % (s1,s2)
                stuck += "s?"
            else: stuck = ";" # or ", none stuck"
            pr += "%d/%d busy%s " % (stillUsed,len(webdriver_runner),stuck)
            if not webdriver_maxBusy == stillUsed:
                pr += "maxUse=%d" % (webdriver_maxBusy,)
            pr += queue
Silas S. Brown's avatar
Silas S. Brown committed
            pr = pr.rstrip().replace("; ;",";")
Silas S. Brown's avatar
Silas S. Brown committed
            if pr.endswith(";"): pr = pr[:-1]
Silas S. Brown's avatar
Silas S. Brown committed
        webdriver_lambda = webdriver_mu = 0
        webdriver_maxBusy = stillUsed
Silas S. Brown's avatar
Silas S. Brown committed
        # TODO: also measure lambda/mu of other threads e.g. htmlFilter ?
Silas S. Brown's avatar
Silas S. Brown committed
        if psutil and not webdriver_runner[0].start: pr += "; system RAM %.1f%% used" % (psutil.virtual_memory().percent)
Silas S. Brown's avatar
Silas S. Brown committed
    try: pr2 += "%d requests in flight (%d from clients)" % (len(reqsInFlight),len(origReqInFlight))
    except NameError: pr2="" # no reqsInFlight
Silas S. Brown's avatar
Silas S. Brown committed
    _doneShowProfile = True
Silas S. Brown's avatar
Silas S. Brown committed
    if not pr and not pr2: return
Silas S. Brown's avatar
Silas S. Brown committed
    if pr: pr += "\n"
    elif not options.background: pr += ": "
Silas S. Brown's avatar
Silas S. Brown committed
    pr += pr2
Silas S. Brown's avatar
Silas S. Brown committed
    if options.background: logging.info(pr)
    elif can_do_ansi_colour: sys.stderr.write("\033[35m"+(time.strftime("%X")+pr).replace("\n","\n\033[35m")+"\033[0m\n")
Silas S. Brown's avatar
Silas S. Brown committed
    else: sys.stderr.write(time.strftime("%X")+pr+"\n")
Silas S. Brown's avatar
Silas S. Brown committed
def setProcName(name="adjuster"):
    "Try to set the process name for top/ps"
Silas S. Brown's avatar
Silas S. Brown committed
    try: # setproctitle works on both Linux and BSD/Mac if installed (but doesn't affect Mac OS 10.7 "Activity Monitor")
Silas S. Brown's avatar
Silas S. Brown committed
        import setproctitle # sudo pip install setproctitle or apt-get install python-setproctitle (requires gcc)
Silas S. Brown's avatar
Silas S. Brown committed
        return setproctitle.setproctitle(name) # TODO: this also stops 'ps axwww' from displaying command-line arguments; make it optional?
Silas S. Brown's avatar
Silas S. Brown committed
    except: pass
    try: # ditto but non-Mac BSD not checked (and doesn't always work on Python 3) :
Silas S. Brown's avatar
Silas S. Brown committed
        import procname # sudo pip install procname (requires gcc)
Silas S. Brown's avatar
Silas S. Brown committed
        return procname.setprocname(name)
    except: pass
Silas S. Brown's avatar
Silas S. Brown committed
    try: # this works in GNU/Linux for 'top', 'pstree -p' and 'killall', but not 'ps' or 'pidof' (which need argv[0] to be changed in C) :
Silas S. Brown's avatar
Silas S. Brown committed
        b = ctypes.create_string_buffer(len(name)+1)
        b.value = name
        ctypes.cdll.LoadLibrary('libc.so.6').prctl(15,ctypes.byref(b),0,0,0)
    except: pass # oh well

Silas S. Brown's avatar
Silas S. Brown committed
#@file: server-control.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# Start / stop / install
# --------------------------------------------------

        current_crontab = getoutput("crontab -l 2>/dev/null")
Silas S. Brown's avatar
Silas S. Brown committed
        def shell_escape(arg):
            if re.match("^[A-Za-z0-9_=/.%+,:@-]*$",arg): return arg # no need to quote if it's entirely safe-characters (including colon: auto-complete escapes : in pathnames but that's probably in case it's used at the START of a command, where it's a built-in alias for 'true')
            return "'"+arg.replace("'",r"'\''")+"'"
        def cron_escape(arg): return shell_escape(arg).replace('%',r'\%')
        new_cmd = "@reboot python "+" ".join(cron_escape(a) for a in sys.argv)
Silas S. Brown's avatar
Silas S. Brown committed
        if not new_cmd in current_crontab.replace("\r","\n").split("\n") and not new_cmd in current_crontab.replace("$HOME",os.environ.get("HOME")).replace("\r","\n").split("\n"):
            sys.stderr.write("Adding to crontab: "+new_cmd+"\n")
            if not current_crontab.endswith("\n"): current_crontab += "\n"
            os.popen("crontab -","w").write(current_crontab+new_cmd+"\n")
    if options.restart or options.stop:
        pidFound = stopOther()
        if options.stop:
            if not pidFound: sys.stderr.write("Could not find which PID to stop (maybe nothing was running?)\n")
            try: CrossProcessLogging.shutdown()
            except: pass
Silas S. Brown's avatar
Silas S. Brown committed
def stopOther():
    pid = triedStop = None
    if options.pidfile:
        try: pid = int(open(options.pidfile).read().strip())
        except: pass
        if not pid==None:
            if not psutil or psutil.pid_exists(pid):
Silas S. Brown's avatar
Silas S. Brown committed
                tryStop(pid,True) # tryStop will rm pidfile if had permission to send the stop signal
Silas S. Brown's avatar
Silas S. Brown committed
                triedStop = pid
            else: unlink(options.pidfile) # stale
        if not options.port: return
Silas S. Brown's avatar
Silas S. Brown committed
    elif not options.port:
        # Oops: the listening port is used to identify the other process; without it, we don't know which process to stop
        errExit("Cannot use --restart or --stop with --port=0 and no --pidfile")
Silas S. Brown's avatar
Silas S. Brown committed
    pids = run_lsof()
    if pids==False: # no lsof, or couldn't make sense of it
        # Could try "fuser -n tcp "+str(options.port), but it can be slow on a busy system.  Try netstat instead.
        pids = run_netstat()
        if pids==False:
            if not options.pidfile: sys.stderr.write("stopOther: can't find understandable 'lsof' or 'netstat' commands on this system\n")
            return False
    try: pids.remove(os.getpid())
    except: pass
    for pid in pids:
        if not pid==triedStop:
            tryStop(pid)
    return triedStop or pids
def tryStop(pid,alsoRemovePidfile=False):
    if options.stop: other="the"
    else: other="other"
    try:
        os.kill(pid,signal.SIGTERM)
        if alsoRemovePidfile: unlink(options.pidfile)
        sys.stderr.write("Stopped %s process at PID %d\n" % (other,pid))
    except: sys.stderr.write("Failed to stop %s process at PID %d\n" % (other,pid))
def run_lsof():
    # TODO: check ssl-fork ports as well as main port ? (also in run_netstat)
    out = getoutput("lsof -iTCP:"+str(options.port)+" -sTCP:LISTEN 2>/dev/null") # Redirect lsof's stderr to /dev/null because it sometimes prints warnings, e.g. if something's wrong with Mac FUSE mounts, that won't affect the output we want. TODO: lsof can hang if ANY programs have files open on stuck remote mounts etc, even if this is nothing to do with TCP connections.  -S 2 might help a BIT but it's not a solution.  Linux's netstat -tlp needs root, and BSD's can't show PIDs.  Might be better to write files or set something in the process name.
Silas S. Brown's avatar
Silas S. Brown committed
    if out.startswith("lsof: unsupported"):
        # lsof 4.81 has -sTCP:LISTEN but lsof 4.78 does not.  However, not including -sTCP:LISTEN can cause lsof to make unnecessary hostname queries for established connections.  So fall back only if have to.
        out = getoutput("lsof -iTCP:"+str(options.port)+" -Ts 2>/dev/null") # lsof -Ts ensures will say LISTEN on the pid that's listening
Silas S. Brown's avatar
Silas S. Brown committed
        lines = filter(lambda x:"LISTEN" in x,out.split("\n")[1:])
    elif not out.strip() and not getoutput("which lsof 2>/dev/null"): return False
Silas S. Brown's avatar
Silas S. Brown committed
    else: lines = out.split("\n")[1:]
    pids = set()
    for line in lines:
        try: pids.add(int(line.split()[1]))
        except:
            if not pids:
                # sys.stderr.write("stopOther: Can't make sense of lsof output %s\n" % repr(line))
                return False # lsof not working, use something else
            break
    return pids
def run_netstat():
    if not 'linux' in sys.platform or not getoutput("which netstat 2>/dev/null"): return False
Silas S. Brown's avatar
Silas S. Brown committed
    pids = set()
    for l in getoutput("netstat -tnlp").split("\n"):
Silas S. Brown's avatar
Silas S. Brown committed
        if ':'+str(options.port)+' ' in l:
            ps = l.split()[-1]
            if '/' in ps:
                pids.add(int(ps[:ps.index('/')]))
    return pids

Silas S. Brown's avatar
Silas S. Brown committed
#@file: ssl-multiprocess.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# Support for SSL termination in separate processes
# --------------------------------------------------
Silas S. Brown's avatar
Silas S. Brown committed
sslforks_to_monitor = [] # list of [pid,callback1,callback2,port]
Silas S. Brown's avatar
Silas S. Brown committed
sslfork_monitor_pid = None
Silas S. Brown's avatar
Silas S. Brown committed
def sslSetup(HelperStarter, ping_portNo, isFixed=False):
Silas S. Brown's avatar
Silas S. Brown committed
    if options.ssl_fork: # queue it to be started by monitor
Silas S. Brown's avatar
Silas S. Brown committed
        if options.multicore and sslforks_to_monitor: sslforks_to_monitor[0][1] = (lambda c1=HelperStarter,c2=sslforks_to_monitor[0][1]:(c1(),c2())) # chain it, as in multicore mode we'll have {N cores} * {single process handling all SSL ports}, rather than cores * processes (TODO: if one gets stuck but others on the port can still handle requests, do we want to somehow detect the individual stuck one and restart it to reduce wasted CPU load?)
Silas S. Brown's avatar
Silas S. Brown committed
        else:
            # no multicore, or this is the first SSL helper, so we need to associate it with a (non-SSL) ping responder
Silas S. Brown's avatar
Silas S. Brown committed
            sslforks_to_monitor.append([None,HelperStarter,(lambda *_:listen_on_port(Application([(r"(.*)",AliveResponder,{})],log_function=nullLog),ping_portNo,"127.0.0.1",False)),ping_portNo])
Silas S. Brown's avatar
Silas S. Brown committed
            return ping_portNo + 1 # where to put the next listener
Silas S. Brown's avatar
Silas S. Brown committed
    else: # just run it on the current process, and we can randomise the internal port and keep track of what it is
        if not isFixed: port_randomise[ping_portNo-1] = True
        HelperStarter()
Silas S. Brown's avatar
Silas S. Brown committed
    return ping_portNo # next listener can use what would have been the ping-responder port as we're not using it
Silas S. Brown's avatar
Silas S. Brown committed
sslFork_pingInterval = 10 # TODO: configurable?  (if setting this larger, might want to track the helper threads for early termination)
Silas S. Brown's avatar
Silas S. Brown committed
def maybe_sslfork_monitor():
Silas S. Brown's avatar
Silas S. Brown committed
    "Returns SIGTERM callback if we're now a child process"
Silas S. Brown's avatar
Silas S. Brown committed
    global sslforks_to_monitor
Silas S. Brown's avatar
Silas S. Brown committed
    if not sslforks_to_monitor: return
    global sslfork_monitor_pid
    pid = os.fork()
    if pid:
        sslfork_monitor_pid = pid ; return
Silas S. Brown's avatar
Silas S. Brown committed
    # If background, can't double-fork (our PID is known)
    # (TODO: if profile_forks_too, there's no profile loop in this monitor (it starts only when we fork a new helper); unlikely to be useful here though)
Silas S. Brown's avatar
Silas S. Brown committed
    try: os.setpgrp() # for stop_threads0 later
    except: pass
    signal.signal(signal.SIGTERM, terminateSslForks)
    signal.signal(signal.SIGINT, terminateSslForks)
Silas S. Brown's avatar
Silas S. Brown committed
    setProcName("adjusterSSLmon")
    # (15 chars is max for some "top" implementations)
    CrossProcessLogging.initChild("SSL")
    # (not SSLmon because helper IDs will be appended to it)
Silas S. Brown's avatar
Silas S. Brown committed
    global is_sslHelp ; is_sslHelp = True
Silas S. Brown's avatar
Silas S. Brown committed
    for i in xrange(len(sslforks_to_monitor)):
      if i==len(sslforks_to_monitor)-1: pid = 0 # don't bother to fork for the last one
      else: pid = os.fork()
      if pid: sslforks_to_monitor[i][0] = pid # for SIGTERM
      else: # child
        oldI = i
        if i < len(sslforks_to_monitor)-1:
            sslforks_to_monitor = [sslforks_to_monitor[i]]
            i = 0 # we'll monitor only one in the child
Silas S. Brown's avatar
Silas S. Brown committed
        # don't use IOLoop for this monitoring: too confusing if we have to restart it on fork
        try: urlopen = build_opener(ProxyHandler({})).open # don't use the system proxy if set
        except: pass # leave urlopen as default if above not supported
Silas S. Brown's avatar
Silas S. Brown committed
        while True:
            try: urlopen(("http://localhost:%d/" % sslforks_to_monitor[i][3]),timeout=sslFork_pingInterval)
Silas S. Brown's avatar
Silas S. Brown committed
            except: # URLError etc
Silas S. Brown's avatar
Silas S. Brown committed
              if restart_sslfork(i,oldI): # child
                  return lambda *args:stopServer("SIG*")
              else: time.sleep(sslFork_pingInterval) # double it after a restart
            time.sleep(sslFork_pingInterval)
Silas S. Brown's avatar
Silas S. Brown committed
def restart_sslfork(n,oldN):
Silas S. Brown's avatar
Silas S. Brown committed
    global sslforks_to_monitor
Silas S. Brown's avatar
Silas S. Brown committed
    if not sslforks_to_monitor[n][0]==None: # not first time
Silas S. Brown's avatar
Silas S. Brown committed
        if options.multicore: oldN = "s"
        else: oldN = " "+str(oldN)
Silas S. Brown's avatar
Silas S. Brown committed
        logging.error("Restarting SSL helper%s via pid %d as not heard from port %d" % (oldN,sslforks_to_monitor[n][0],sslforks_to_monitor[n][3]))
Silas S. Brown's avatar
Silas S. Brown committed
        emergency_zap_pid_and_children(sslforks_to_monitor[n][0]) # may have children if multicore
Silas S. Brown's avatar
Silas S. Brown committed
    # TODO: if profile_forks_too, do things with profile?
Silas S. Brown's avatar
Silas S. Brown committed
    pid = os.fork()
Silas S. Brown's avatar
Silas S. Brown committed
    if pid: sslforks_to_monitor[n][0] = pid
Silas S. Brown's avatar
Silas S. Brown committed
    else: # child
Silas S. Brown's avatar
Silas S. Brown committed
        setProcName("adjusterSSLhelp")
        CrossProcessLogging.initChild(str(n)) # TODO: or port number?
Silas S. Brown's avatar
Silas S. Brown committed
        sslforks_to_monitor[n][1]() # main listener
        sslforks_to_monitor[n][2]() # 'still alive' listener
        sslforks_to_monitor = [] # nothing for us to check
Silas S. Brown's avatar
Silas S. Brown committed
        return True
Silas S. Brown's avatar
Silas S. Brown committed
def terminateSslForks(*args):
    "sslfork_monitor's SIGTERM handler"
Silas S. Brown's avatar
Silas S. Brown committed
    global sslforks_to_monitor
Silas S. Brown's avatar
Silas S. Brown committed
    for p,_,_,_ in sslforks_to_monitor:
Silas S. Brown's avatar
Silas S. Brown committed
        try: os.kill(p,signal.SIGTERM)
        except OSError: pass # somebody might have 'killall'd them
Silas S. Brown's avatar
Silas S. Brown committed
        try: os.waitpid(p, os.WNOHANG)
Silas S. Brown's avatar
Silas S. Brown committed
    stop_threads0()
Silas S. Brown's avatar
Silas S. Brown committed
class AliveResponder(RequestHandler):
    SUPPORTED_METHODS = ("GET",)
    def get(self, *args, **kwargs): self.write("1")

Silas S. Brown's avatar
Silas S. Brown committed
#@file: port-listen.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# Port listening - main, SSL-termination and JS-upstream
# --------------------------------------------------

Silas S. Brown's avatar
Silas S. Brown committed
def open_extra_ports():
    "Returns the stop function if we're now a child process that shouldn't run anything else"
Silas S. Brown's avatar
Silas S. Brown committed
    nextPort = options.internalPort
    # don't add any other ports here: NormalRequestForwarder assumes the real_proxy SSL helper will be at internalPort
Silas S. Brown's avatar
Silas S. Brown committed
    # All calls to sslSetup and maybe_sslfork_monitor must be made before ANY other calls to listen_on_port (as we don't yet want there to be an IOLoop instance when maybe_sslfork_monitor is called)
Silas S. Brown's avatar
Silas S. Brown committed
    if options.real_proxy: nextPort = sslSetup(lambda port=nextPort:listen_on_port(Application([(r"(.*)",SSLRequestForwarder(),{})],log_function=accessLog,gzip=False),port,"127.0.0.1",False,ssl_options={"certfile":duff_certfile()}),nextPort+1) # A modified Application that's 'aware' it's the SSL-helper version (use SSLRequestForwarder & no need for staticDocs listener) - this will respond to SSL requests that have been CONNECT'd via the first port.  We set gzip=False because little point if we know the final client is on localhost.
Silas S. Brown's avatar
Silas S. Brown committed
    if options.js_reproxy:
        # ditto for js_interpreter (saves having to override its user-agent, or add custom headers requiring PhantomJS 1.5+, for us to detect its connections back to us)
        global js_proxy_port
        js_proxy_port = []
Silas S. Brown's avatar
Silas S. Brown committed
        for c in xrange(cores):
Silas S. Brown's avatar
Silas S. Brown committed
          for i in xrange(js_per_core):
            # PjsRequestForwarder to be done later
Silas S. Brown's avatar
Silas S. Brown committed
            js_proxy_port.append(nextPort)
Silas S. Brown's avatar
Silas S. Brown committed
            nextPort = sslSetup(lambda port=nextPort,cc=c,ii=i : listen_on_port(Application([(r"(.*)",PjsSslRequestForwarder(cc*js_per_core,ii),{})],log_function=nullLog,gzip=False),port+1,"127.0.0.1",False,ssl_options={"certfile":duff_certfile()}),nextPort+2)
    if upstream_rewrite_ssl:
Silas S. Brown's avatar
Silas S. Brown committed
        # This one does NOT listen on SSL: it listens on unencrypted HTTP and rewrites .0 into outgoing SSL.  But we can still run it in a different process if ssl_fork is enabled, and this will save encountering the curl_max_clients issue as well as possibly offloading *client*-side SSL to a different CPU core (TODO: could also use Tornado's multiprocessing to multi-core the client-side SSL)
Silas S. Brown's avatar
Silas S. Brown committed
        sslSetup(lambda port=upstream_proxy_port+1:listen_on_port(Application([(r"(.*)",UpSslRequestForwarder,{})],log_function=nullLog,gzip=False),port,"127.0.0.1",False),upstream_proxy_port+2,True) # TODO: document upstream_proxy_port+2 needs to be reserved if options.ssl_fork and not options.upstream_proxy_host
Silas S. Brown's avatar
Silas S. Brown committed
    r = maybe_sslfork_monitor()
    if r: return r
Silas S. Brown's avatar
Silas S. Brown committed
    # NOW we can start non-sslSetup listen_on_port:
Silas S. Brown's avatar
Silas S. Brown committed
    if options.js_reproxy:
Silas S. Brown's avatar
Silas S. Brown committed
          for i in xrange(js_per_core):
Silas S. Brown's avatar
Silas S. Brown committed
            if options.ssl_fork: pass # do NOT port_randomise, because maybe_sslfork_monitor is called ABOVE and the fork will NOT have a copy of our updated port_randomise map for its forwardToOtherPid call
            else: port_randomise[js_proxy_port[c*js_per_core+i]]=True
Silas S. Brown's avatar
Silas S. Brown committed
            listen_on_port(makePjsApplication(c*js_per_core,i),js_proxy_port[c*js_per_core+i],"127.0.0.1",False,core=c)
Silas S. Brown's avatar
Silas S. Brown committed

def makeMainApplication():
    handlers = [(r"(.*)",NormalRequestForwarder(),{})]
    if options.staticDocs: handlers.insert(0,static_handler())
Silas S. Brown's avatar
Silas S. Brown committed
    return Application(handlers,log_function=accessLog,gzip=options.compress_responses) # TODO: gzip= deprecated in Tornado 4.x (if they remove it, we may have to check Tornado version and send either gzip= or compress_response= as appropriate, in all calls to Application)
Silas S. Brown's avatar
Silas S. Brown committed
def makePjsApplication(x,y):
    handlers = [(r"(.*)",PjsRequestForwarder(x,y),{})]
    if options.js_upstream and options.staticDocs: handlers.insert(0,static_handler())
    return Application(handlers,log_function=nullLog,gzip=False)

Silas S. Brown's avatar
Silas S. Brown committed
def start_multicore(isSSLEtcChild=False):
    "Fork child processes, set coreNo unless isSSLEtcChild; parent waits and exits.  Call to this must come after unixfork if want to run in the background."
    global coreNo
    if not options.multicore:
Silas S. Brown's avatar
Silas S. Brown committed
        if not isSSLEtcChild: coreNo = 0
        return
    # Simplified version of Tornado fork_processes with
    # added setupRunAndBrowser (must have the terminal)
    children = set()
    for i in range(cores):
        pid = os.fork()
Silas S. Brown's avatar
Silas S. Brown committed
        if not pid: # child
            if not isSSLEtcChild: coreNo = i
            return CrossProcessLogging.initChild()
        children.add(pid)
Silas S. Brown's avatar
Silas S. Brown committed
    if not isSSLEtcChild:
Silas S. Brown's avatar
Silas S. Brown committed
        # Do the equivalent of setupRunAndBrowser() but without the IOLoop.  This can start threads, so must be after the above fork() calls.
        if options.browser: runBrowser()
        if options.run: runRun()
    # Now wait for the browser or the children to exit
    # (and monitor for SIGTERM: we might be an SSLhelp)
    def handleTerm(*_):
        interruptReason = "SIGTERM received by multicore helper"
        for pid in children: os.kill(pid,signal.SIGTERM)
    signal.signal(signal.SIGTERM,handleTerm)
    try:
      while children:
        try: pid, status = os.wait()
        except KeyboardInterrupt: raise # see below
        except: continue # interrupted system call OK
        if pid in children: children.remove(pid)
    except KeyboardInterrupt: pass
    if children:
        try: reason = interruptReason # from handleTerm
        except: reason = "keyboard interrupt"
        reason = "Adjuster multicore handler: "+reason+", stopping "+str(len(children))+" child processes"
        if options.background: logging.info(reason)
    for pid in children: os.kill(pid,signal.SIGTERM)
    while children:
        try: pid, status = os.wait()
        except KeyboardInterrupt: logging.error("KeyboardInterrupt received while waiting for child-processes to terminate: "+" ".join(str(s) for s in children))
        except: continue
        if pid in children: children.remove(pid)
Silas S. Brown's avatar
Silas S. Brown committed
    if not isSSLEtcChild: announceShutdown0()
Silas S. Brown's avatar
Silas S. Brown committed
    stop_threads() # must be last thing, except
    raise SystemExit # (in case weren't any threads to stop)
Silas S. Brown's avatar
Silas S. Brown committed
def openPortsEtc():
    workaround_raspbian7_IPv6_bug()
Silas S. Brown's avatar
Silas S. Brown committed
    workaround_timeWait_problem()
    early_fork = (options.ssl_fork and options.background)
    if early_fork: banner(True),unixfork()
    if options.ssl_fork: initLogging() # even if not early_fork (i.e. not background)
Silas S. Brown's avatar
Silas S. Brown committed
    stopFunc = open_extra_ports()
Silas S. Brown's avatar
Silas S. Brown committed
    if stopFunc: # we're a child process (--ssl-fork)
        assert not options.background or early_fork
Silas S. Brown's avatar
Silas S. Brown committed
        dropPrivileges()
        # can't double-fork (our PID is known), hence early_fork above
Silas S. Brown's avatar
Silas S. Brown committed
        start_multicore(True) ; schedule_retries()
Silas S. Brown's avatar
Silas S. Brown committed
        if profile_forks_too: open_profile()
Silas S. Brown's avatar
Silas S. Brown committed
    else: # we're not a child process of --ssl-fork
Silas S. Brown's avatar
Silas S. Brown committed
      try:
        if options.port: listen_on_port(makeMainApplication(),options.port,options.address,options.browser)
        openWatchdog() ; dropPrivileges()
        open_upnp() # make sure package avail if needed
        if not early_fork: banner()
        if options.background and not early_fork:
Silas S. Brown's avatar
Silas S. Brown committed
            if options.js_interpreter: test_init_webdriver()
            unixfork() # MUST be before init_webdrivers (js_interpreter does NOT work if you start them before forking)
        if not options.ssl_fork: initLogging() # as we hadn't done it before (must be after unixfork)
Silas S. Brown's avatar
Silas S. Brown committed
        init429()
Silas S. Brown's avatar
Silas S. Brown committed
        if not options.background: notifyReady()
Silas S. Brown's avatar
Silas S. Brown committed
        start_multicore() # if multicore, returns iff we're one of the cores
        if not options.multicore or profile_forks_too: open_profile()
        else: open_profile_pjsOnly()
Silas S. Brown's avatar
Silas S. Brown committed
        if options.js_interpreter: init_webdrivers(coreNo*js_per_core,js_per_core)
Silas S. Brown's avatar
Silas S. Brown committed
        if not options.multicore: setupRunAndBrowser() # (equivalent is done by start_multicore if multicore)
        watchdog.start() # ALL cores if multicore (since only one needs to be up for us to be still working) although TODO: do we want this only if not coreNo so as to ensure Dynamic_DNS_updater is still up?
        checkServer.setup() # (TODO: if we're multicore, can we propagate to other processes ourselves instead of having each core check the fasterServer?  Low priority because how often will a multicore box need a fasterServer)
Silas S. Brown's avatar
Silas S. Brown committed
        if not coreNo:
            CrossProcess429.startThread()
            Dynamic_DNS_updater()
Silas S. Brown's avatar
Silas S. Brown committed
            if options.pimote: pimote_thread() # must be on same core as Dynamic_DNS_updater so it can set pimote_may_need_override
        if options.multicore: stopFunc = lambda *_:stopServer("SIG*")
        else: stopFunc = lambda *_:stopServer("SIGTERM received")
        if options.seconds: IOLoopInstance().add_timeout(time.time()+options.seconds,lambda *args:stopServer("Uptime limit reached"))
Silas S. Brown's avatar
Silas S. Brown committed
        if options.stdio and not coreNo: setup_stdio()
Silas S. Brown's avatar
Silas S. Brown committed
      except: # oops, error during startup, stop forks if any
        if not sslfork_monitor_pid == None:
          time.sleep(0.5) # (it may have only just started: give it a chance to install its signal handler)
          try: os.kill(sslfork_monitor_pid,signal.SIGTERM)
Silas S. Brown's avatar
Silas S. Brown committed
        raise
Silas S. Brown's avatar
Silas S. Brown committed
    signal.signal(signal.SIGTERM, stopFunc)
Silas S. Brown's avatar
Silas S. Brown committed
    try: os.setpgrp() # for stop_threads0 later
Silas S. Brown's avatar
Silas S. Brown committed
def setup_stdio():
Silas S. Brown's avatar
Silas S. Brown committed
    # Handle option for request on standard input
    # (when used in one-off mode)
Silas S. Brown's avatar
Silas S. Brown committed
    global StdinPass,StdinPending
    StdinPass,StdinPending = None,[]
    def doStdin(fd,events):
Silas S. Brown's avatar
Silas S. Brown committed
        l=os.read(fd,1024) # read 1 line or 1024 bytes (TODO: double-check this can never block)
Silas S. Brown's avatar
Silas S. Brown committed
        if not l: # EOF (but don't close stdout yet)
            IOLoopInstance().remove_handler(sys.stdin.fileno())
Silas S. Brown's avatar
Silas S. Brown committed
            return
        global StdinPass
        if StdinPending: StdinPending.append(l) # connection is still being established
        elif StdinPass: StdinPass.write(l) # open
        else: # not yet established
            StdinPending.append(l)
            def ClearPending():
                global StdinPending ; StdinPending = []
            StdinPass = tornado.iostream.IOStream(socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0))
            doCallback(None,StdinPass.connect,lambda *args:(StdinPass.write(B('').join(StdinPending)),ClearPending(),readUntilClose(StdinPass,lambda last:(sys.stdout.write(last),sys.stdout.close()),lambda chunk:sys.stdout.write(chunk))),(options.address, port_randomise.get(options.port,options.port)))
    IOLoopInstance().add_handler(sys.stdin.fileno(), doStdin, IOLoop.READ)
Silas S. Brown's avatar
Silas S. Brown committed
#@file: up-down.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# General startup and shutdown tasks
# --------------------------------------------------

def banner(delayed=False):
    ret = [twoline_program_name]
Silas S. Brown's avatar
Silas S. Brown committed
    if options.port:
Silas S. Brown's avatar
Silas S. Brown committed
        if options.port==-1:
            ret.append("Listening on 127.0.0.1:%d" % port_randomise[-1])
            if not istty(sys.stdout) and options.background: sys.stdout.write("127.0.0.1:%d" % port_randomise[-1]),sys.stdout.flush()
Silas S. Brown's avatar
Silas S. Brown committed
        else: ret.append("Listening on port %d" % options.port)
        if upstream_rewrite_ssl: ret.append("--upstream-proxy back-connection helper is listening on 127.0.0.1:%d" % (upstream_proxy_port+1,))
Silas S. Brown's avatar
Silas S. Brown committed
        if options.stdio: ret.append("Listening on standard input")
Silas S. Brown's avatar
Silas S. Brown committed
    else: ret.append("Not listening (--port=0 set)")
    if options.watchdog:
        ret.append("Writing "+options.watchdogDevice+" every %d seconds" % options.watchdog)
        if options.watchdogWait: ret.append("(abort if unresponsive for %d seconds)" % options.watchdogWait)
Silas S. Brown's avatar
Silas S. Brown committed
    if options.ssl_fork and not options.background: ret.append("To inspect processes, use: pstree "+str(os.getpid()))
    ret = "\n".join(ret)+"\n"
Silas S. Brown's avatar
Silas S. Brown committed
    if delayed: ret=ret.replace("Listening","Will listen").replace("Writing","Will write") # for --ssl-fork --background, need early fork (TODO: unless write a PID somewhere)
    sys.stderr.write(ret),sys.stderr.flush()
    if not options.background:
        # set window title for foreground running
        t = "adjuster"
        if "SSH_CONNECTION" in os.environ: t += "@"+hostSuffix() # TODO: might want to use socket.getfqdn() to save confusion if several servers are configured with the same host_suffix and/or host_suffix specifies multiple hosts?
        set_title(t)
Silas S. Brown's avatar
Silas S. Brown committed

def istty(f=sys.stderr): return hasattr(f,"isatty") and f.isatty()
def set_title(t):
  if not istty(): return
  term = os.environ.get("TERM","")
  is_xterm = "xterm" in term
  is_screen = (term=="screen" and os.environ.get("STY",""))
  is_tmux = (term=="screen" and os.environ.get("TMUX",""))
Silas S. Brown's avatar
Silas S. Brown committed
  if is_xterm or is_tmux:
      sys.stderr.write("\033]0;%s\007" % (t,)),sys.stderr.flush()
Silas S. Brown's avatar
Silas S. Brown committed
      # ("0;" sets both title and minimised title, "1;" sets minimised title, "2;" sets title.  Tmux takes its pane title from title (but doesn't display it in the titlebar))
Silas S. Brown's avatar
Silas S. Brown committed
  elif is_screen: os.system("screen -X title \"%s\"" % (t,))
  else: return
  if not t: return
  import atexit
  atexit.register(set_title,"")
  global can_do_ansi_colour
Silas S. Brown's avatar
Silas S. Brown committed
  can_do_ansi_colour = is_xterm or (is_screen and "VT 100/ANSI" in os.environ.get("TERMCAP",""))
  # can_do_ansi_colour is used by showProfile (TODO: if profile_forks_too, we'd need to set this earlier than the call to banner / set_title in order to make it available to SSL forks etc, otherwise only the main one has purple profile output. Multicore is already OK (but does only counts per core).)
Silas S. Brown's avatar
Silas S. Brown committed
can_do_ansi_colour=False

coreNo = "unknown" # want it to be non-False to begin with
Silas S. Brown's avatar
Silas S. Brown committed
def announceInterrupt():
Silas S. Brown's avatar
Silas S. Brown committed
    if coreNo or options.multicore: return # we are a silent helper process (coreNo=="unknown"), or we announce interrupts differently in multicore (see start_multicore), so nothing to do here
Silas S. Brown's avatar
Silas S. Brown committed
    if options.background: logging.info("SIGINT received"+find_adjuster_in_traceback())
    else: sys.stderr.write("\nKeyboard interrupt"+find_adjuster_in_traceback()+"\n")
Silas S. Brown's avatar
Silas S. Brown committed
def announceShutdown():
Silas S. Brown's avatar
Silas S. Brown committed
    if coreNo or options.multicore: return # silent helper process (coreNo=="unknown"), or we announce interrupts differently in multicore (see start_multicore)
    announceShutdown0()
def announceShutdown0():
    global exitting ; exitting = True # so not restarted if options.runWait == 0 and the run process was given the same signal (it can be confusing if get a restart message from the other thread AFTER shutdown has been announced)
Silas S. Brown's avatar
Silas S. Brown committed
    if options.background:
        logging.info("Server shutdown")
        if options.pidfile: unlink(options.pidfile)
    else: sys.stderr.write("Adjuster shutdown\n")
Silas S. Brown's avatar
Silas S. Brown committed

def main():
Silas S. Brown's avatar
Silas S. Brown committed
    check_injected_globals()
Silas S. Brown's avatar
Silas S. Brown committed
    setProcName() ; readOptions() ; preprocessOptions()
    serverControl() ; openPortsEtc() ; startServers()
    try: IOLoopInstance().start()
# "There seemed a strangeness in the air,
#  Vermilion light on the land's lean face;
#  I heard a Voice from I knew not where:
#   'The Great Adjustment is taking place!'" - Thomas Hardy
Silas S. Brown's avatar
Silas S. Brown committed
    except KeyboardInterrupt: announceInterrupt()
    announceShutdown()
    options.pimote = "" # so pimote_thread stops
    for v in kept_tempfiles.values(): unlink(v)
Silas S. Brown's avatar
Silas S. Brown committed
    if watchdog: watchdog.stop()
    stop_threads() # must be last thing

def plural(number):
    if number == 1: return ""
    else: return "s"
Silas S. Brown's avatar
Silas S. Brown committed
def stop_threads():
    if quitFuncToCall: quitFuncToCall()
Silas S. Brown's avatar
Silas S. Brown committed
    if not sslfork_monitor_pid == None:
Silas S. Brown's avatar
Silas S. Brown committed
        try: os.kill(sslfork_monitor_pid,signal.SIGTERM) # this should cause it to propagate that signal to the monitored PIDs
        except OSError: pass # somebody might have killall'd it
    CrossProcessLogging.shutdown()
    writeMsg = not options.background and not coreNo
    for t in range(10): # wait for helper_threads first (especially if quitFuncToCall above, as if the terminate routine is too forceful it might prevent the EOF from being sent over the pipe (multiprocessing.Pipe has no flush method after we send the EOF, so quitFuncToCall's returning does NOT mean the eof has actually been sent) and we could get a stuck adjusterWDhelp process)
      if t: time.sleep(0.2)
      if not helper_threads:
        if t: sys.stderr.write("Helper threads have stopped\n")
        return
      if not t and writeMsg: sys.stderr.write("Waiting 2secs for helper threads to stop...\n")
    ht = [(i,1) for i in sorted(helper_threads)]
    i = 0
    while i < len(ht)-1:
        if ht[i][0] == ht[i+1][0]:
            ht[i] = (ht[i][0], ht[i][1]+1)
            del ht[i+1]
        else: i += 1
    for i in xrange(len(ht)):
        if ht[i][1]==1: ht[i] = ht[i][0]
        else: ht[i] = ht[i][0]+"*"+str(ht[i][1])
    msg = "Terminating %d helper thread%s (%s)" % (len(ht),plural(len(ht)),", ".join(ht))
Silas S. Brown's avatar
Silas S. Brown committed
    # in case someone needs our port quickly.
    # Most likely "runaway" thread is ip_change_command if you did a --restart shortly after the server started.
    # TODO it would be nice if the port can be released at the IOLoop.instance.stop, and make sure os.system doesn't dup any /dev/watchdog handle we might need to release, so that it's not necessary to stop the threads
    if writeMsg: sys.stderr.write(msg+"\n")
Silas S. Brown's avatar
Silas S. Brown committed
    stop_threads0()
def stop_threads0():
Silas S. Brown's avatar
Silas S. Brown committed
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    if options.run:
        try: os.kill(runningPid,signal.SIGTERM)
        except NameError: pass # runningPid not set
        except OSError: pass # already exitted
Silas S. Brown's avatar
Silas S. Brown committed
    os.killpg(os.getpgrp(),signal.SIGTERM)
Silas S. Brown's avatar
Silas S. Brown committed
    os.abort() # if the above didn't work, this should
Silas S. Brown's avatar
Silas S. Brown committed
#@file: tornado-setup.py
Silas S. Brown's avatar
Silas S. Brown committed
# --------------------------------------------------
# Basic Tornado-server setup
# --------------------------------------------------

def static_handler():
    url,path = options.staticDocs.split('#')
    if not url.startswith("/"): url="/"+url
    if not url.endswith("/"): url += "/"
    class OurStaticFileHandler(StaticFileHandler):
        def set_extra_headers(self,path): fixServerHeader(self)
    return (url+"(.*)",OurStaticFileHandler,{"path":path,"default_filename":"index.html"})

Silas S. Brown's avatar
Silas S. Brown committed
port_randomise = {} # port -> _ or port -> mappedPort
def listen_on_port(application,port,address,browser,core="all",**kwargs):
Silas S. Brown's avatar
Silas S. Brown committed
    # Don't set backlog=0: it's advisory only and is often rounded up to 8; we use CrossProcess429 instead
Silas S. Brown's avatar
Silas S. Brown committed
    if port in port_randomise:
Silas S. Brown's avatar
Silas S. Brown committed
        # should get len(s)==1 if address=="127.0.0.1" (may get more than one socket, with different ports, if address maps to some mixed IPv4/IPv6 configuration)
Silas S. Brown's avatar
Silas S. Brown committed
        port_randomise[port] = s[0].getsockname()[1]
    else:
     for portTry in [5,4,3,2,1,0]:
      try: s = bind_sockets(port,address)
      except socket.error as e:
Silas S. Brown's avatar
Silas S. Brown committed
        if is_sslHelp:
            # We had better not time.sleep() here trying
            # to open, especially not if multicore: don't
            # want to hold up the OTHER ports being opened
            # and get into an infinite-restart loop when
            # MOST services are already running:
            f = lambda *_:IOLoopInstance().add_timeout(time.time()+1,lambda *args:listen_on_port(application,port,address,browser,core,schedRetry,**kwargs))
Silas S. Brown's avatar
Silas S. Brown committed
            if is_sslHelp=="started": f()
            else: sslRetries.append(f)
            logging.info("Can't open port "+repr(port)+", retry scheduled")
            return
Silas S. Brown's avatar
Silas S. Brown committed
        if not "already in use" in e.strerror: raise
        # Maybe the previous server is taking a while to stop
        if portTry:
            time.sleep(0.5) ; continue
        # tried 6 times over 3 seconds, can't open the port
        if browser:
            # there's probably another adjuster instance, in which case we probably want to let the browser open a new window and let our listen() fail
            dropPrivileges()
            runBrowser()
Silas S. Brown's avatar
Silas S. Brown committed
        raise Exception("Can't open port "+repr(port)+" (tried for 3 seconds, "+e.strerror+")")
    i = len(theServers.setdefault(core,[])) ; c = core
    class ServerStarter: # don't construct HTTPServer before fork
        def start(self):
            h = HTTPServer(application,**kwargs)
            h.add_sockets(s)
            if port==options.port:
                global mainServer ; mainServer = h
            theServers[c][i]=(port,h) ; h.start()
    theServers[core].append((port,ServerStarter()))
Silas S. Brown's avatar
Silas S. Brown committed
is_sslHelp = False ; sslRetries = []
def schedule_retries():
    global is_sslHelp,sslRetries
    is_sslHelp = "started"
    for s in sslRetries: s()