This commit is contained in:
Sven Riwoldt
2024-04-01 20:30:24 +02:00
parent fd333f3514
commit c7bc862c6f
6804 changed files with 1065135 additions and 0 deletions

View File

@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
# -----------------------------------------------------------------------------
"""
Site package for the console kernel
NOTE: This package shouldn't be imported at **any** place.
It's only used to set additional functionality for
our consoles.
"""

View File

@@ -0,0 +1,127 @@
#
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
import linecache
import os.path
import types
import sys
from IPython.core.getipython import get_ipython
from spyder_kernels.py3compat import PY2
def new_main_mod(filename, modname):
"""
Reimplemented from IPython/core/interactiveshell.py to avoid caching
and clearing recursive namespace.
"""
filename = os.path.abspath(filename)
main_mod = types.ModuleType(
modname,
doc="Module created for script run in IPython")
main_mod.__file__ = filename
# It seems pydoc (and perhaps others) needs any module instance to
# implement a __nonzero__ method
main_mod.__nonzero__ = lambda : True
return main_mod
class NamespaceManager(object):
"""
Get a namespace and set __file__ to filename for this namespace.
The namespace is either namespace, the current namespace if
current_namespace is True, or a new namespace.
"""
def __init__(self, filename, namespace=None, current_namespace=False,
file_code=None, stack_depth=1):
self.filename = filename
self.ns_globals = namespace
self.ns_locals = None
self.current_namespace = current_namespace
self._previous_filename = None
self._previous_main = None
self._previous_running_namespace = None
self._reset_main = False
self._file_code = file_code
ipython_shell = get_ipython()
self.context_globals = ipython_shell.get_global_scope(stack_depth + 1)
self.context_locals = ipython_shell.get_local_scope(stack_depth + 1)
def __enter__(self):
"""
Prepare the namespace.
"""
# Save previous __file__
ipython_shell = get_ipython()
if self.ns_globals is None:
if self.current_namespace:
self.ns_globals = self.context_globals
self.ns_locals = self.context_locals
if '__file__' in self.ns_globals:
self._previous_filename = self.ns_globals['__file__']
self.ns_globals['__file__'] = self.filename
else:
main_mod = new_main_mod(self.filename, '__main__')
self.ns_globals = main_mod.__dict__
self.ns_locals = None
# Needed to allow pickle to reference main
if '__main__' in sys.modules:
self._previous_main = sys.modules['__main__']
sys.modules['__main__'] = main_mod
self._reset_main = True
# Save current namespace for access by variable explorer
self._previous_running_namespace = (
ipython_shell.kernel._running_namespace)
ipython_shell.kernel._running_namespace = (
self.ns_globals, self.ns_locals)
if (self._file_code is not None
and not PY2
and isinstance(self._file_code, bytes)):
try:
self._file_code = self._file_code.decode()
except UnicodeDecodeError:
# Setting the cache is not supported for non utf-8 files
self._file_code = None
if self._file_code is not None:
# '\n' is used instead of the native line endings. (see linecache)
# mtime is set to None to avoid a cache update.
linecache.cache[self.filename] = (
len(self._file_code), None,
[line + '\n' for line in self._file_code.splitlines()],
self.filename)
return self.ns_globals, self.ns_locals
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Reset the namespace.
"""
ipython_shell = get_ipython()
ipython_shell.kernel._running_namespace = (
self._previous_running_namespace)
if self._previous_filename:
self.ns_globals['__file__'] = self._previous_filename
elif '__file__' in self.ns_globals:
self.ns_globals.pop('__file__')
if not self.current_namespace:
self.context_globals.update(self.ns_globals)
if self.context_locals and self.ns_locals:
self.context_locals.update(self.ns_locals)
if self._previous_main:
sys.modules['__main__'] = self._previous_main
elif '__main__' in sys.modules and self._reset_main:
del sys.modules['__main__']
if self.filename in linecache.cache and os.path.exists(self.filename):
linecache.cache.pop(self.filename)

View File

@@ -0,0 +1,818 @@
#
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
# -----------------------------------------------------------------------------
#
# IMPORTANT NOTE: Don't add a coding line here! It's not necessary for
# site files
#
# Spyder consoles sitecustomize
#
import ast
import bdb
import io
import logging
import os
import pdb
import shlex
import sys
import time
import warnings
from IPython.core.getipython import get_ipython
from spyder_kernels.comms.frontendcomm import frontend_request
from spyder_kernels.customize.namespace_manager import NamespaceManager
from spyder_kernels.customize.spyderpdb import SpyderPdb, get_new_debugger
from spyder_kernels.customize.umr import UserModuleReloader
from spyder_kernels.py3compat import (
PY2, _print, encode, compat_exec, FileNotFoundError)
from spyder_kernels.customize.utils import capture_last_Expr, canonic
if not PY2:
from IPython.core.inputtransformer2 import (
TransformerManager, leading_indent, leading_empty_lines)
else:
from IPython.core.inputsplitter import IPythonInputSplitter
logger = logging.getLogger(__name__)
# =============================================================================
# sys.argv can be missing when Python is embedded, taking care of it.
# Fixes Issue 1473 and other crazy crashes with IPython 0.13 trying to
# access it.
# =============================================================================
if not hasattr(sys, 'argv'):
sys.argv = ['']
# =============================================================================
# Main constants
# =============================================================================
IS_EXT_INTERPRETER = os.environ.get('SPY_EXTERNAL_INTERPRETER') == "True"
HIDE_CMD_WINDOWS = os.environ.get('SPY_HIDE_CMD') == "True"
SHOW_INVALID_SYNTAX_MSG = True
# =============================================================================
# Execfile functions
#
# The definitions for Python 2 on Windows were taken from the IPython project
# Copyright (C) The IPython Development Team
# Distributed under the terms of the modified BSD license
# =============================================================================
try:
# Python 2
import __builtin__ as builtins
except ImportError:
# Python 3
import builtins
basestring = (str,)
# =============================================================================
# Setting console encoding (otherwise Python does not recognize encoding)
# for Windows platforms
# =============================================================================
if os.name == 'nt' and PY2:
try:
import locale, ctypes
_t, _cp = locale.getdefaultlocale('LANG')
try:
_cp = int(_cp[2:])
ctypes.windll.kernel32.SetConsoleCP(_cp)
ctypes.windll.kernel32.SetConsoleOutputCP(_cp)
except (ValueError, TypeError):
# Code page number in locale is not valid
pass
except Exception:
pass
# =============================================================================
# Prevent subprocess.Popen calls to create visible console windows on Windows.
# See issue #4932
# =============================================================================
if os.name == 'nt' and HIDE_CMD_WINDOWS:
import subprocess
creation_flag = 0x08000000 # CREATE_NO_WINDOW
class SubprocessPopen(subprocess.Popen):
def __init__(self, *args, **kwargs):
kwargs['creationflags'] = creation_flag
super(SubprocessPopen, self).__init__(*args, **kwargs)
subprocess.Popen = SubprocessPopen
# =============================================================================
# Importing user's sitecustomize
# =============================================================================
try:
import sitecustomize #analysis:ignore
except Exception:
pass
# =============================================================================
# Add default filesystem encoding on Linux to avoid an error with
# Matplotlib 1.5 in Python 2 (Fixes Issue 2793)
# =============================================================================
if PY2 and sys.platform.startswith('linux'):
def _getfilesystemencoding_wrapper():
return 'utf-8'
sys.getfilesystemencoding = _getfilesystemencoding_wrapper
# =============================================================================
# Set PyQt API to #2
# =============================================================================
if os.environ.get("QT_API") == 'pyqt':
try:
import sip
for qtype in ('QString', 'QVariant', 'QDate', 'QDateTime',
'QTextStream', 'QTime', 'QUrl'):
sip.setapi(qtype, 2)
except Exception:
pass
else:
try:
os.environ.pop('QT_API')
except KeyError:
pass
# =============================================================================
# Patch PyQt4 and PyQt5
# =============================================================================
# This saves the QApplication instances so that Python doesn't destroy them.
# Python sees all the QApplication as differnet Python objects, while
# Qt sees them as a singleton (There is only one Application!). Deleting one
# QApplication causes all the other Python instances to become broken.
# See spyder-ide/spyder/issues/2970
try:
from PyQt5 import QtWidgets
class SpyderQApplication(QtWidgets.QApplication):
def __init__(self, *args, **kwargs):
super(SpyderQApplication, self).__init__(*args, **kwargs)
# Add reference to avoid destruction
# This creates a Memory leak but avoids a Segmentation fault
SpyderQApplication._instance_list.append(self)
SpyderQApplication._instance_list = []
QtWidgets.QApplication = SpyderQApplication
except Exception:
pass
try:
from PyQt4 import QtGui
class SpyderQApplication(QtGui.QApplication):
def __init__(self, *args, **kwargs):
super(SpyderQApplication, self).__init__(*args, **kwargs)
# Add reference to avoid destruction
# This creates a Memory leak but avoids a Segmentation fault
SpyderQApplication._instance_list.append(self)
SpyderQApplication._instance_list = []
QtGui.QApplication = SpyderQApplication
except Exception:
pass
# =============================================================================
# IPython adjustments
# =============================================================================
# Patch unittest.main so that errors are printed directly in the console.
# See http://comments.gmane.org/gmane.comp.python.ipython.devel/10557
# Fixes Issue 1370
import unittest
from unittest import TestProgram
class IPyTesProgram(TestProgram):
def __init__(self, *args, **kwargs):
test_runner = unittest.TextTestRunner(stream=sys.stderr)
kwargs['testRunner'] = kwargs.pop('testRunner', test_runner)
kwargs['exit'] = False
TestProgram.__init__(self, *args, **kwargs)
unittest.main = IPyTesProgram
# Ignore some IPython/ipykernel warnings
try:
warnings.filterwarnings(action='ignore', category=DeprecationWarning,
module='ipykernel.ipkernel')
except Exception:
pass
# =============================================================================
# Turtle adjustments
# =============================================================================
# This is needed to prevent turtle scripts crashes after multiple runs in the
# same IPython Console instance.
# See Spyder issue #6278
try:
import turtle
from turtle import Screen, Terminator
def spyder_bye():
try:
Screen().bye()
turtle.TurtleScreen._RUNNING = True
except Terminator:
pass
turtle.bye = spyder_bye
except Exception:
pass
# =============================================================================
# Pandas adjustments
# =============================================================================
try:
import pandas as pd
# Set Pandas output encoding
pd.options.display.encoding = 'utf-8'
# Filter warning that appears for DataFrames with np.nan values
# Example:
# >>> import pandas as pd, numpy as np
# >>> pd.Series([np.nan,np.nan,np.nan],index=[1,2,3])
# Fixes Issue 2991
# For 0.18-
warnings.filterwarnings(action='ignore', category=RuntimeWarning,
module='pandas.core.format',
message=".*invalid value encountered in.*")
# For 0.18.1+
warnings.filterwarnings(action='ignore', category=RuntimeWarning,
module='pandas.formats.format',
message=".*invalid value encountered in.*")
except Exception:
pass
# =============================================================================
# Numpy adjustments
# =============================================================================
try:
# Filter warning that appears when users have 'Show max/min'
# turned on and Numpy arrays contain a nan value.
# Fixes Issue 7063
# Note: It only happens in Numpy 1.14+
warnings.filterwarnings(action='ignore', category=RuntimeWarning,
module='numpy.core._methods',
message=".*invalid value encountered in.*")
except Exception:
pass
# =============================================================================
# Multiprocessing adjustments
# =============================================================================
# This patch is only needed on Python 3
if not PY2:
# This could fail with changes in Python itself, so we protect it
# with a try/except
try:
import multiprocessing.spawn
_old_preparation_data = multiprocessing.spawn.get_preparation_data
def _patched_preparation_data(name):
"""
Patched get_preparation_data to work when all variables are
removed before execution.
"""
try:
d = _old_preparation_data(name)
except AttributeError:
main_module = sys.modules['__main__']
# Any string for __spec__ does the job
main_module.__spec__ = ''
d = _old_preparation_data(name)
# On windows, there is no fork, so we need to save the main file
# and import it
if (os.name == 'nt' and 'init_main_from_path' in d
and not os.path.exists(d['init_main_from_path'])):
_print(
"Warning: multiprocessing may need the main file to exist. "
"Please save {}".format(d['init_main_from_path']))
# Remove path as the subprocess can't do anything with it
del d['init_main_from_path']
return d
multiprocessing.spawn.get_preparation_data = _patched_preparation_data
except Exception:
pass
# =============================================================================
# os adjustments
# =============================================================================
# This is necessary to have better support for Rich and Colorama.
def _patched_get_terminal_size(fd=None):
return os.terminal_size((80, 30))
os.get_terminal_size = _patched_get_terminal_size
# =============================================================================
# Pdb adjustments
# =============================================================================
pdb.Pdb = SpyderPdb
# =============================================================================
# User module reloader
# =============================================================================
__umr__ = UserModuleReloader(namelist=os.environ.get("SPY_UMR_NAMELIST", None))
# =============================================================================
# Handle Post Mortem Debugging and Traceback Linkage to Spyder
# =============================================================================
def post_mortem_excepthook(type, value, tb):
"""
For post mortem exception handling, print a banner and enable post
mortem debugging.
"""
ipython_shell = get_ipython()
ipython_shell.showtraceback((type, value, tb))
p = pdb.Pdb(ipython_shell.colors)
if not type == SyntaxError:
# wait for stderr to print (stderr.flush does not work in this case)
time.sleep(0.1)
_print('*' * 40)
_print('Entering post mortem debugging...')
_print('*' * 40)
# Inform Spyder about position of exception: pdb.Pdb.interaction() calls
# cmd.Cmd.cmdloop(), which calls SpyderPdb.preloop() where
# send_initial_notification is handled.
p.send_initial_notification = True
p.reset()
frame = tb.tb_next.tb_frame
# wait for stdout to print
time.sleep(0.1)
p.interaction(frame, tb)
# ==============================================================================
# runfile and debugfile commands
# ==============================================================================
def get_current_file_name():
"""Get the current file name."""
try:
return frontend_request(blocking=True).current_filename()
except Exception:
_print("This command failed to be executed because an error occurred"
" while trying to get the current file name from Spyder's"
" editor. The error was:\n\n")
get_ipython().showtraceback(exception_only=True)
return None
def count_leading_empty_lines(cell):
"""Count the number of leading empty cells."""
if PY2:
lines = cell.splitlines(True)
else:
lines = cell.splitlines(keepends=True)
if not lines:
return 0
for i, line in enumerate(lines):
if line and not line.isspace():
return i
return len(lines)
def transform_cell(code, indent_only=False):
"""Transform IPython code to Python code."""
number_empty_lines = count_leading_empty_lines(code)
if indent_only:
# Not implemented for PY2
if PY2:
return code
if not code.endswith('\n'):
code += '\n' # Ensure the cell has a trailing newline
lines = code.splitlines(keepends=True)
lines = leading_indent(leading_empty_lines(lines))
code = ''.join(lines)
else:
if PY2:
tm = IPythonInputSplitter()
return tm.transform_cell(code)
else:
tm = TransformerManager()
code = tm.transform_cell(code)
return '\n' * number_empty_lines + code
def exec_code(code, filename, ns_globals, ns_locals=None, post_mortem=False,
exec_fun=None, capture_last_expression=False):
"""Execute code and display any exception."""
# Tell IPython to hide this frame (>7.16)
__tracebackhide__ = True
global SHOW_INVALID_SYNTAX_MSG
if PY2:
filename = encode(filename)
code = encode(code)
if exec_fun is None:
# Replace by exec when dropping Python 2
exec_fun = compat_exec
ipython_shell = get_ipython()
is_ipython = os.path.splitext(filename)[1] == '.ipy'
try:
if not is_ipython:
# TODO: remove the try-except and let the SyntaxError raise
# Because there should not be ipython code in a python file
try:
ast_code = ast.parse(transform_cell(code, indent_only=True))
except SyntaxError as e:
try:
ast_code = ast.parse(transform_cell(code))
except SyntaxError:
if PY2:
raise e
else:
# Need to call exec to avoid Syntax Error in Python 2.
# TODO: remove exec when dropping Python 2 support.
exec("raise e from None")
else:
if SHOW_INVALID_SYNTAX_MSG:
_print(
"\nWARNING: This is not valid Python code. "
"If you want to use IPython magics, "
"flexible indentation, and prompt removal, "
"we recommend that you save this file with the "
".ipy extension.\n")
SHOW_INVALID_SYNTAX_MSG = False
else:
ast_code = ast.parse(transform_cell(code))
if code.rstrip()[-1:] == ";":
# Supress output with ;
capture_last_expression = False
if capture_last_expression:
ast_code, capture_last_expression = capture_last_Expr(
ast_code, "_spyder_out")
exec_fun(compile(ast_code, filename, 'exec'), ns_globals, ns_locals)
if capture_last_expression:
out = ns_globals.pop("_spyder_out", None)
if out is not None:
return out
except SystemExit as status:
# ignore exit(0)
if status.code:
ipython_shell.showtraceback(exception_only=True)
except BaseException as error:
if (isinstance(error, bdb.BdbQuit)
and ipython_shell.pdb_session):
# Ignore BdbQuit if we are debugging, as it is expected.
ipython_shell.pdb_session = None
elif post_mortem and isinstance(error, Exception):
error_type, error, tb = sys.exc_info()
post_mortem_excepthook(error_type, error, tb)
else:
# We ignore the call to exec
ipython_shell.showtraceback(tb_offset=1)
finally:
__tracebackhide__ = "__pdb_exit__"
def get_file_code(filename, save_all=True, raise_exception=False):
"""Retrieve the content of a file."""
# Get code from spyder
try:
return frontend_request(blocking=True).get_file_code(
filename, save_all=save_all)
except Exception:
# Maybe this is a local file
try:
with open(filename, 'r') as f:
return f.read()
except FileNotFoundError:
pass
if raise_exception:
raise
# Else return None
return None
def runfile(filename=None, args=None, wdir=None, namespace=None,
post_mortem=False, current_namespace=False):
"""
Run filename
args: command line arguments (string)
wdir: working directory
namespace: namespace for execution
post_mortem: boolean, whether to enter post-mortem mode on error
current_namespace: if true, run the file in the current namespace
"""
return _exec_file(
filename, args, wdir, namespace,
post_mortem, current_namespace, stack_depth=1)
def _exec_file(filename=None, args=None, wdir=None, namespace=None,
post_mortem=False, current_namespace=False, stack_depth=0,
exec_fun=None, canonic_filename=None):
# Tell IPython to hide this frame (>7.16)
__tracebackhide__ = True
ipython_shell = get_ipython()
if filename is None:
filename = get_current_file_name()
if filename is None:
return
try:
filename = filename.decode('utf-8')
except (UnicodeError, TypeError, AttributeError):
# UnicodeError, TypeError --> eventually raised in Python 2
# AttributeError --> systematically raised in Python 3
pass
if PY2:
filename = encode(filename)
if __umr__.enabled:
__umr__.run()
if args is not None and not isinstance(args, basestring):
raise TypeError("expected a character buffer object")
try:
file_code = get_file_code(filename, raise_exception=True)
except Exception:
# Show an error and return None
_print(
"This command failed to be executed because an error occurred"
" while trying to get the file code from Spyder's"
" editor. The error was:\n\n")
get_ipython().showtraceback(exception_only=True)
return
# Here the remote filename has been used. It must now be valid locally.
if canonic_filename is not None:
filename = canonic_filename
else:
filename = canonic(filename)
with NamespaceManager(filename, namespace, current_namespace,
file_code=file_code, stack_depth=stack_depth + 1
) as (ns_globals, ns_locals):
sys.argv = [filename]
if args is not None:
for arg in shlex.split(args):
sys.argv.append(arg)
if "multiprocessing" in sys.modules:
# See https://github.com/spyder-ide/spyder/issues/16696
try:
sys.modules['__mp_main__'] = sys.modules['__main__']
except Exception:
pass
if wdir is not None:
if PY2:
try:
wdir = wdir.decode('utf-8')
except (UnicodeError, TypeError):
# UnicodeError, TypeError --> eventually raised in Python 2
pass
if os.path.isdir(wdir):
os.chdir(wdir)
# See https://github.com/spyder-ide/spyder/issues/13632
if "multiprocessing.process" in sys.modules:
try:
import multiprocessing.process
multiprocessing.process.ORIGINAL_DIR = os.path.abspath(
wdir)
except Exception:
pass
else:
_print("Working directory {} doesn't exist.\n".format(wdir))
try:
if __umr__.has_cython:
# Cython files
with io.open(filename, encoding='utf-8') as f:
ipython_shell.run_cell_magic('cython', '', f.read())
else:
exec_code(file_code, filename, ns_globals, ns_locals,
post_mortem=post_mortem, exec_fun=exec_fun,
capture_last_expression=False)
finally:
sys.argv = ['']
# IPykernel 6.3.0+ shadows our runfile because it depends on the Pydev
# debugger, which adds its own runfile to builtins. So we replace it with
# our own using exec_lines in start.py
if PY2:
builtins.runfile = runfile
else:
builtins.spyder_runfile = runfile
def debugfile(filename=None, args=None, wdir=None, post_mortem=False,
current_namespace=False):
"""
Debug filename
args: command line arguments (string)
wdir: working directory
post_mortem: boolean, included for compatiblity with runfile
"""
# Tell IPython to hide this frame (>7.16)
__tracebackhide__ = True
if filename is None:
filename = get_current_file_name()
if filename is None:
return
shell = get_ipython()
if shell.is_debugging():
# Recursive
code = (
"runfile({}".format(repr(filename)) +
", args=%r, wdir=%r, current_namespace=%r)" % (
args, wdir, current_namespace)
)
shell.pdb_session.enter_recursive_debugger(
code, filename, True,
)
else:
debugger = get_new_debugger(filename, True)
_exec_file(
filename=filename,
canonic_filename=debugger.canonic(filename),
args=args,
wdir=wdir,
current_namespace=current_namespace,
exec_fun=debugger.run,
stack_depth=1,
)
builtins.debugfile = debugfile
def runcell(cellname, filename=None, post_mortem=False):
"""
Run a code cell from an editor as a file.
Parameters
----------
cellname : str or int
Cell name or index.
filename : str
Needed to allow for proper traceback links.
post_mortem: bool
Automatically enter post mortem on exception.
"""
# Tell IPython to hide this frame (>7.16)
__tracebackhide__ = True
return _exec_cell(cellname, filename, post_mortem, stack_depth=1)
def _exec_cell(cellname, filename=None, post_mortem=False, stack_depth=0,
exec_fun=None, canonic_filename=None):
"""
Execute a code cell with a given exec function.
"""
# Tell IPython to hide this frame (>7.16)
__tracebackhide__ = True
if filename is None:
filename = get_current_file_name()
if filename is None:
return
try:
filename = filename.decode('utf-8')
except (UnicodeError, TypeError, AttributeError):
# UnicodeError, TypeError --> eventually raised in Python 2
# AttributeError --> systematically raised in Python 3
pass
ipython_shell = get_ipython()
try:
# Get code from spyder
cell_code = frontend_request(
blocking=True).run_cell(cellname, filename)
except Exception:
_print("This command failed to be executed because an error occurred"
" while trying to get the cell code from Spyder's"
" editor. The error was:\n\n")
get_ipython().showtraceback(exception_only=True)
return
if not cell_code or cell_code.strip() == '':
_print("Nothing to execute, this cell is empty.\n")
return
# Trigger `post_execute` to exit the additional pre-execution.
# See Spyder PR #7310.
ipython_shell.events.trigger('post_execute')
file_code = get_file_code(filename, save_all=False)
# Here the remote filename has been used. It must now be valid locally.
if canonic_filename is not None:
filename = canonic_filename
else:
# Normalise the filename
filename = canonic(filename)
with NamespaceManager(filename, current_namespace=True,
file_code=file_code, stack_depth=stack_depth + 1
) as (ns_globals, ns_locals):
return exec_code(cell_code, filename, ns_globals, ns_locals,
post_mortem=post_mortem, exec_fun=exec_fun,
capture_last_expression=True)
builtins.runcell = runcell
def debugcell(cellname, filename=None, post_mortem=False):
"""Debug a cell."""
# Tell IPython to hide this frame (>7.16)
__tracebackhide__ = True
if filename is None:
filename = get_current_file_name()
if filename is None:
return
shell = get_ipython()
if shell.is_debugging():
# Recursive
code = (
"runcell({}, ".format(repr(cellname)) +
"{})".format(repr(filename))
)
shell.pdb_session.enter_recursive_debugger(
code, filename, False,
)
else:
debugger = get_new_debugger(filename, False)
_exec_cell(
cellname=cellname,
filename=filename,
canonic_filename=debugger.canonic(filename),
exec_fun=debugger.run,
stack_depth=1
)
builtins.debugcell = debugcell
def cell_count(filename=None):
"""
Get the number of cells in a file.
Parameters
----------
filename : str
The file to get the cells from. If None, the currently opened file.
"""
if filename is None:
filename = get_current_file_name()
if filename is None:
raise RuntimeError('Could not get cell count from frontend.')
try:
# Get code from spyder
cell_count = frontend_request(blocking=True).cell_count(filename)
return cell_count
except Exception:
etype, error, tb = sys.exc_info()
raise etype(error)
builtins.cell_count = cell_count
# =============================================================================
# PYTHONPATH and sys.path Adjustments
# =============================================================================
# PYTHONPATH is not passed to kernel directly, see spyder-ide/spyder#13519
# This allows the kernel to start without crashing if modules in PYTHONPATH
# shadow standard library modules.
def set_spyder_pythonpath():
pypath = os.environ.get('SPY_PYTHONPATH')
if pypath:
sys.path.extend(pypath.split(os.pathsep))
os.environ.update({'PYTHONPATH': pypath})
set_spyder_pythonpath()

View File

@@ -0,0 +1,975 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
"""Spyder debugger."""
import ast
import bdb
import logging
import os
import sys
import traceback
import threading
from collections import namedtuple
from IPython.core.autocall import ZMQExitAutocall
from IPython.core.debugger import Pdb as ipyPdb
from IPython.core.getipython import get_ipython
from spyder_kernels.comms.frontendcomm import CommError, frontend_request
from spyder_kernels.customize.utils import path_is_library, capture_last_Expr
from spyder_kernels.py3compat import (
TimeoutError, PY2, _print, isidentifier, PY3, input)
if not PY2:
from IPython.core.inputtransformer2 import TransformerManager
import builtins
basestring = (str,)
else:
import __builtin__ as builtins
from IPython.core.inputsplitter import IPythonInputSplitter as TransformerManager
logger = logging.getLogger(__name__)
class DebugWrapper(object):
"""
Notifies the frontend when debugging starts/stops
"""
def __init__(self, pdb_obj):
self.pdb_obj = pdb_obj
def __enter__(self):
"""
Debugging starts.
"""
self.pdb_obj._frontend_notified = True
try:
frontend_request(blocking=True).set_debug_state(True)
except (CommError, TimeoutError):
logger.debug("Could not send debugging state to the frontend.")
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Debugging ends.
"""
self.pdb_obj._frontend_notified = False
try:
frontend_request(blocking=True).set_debug_state(False)
except (CommError, TimeoutError):
logger.debug("Could not send debugging state to the frontend.")
class SpyderPdb(ipyPdb, object): # Inherits `object` to call super() in PY2
"""
Extends Pdb to add features:
- Process IPython magics.
- Accepts multiline input.
- Better interrupt signal handling.
- Option to skip libraries while stepping.
- Add completion to non-command code.
"""
send_initial_notification = True
starting = True
def __init__(self, completekey='tab', stdin=None, stdout=None,
skip=None, nosigint=False):
"""Init Pdb."""
self.curframe_locals = None
# Only set to true when calling debugfile
self.continue_if_has_breakpoints = False
self.pdb_ignore_lib = False
self.pdb_execute_events = False
self.pdb_use_exclamation_mark = False
self._exclamation_warning_printed = False
self.pdb_stop_first_line = True
self._disable_next_stack_entry = False
super(SpyderPdb, self).__init__()
self._pdb_breaking = False
self._frontend_notified = False
# content of tuple: (filename, line number)
self._previous_step = None
# Don't report hidden frames for IPython 7.24+. This attribute
# has no effect in previous versions.
self.report_skipped = False
# Keep track of remote filename
self.remote_filename = None
# Line received from the frontend
self._cmd_input_line = None
# This is not available in IPython 5
if hasattr(self, '_predicates'):
# Turn off IPython's debugger skip funcionality by default because
# it makes our debugger quite slow. It's also important to remark
# that this functionality doesn't do anything on its own. Users
# need to mark what frames they want to skip for it to be useful.
# So, we hope that knowledgeable users will find that they need to
# enable it in Spyder.
# Fixes spyder-ide/spyder#20639.
self._predicates["debuggerskip"] = False
# --- Methods overriden for code execution
def print_exclamation_warning(self):
"""Print pdb warning for exclamation mark."""
if not self._exclamation_warning_printed:
print("Warning: The exclamation mark option is enabled. "
"Please use '!' as a prefix for Pdb commands.")
self._exclamation_warning_printed = True
def default(self, line):
"""
Default way of running pdb statment.
"""
execute_events = self.pdb_execute_events
if line[:1] == '!':
line = line[1:]
elif self.pdb_use_exclamation_mark:
self.print_exclamation_warning()
self.error("Unknown command '" + line.split()[0] + "'")
return
# Disallow the use of %debug magic in the debugger
if line.startswith("%debug"):
self.error("Please don't use '%debug' in the debugger.\n"
"For a recursive debugger, use the pdb 'debug'"
" command instead")
return
locals = self.curframe_locals
globals = self.curframe.f_globals
if self.pdb_use_exclamation_mark:
# Find pdb commands executed without !
cmd, arg, line = self.parseline(line)
if cmd:
cmd_in_namespace = (
cmd in globals
or cmd in locals
or cmd in builtins.__dict__
)
# Special case for quit and exit
if cmd in ("quit", "exit"):
if cmd in globals and isinstance(
globals[cmd], ZMQExitAutocall):
# Use the pdb call
cmd_in_namespace = False
cmd_func = getattr(self, 'do_' + cmd, None)
is_pdb_cmd = cmd_func is not None
# Look for assignment
is_assignment = False
try:
for node in ast.walk(ast.parse(line)):
if isinstance(node, ast.Assign):
is_assignment = True
break
except SyntaxError:
pass
if is_pdb_cmd:
if not cmd_in_namespace and not is_assignment:
# This is a pdb command without the '!' prefix.
self.lastcmd = line
return cmd_func(arg)
else:
# The pdb command is masked by something
self.print_exclamation_warning()
try:
line = TransformerManager().transform_cell(line)
save_stdout = sys.stdout
save_stdin = sys.stdin
save_displayhook = sys.displayhook
try:
sys.stdin = self.stdin
sys.stdout = self.stdout
sys.displayhook = self.displayhook
if execute_events:
get_ipython().events.trigger('pre_execute')
code_ast = ast.parse(line)
if line.rstrip()[-1:] == ";":
# Supress output with ;
capture_last_expression = False
else:
code_ast, capture_last_expression = capture_last_Expr(
code_ast, "_spyderpdb_out")
if locals is not globals:
# Mitigates a behaviour of CPython that makes it difficult
# to work with exec and the local namespace
# See:
# - https://bugs.python.org/issue41918
# - https://bugs.python.org/issue46153
# - https://bugs.python.org/issue21161
# - spyder-ide/spyder#13909
# - spyder-ide/spyder-kernels#345
#
# The idea here is that the best way to emulate being in a
# function is to actually execute the code in a function.
# A function called `_spyderpdb_code` is created and
# called. It will first load the locals, execute the code,
# and then update the locals.
#
# One limitation of this approach is that locals() is only
# a copy of the curframe locals. This means that closures
# for example are early binding instead of late binding.
# Create a function
indent = " "
code = ["def _spyderpdb_code():"]
# Load the locals
globals["_spyderpdb_builtins_locals"] = builtins.locals
# Save builtins locals in case it is shadowed
globals["_spyderpdb_locals"] = locals
# Load locals if they have a valid name
# In comprehensions, locals could contain ".0" for example
code += [indent + "{k} = _spyderpdb_locals['{k}']".format(
k=k) for k in locals if isidentifier(k)]
# Update the locals
code += [indent + "_spyderpdb_locals.update("
"_spyderpdb_builtins_locals())"]
# Run the function
code += ["_spyderpdb_code()"]
# Cleanup
code += [
"del _spyderpdb_code",
"del _spyderpdb_locals",
"del _spyderpdb_builtins_locals"
]
# Parse the function
fun_ast = ast.parse('\n'.join(code) + '\n')
# Inject code_ast in the function before the locals update
fun_ast.body[0].body = (
fun_ast.body[0].body[:-1] # The locals
+ code_ast.body # Code to run
+ fun_ast.body[0].body[-1:] # Locals update
)
code_ast = fun_ast
exec(compile(code_ast, "<stdin>", "exec"), globals)
if capture_last_expression:
out = globals.pop("_spyderpdb_out", None)
if out is not None:
sys.stdout.flush()
sys.stderr.flush()
try:
frontend_request(blocking=False).show_pdb_output(
repr(out))
except (CommError, TimeoutError):
# Fallback
print("pdb out> ", repr(out))
finally:
if execute_events:
get_ipython().events.trigger('post_execute')
sys.stdout = save_stdout
sys.stdin = save_stdin
sys.displayhook = save_displayhook
except BaseException:
if PY2:
t, v = sys.exc_info()[:2]
if type(t) == type(''):
exc_type_name = t
else: exc_type_name = t.__name__
print >>self.stdout, '***', exc_type_name + ':', v
else:
exc_info = sys.exc_info()[:2]
self.error(
traceback.format_exception_only(*exc_info)[-1].strip())
# --- Methods overriden for signal handling
def sigint_handler(self, signum, frame):
"""
Handle a sigint signal. Break on the frame above this one.
This method is not present in python2 so this won't be called there.
"""
if self.allow_kbdint:
raise KeyboardInterrupt
self.message("\nProgram interrupted. (Use 'cont' to resume).")
# avoid stopping in set_trace
sys.settrace(None)
self._pdb_breaking = True
self.set_step()
self.set_trace(sys._getframe())
def interaction(self, frame, traceback):
"""
Called when a user interaction is required.
If this is from sigint, break on the upper frame.
If the frame is in spydercustomize.py, quit.
Notifies spyder and print current code.
"""
if self._pdb_breaking:
self._pdb_breaking = False
if frame and frame.f_back:
return self.interaction(frame.f_back, traceback)
# This is necessary to handle chained exceptions in Pdb, support for
# which was added in IPython 8.15 and will be the default in Python
# 3.13 (see ipython/ipython#14146).
if isinstance(traceback, BaseException):
_chained_exceptions, tb = self._get_tb_and_exceptions(traceback)
with self._hold_exceptions(_chained_exceptions):
self.interaction(frame, tb)
return
self.setup(frame, traceback)
self.print_stack_entry(self.stack[self.curindex])
if self._frontend_notified:
self._cmdloop()
else:
with DebugWrapper(self):
self._cmdloop()
self.forget()
def print_stack_entry(self, frame_lineno, prompt_prefix='\n-> ',
context=None):
"""Disable printing stack entry if requested."""
if self._disable_next_stack_entry:
self._disable_next_stack_entry = False
return
return super(SpyderPdb, self).print_stack_entry(
frame_lineno, prompt_prefix, context)
# --- Methods overriden for skipping libraries
def stop_here(self, frame):
"""Check if pdb should stop here."""
if (frame is not None
and "__tracebackhide__" in frame.f_locals
and frame.f_locals["__tracebackhide__"] == "__pdb_exit__"):
self.onecmd('exit')
return False
if not super(SpyderPdb, self).stop_here(frame):
return False
filename = frame.f_code.co_filename
if filename.startswith('<'):
# This is not a file
return True
if self.pdb_ignore_lib and path_is_library(filename):
return False
return True
def do_where(self, arg):
"""w(here)
Print a stack trace, with the most recent frame at the bottom.
An arrow indicates the "current frame", which determines the
context of most commands. 'bt' is an alias for this command.
Take a number as argument as an (optional) number of context line to
print"""
super(SpyderPdb, self).do_where(arg)
try:
frontend_request(blocking=False).do_where()
except (CommError, TimeoutError):
logger.debug("Could not send where request to the frontend.")
do_w = do_where
do_bt = do_where
# --- Method defined by us to respond to ipython complete protocol
def do_complete(self, code, cursor_pos):
"""
Respond to a complete request.
"""
if self.pdb_use_exclamation_mark:
return self._complete_exclamation(code, cursor_pos)
else:
return self._complete_default(code, cursor_pos)
def _complete_default(self, code, cursor_pos):
"""
Respond to a complete request if not pdb_use_exclamation_mark.
"""
if cursor_pos is None:
cursor_pos = len(code)
# Get text to complete
text = code[:cursor_pos].split(' ')[-1]
# Choose Pdb function to complete, based on cmd.py
origline = code
line = origline.lstrip()
if not line:
# Nothing to complete
return
stripped = len(origline) - len(line)
begidx = cursor_pos - len(text) - stripped
endidx = cursor_pos - stripped
compfunc = None
ipython_do_complete = True
if begidx > 0:
# This could be after a Pdb command
cmd, args, _ = self.parseline(line)
if cmd != '':
try:
# Function to complete Pdb command arguments
compfunc = getattr(self, 'complete_' + cmd)
# Don't call ipython do_complete for commands
ipython_do_complete = False
except AttributeError:
pass
elif line[0] != '!':
# This could be a Pdb command
compfunc = self.completenames
def is_name_or_composed(text):
if not text or text[0] == '.':
return False
# We want to keep value.subvalue
return isidentifier(text.replace('.', ''))
while text and not is_name_or_composed(text):
text = text[1:]
begidx += 1
matches = []
if compfunc:
matches = compfunc(text, line, begidx, endidx)
cursor_start = cursor_pos - len(text)
if ipython_do_complete:
kernel = get_ipython().kernel
# Make complete call with current frame
if self.curframe:
if self.curframe_locals:
Frame = namedtuple("Frame", ["f_locals", "f_globals"])
frame = Frame(self.curframe_locals,
self.curframe.f_globals)
else:
frame = self.curframe
kernel.shell.set_completer_frame(frame)
result = kernel._do_complete(code, cursor_pos)
# Reset frame
kernel.shell.set_completer_frame()
# If there is no Pdb results to merge, return the result
if not compfunc:
return result
ipy_matches = result['matches']
# Make sure both match lists start at the same place
if cursor_start < result['cursor_start']:
# Fill IPython matches
missing_txt = code[cursor_start:result['cursor_start']]
ipy_matches = [missing_txt + m for m in ipy_matches]
elif result['cursor_start'] < cursor_start:
# Fill Pdb matches
missing_txt = code[result['cursor_start']:cursor_start]
matches = [missing_txt + m for m in matches]
cursor_start = result['cursor_start']
# Add Pdb-specific matches
matches += [match for match in ipy_matches if match not in matches]
return {'matches': matches,
'cursor_end': cursor_pos,
'cursor_start': cursor_start,
'metadata': {},
'status': 'ok'}
def _complete_exclamation(self, code, cursor_pos):
"""
Respond to a complete request if pdb_use_exclamation_mark.
"""
if cursor_pos is None:
cursor_pos = len(code)
# Get text to complete
text = code[:cursor_pos].split(' ')[-1]
# Choose Pdb function to complete, based on cmd.py
origline = code
line = origline.lstrip()
if not line:
# Nothing to complete
return
is_pdb_command = line[0] == '!'
is_pdb_command_name = False
stripped = len(origline) - len(line)
begidx = cursor_pos - len(text) - stripped
endidx = cursor_pos - stripped
compfunc = None
if is_pdb_command:
line = line[1:]
begidx -= 1
endidx -= 1
if begidx == -1:
is_pdb_command_name = True
text = text[1:]
begidx += 1
compfunc = self.completenames
else:
cmd, args, _ = self.parseline(line)
if cmd != '':
try:
# Function to complete Pdb command arguments
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
# This command doesn't exist, nothing to complete
return
else:
# We don't know this command
return
if not is_pdb_command_name:
# Remove eg. leading opening parenthesis
def is_name_or_composed(text):
if not text or text[0] == '.':
return False
# We want to keep value.subvalue
return isidentifier(text.replace('.', ''))
while text and not is_name_or_composed(text):
text = text[1:]
begidx += 1
cursor_start = cursor_pos - len(text)
matches = []
if is_pdb_command:
matches = compfunc(text, line, begidx, endidx)
return {
'matches': matches,
'cursor_end': cursor_pos,
'cursor_start': cursor_start,
'metadata': {},
'status': 'ok'
}
kernel = get_ipython().kernel
# Make complete call with current frame
if self.curframe:
if self.curframe_locals:
Frame = namedtuple("Frame", ["f_locals", "f_globals"])
frame = Frame(self.curframe_locals,
self.curframe.f_globals)
else:
frame = self.curframe
kernel.shell.set_completer_frame(frame)
result = kernel._do_complete(code, cursor_pos)
# Reset frame
kernel.shell.set_completer_frame()
return result
# --- Methods overriden by us for Spyder integration
def postloop(self):
# postloop() is called when the debuggers input prompt exists. Reset
# _previous_step so that publish_pdb_state() actually notifies Spyder
# about a changed frame the next the input prompt is entered again.
self._previous_step = None
def preloop(self):
"""Ask Spyder for breakpoints before the first prompt is created."""
try:
pdb_settings = frontend_request(blocking=True).get_pdb_settings()
self.pdb_ignore_lib = pdb_settings['pdb_ignore_lib']
self.pdb_execute_events = pdb_settings['pdb_execute_events']
self.pdb_use_exclamation_mark = pdb_settings[
'pdb_use_exclamation_mark']
self.pdb_stop_first_line = pdb_settings['pdb_stop_first_line']
if self.starting:
self.set_spyder_breakpoints(pdb_settings['breakpoints'])
if self.send_initial_notification:
self.publish_pdb_state()
except (CommError, TimeoutError):
logger.debug("Could not get breakpoints from the frontend.")
super(SpyderPdb, self).preloop()
def set_continue(self):
"""
Stop only at breakpoints or when finished.
Reimplemented to avoid stepping out of debugging if there are no
breakpoints. We could add more later.
"""
# Don't stop except at breakpoints or when finished
self._set_stopinfo(self.botframe, None, -1)
def reset(self):
"""
Register Pdb session after reset.
"""
super(SpyderPdb, self).reset()
get_ipython().pdb_session = self
def do_debug(self, arg):
"""
Debug code
Enter a recursive debugger that steps through the code
argument (which is an arbitrary expression or statement to be
executed in the current environment).
"""
try:
super(SpyderPdb, self).do_debug(arg)
except Exception:
if PY2:
t, v = sys.exc_info()[:2]
if type(t) == type(''):
exc_type_name = t
else: exc_type_name = t.__name__
print >>self.stdout, '***', exc_type_name + ':', v
else:
exc_info = sys.exc_info()[:2]
self.error(
traceback.format_exception_only(*exc_info)[-1].strip())
get_ipython().pdb_session = self
def user_return(self, frame, return_value):
"""This function is called when a return trap is set here."""
# This is useful when debugging in an active interpreter (otherwise,
# the debugger will stop before reaching the target file)
if self._wait_for_mainpyfile:
if (self.mainpyfile != self.canonic(frame.f_code.co_filename)
or frame.f_lineno <= 0):
return
self._wait_for_mainpyfile = False
super(SpyderPdb, self).user_return(frame, return_value)
def _cmdloop(self):
"""Modifies the error text."""
while True:
try:
# keyboard interrupts allow for an easy way to cancel
# the current command, so allow them during interactive input
self.allow_kbdint = True
self.cmdloop()
self.allow_kbdint = False
break
except KeyboardInterrupt:
_print("--KeyboardInterrupt--\n"
"For copying text while debugging, use Ctrl+Shift+C",
file=self.stdout)
def cmdloop(self, intro=None):
"""
Repeatedly issue a prompt, accept input, parse an initial prefix
off the received input, and dispatch to action methods, passing them
the remainder of the line as argument.
"""
self.preloop()
if intro is not None:
self.intro = intro
if self.intro:
self.stdout.write(str(self.intro)+"\n")
stop = None
while not stop:
if self.cmdqueue:
line = self.cmdqueue.pop(0)
else:
try:
line = self.cmd_input(self.prompt)
except EOFError:
line = 'EOF'
line = self.precmd(line)
stop = self.onecmd(line)
stop = self.postcmd(stop, line)
self.postloop()
def cmd_input(self, prompt=''):
"""
Get input from frontend. Blocks until return
"""
kernel = get_ipython().kernel
# Only works if the comm is open
if not kernel.frontend_comm.is_open():
return input(prompt)
# Flush output before making the request.
sys.stderr.flush()
sys.stdout.flush()
sys.__stderr__.flush()
sys.__stdout__.flush()
# Send the input request.
self._cmd_input_line = None
kernel.frontend_call().pdb_input(prompt)
# Allow GUI event loop to update
if PY3:
is_main_thread = (
threading.current_thread() is threading.main_thread())
else:
is_main_thread = isinstance(
threading.current_thread(), threading._MainThread)
# Get input by running eventloop
if is_main_thread and kernel.eventloop:
while self._cmd_input_line is None:
eventloop = kernel.eventloop
# Check if the current backend is Tk on Windows
# to let GUI update.
# See spyder-ide/spyder#17523
if (eventloop and hasattr(kernel, "app_wrapper") and
os.name == "nt"):
kernel.app_wrapper.app.update()
elif eventloop:
eventloop(kernel)
else:
break
# Get input by blocking
if self._cmd_input_line is None:
kernel.frontend_comm.wait_until(
lambda: self._cmd_input_line is not None)
return self._cmd_input_line
def precmd(self, line):
"""
Hook method executed just before the command line is
interpreted, but after the input prompt is generated and issued.
Here we switch ! and non !
"""
if not self.pdb_use_exclamation_mark:
return line
if not line:
return line
if line[0] == '!':
line = line[1:]
else:
line = '!' + line
return line
def postcmd(self, stop, line):
"""Hook method executed just after a command dispatch is finished."""
# Flush in case the command produced output on underlying outputs
sys.__stderr__.flush()
sys.__stdout__.flush()
self.publish_pdb_state()
return super(SpyderPdb, self).postcmd(stop, line)
if PY2:
def break_here(self, frame):
"""
Breakpoints don't work for files with non-ascii chars in Python 2
Fixes Issue 1484
"""
from bdb import effective
filename = self.canonic(frame.f_code.co_filename)
try:
filename = unicode(filename, "utf-8")
except TypeError:
pass
if filename not in self.breaks:
return False
lineno = frame.f_lineno
if lineno not in self.breaks[filename]:
# The line itself has no breakpoint, but maybe the line is the
# first line of a function with breakpoint set by function name
lineno = frame.f_code.co_firstlineno
if lineno not in self.breaks[filename]:
return False
# flag says ok to delete temp. bp
(bp, flag) = effective(filename, lineno, frame)
if bp:
self.currentbp = bp.number
if (flag and bp.temporary):
self.do_clear(str(bp.number))
return True
else:
return False
# --- Methods defined by us for Spyder integration
def set_spyder_breakpoints(self, breakpoints):
"""Set Spyder breakpoints."""
self.clear_all_breaks()
# -----Really deleting all breakpoints:
for bp in bdb.Breakpoint.bpbynumber:
if bp:
bp.deleteMe()
bdb.Breakpoint.next = 1
bdb.Breakpoint.bplist = {}
bdb.Breakpoint.bpbynumber = [None]
# -----
for fname, data in list(breakpoints.items()):
for linenumber, condition in data:
try:
self.set_break(self.canonic(fname), linenumber,
cond=condition)
except ValueError:
# Fixes spyder/issues/15546
# The file is not readable
pass
# Jump to first breakpoint.
# Fixes issue 2034
if self.starting:
# Only run this after a Pdb session is created
self.starting = False
# Get all breakpoints for the file we're going to debug
frame = self.curframe
if not frame:
# We are not debugging, return. Solves #10290
return
lineno = frame.f_lineno
breaks = self.get_file_breaks(frame.f_code.co_filename)
# Do 'continue' if the first breakpoint is *not* placed
# where the debugger is going to land.
# Fixes issue 4681
if self.pdb_stop_first_line:
do_continue = (
self.continue_if_has_breakpoints
and breaks
and lineno < breaks[0])
else:
# The breakpoint could be in another file.
do_continue = (
self.continue_if_has_breakpoints
and not (breaks and lineno >= breaks[0]))
if do_continue:
try:
if self.pdb_use_exclamation_mark:
cont_cmd = '!continue'
else:
cont_cmd = 'continue'
frontend_request(blocking=False).pdb_execute(cont_cmd)
except (CommError, TimeoutError):
logger.debug(
"Could not send a Pdb continue call to the frontend.")
def publish_pdb_state(self):
"""
Send debugger state (frame position) to the frontend.
The state is only sent if it has changed since the last update.
"""
frame = self.curframe
if frame is None:
self._previous_step = None
return
# Get filename and line number of the current frame
fname = self.canonic(frame.f_code.co_filename)
if PY2:
try:
fname = unicode(fname, "utf-8")
except TypeError:
pass
if fname == self.mainpyfile and self.remote_filename is not None:
fname = self.remote_filename
lineno = frame.f_lineno
if self._previous_step == (fname, lineno):
return
# Set step of the current frame (if any)
step = {}
self._previous_step = None
if isinstance(fname, basestring) and isinstance(lineno, int):
step = dict(fname=fname, lineno=lineno)
self._previous_step = (fname, lineno)
try:
frontend_request(blocking=False).pdb_state(dict(step=step))
except (CommError, TimeoutError):
logger.debug("Could not send Pdb state to the frontend.")
def run(self, cmd, globals=None, locals=None):
"""Debug a statement executed via the exec() function.
globals defaults to __main__.dict; locals defaults to globals.
"""
self.starting = True
with DebugWrapper(self):
super(SpyderPdb, self).run(cmd, globals, locals)
def runeval(self, expr, globals=None, locals=None):
"""Debug an expression executed via the eval() function.
globals defaults to __main__.dict; locals defaults to globals.
"""
self.starting = True
with DebugWrapper(self):
super(SpyderPdb, self).runeval(expr, globals, locals)
def runcall(self, *args, **kwds):
"""Debug a single function call.
Return the result of the function call.
"""
self.starting = True
with DebugWrapper(self):
super(SpyderPdb, self).runcall(*args, **kwds)
def enter_recursive_debugger(self, code, filename,
continue_if_has_breakpoints):
"""
Enter debugger recursively.
"""
sys.settrace(None)
globals = self.curframe.f_globals
locals = self.curframe_locals
# Create child debugger
debugger = SpyderPdb(
completekey=self.completekey,
stdin=self.stdin, stdout=self.stdout)
debugger.use_rawinput = self.use_rawinput
debugger.prompt = "(%s) " % self.prompt.strip()
debugger.set_remote_filename(filename)
debugger.continue_if_has_breakpoints = continue_if_has_breakpoints
# Enter recursive debugger
sys.call_tracing(debugger.run, (code, globals, locals))
# Reset parent debugger
sys.settrace(self.trace_dispatch)
self.lastcmd = debugger.lastcmd
get_ipython().pdb_session = self
# Reset _previous_step so that publish_pdb_state() called from within
# postcmd() notifies Spyder about a changed debugger position. The reset
# is required because the recursive debugger might change the position,
# but the parent debugger (self) is not aware of this.
self._previous_step = None
def set_remote_filename(self, filename):
"""Set remote filename to signal Spyder on mainpyfile."""
self.remote_filename = filename
self.mainpyfile = self.canonic(filename)
self._wait_for_mainpyfile = True
def get_new_debugger(filename, continue_if_has_breakpoints):
"""Get a new debugger."""
debugger = SpyderPdb()
debugger.set_remote_filename(filename)
debugger.continue_if_has_breakpoints = continue_if_has_breakpoints
return debugger

View File

@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
# -----------------------------------------------------------------------------
"""Tests for spydercustomize."""

View File

@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
# -----------------------------------------------------------------------------
"""Tests for the User Module Reloader."""
# Stdlib imports
import os
import sys
# Third party imports
import pytest
# Local imports
from spyder_kernels.py3compat import to_text_string
from spyder_kernels.customize.umr import UserModuleReloader
@pytest.fixture
def user_module(tmpdir):
"""Create a simple module in tmpdir as an example of a user module."""
if to_text_string(tmpdir) not in sys.path:
sys.path.append(to_text_string(tmpdir))
def create_module(modname):
modfile = tmpdir.mkdir(modname).join('bar.py')
code = """
def square(x):
return x**2
"""
modfile.write(code)
init_file = tmpdir.join(modname).join('__init__.py')
init_file.write('#')
return create_module
def test_umr_skip_cython(user_module):
"""
Test that the UMR doesn't try to reload modules when Cython
support is active.
"""
# Create user module
user_module('foo')
# Activate Cython support
os.environ['SPY_RUN_CYTHON'] = 'True'
# Create UMR
umr = UserModuleReloader()
import foo
assert umr.is_module_reloadable(foo, 'foo') == False
# Deactivate Cython support
os.environ['SPY_RUN_CYTHON'] = 'False'
def test_umr_run(user_module):
"""Test that UMR's run method is working correctly."""
# Create user module
user_module('foo1')
# Activate verbose mode in the UMR
os.environ['SPY_UMR_VERBOSE'] = 'True'
# Create UMR
umr = UserModuleReloader()
from foo1.bar import square
umr.run()
umr.modnames_to_reload == ['foo', 'foo.bar']
def test_umr_previous_modules(user_module):
"""Test that UMR's previos_modules is working as expected."""
# Create user module
user_module('foo2')
# Create UMR
umr = UserModuleReloader()
import foo2
assert 'IPython' in umr.previous_modules
assert 'foo2' not in umr.previous_modules
def test_umr_namelist():
"""Test that the UMR skips modules according to its name."""
umr = UserModuleReloader()
assert umr.is_module_in_namelist('tensorflow')
assert umr.is_module_in_namelist('pytorch')
assert umr.is_module_in_namelist('spyder_kernels')
assert not umr.is_module_in_namelist('foo')
def test_umr_reload_modules(user_module):
"""Test that the UMR only tries to reload user modules."""
# Create user module
user_module('foo3')
# Create UMR
umr = UserModuleReloader()
# Don't reload stdlib modules
import xml
assert not umr.is_module_reloadable(xml, 'xml')
# Don't reload third-party modules
import numpy
assert not umr.is_module_reloadable(numpy, 'numpy')
# Reload user modules
import foo3
assert umr.is_module_reloadable(foo3, 'foo3')

View File

@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
# -----------------------------------------------------------------------------
import os
import sys
from spyder_kernels.customize.utils import create_pathlist
def test_user_sitepackages_in_pathlist():
"""Test that we include user site-packages in pathlist."""
if sys.platform.startswith('linux'):
user_path = 'local'
elif (sys.platform == 'darwin' or sys.platform.startswith('freebsd')):
user_path = os.path.expanduser('~/.local')
else:
user_path = 'Roaming'
assert any([user_path in path for path in create_pathlist()])

View File

@@ -0,0 +1,143 @@
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
"""User module reloader."""
import os
import sys
from spyder_kernels.customize.utils import path_is_library
from spyder_kernels.py3compat import PY2, _print
class UserModuleReloader(object):
"""
User Module Reloader (UMR) aims at deleting user modules
to force Python to deeply reload them during import
pathlist [list]: blacklist in terms of module path
namelist [list]: blacklist in terms of module name
"""
def __init__(self, namelist=None, pathlist=None):
if namelist is None:
namelist = []
else:
try:
namelist = namelist.split(',')
except Exception:
namelist = []
# Spyder modules
spy_modules = ['spyder_kernels']
# Matplotlib modules
mpl_modules = ['matplotlib', 'tkinter', 'Tkinter']
# Add other, necessary modules to the UMR blacklist
# astropy: See spyder-ide/spyder#6962
# pytorch: See spyder-ide/spyder#7041
# fastmat: See spyder-ide/spyder#7190
# pythoncom: See spyder-ide/spyder#7190
# tensorflow: See spyder-ide/spyder#8697
other_modules = ['pytorch', 'pythoncom', 'tensorflow']
if PY2:
py2_modules = ['astropy', 'fastmat']
other_modules = other_modules + py2_modules
self.namelist = namelist + spy_modules + mpl_modules + other_modules
self.pathlist = pathlist
# List of previously loaded modules
self.previous_modules = list(sys.modules.keys())
# List of module names to reload
self.modnames_to_reload = []
# Activate Cython support
self.has_cython = False
self.activate_cython()
# Check if the UMR is enabled or not
enabled = os.environ.get("SPY_UMR_ENABLED", "")
self.enabled = enabled.lower() == "true"
# Check if the UMR should print the list of reloaded modules or not
verbose = os.environ.get("SPY_UMR_VERBOSE", "")
self.verbose = verbose.lower() == "true"
def is_module_reloadable(self, module, modname):
"""Decide if a module is reloadable or not."""
if self.has_cython:
# Don't return cached inline compiled .PYX files
return False
else:
if (path_is_library(getattr(module, '__file__', None),
self.pathlist) or
self.is_module_in_namelist(modname)):
return False
else:
return True
def is_module_in_namelist(self, modname):
"""Decide if a module can be reloaded or not according to its name."""
return set(modname.split('.')) & set(self.namelist)
def activate_cython(self):
"""
Activate Cython support.
We need to run this here because if the support is
active, we don't to run the UMR at all.
"""
run_cython = os.environ.get("SPY_RUN_CYTHON") == "True"
if run_cython:
try:
__import__('Cython')
self.has_cython = True
except Exception:
pass
if self.has_cython:
# Import pyximport to enable Cython files support for
# import statement
import pyximport
pyx_setup_args = {}
# Add Numpy include dir to pyximport/distutils
try:
import numpy
pyx_setup_args['include_dirs'] = numpy.get_include()
except Exception:
pass
# Setup pyximport and enable Cython files reload
pyximport.install(setup_args=pyx_setup_args,
reload_support=True)
def run(self):
"""
Delete user modules to force Python to deeply reload them
Do not del modules which are considered as system modules, i.e.
modules installed in subdirectories of Python interpreter's binary
Do not del C modules
"""
self.modnames_to_reload = []
for modname, module in list(sys.modules.items()):
if modname not in self.previous_modules:
# Decide if a module can be reloaded or not
if self.is_module_reloadable(module, modname):
self.modnames_to_reload.append(modname)
del sys.modules[modname]
else:
continue
# Report reloaded modules
if self.verbose and self.modnames_to_reload:
modnames = self.modnames_to_reload
_print("\x1b[4;33m%s\x1b[24m%s\x1b[0m"
% ("Reloaded modules", ": "+", ".join(modnames)))

View File

@@ -0,0 +1,140 @@
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
"""Utility functions."""
import ast
import os
import re
import sys
import sysconfig
def create_pathlist():
"""
Create list of Python library paths to be skipped from module
reloading and Pdb steps.
"""
# Get standard installation paths
try:
paths = sysconfig.get_paths()
standard_paths = [paths['stdlib'],
paths['purelib'],
paths['scripts'],
paths['data']]
except Exception:
standard_paths = []
# Get user installation path
# See spyder-ide/spyder#8776
try:
import site
if getattr(site, 'getusersitepackages', False):
# Virtualenvs don't have this function but
# conda envs do
user_path = [site.getusersitepackages()]
elif getattr(site, 'USER_SITE', False):
# However, it seems virtualenvs have this
# constant
user_path = [site.USER_SITE]
else:
user_path = []
except Exception:
user_path = []
return standard_paths + user_path
def path_is_library(path, initial_pathlist=None):
"""Decide if a path is in user code or a library according to its path."""
# Compute DEFAULT_PATHLIST only once and make it global to reuse it
# in any future call of this function.
if 'DEFAULT_PATHLIST' not in globals():
global DEFAULT_PATHLIST
DEFAULT_PATHLIST = create_pathlist()
if initial_pathlist is None:
initial_pathlist = []
pathlist = initial_pathlist + DEFAULT_PATHLIST
if path is None:
# Path probably comes from a C module that is statically linked
# into the interpreter. There is no way to know its path, so we
# choose to ignore it.
return True
elif any([p in path for p in pathlist]):
# We don't want to consider paths that belong to the standard
# library or installed to site-packages.
return True
elif os.name == 'nt':
if re.search(r'.*\\pkgs\\.*', path):
return True
else:
return False
elif not os.name == 'nt':
# Paths containing the strings below can be part of the default
# Linux installation, Homebrew or the user site-packages in a
# virtualenv.
patterns = [
r'^/usr/lib.*',
r'^/usr/local/lib.*',
r'^/usr/.*/dist-packages/.*',
r'^/home/.*/.local/lib.*',
r'^/Library/.*',
r'^/Users/.*/Library/.*',
r'^/Users/.*/.local/.*',
]
if [p for p in patterns if re.search(p, path)]:
return True
else:
return False
else:
return False
def capture_last_Expr(code_ast, out_varname):
"""Parse line and modify code to capture in globals the last expression."""
# Modify ast code to capture the last expression
capture_last_expression = False
if (
len(code_ast.body)
and isinstance(code_ast.body[-1], ast.Expr)
):
capture_last_expression = True
expr_node = code_ast.body[-1]
# Create new assign node
assign_node = ast.parse(
'globals()[{}] = None'.format(repr(out_varname))).body[0]
# Replace None by the value
assign_node.value = expr_node.value
# Fix line number and column offset
assign_node.lineno = expr_node.lineno
assign_node.col_offset = expr_node.col_offset
if sys.version_info[:2] >= (3, 8):
# Exists from 3.8, necessary from 3.11
assign_node.end_lineno = expr_node.end_lineno
if assign_node.lineno == assign_node.end_lineno:
# Add 'globals()[{}] = ' and remove 'None'
assign_node.end_col_offset += expr_node.end_col_offset - 4
else:
assign_node.end_col_offset = expr_node.end_col_offset
code_ast.body[-1] = assign_node
return code_ast, capture_last_expression
def canonic(filename):
"""
Return canonical form of filename.
This is a copy of bdb.canonic, so that the debugger will process
filenames in the same way
"""
if filename == "<" + filename[1:-1] + ">":
return filename
canonic = os.path.abspath(filename)
canonic = os.path.normcase(canonic)
return canonic