Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import hook #576

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- Added `filter_glob` and `exclude_glob` parameters to `fs.walk.Walker`.
Closes [#459](https://github.com/PyFilesystem/pyfilesystem2/issues/459).
- Added `fs.expose.importhook`.

### Fixed
- Elaborated documentation of `filter_dirs` and `exclude_dirs` in `fs.walk.Walker`.
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ Many thanks to the following developers for contributing to this project:
- [Vilius Grigaliūnas](https://github.com/vilius-g)
- [Will McGugan](https://github.com/willmcgugan)
- [Zmej Serow](https://github.com/zmej-serow)
- [Amir Rossert](https://github.com/arossert/)
Empty file added fs/expose/__init__.py
Empty file.
234 changes: 234 additions & 0 deletions fs/expose/importhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
"""
fs.expose.importhook
====================

Expose an FS object to the python import machinery, via a PEP-302 loader.

This module allows you to import python modules from an arbitrary FS object,
by placing FS urls on sys.path and/or inserting objects into sys.meta_path.

The main class in this module is FSImportHook, which is a PEP-302-compliant
module finder and loader. If you place an instance of FSImportHook on
sys.meta_path, you will be able to import modules from the exposed filesystem::

>>> from fs.memoryfs import MemoryFS
>>> m = MemoryFS()
>>> m.writetext("helloworld.py", "print('hello world!')")
>>>
>>> import sys
>>> from fs.expose.importhook import FSImportHook
>>> sys.meta_path.append(FSImportHook(m))
>>> import helloworld
hello world!

It is also possible to install FSImportHook as an import path handler. This
allows you to place filesystem URLs on sys.path and have them automagically
opened for importing. This example would allow modules to be imported from
an SFTP server::

>>> from fs.expose.importhook import FSImportHook
>>> FSImportHook.install()
>>> sys.path.append("sftp://some.remote.machine/mypath/")

"""
import os
import sys
import imp
import types
import marshal
import importlib
from importlib.util import MAGIC_NUMBER

from fs import open_fs
from fs.base import FS

from typing import Sequence, Text, Tuple, Optional

ModuleInfo = Tuple[Text, int, bool]


class FSImportHook(importlib.abc.MetaPathFinder):
"""PEP-302-compliant module finder and loader for FS objects.

FSImportHook is a module finder and loader that takes its data from an
arbitrary FS object. The FS must have .py or .pyc files stored in the
standard module structure.

For easy use with sys.path, FSImportHook will also accept a filesystem
URL, which is automatically opened using fs.opener.
"""

_VALID_MODULE_TYPES = {imp.PY_SOURCE, imp.PY_COMPILED}

def __init__(self, fs_or_url):
# type: ([FS, Text]) -> None
# If given a string, try to open it as an FS url.
# Don't open things on the local filesystem though.
if (isinstance(fs_or_url, str) and ":/" not in fs_or_url) and (
not isinstance(fs_or_url, FS)
):
raise ImportError()

self.fs = open_fs(fs_or_url)
self.path = fs_or_url if isinstance(fs_or_url, str) else None
# Cache the 'module_info' data
self.module_info = {}

@classmethod
def install(cls):
# type: () -> None
"""Install this class into the import machinery.

This classmethod installs the custom FSImportHook class into the
import machinery of the running process, if it is not already
installed.
"""
if cls in sys.path_hooks:
return
sys.path_hooks.append(cls)
sys.path_importer_cache.clear()

@classmethod
def uninstall(cls):
# type: () -> None
"""Uninstall this class from the import machinery.

This classmethod uninstalls the custom FSImportHook class from the
import machinery of the running process.
"""
if cls not in sys.path_hooks:
return
sys.path_hooks.remove(cls)
sys.path_importer_cache.clear()

def find_module(self, fullname, path=None):
# type: (Text, Sequence[bytes, str]) -> [FSImportHook, None]
"""Find the FS loader for the given module.

This object is always its own loader, so this really just checks
whether it's a valid module on the exposed filesystem.
"""
try:
self._get_module_info(fullname)
except ImportError:
return None
else:
return self

def _get_module_info(self, fullname):
# type: (Text) -> ModuleInfo
"""Get basic information about the given module.

If the specified module exists, this method returns a tuple giving
its filepath, file type and whether it's a package. Otherwise,
it raise ImportError.
"""
if fullname in self.module_info:
return self.module_info[fullname]

prefix = fullname.replace(".", "/")
# Is it a regular module?
path, _type = self._find_module_file(prefix)
if path is not None:
self.module_info[fullname] = path, _type, False
return path, _type, False
# Is it a package?
prefix = os.path.join(prefix, "__init__")
path, _type = self._find_module_file(prefix)
if path is not None:
self.module_info[fullname] = path, _type, True
return path, _type, True
# No, it's nothing
raise ImportError(fullname)

def _find_module_file(self, prefix):
# type: (Text) -> Tuple[Optional[Text], Optional[int]]
"""Find a module file from the given path prefix.

This method iterates over the possible module suffixes, checking each
in turn and returning the first match found. It returns a two-tuple
(path,type) or (None,None) if there's no module.
"""
for suffix, mode, _type in imp.get_suffixes():
if _type in self._VALID_MODULE_TYPES:
path = prefix + suffix
if self.fs.isfile(path):
return path, _type
return None, None

def load_module(self, fullname):
# type: (Text) -> types.ModuleType
"""Load the specified module.

This method locates the file for the specified module, loads and
executes it and returns the created module object.
"""
# Reuse an existing module if present.
try:
return sys.modules[fullname]
except KeyError:
pass
# Try to create from source or bytecode.
info = self._get_module_info(fullname)
code = self.get_code(fullname, info)
if code is None:
raise ImportError(fullname)
mod = types.ModuleType(fullname)
mod.__file__ = "<loading>"
mod.__loader__ = self
sys.modules[fullname] = mod
try:
exec(code, mod.__dict__)
mod.__file__ = self.get_filename(fullname, info)
if self.is_package(fullname, info):
if self.path is None:
mod.__path__ = []
else:
mod.__path__ = [self.path]
return mod
except Exception:
sys.modules.pop(fullname, None)
raise

def is_package(self, fullname, info=None):
# type: (Text, Optional[ModuleInfo]) -> bool
"""Check whether the specified module is a package."""
if info is None:
info = self._get_module_info(fullname)
path, _type, ispkg = info
return ispkg

def get_code(self, fullname, info=None):
# type: (Text, Optional[ModuleInfo]) -> Optional[Code]
"""Get the bytecode for the specified module."""
if info is None:
info = self._get_module_info(fullname)
path, _type, ispkg = info
code = self.fs.readbytes(path)
if _type == imp.PY_SOURCE:
code = code.replace(b"\r\n", b"\n")
return compile(code, path, "exec")
elif _type == imp.PY_COMPILED:
if code[:4] != MAGIC_NUMBER:
return None
return marshal.loads(code[8:])
else:
return None

def get_source(self, fullname, info=None):
# type: (Text, Optional[ModuleInfo]) -> Optional[bytes]
"""Get the sourcecode for the specified module, if present."""
if info is None:
info = self._get_module_info(fullname)
path, _type, ispkg = info
if _type != imp.PY_SOURCE:
return None
return self.fs.getcontents(path, "rb").replace(b"\r\n", b"\n")

def get_filename(self, fullname, info=None):
# type: (Text, Optional[ModuleInfo]) -> str
"""Get the __file__ attribute for the specified module."""
if info is None:
info = self._get_module_info(fullname)
path, _type, ispkg = info
return path
141 changes: 141 additions & 0 deletions tests/test_importhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import imp
import sys
import unittest
import marshal
import struct
from textwrap import dedent

from fs.expose.importhook import FSImportHook
from fs.tempfs import TempFS
from fs.zipfs import ZipFS


class TestFSImportHook(unittest.TestCase):

def setUp(self):
pass

def tearDown(self):
for mph in list(sys.meta_path):
if isinstance(mph, FSImportHook):
sys.meta_path.remove(mph)
for ph in list(sys.path_hooks):
if isinstance(ph, type) and issubclass(ph, FSImportHook):
sys.path_hooks.remove(ph)
for k, v in list(sys.modules.items()):
if k.startswith("fsih_"):
del sys.modules[k]
elif hasattr(v, "__loader__"):
if isinstance(v.__loader__, FSImportHook):
del sys.modules[k]
sys.path_importer_cache.clear()

def _init_modules(self, fs):
fs.writetext("__init__.py", "")
fs.writetext("fsih_hello.py", dedent("""
message = 'hello world!'
"""))
fs.makedir("fsih_pkg")
fs.writetext("fsih_pkg/__init__.py", dedent("""
a = 42
"""))
fs.writetext("fsih_pkg/sub1.py", dedent("""
import fsih_pkg
from fsih_hello import message
a = fsih_pkg.a
"""))
fs.writebytes("fsih_pkg/sub2.pyc", self._getpyc(dedent("""
import fsih_pkg
from fsih_hello import message
a = fsih_pkg.a * 2
""")))

def _getpyc(self, src):
"""Get the .pyc contents to match th given .py source code."""
code = imp.get_magic() + struct.pack("<i", 0)
code += marshal.dumps(compile(src, __file__, "exec"))
return code

def test_loader_methods(self):
t = TempFS()
self._init_modules(t)
ih = FSImportHook(t)
sys.meta_path.append(ih)
try:
self.assertEqual(ih.find_module("fsih_hello"), ih)
self.assertEqual(ih.find_module("fsih_helo"), None)
self.assertEqual(ih.find_module("fsih_pkg"), ih)
self.assertEqual(ih.find_module("fsih_pkg.sub1"), ih)
self.assertEqual(ih.find_module("fsih_pkg.sub2"), ih)
self.assertEqual(ih.find_module("fsih_pkg.sub3"), None)
m = ih.load_module("fsih_hello")
self.assertEqual(m.message, "hello world!")
self.assertRaises(ImportError, ih.load_module, "fsih_helo")
ih.load_module("fsih_pkg")
m = ih.load_module("fsih_pkg.sub1")
self.assertEqual(m.message, "hello world!")
self.assertEqual(m.a, 42)
m = ih.load_module("fsih_pkg.sub2")
self.assertEqual(m.message, "hello world!")
self.assertEqual(m.a, 42 * 2)
self.assertRaises(ImportError, ih.load_module, "fsih_pkg.sub3")
finally:
sys.meta_path.remove(ih)
t.close()

def _check_imports_are_working(self):
try:
import fsih_hello
self.assertEqual(fsih_hello.message, "hello world!")
try:
import fsih_helo
except ImportError:
pass
else:
assert False, "ImportError not raised"
import fsih_pkg
import fsih_pkg.sub1
self.assertEqual(fsih_pkg.sub1.message, "hello world!")
self.assertEqual(fsih_pkg.sub1.a, 42)
import fsih_pkg.sub2
self.assertEqual(fsih_pkg.sub2.message, "hello world!")
self.assertEqual(fsih_pkg.sub2.a, 42 * 2)
try:
import fsih_pkg.sub3
except ImportError:
pass
else:
assert False, "ImportError not raised"
finally:
for k in list(sys.modules.keys()):
if k.startswith("fsih_"):
del sys.modules[k]

def test_importer_on_meta_path(self):
t = TempFS()
self._init_modules(t)
ih = FSImportHook(t)
sys.meta_path.append(ih)
try:
self._check_imports_are_working()
finally:
sys.meta_path.remove(ih)
t.close()

def test_url_on_sys_path(self):
t = TempFS()
zpath = t.getsyspath("modules.zip")
z = ZipFS(zpath, write=True)
self._init_modules(z)
z.close()
z = ZipFS(zpath, write=False)
assert z.isfile("fsih_hello.py")
z.close()
FSImportHook.install()
sys.path.append("zip://" + zpath)
try:
self._check_imports_are_working()
finally:
sys.path_hooks.remove(FSImportHook)
sys.path.pop()
t.close()