Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 89 additions & 68 deletions Tests/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,46 @@

import pytest
from packaging.version import parse as parse_version
from typing import Any, Callable, Sequence

from PIL import Image, ImageMath, features

logger = logging.getLogger(__name__)


HAS_UPLOADER = False

uploader = None
if os.environ.get("SHOW_ERRORS"):
# local img.show for errors.
HAS_UPLOADER = True

class test_image_results:
@staticmethod
def upload(a, b):
a.show()
b.show()

uploader = "show"
elif "GITHUB_ACTIONS" in os.environ:
HAS_UPLOADER = True

class test_image_results:
@staticmethod
def upload(a, b):
dir_errors = os.path.join(os.path.dirname(__file__), "errors")
os.makedirs(dir_errors, exist_ok=True)
tmpdir = tempfile.mkdtemp(dir=dir_errors)
a.save(os.path.join(tmpdir, "a.png"))
b.save(os.path.join(tmpdir, "b.png"))
return tmpdir

uploader = "github_actions"
else:
try:
import test_image_results

HAS_UPLOADER = True
uploader = "aws"
except ImportError:
pass


def convert_to_comparable(a, b):
def upload(a: Image.Image, b: Image.Image) -> str | None:
if uploader == "show":
# local img.show for errors.
a.show()
b.show()
elif uploader == "github_actions":
dir_errors = os.path.join(os.path.dirname(__file__), "errors")
os.makedirs(dir_errors, exist_ok=True)
tmpdir = tempfile.mkdtemp(dir=dir_errors)
a.save(os.path.join(tmpdir, "a.png"))
b.save(os.path.join(tmpdir, "b.png"))
return tmpdir
elif uploader == "aws":
return test_image_results.upload(a, b)
Comment on lines +49 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope for this PR, but if the AWS Lambda is long gone, shall we remove this stuff?

wiredfool/test-image-results#1 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've created #7739 to remove it.

return None


def convert_to_comparable(
a: Image.Image, b: Image.Image
) -> tuple[Image.Image, Image.Image]:
new_a, new_b = a, b
if a.mode == "P":
new_a = Image.new("L", a.size)
Expand All @@ -67,14 +66,16 @@ def convert_to_comparable(a, b):
return new_a, new_b


def assert_deep_equal(a, b, msg=None):
def assert_deep_equal(a: Any, b: Any, msg: str | None = None) -> None:
try:
assert len(a) == len(b), msg or f"got length {len(a)}, expected {len(b)}"
except Exception:
assert a == b, msg


def assert_image(im, mode, size, msg=None):
def assert_image(
im: Image.Image, mode: str, size: tuple[int, int], msg: str | None = None
) -> None:
if mode is not None:
assert im.mode == mode, (
msg or f"got mode {repr(im.mode)}, expected {repr(mode)}"
Expand All @@ -86,28 +87,32 @@ def assert_image(im, mode, size, msg=None):
)


def assert_image_equal(a, b, msg=None):
def assert_image_equal(a: Image.Image, b: Image.Image, msg: str | None = None) -> None:
assert a.mode == b.mode, msg or f"got mode {repr(a.mode)}, expected {repr(b.mode)}"
assert a.size == b.size, msg or f"got size {repr(a.size)}, expected {repr(b.size)}"
if a.tobytes() != b.tobytes():
if HAS_UPLOADER:
if uploader:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if is probably no longer needed since it is now checked in the upload function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this condition was removed, then the logger.error on line 97 would always run, and similarly, the logger.exception on line 136 would always run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it shouldn't run if there is no url returned?
If uploader = "show", that logger message is probably also undesirable.

url = upload(a, b)
if url:
    logger.error("URL for test images: %s", url)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've pushed f7701e6 for this.

try:
url = test_image_results.upload(a, b)
url = upload(a, b)
logger.error("URL for test images: %s", url)
except Exception:
pass

pytest.fail(msg or "got different content")


def assert_image_equal_tofile(a, filename, msg=None, mode=None):
def assert_image_equal_tofile(
a: Image.Image, filename: str, msg: str | None = None, mode: str | None = None
) -> None:
with Image.open(filename) as img:
if mode:
img = img.convert(mode)
assert_image_equal(a, img, msg)


def assert_image_similar(a, b, epsilon, msg=None):
def assert_image_similar(
a: Image.Image, b: Image.Image, epsilon: float, msg: str | None = None
) -> None:
assert a.mode == b.mode, msg or f"got mode {repr(a.mode)}, expected {repr(b.mode)}"
assert a.size == b.size, msg or f"got size {repr(a.size)}, expected {repr(b.size)}"

Expand All @@ -125,55 +130,68 @@ def assert_image_similar(a, b, epsilon, msg=None):
+ f" average pixel value difference {ave_diff:.4f} > epsilon {epsilon:.4f}"
)
except Exception as e:
if HAS_UPLOADER:
if uploader:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this if.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've pushed f7701e6 for this.

try:
url = test_image_results.upload(a, b)
url = upload(a, b)
logger.exception("URL for test images: %s", url)
except Exception:
pass
raise e


def assert_image_similar_tofile(a, filename, epsilon, msg=None, mode=None):
def assert_image_similar_tofile(
a: Image.Image,
filename: str,
epsilon: float,
msg: str | None = None,
mode: str | None = None,
) -> None:
with Image.open(filename) as img:
if mode:
img = img.convert(mode)
assert_image_similar(a, img, epsilon, msg)


def assert_all_same(items, msg=None):
def assert_all_same(items: Sequence[Any], msg: str | None = None) -> None:
assert items.count(items[0]) == len(items), msg


def assert_not_all_same(items, msg=None):
def assert_not_all_same(items: Sequence[Any], msg: str | None = None) -> None:
assert items.count(items[0]) != len(items), msg


def assert_tuple_approx_equal(actuals, targets, threshold, msg):
def assert_tuple_approx_equal(
actuals: Sequence[int], targets: tuple[int, ...], threshold: int, msg: str
) -> None:
"""Tests if actuals has values within threshold from targets"""
value = True
for i, target in enumerate(targets):
value *= target - threshold <= actuals[i] <= target + threshold

assert value, msg + ": " + repr(actuals) + " != " + repr(targets)
if not (target - threshold <= actuals[i] <= target + threshold):
pytest.fail(msg + ": " + repr(actuals) + " != " + repr(targets))


def skip_unless_feature(feature: str) -> pytest.MarkDecorator:
reason = f"{feature} not available"
return pytest.mark.skipif(not features.check(feature), reason=reason)


def skip_unless_feature_version(feature, version_required, reason=None):
def skip_unless_feature_version(
feature: str, required: str, reason: str | None = None
) -> pytest.MarkDecorator:
if not features.check(feature):
return pytest.mark.skip(f"{feature} not available")
if reason is None:
reason = f"{feature} is older than {version_required}"
version_required = parse_version(version_required)
reason = f"{feature} is older than {required}"
version_required = parse_version(required)
version_available = parse_version(features.version(feature))
return pytest.mark.skipif(version_available < version_required, reason=reason)


def mark_if_feature_version(mark, feature, version_blacklist, reason=None):
def mark_if_feature_version(
mark: pytest.MarkDecorator,
feature: str,
version_blacklist: str,
reason: str | None = None,
) -> pytest.MarkDecorator:
if not features.check(feature):
return pytest.mark.pil_noop_mark()
if reason is None:
Expand All @@ -194,7 +212,7 @@ class PillowLeakTestCase:
iterations = 100 # count
mem_limit = 512 # k

def _get_mem_usage(self):
def _get_mem_usage(self) -> float:
"""
Gets the RUSAGE memory usage, returns in K. Encapsulates the difference
between macOS and Linux rss reporting
Expand All @@ -216,7 +234,7 @@ def _get_mem_usage(self):
# This is the maximum resident set size used (in kilobytes).
return mem # Kb

def _test_leak(self, core):
def _test_leak(self, core: Callable[[], None]) -> None:
start_mem = self._get_mem_usage()
for cycle in range(self.iterations):
core()
Expand All @@ -228,17 +246,17 @@ def _test_leak(self, core):
# helpers


def fromstring(data):
def fromstring(data: bytes) -> Image.Image:
return Image.open(BytesIO(data))


def tostring(im, string_format, **options):
def tostring(im: Image.Image, string_format: str, **options: dict[str, Any]) -> bytes:
out = BytesIO()
im.save(out, string_format, **options)
return out.getvalue()


def hopper(mode=None, cache={}):
def hopper(mode: str | None = None, cache: dict[str, Image.Image] = {}) -> Image.Image:
if mode is None:
# Always return fresh not-yet-loaded version of image.
# Operations on not-yet-loaded images is separate class of errors
Expand All @@ -259,29 +277,31 @@ def hopper(mode=None, cache={}):
return im.copy()


def djpeg_available():
def djpeg_available() -> bool:
if shutil.which("djpeg"):
try:
subprocess.check_call(["djpeg", "-version"])
return True
except subprocess.CalledProcessError: # pragma: no cover
return False
pass
return False


def cjpeg_available():
def cjpeg_available() -> bool:
if shutil.which("cjpeg"):
try:
subprocess.check_call(["cjpeg", "-version"])
return True
except subprocess.CalledProcessError: # pragma: no cover
return False
pass
return False


def netpbm_available():
def netpbm_available() -> bool:
return bool(shutil.which("ppmquant") and shutil.which("ppmtogif"))


def magick_command():
def magick_command() -> list[str] | None:
if sys.platform == "win32":
magickhome = os.environ.get("MAGICK_HOME")
if magickhome:
Expand All @@ -298,47 +318,48 @@ def magick_command():
return imagemagick
if graphicsmagick and shutil.which(graphicsmagick[0]):
return graphicsmagick
return None


def on_appveyor():
def on_appveyor() -> bool:
return "APPVEYOR" in os.environ


def on_github_actions():
def on_github_actions() -> bool:
return "GITHUB_ACTIONS" in os.environ


def on_ci():
def on_ci() -> bool:
# GitHub Actions and AppVeyor have "CI"
return "CI" in os.environ


def is_big_endian():
def is_big_endian() -> bool:
return sys.byteorder == "big"


def is_ppc64le():
def is_ppc64le() -> bool:
import platform

return platform.machine() == "ppc64le"


def is_win32():
def is_win32() -> bool:
return sys.platform.startswith("win32")


def is_pypy():
def is_pypy() -> bool:
return hasattr(sys, "pypy_translation_info")


def is_mingw():
def is_mingw() -> bool:
return sysconfig.get_platform() == "mingw"


class CachedProperty:
def __init__(self, func):
def __init__(self, func: Callable[[Any], None]) -> None:
self.func = func

def __get__(self, instance, cls=None):
def __get__(self, instance: Any, cls: type[Any] | None = None) -> Any:
result = instance.__dict__[self.func.__name__] = self.func(instance)
return result