update mutagen

This commit is contained in:
AdeHub
2024-08-24 16:00:16 +12:00
parent ba666f68b8
commit 5b1d9c0750
70 changed files with 310 additions and 2784 deletions

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
#
# This program is free software; you can redistribute it and/or modify
@@ -23,7 +22,7 @@ from mutagen._util import MutagenError
from mutagen._file import FileType, StreamInfo, File
from mutagen._tags import Tags, Metadata, PaddingInfo
version = (1, 45, 1)
version = (1, 47, 1)
"""Version tuple."""
version_string = ".".join(map(str, version))

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
#
# This program is free software; you can redistribute it and/or modify
@@ -7,6 +6,7 @@
# (at your option) any later version.
import warnings
from typing import List
from mutagen._util import DictMixin, loadfile
@@ -82,9 +82,9 @@ class FileType(DictMixin):
if self.tags is None:
raise KeyError(key)
else:
del(self.tags[key])
del self.tags[key]
def keys(self):
def keys(self) -> list:
"""Return a list of keys in the metadata tag.
If the file has no tags at all, an empty list is returned.
@@ -131,12 +131,13 @@ class FileType(DictMixin):
if self.tags is not None:
return self.tags.save(filething, **kwargs)
def pprint(self):
def pprint(self) -> str:
"""
Returns:
text: stream information and comment key=value pairs.
"""
assert self.info is not None
stream = "%s (%s)" % (self.info.pprint(), self.mime[0])
try:
tags = self.tags.pprint()
@@ -145,7 +146,7 @@ class FileType(DictMixin):
else:
return stream + ((tags and "\n" + tags) or "")
def add_tags(self):
def add_tags(self) -> None:
"""Adds new tags to the file.
Raises:
@@ -156,7 +157,7 @@ class FileType(DictMixin):
raise NotImplementedError
@property
def mime(self):
def mime(self) -> List[str]:
"""A list of mime types (:class:`mutagen.text`)"""
mimes = []
@@ -167,7 +168,7 @@ class FileType(DictMixin):
return mimes
@staticmethod
def score(filename, fileobj, header):
def score(filename, fileobj, header) -> int:
"""Returns a score for how likely the file can be parsed by this type.
Args:
@@ -195,7 +196,7 @@ class StreamInfo(object):
__module__ = "mutagen"
def pprint(self):
def pprint(self) -> str:
"""
Returns:
text: Print stream information

View File

@@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Evan Purkhiser
# 2014 Ben Ockmore
# 2017 Borewit
# 2019-2020 Philipp Wolfer
# 2019-2021 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -38,7 +37,7 @@ class EmptyChunk(InvalidChunk):
pass
def is_valid_chunk_id(id):
def is_valid_chunk_id(id: str) -> bool:
""" is_valid_chunk_id(FOURCC)
Arguments:
@@ -57,7 +56,7 @@ def is_valid_chunk_id(id):
# Assert FOURCC formatted valid
def assert_valid_chunk_id(id):
def assert_valid_chunk_id(id: str) -> None:
if not is_valid_chunk_id(id):
raise ValueError("IFF chunk ID must be four ASCII characters.")
@@ -127,13 +126,13 @@ class IffChunk(object):
% (type(self).__name__, self.id, self.offset, self.size,
self.data_offset, self.data_size))
def read(self):
def read(self) -> bytes:
"""Read the chunks data"""
self._fileobj.seek(self.data_offset)
return self._fileobj.read(self.data_size)
def write(self, data):
def write(self, data: bytes) -> None:
"""Write the chunk data"""
if len(data) > self.data_size:
@@ -147,7 +146,7 @@ class IffChunk(object):
self._fileobj.seek(self.data_offset + self.data_size)
self._fileobj.write(b'\x00' * padding)
def delete(self):
def delete(self) -> None:
"""Removes the chunk from the file"""
delete_bytes(self._fileobj, self.size, self.offset)
@@ -173,23 +172,36 @@ class IffChunk(object):
self.size = self.HEADER_SIZE + self.data_size + self.padding()
assert self.size % 2 == 0
def resize(self, new_data_size):
def resize(self, new_data_size: int) -> None:
"""Resize the file and update the chunk sizes"""
old_size = self._get_actual_data_size()
padding = new_data_size % 2
resize_bytes(self._fileobj, self.data_size + self.padding(),
resize_bytes(self._fileobj, old_size,
new_data_size + padding, self.data_offset)
size_diff = new_data_size - self.data_size
self._update_size(size_diff)
self._fileobj.flush()
def padding(self):
def padding(self) -> int:
"""Returns the number of padding bytes (0 or 1).
IFF chunks are required to be a even number in total length. If
data_size is odd a padding byte will be added at the end.
"""
return self.data_size % 2
def _get_actual_data_size(self) -> int:
"""Returns the data size that is actually possible.
Some files have chunks that are truncated and their reported size
would be outside of the file's actual size."""
fileobj = self._fileobj
fileobj.seek(0, 2)
file_size = fileobj.tell()
expected_size = self.data_size + self.padding()
max_size_possible = file_size - self.data_offset
return min(expected_size, max_size_possible)
class IffContainerChunkMixin():
"""A IFF chunk containing other chunks.
@@ -250,7 +262,7 @@ class IffContainerChunkMixin():
if not is_valid_chunk_id(id_):
raise KeyError("Invalid IFF key.")
next_offset = self.offset + self.size
next_offset = self.data_offset + self._get_actual_data_size()
size = self.HEADER_SIZE
data_size = 0
if data:

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Borewit
# Copyright (C) 2019-2020 Philipp Wolfer
#

View File

@@ -1 +0,0 @@
Don't change things here, this is a copy of https://github.com/lazka/senf

View File

@@ -1,85 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from ._fsnative import fsnative, path2fsn, fsn2text, fsn2bytes, \
bytes2fsn, uri2fsn, fsn2uri, text2fsn, fsn2norm
from ._print import print_, input_, supports_ansi_escape_codes
from ._stdlib import sep, pathsep, curdir, pardir, altsep, extsep, devnull, \
defpath, getcwd, expanduser, expandvars
from ._argv import argv
from ._environ import environ, getenv, unsetenv, putenv
from ._temp import mkstemp, gettempdir, gettempprefix, mkdtemp
fsnative, print_, getcwd, getenv, unsetenv, putenv, environ, expandvars, \
path2fsn, fsn2text, fsn2bytes, bytes2fsn, uri2fsn, fsn2uri, mkstemp, \
gettempdir, gettempprefix, mkdtemp, input_, expanduser, text2fsn, \
supports_ansi_escape_codes, fsn2norm
version = (1, 4, 2)
"""Tuple[`int`, `int`, `int`]: The version tuple (major, minor, micro)"""
version_string = ".".join(map(str, version))
"""`str`: A version string"""
argv = argv
"""List[`fsnative`]: Like `sys.argv` but contains unicode under
Windows + Python 2
"""
sep = sep
"""`fsnative`: Like `os.sep` but a `fsnative`"""
pathsep = pathsep
"""`fsnative`: Like `os.pathsep` but a `fsnative`"""
curdir = curdir
"""`fsnative`: Like `os.curdir` but a `fsnative`"""
pardir = pardir
"""`fsnative`: Like `os.pardir` but a fsnative"""
altsep = altsep
"""`fsnative` or `None`: Like `os.altsep` but a `fsnative` or `None`"""
extsep = extsep
"""`fsnative`: Like `os.extsep` but a `fsnative`"""
devnull = devnull
"""`fsnative`: Like `os.devnull` but a `fsnative`"""
defpath = defpath
"""`fsnative`: Like `os.defpath` but a `fsnative`"""
__all__ = []

View File

@@ -1,104 +0,0 @@
import sys
import os
from typing import Text, Union, Any, Optional, Tuple, List, Dict
if sys.version_info[0] == 2:
_pathlike = Union[Text, bytes]
else:
_pathlike = Union[Text, bytes, 'os.PathLike[Any]']
_uri = Union[Text, str]
if sys.version_info[0] == 2:
if sys.platform == "win32":
_base = Text
else:
_base = bytes
else:
_base = Text
class fsnative(_base):
def __init__(self, object: Text=u"") -> None:
...
_fsnative = Union[fsnative, _base]
if sys.platform == "win32":
_bytes_default_encoding = str
else:
_bytes_default_encoding = Optional[str]
def path2fsn(path: _pathlike) -> _fsnative:
...
def fsn2text(path: _fsnative, strict: bool=False) -> Text:
...
def text2fsn(text: Text) -> _fsnative:
...
def fsn2bytes(path: _fsnative, encoding: _bytes_default_encoding="utf-8") -> bytes:
...
def bytes2fsn(data: bytes, encoding: _bytes_default_encoding="utf-8") -> _fsnative:
...
def uri2fsn(uri: _uri) -> _fsnative:
...
def fsn2uri(path: _fsnative) -> Text:
...
def fsn2norm(path: _fsnative) -> _fsnative:
...
sep: _fsnative
pathsep: _fsnative
curdir: _fsnative
pardir: _fsnative
altsep: _fsnative
extsep: _fsnative
devnull: _fsnative
defpath: _fsnative
def getcwd() -> _fsnative:
...
def getenv(key: _pathlike, value: Optional[_fsnative]=None) -> Optional[_fsnative]:
...
def putenv(key: _pathlike, value: _pathlike):
...
def unsetenv(key: _pathlike) -> None:
...
def supports_ansi_escape_codes(fd: int) -> bool:
...
def expandvars(path: _pathlike) -> _fsnative:
...
def expanduser(path: _pathlike) -> _fsnative:
...
environ: Dict[_fsnative,_fsnative]
argv: List[_fsnative]
def gettempdir() -> _fsnative:
pass
def mkstemp(suffix: Optional[_pathlike]=None, prefix: Optional[_pathlike]=None, dir: Optional[_pathlike]=None, text: bool=False) -> Tuple[int, _fsnative]:
...
def mkdtemp(suffix: Optional[_pathlike]=None, prefix: Optional[_pathlike]=None, dir: Optional[_pathlike]=None) -> _fsnative:
...
version_string: str
version: Tuple[int, int, int]
print_ = print
def input_(prompt: Any=None) -> _fsnative:
...

View File

@@ -1,120 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import sys
import ctypes
try:
from collections import abc
except ImportError:
import collections as abc # type: ignore
from functools import total_ordering
from ._compat import PY2, string_types
from ._fsnative import is_win, _fsn2legacy, path2fsn
from . import _winapi as winapi
def _get_win_argv():
"""Returns a unicode argv under Windows and standard sys.argv otherwise
Returns:
List[`fsnative`]
"""
assert is_win
argc = ctypes.c_int()
try:
argv = winapi.CommandLineToArgvW(
winapi.GetCommandLineW(), ctypes.byref(argc))
except WindowsError:
return []
if not argv:
return []
res = argv[max(0, argc.value - len(sys.argv)):argc.value]
winapi.LocalFree(argv)
return res
@total_ordering
class Argv(abc.MutableSequence):
"""List[`fsnative`]: Like `sys.argv` but contains unicode
keys and values under Windows + Python 2.
Any changes made will be forwarded to `sys.argv`.
"""
def __init__(self):
if PY2 and is_win:
self._argv = _get_win_argv()
else:
self._argv = sys.argv
def __getitem__(self, index):
return self._argv[index]
def __setitem__(self, index, value):
if isinstance(value, string_types):
value = path2fsn(value)
self._argv[index] = value
if sys.argv is not self._argv:
try:
if isinstance(value, string_types):
sys.argv[index] = _fsn2legacy(value)
else:
sys.argv[index] = [_fsn2legacy(path2fsn(v)) for v in value]
except IndexError:
pass
def __delitem__(self, index):
del self._argv[index]
try:
del sys.argv[index]
except IndexError:
pass
def __eq__(self, other):
return self._argv == other
def __lt__(self, other):
return self._argv < other
def __len__(self):
return len(self._argv)
def __repr__(self):
return repr(self._argv)
def insert(self, index, value):
value = path2fsn(value)
self._argv.insert(index, value)
if sys.argv is not self._argv:
sys.argv.insert(index, _fsn2legacy(value))
argv = Argv()

View File

@@ -1,58 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import sys
PY2 = sys.version_info[0] == 2
PY3 = not PY2
if PY2:
from urlparse import urlparse, urlunparse
urlparse, urlunparse
from urllib import quote, unquote
quote, unquote
from StringIO import StringIO
BytesIO = StringIO
from io import StringIO as TextIO
TextIO
string_types = (str, unicode)
text_type = unicode
iteritems = lambda d: d.iteritems()
elif PY3:
from urllib.parse import urlparse, quote, unquote, urlunparse
urlparse, quote, unquote, urlunparse
from io import StringIO
StringIO = StringIO
TextIO = StringIO
from io import BytesIO
BytesIO = BytesIO
string_types = (str,)
text_type = str
iteritems = lambda d: iter(d.items())

View File

@@ -1,270 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import ctypes
try:
from collections import abc
except ImportError:
import collections as abc # type: ignore
from ._compat import text_type, PY2
from ._fsnative import path2fsn, is_win, _fsn2legacy, fsnative
from . import _winapi as winapi
def get_windows_env_var(key):
"""Get an env var.
Raises:
WindowsError
"""
if not isinstance(key, text_type):
raise TypeError("%r not of type %r" % (key, text_type))
buf = ctypes.create_unicode_buffer(32767)
stored = winapi.GetEnvironmentVariableW(key, buf, 32767)
if stored == 0:
raise ctypes.WinError()
return buf[:stored]
def set_windows_env_var(key, value):
"""Set an env var.
Raises:
WindowsError
"""
if not isinstance(key, text_type):
raise TypeError("%r not of type %r" % (key, text_type))
if not isinstance(value, text_type):
raise TypeError("%r not of type %r" % (value, text_type))
status = winapi.SetEnvironmentVariableW(key, value)
if status == 0:
raise ctypes.WinError()
def del_windows_env_var(key):
"""Delete an env var.
Raises:
WindowsError
"""
if not isinstance(key, text_type):
raise TypeError("%r not of type %r" % (key, text_type))
status = winapi.SetEnvironmentVariableW(key, None)
if status == 0:
raise ctypes.WinError()
def read_windows_environ():
"""Returns a unicode dict of the Windows environment.
Raises:
WindowsEnvironError
"""
res = winapi.GetEnvironmentStringsW()
if not res:
raise ctypes.WinError()
res = ctypes.cast(res, ctypes.POINTER(ctypes.c_wchar))
done = []
current = u""
i = 0
while 1:
c = res[i]
i += 1
if c == u"\x00":
if not current:
break
done.append(current)
current = u""
continue
current += c
dict_ = {}
for entry in done:
try:
key, value = entry.split(u"=", 1)
except ValueError:
continue
key = _norm_key(key)
dict_[key] = value
status = winapi.FreeEnvironmentStringsW(res)
if status == 0:
raise ctypes.WinError()
return dict_
def _norm_key(key):
assert isinstance(key, fsnative)
if is_win:
key = key.upper()
return key
class Environ(abc.MutableMapping):
"""Dict[`fsnative`, `fsnative`]: Like `os.environ` but contains unicode
keys and values under Windows + Python 2.
Any changes made will be forwarded to `os.environ`.
"""
def __init__(self):
if is_win and PY2:
try:
env = read_windows_environ()
except WindowsError:
env = {}
else:
env = os.environ
self._env = env
def __getitem__(self, key):
key = _norm_key(path2fsn(key))
return self._env[key]
def __setitem__(self, key, value):
key = _norm_key(path2fsn(key))
value = path2fsn(value)
if is_win and PY2:
# this calls putenv, so do it first and replace later
try:
os.environ[_fsn2legacy(key)] = _fsn2legacy(value)
except OSError:
raise ValueError
try:
set_windows_env_var(key, value)
except WindowsError:
# py3+win fails for invalid keys. try to do the same
raise ValueError
try:
self._env[key] = value
except OSError:
raise ValueError
def __delitem__(self, key):
key = _norm_key(path2fsn(key))
if is_win and PY2:
try:
del_windows_env_var(key)
except WindowsError:
pass
try:
del os.environ[_fsn2legacy(key)]
except KeyError:
pass
del self._env[key]
def __iter__(self):
return iter(self._env)
def __len__(self):
return len(self._env)
def __repr__(self):
return repr(self._env)
def copy(self):
return self._env.copy()
environ = Environ()
def getenv(key, value=None):
"""Like `os.getenv` but returns unicode under Windows + Python 2
Args:
key (pathlike): The env var to get
value (object): The value to return if the env var does not exist
Returns:
`fsnative` or `object`:
The env var or the passed value if it doesn't exist
"""
key = path2fsn(key)
if is_win and PY2:
return environ.get(key, value)
return os.getenv(key, value)
def unsetenv(key):
"""Like `os.unsetenv` but takes unicode under Windows + Python 2
Args:
key (pathlike): The env var to unset
"""
key = path2fsn(key)
if is_win:
# python 3 has no unsetenv under Windows -> use our ctypes one as well
try:
del_windows_env_var(key)
except WindowsError:
pass
else:
os.unsetenv(key)
def putenv(key, value):
"""Like `os.putenv` but takes unicode under Windows + Python 2
Args:
key (pathlike): The env var to get
value (pathlike): The value to set
Raises:
ValueError
"""
key = path2fsn(key)
value = path2fsn(value)
if is_win and PY2:
try:
set_windows_env_var(key, value)
except WindowsError:
# py3 + win fails here
raise ValueError
else:
try:
os.putenv(key, value)
except OSError:
# win + py3 raise here for invalid keys which is probably a bug.
# ValueError seems better
raise ValueError

View File

@@ -1,625 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import ctypes
import codecs
from . import _winapi as winapi
from ._compat import text_type, PY3, PY2, urlparse, quote, unquote, urlunparse
is_win = os.name == "nt"
is_unix = not is_win
is_darwin = sys.platform == "darwin"
_surrogatepass = "strict" if PY2 else "surrogatepass"
def _normalize_codec(codec, _cache={}):
"""Raises LookupError"""
try:
return _cache[codec]
except KeyError:
_cache[codec] = codecs.lookup(codec).name
return _cache[codec]
def _swap_bytes(data):
"""swaps bytes for 16 bit, leaves remaining trailing bytes alone"""
a, b = data[1::2], data[::2]
data = bytearray().join(bytearray(x) for x in zip(a, b))
if len(b) > len(a):
data += b[-1:]
return bytes(data)
def _decode_surrogatepass(data, codec):
"""Like data.decode(codec, 'surrogatepass') but makes utf-16-le/be work
on Python 2.
https://bugs.python.org/issue27971
Raises UnicodeDecodeError, LookupError
"""
try:
return data.decode(codec, _surrogatepass)
except UnicodeDecodeError:
if PY2:
if _normalize_codec(codec) == "utf-16-be":
data = _swap_bytes(data)
codec = "utf-16-le"
if _normalize_codec(codec) == "utf-16-le":
buffer_ = ctypes.create_string_buffer(data + b"\x00\x00")
value = ctypes.wstring_at(buffer_, len(data) // 2)
if value.encode("utf-16-le", _surrogatepass) != data:
raise
return value
else:
raise
else:
raise
def _merge_surrogates(text):
"""Returns a copy of the text with all surrogate pairs merged"""
return _decode_surrogatepass(
text.encode("utf-16-le", _surrogatepass),
"utf-16-le")
def fsn2norm(path):
"""
Args:
path (fsnative): The path to normalize
Returns:
`fsnative`
Normalizes an fsnative path.
The same underlying path can have multiple representations as fsnative
(due to surrogate pairs and variable length encodings). When concatenating
fsnative the result might be different than concatenating the serialized
form and then deserializing it.
This returns the normalized form i.e. the form which os.listdir() would
return. This is useful when you alter fsnative but require that the same
underlying path always maps to the same fsnative value.
All functions like :func:`bytes2fsn`, :func:`fsnative`, :func:`text2fsn`
and :func:`path2fsn` always return a normalized path, independent of their
input.
"""
native = _fsn2native(path)
if is_win:
return _merge_surrogates(native)
elif PY3:
return bytes2fsn(native, None)
else:
return path
def _fsn2legacy(path):
"""Takes a fsnative path and returns a path that can be put into os.environ
or sys.argv. Might result in a mangled path on Python2 + Windows.
Can't fail.
Args:
path (fsnative)
Returns:
str
"""
if PY2 and is_win:
return path.encode(_encoding, "replace")
return path
def _fsnative(text):
if not isinstance(text, text_type):
raise TypeError("%r needs to be a text type (%r)" % (text, text_type))
if is_unix:
# First we go to bytes so we can be sure we have a valid source.
# Theoretically we should fail here in case we have a non-unicode
# encoding. But this would make everything complicated and there is
# no good way to handle a failure from the user side. Instead
# fall back to utf-8 which is the most likely the right choice in
# a mis-configured environment
encoding = _encoding
try:
path = text.encode(encoding, _surrogatepass)
except UnicodeEncodeError:
path = text.encode("utf-8", _surrogatepass)
if b"\x00" in path:
path = path.replace(b"\x00", fsn2bytes(_fsnative(u"\uFFFD"), None))
if PY3:
return path.decode(_encoding, "surrogateescape")
return path
else:
if u"\x00" in text:
text = text.replace(u"\x00", u"\uFFFD")
text = fsn2norm(text)
return text
def _create_fsnative(type_):
# a bit of magic to make fsnative(u"foo") and isinstance(path, fsnative)
# work
class meta(type):
def __instancecheck__(self, instance):
return _typecheck_fsnative(instance)
def __subclasscheck__(self, subclass):
return issubclass(subclass, type_)
class impl(object):
"""fsnative(text=u"")
Args:
text (text): The text to convert to a path
Returns:
fsnative: The new path.
Raises:
TypeError: In case something other then `text` has been passed
This type is a virtual base class for the real path type.
Instantiating it returns an instance of the real path type and it
overrides instance and subclass checks so that `isinstance` and
`issubclass` checks work:
::
isinstance(fsnative(u"foo"), fsnative) == True
issubclass(type(fsnative(u"foo")), fsnative) == True
The real returned type is:
- **Python 2 + Windows:** :obj:`python:unicode`, with ``surrogates``,
without ``null``
- **Python 2 + Unix:** :obj:`python:str`, without ``null``
- **Python 3 + Windows:** :obj:`python3:str`, with ``surrogates``,
without ``null``
- **Python 3 + Unix:** :obj:`python3:str`, with ``surrogates``, without
``null``, without code points not encodable with the locale encoding
Constructing a `fsnative` can't fail.
Passing a `fsnative` to :func:`open` will never lead to `ValueError`
or `TypeError`.
Any operation on `fsnative` can also use the `str` type, as long as
the `str` only contains ASCII and no NULL.
"""
def __new__(cls, text=u""):
return _fsnative(text)
new_type = meta("fsnative", (object,), dict(impl.__dict__))
new_type.__module__ = "senf"
return new_type
fsnative_type = text_type if is_win or PY3 else bytes
fsnative = _create_fsnative(fsnative_type)
def _typecheck_fsnative(path):
"""
Args:
path (object)
Returns:
bool: if path is a fsnative
"""
if not isinstance(path, fsnative_type):
return False
if PY3 or is_win:
if u"\x00" in path:
return False
if is_unix:
try:
path.encode(_encoding, "surrogateescape")
except UnicodeEncodeError:
return False
elif b"\x00" in path:
return False
return True
def _fsn2native(path):
"""
Args:
path (fsnative)
Returns:
`text` on Windows, `bytes` on Unix
Raises:
TypeError: in case the type is wrong or the ´str` on Py3 + Unix
can't be converted to `bytes`
This helper allows to validate the type and content of a path.
To reduce overhead the encoded value for Py3 + Unix is returned so
it can be reused.
"""
if not isinstance(path, fsnative_type):
raise TypeError("path needs to be %s, not %s" % (
fsnative_type.__name__, type(path).__name__))
if is_unix:
if PY3:
try:
path = path.encode(_encoding, "surrogateescape")
except UnicodeEncodeError:
# This look more like ValueError, but raising only one error
# makes things simpler... also one could say str + surrogates
# is its own type
raise TypeError(
"path contained Unicode code points not valid in"
"the current path encoding. To create a valid "
"path from Unicode use text2fsn()")
if b"\x00" in path:
raise TypeError("fsnative can't contain nulls")
else:
if u"\x00" in path:
raise TypeError("fsnative can't contain nulls")
return path
def _get_encoding():
"""The encoding used for paths, argv, environ, stdout and stdin"""
encoding = sys.getfilesystemencoding()
if encoding is None:
if is_darwin:
encoding = "utf-8"
elif is_win:
encoding = "mbcs"
else:
encoding = "ascii"
encoding = _normalize_codec(encoding)
return encoding
_encoding = _get_encoding()
def path2fsn(path):
"""
Args:
path (pathlike): The path to convert
Returns:
`fsnative`
Raises:
TypeError: In case the type can't be converted to a `fsnative`
ValueError: In case conversion fails
Returns a `fsnative` path for a `pathlike`.
"""
# allow mbcs str on py2+win and bytes on py3
if PY2:
if is_win:
if isinstance(path, bytes):
path = path.decode(_encoding)
else:
if isinstance(path, text_type):
path = path.encode(_encoding)
if "\x00" in path:
raise ValueError("embedded null")
else:
path = getattr(os, "fspath", lambda x: x)(path)
if isinstance(path, bytes):
if b"\x00" in path:
raise ValueError("embedded null")
path = path.decode(_encoding, "surrogateescape")
elif is_unix and isinstance(path, str):
# make sure we can encode it and this is not just some random
# unicode string
data = path.encode(_encoding, "surrogateescape")
if b"\x00" in data:
raise ValueError("embedded null")
path = fsn2norm(path)
else:
if u"\x00" in path:
raise ValueError("embedded null")
path = fsn2norm(path)
if not isinstance(path, fsnative_type):
raise TypeError("path needs to be %s", fsnative_type.__name__)
return path
def fsn2text(path, strict=False):
"""
Args:
path (fsnative): The path to convert
strict (bool): Fail in case the conversion is not reversible
Returns:
`text`
Raises:
TypeError: In case no `fsnative` has been passed
ValueError: In case ``strict`` was True and the conversion failed
Converts a `fsnative` path to `text`.
Can be used to pass a path to some unicode API, like for example a GUI
toolkit.
If ``strict`` is True the conversion will fail in case it is not
reversible. This can be useful for converting program arguments that are
supposed to be text and erroring out in case they are not.
Encoding with a Unicode encoding will always succeed with the result.
"""
path = _fsn2native(path)
errors = "strict" if strict else "replace"
if is_win:
return path.encode("utf-16-le", _surrogatepass).decode("utf-16-le",
errors)
else:
return path.decode(_encoding, errors)
def text2fsn(text):
"""
Args:
text (text): The text to convert
Returns:
`fsnative`
Raises:
TypeError: In case no `text` has been passed
Takes `text` and converts it to a `fsnative`.
This operation is not reversible and can't fail.
"""
return fsnative(text)
def fsn2bytes(path, encoding="utf-8"):
"""
Args:
path (fsnative): The path to convert
encoding (`str`): encoding used for Windows
Returns:
`bytes`
Raises:
TypeError: If no `fsnative` path is passed
ValueError: If encoding fails or the encoding is invalid
Converts a `fsnative` path to `bytes`.
The passed *encoding* is only used on platforms where paths are not
associated with an encoding (Windows for example).
For Windows paths, lone surrogates will be encoded like normal code points
and surrogate pairs will be merged before encoding. In case of ``utf-8``
or ``utf-16-le`` this is equal to the `WTF-8 and WTF-16 encoding
<https://simonsapin.github.io/wtf-8/>`__.
"""
path = _fsn2native(path)
if is_win:
if encoding is None:
raise ValueError("invalid encoding %r" % encoding)
if PY2:
try:
return path.encode(encoding)
except LookupError:
raise ValueError("invalid encoding %r" % encoding)
else:
try:
return path.encode(encoding)
except LookupError:
raise ValueError("invalid encoding %r" % encoding)
except UnicodeEncodeError:
# Fallback implementation for text including surrogates
# merge surrogate codepoints
if _normalize_codec(encoding).startswith("utf-16"):
# fast path, utf-16 merges anyway
return path.encode(encoding, _surrogatepass)
return _merge_surrogates(path).encode(encoding, _surrogatepass)
else:
return path
def bytes2fsn(data, encoding="utf-8"):
"""
Args:
data (bytes): The data to convert
encoding (`str`): encoding used for Windows
Returns:
`fsnative`
Raises:
TypeError: If no `bytes` path is passed
ValueError: If decoding fails or the encoding is invalid
Turns `bytes` to a `fsnative` path.
The passed *encoding* is only used on platforms where paths are not
associated with an encoding (Windows for example).
For Windows paths ``WTF-8`` is accepted if ``utf-8`` is used and
``WTF-16`` accepted if ``utf-16-le`` is used.
"""
if not isinstance(data, bytes):
raise TypeError("data needs to be bytes")
if is_win:
if encoding is None:
raise ValueError("invalid encoding %r" % encoding)
try:
path = _decode_surrogatepass(data, encoding)
except LookupError:
raise ValueError("invalid encoding %r" % encoding)
if u"\x00" in path:
raise ValueError("contains nulls")
return path
else:
if b"\x00" in data:
raise ValueError("contains nulls")
if PY2:
return data
else:
return data.decode(_encoding, "surrogateescape")
def uri2fsn(uri):
"""
Args:
uri (`text` or :obj:`python:str`): A file URI
Returns:
`fsnative`
Raises:
TypeError: In case an invalid type is passed
ValueError: In case the URI isn't a valid file URI
Takes a file URI and returns a `fsnative` path
"""
if PY2:
if isinstance(uri, text_type):
uri = uri.encode("utf-8")
if not isinstance(uri, bytes):
raise TypeError("uri needs to be ascii str or unicode")
else:
if not isinstance(uri, str):
raise TypeError("uri needs to be str")
parsed = urlparse(uri)
scheme = parsed.scheme
netloc = parsed.netloc
path = parsed.path
if scheme != "file":
raise ValueError("Not a file URI: %r" % uri)
if not path:
raise ValueError("Invalid file URI: %r" % uri)
uri = urlunparse(parsed)[7:]
if is_win:
try:
drive, rest = uri.split(":", 1)
except ValueError:
path = ""
rest = uri.replace("/", "\\")
else:
path = drive[-1] + ":"
rest = rest.replace("/", "\\")
if PY2:
path += unquote(rest)
else:
path += unquote(rest, encoding="utf-8", errors="surrogatepass")
if netloc:
path = "\\\\" + path
if PY2:
path = path.decode("utf-8")
if u"\x00" in path:
raise ValueError("embedded null")
return path
else:
if PY2:
path = unquote(uri)
else:
path = unquote(uri, encoding=_encoding, errors="surrogateescape")
if "\x00" in path:
raise ValueError("embedded null")
return path
def fsn2uri(path):
"""
Args:
path (fsnative): The path to convert to an URI
Returns:
`text`: An ASCII only URI
Raises:
TypeError: If no `fsnative` was passed
ValueError: If the path can't be converted
Takes a `fsnative` path and returns a file URI.
On Windows non-ASCII characters will be encoded using utf-8 and then
percent encoded.
"""
path = _fsn2native(path)
def _quote_path(path):
# RFC 2396
path = quote(path, "/:@&=+$,")
if PY2:
path = path.decode("ascii")
return path
if is_win:
buf = ctypes.create_unicode_buffer(winapi.INTERNET_MAX_URL_LENGTH)
length = winapi.DWORD(winapi.INTERNET_MAX_URL_LENGTH)
flags = 0
try:
winapi.UrlCreateFromPathW(path, buf, ctypes.byref(length), flags)
except WindowsError as e:
raise ValueError(e)
uri = buf[:length.value]
# https://bitbucket.org/pypy/pypy/issues/3133
uri = _merge_surrogates(uri)
# For some reason UrlCreateFromPathW escapes some chars outside of
# ASCII and some not. Unquote and re-quote with utf-8.
if PY3:
# latin-1 maps code points directly to bytes, which is what we want
uri = unquote(uri, "latin-1")
else:
# Python 2 does what we want by default
uri = unquote(uri)
return _quote_path(uri.encode("utf-8", _surrogatepass))
else:
return u"file://" + _quote_path(path)

View File

@@ -1,424 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import sys
import os
import ctypes
import re
from ._fsnative import _encoding, is_win, is_unix, _surrogatepass, bytes2fsn
from ._compat import text_type, PY2, PY3
from ._winansi import AnsiState, ansi_split
from . import _winapi as winapi
def print_(*objects, **kwargs):
"""print_(*objects, sep=None, end=None, file=None, flush=False)
Args:
objects (object): zero or more objects to print
sep (str): Object separator to use, defaults to ``" "``
end (str): Trailing string to use, defaults to ``"\\n"``.
If end is ``"\\n"`` then `os.linesep` is used.
file (object): A file-like object, defaults to `sys.stdout`
flush (bool): If the file stream should be flushed
Raises:
EnvironmentError
Like print(), but:
* Supports printing filenames under Unix + Python 3 and Windows + Python 2
* Emulates ANSI escape sequence support under Windows
* Never fails due to encoding/decoding errors. Tries hard to get everything
on screen as is, but will fall back to "?" if all fails.
This does not conflict with ``colorama``, but will not use it on Windows.
"""
sep = kwargs.get("sep")
sep = sep if sep is not None else " "
end = kwargs.get("end")
end = end if end is not None else "\n"
file = kwargs.get("file")
file = file if file is not None else sys.stdout
flush = bool(kwargs.get("flush", False))
if is_win:
_print_windows(objects, sep, end, file, flush)
else:
_print_unix(objects, sep, end, file, flush)
def _print_unix(objects, sep, end, file, flush):
"""A print_() implementation which writes bytes"""
encoding = _encoding
if isinstance(sep, text_type):
sep = sep.encode(encoding, "replace")
if not isinstance(sep, bytes):
raise TypeError
if isinstance(end, text_type):
end = end.encode(encoding, "replace")
if not isinstance(end, bytes):
raise TypeError
if end == b"\n":
end = os.linesep
if PY3:
end = end.encode("ascii")
parts = []
for obj in objects:
if not isinstance(obj, text_type) and not isinstance(obj, bytes):
obj = text_type(obj)
if isinstance(obj, text_type):
if PY2:
obj = obj.encode(encoding, "replace")
else:
try:
obj = obj.encode(encoding, "surrogateescape")
except UnicodeEncodeError:
obj = obj.encode(encoding, "replace")
assert isinstance(obj, bytes)
parts.append(obj)
data = sep.join(parts) + end
assert isinstance(data, bytes)
file = getattr(file, "buffer", file)
try:
file.write(data)
except TypeError:
if PY3:
# For StringIO, first try with surrogates
surr_data = data.decode(encoding, "surrogateescape")
try:
file.write(surr_data)
except (TypeError, ValueError):
file.write(data.decode(encoding, "replace"))
else:
# for file like objects with don't support bytes
file.write(data.decode(encoding, "replace"))
if flush:
file.flush()
ansi_state = AnsiState()
def _print_windows(objects, sep, end, file, flush):
"""The windows implementation of print_()"""
h = winapi.INVALID_HANDLE_VALUE
try:
fileno = file.fileno()
except (EnvironmentError, AttributeError):
pass
else:
if fileno == 1:
h = winapi.GetStdHandle(winapi.STD_OUTPUT_HANDLE)
elif fileno == 2:
h = winapi.GetStdHandle(winapi.STD_ERROR_HANDLE)
encoding = _encoding
parts = []
for obj in objects:
if isinstance(obj, bytes):
obj = obj.decode(encoding, "replace")
if not isinstance(obj, text_type):
obj = text_type(obj)
parts.append(obj)
if isinstance(sep, bytes):
sep = sep.decode(encoding, "replace")
if not isinstance(sep, text_type):
raise TypeError
if isinstance(end, bytes):
end = end.decode(encoding, "replace")
if not isinstance(end, text_type):
raise TypeError
if end == u"\n":
end = os.linesep
text = sep.join(parts) + end
assert isinstance(text, text_type)
is_console = True
if h == winapi.INVALID_HANDLE_VALUE:
is_console = False
else:
# get the default value
info = winapi.CONSOLE_SCREEN_BUFFER_INFO()
if not winapi.GetConsoleScreenBufferInfo(h, ctypes.byref(info)):
is_console = False
if is_console:
# make sure we flush before we apply any console attributes
file.flush()
# try to force a utf-8 code page, use the output CP if that fails
cp = winapi.GetConsoleOutputCP()
try:
encoding = "utf-8"
if winapi.SetConsoleOutputCP(65001) == 0:
encoding = None
for is_ansi, part in ansi_split(text):
if is_ansi:
ansi_state.apply(h, part)
else:
if encoding is not None:
data = part.encode(encoding, _surrogatepass)
else:
data = _encode_codepage(cp, part)
os.write(fileno, data)
finally:
# reset the code page to what we had before
winapi.SetConsoleOutputCP(cp)
else:
# try writing bytes first, so in case of Python 2 StringIO we get
# the same type on all platforms
try:
file.write(text.encode("utf-8", _surrogatepass))
except (TypeError, ValueError):
file.write(text)
if flush:
file.flush()
def _readline_windows():
"""Raises OSError"""
try:
fileno = sys.stdin.fileno()
except (EnvironmentError, AttributeError):
fileno = -1
# In case stdin is replaced, read from that
if fileno != 0:
return _readline_windows_fallback()
h = winapi.GetStdHandle(winapi.STD_INPUT_HANDLE)
if h == winapi.INVALID_HANDLE_VALUE:
return _readline_windows_fallback()
buf_size = 1024
buf = ctypes.create_string_buffer(buf_size * ctypes.sizeof(winapi.WCHAR))
read = winapi.DWORD()
text = u""
while True:
if winapi.ReadConsoleW(
h, buf, buf_size, ctypes.byref(read), None) == 0:
if not text:
return _readline_windows_fallback()
raise ctypes.WinError()
data = buf[:read.value * ctypes.sizeof(winapi.WCHAR)]
text += data.decode("utf-16-le", _surrogatepass)
if text.endswith(u"\r\n"):
return text[:-2]
def _decode_codepage(codepage, data):
"""
Args:
codepage (int)
data (bytes)
Returns:
`text`
Decodes data using the given codepage. If some data can't be decoded
using the codepage it will not fail.
"""
assert isinstance(data, bytes)
if not data:
return u""
# get the required buffer length first
length = winapi.MultiByteToWideChar(codepage, 0, data, len(data), None, 0)
if length == 0:
raise ctypes.WinError()
# now decode
buf = ctypes.create_unicode_buffer(length)
length = winapi.MultiByteToWideChar(
codepage, 0, data, len(data), buf, length)
if length == 0:
raise ctypes.WinError()
return buf[:]
def _encode_codepage(codepage, text):
"""
Args:
codepage (int)
text (text)
Returns:
`bytes`
Encode text using the given code page. Will not fail if a char
can't be encoded using that codepage.
"""
assert isinstance(text, text_type)
if not text:
return b""
size = (len(text.encode("utf-16-le", _surrogatepass)) //
ctypes.sizeof(winapi.WCHAR))
# get the required buffer size
length = winapi.WideCharToMultiByte(
codepage, 0, text, size, None, 0, None, None)
if length == 0:
raise ctypes.WinError()
# decode to the buffer
buf = ctypes.create_string_buffer(length)
length = winapi.WideCharToMultiByte(
codepage, 0, text, size, buf, length, None, None)
if length == 0:
raise ctypes.WinError()
return buf[:length]
def _readline_windows_fallback():
# In case reading from the console failed (maybe we get piped data)
# we assume the input was generated according to the output encoding.
# Got any better ideas?
assert is_win
cp = winapi.GetConsoleOutputCP()
data = getattr(sys.stdin, "buffer", sys.stdin).readline().rstrip(b"\r\n")
return _decode_codepage(cp, data)
def _readline_default():
assert is_unix
data = getattr(sys.stdin, "buffer", sys.stdin).readline().rstrip(b"\r\n")
if PY3:
return data.decode(_encoding, "surrogateescape")
else:
return data
def _readline():
if is_win:
return _readline_windows()
else:
return _readline_default()
def input_(prompt=None):
"""
Args:
prompt (object): Prints the passed object to stdout without
adding a trailing newline
Returns:
`fsnative`
Raises:
EnvironmentError
Like :func:`python3:input` but returns a `fsnative` and allows printing
filenames as prompt to stdout.
Use :func:`fsn2text` on the result if you just want to deal with text.
"""
if prompt is not None:
print_(prompt, end="")
return _readline()
def _get_file_name_for_handle(handle):
"""(Windows only) Returns a file name for a file handle.
Args:
handle (winapi.HANDLE)
Returns:
`text` or `None` if no file name could be retrieved.
"""
assert is_win
assert handle != winapi.INVALID_HANDLE_VALUE
size = winapi.FILE_NAME_INFO.FileName.offset + \
winapi.MAX_PATH * ctypes.sizeof(winapi.WCHAR)
buf = ctypes.create_string_buffer(size)
if winapi.GetFileInformationByHandleEx is None:
# Windows XP
return None
status = winapi.GetFileInformationByHandleEx(
handle, winapi.FileNameInfo, buf, size)
if status == 0:
return None
name_info = ctypes.cast(
buf, ctypes.POINTER(winapi.FILE_NAME_INFO)).contents
offset = winapi.FILE_NAME_INFO.FileName.offset
data = buf[offset:offset + name_info.FileNameLength]
return bytes2fsn(data, "utf-16-le")
def supports_ansi_escape_codes(fd):
"""Returns whether the output device is capable of interpreting ANSI escape
codes when :func:`print_` is used.
Args:
fd (int): file descriptor (e.g. ``sys.stdout.fileno()``)
Returns:
`bool`
"""
if os.isatty(fd):
return True
if not is_win:
return False
# Check for cygwin/msys terminal
handle = winapi._get_osfhandle(fd)
if handle == winapi.INVALID_HANDLE_VALUE:
return False
if winapi.GetFileType(handle) != winapi.FILE_TYPE_PIPE:
return False
file_name = _get_file_name_for_handle(handle)
match = re.match(
"^\\\\(cygwin|msys)-[a-z0-9]+-pty[0-9]+-(from|to)-master$", file_name)
return match is not None

View File

@@ -1,154 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import re
import os
from ._fsnative import path2fsn, fsnative, is_win
from ._compat import PY2
from ._environ import environ
sep = path2fsn(os.sep)
pathsep = path2fsn(os.pathsep)
curdir = path2fsn(os.curdir)
pardir = path2fsn(os.pardir)
altsep = path2fsn(os.altsep) if os.altsep is not None else None
extsep = path2fsn(os.extsep)
devnull = path2fsn(os.devnull)
defpath = path2fsn(os.defpath)
def getcwd():
"""Like `os.getcwd` but returns a `fsnative` path
Returns:
`fsnative`
"""
if is_win and PY2:
return os.getcwdu()
return os.getcwd()
def _get_userdir(user=None):
"""Returns the user dir or None"""
if user is not None and not isinstance(user, fsnative):
raise TypeError
if is_win:
if "HOME" in environ:
path = environ["HOME"]
elif "USERPROFILE" in environ:
path = environ["USERPROFILE"]
elif "HOMEPATH" in environ and "HOMEDRIVE" in environ:
path = os.path.join(environ["HOMEDRIVE"], environ["HOMEPATH"])
else:
return
if user is None:
return path
else:
return os.path.join(os.path.dirname(path), user)
else:
import pwd
if user is None:
if "HOME" in environ:
return environ["HOME"]
else:
try:
return path2fsn(pwd.getpwuid(os.getuid()).pw_dir)
except KeyError:
return
else:
try:
return path2fsn(pwd.getpwnam(user).pw_dir)
except KeyError:
return
def expanduser(path):
"""
Args:
path (pathlike): A path to expand
Returns:
`fsnative`
Like :func:`python:os.path.expanduser` but supports unicode home
directories under Windows + Python 2 and always returns a `fsnative`.
"""
path = path2fsn(path)
if path == "~":
return _get_userdir()
elif path.startswith("~" + sep) or (
altsep is not None and path.startswith("~" + altsep)):
userdir = _get_userdir()
if userdir is None:
return path
return userdir + path[1:]
elif path.startswith("~"):
sep_index = path.find(sep)
if altsep is not None:
alt_index = path.find(altsep)
if alt_index != -1 and alt_index < sep_index:
sep_index = alt_index
if sep_index == -1:
user = path[1:]
rest = ""
else:
user = path[1:sep_index]
rest = path[sep_index:]
userdir = _get_userdir(user)
if userdir is not None:
return userdir + rest
else:
return path
else:
return path
def expandvars(path):
"""
Args:
path (pathlike): A path to expand
Returns:
`fsnative`
Like :func:`python:os.path.expandvars` but supports unicode under Windows
+ Python 2 and always returns a `fsnative`.
"""
path = path2fsn(path)
def repl_func(match):
return environ.get(match.group(1), match.group(0))
path = re.compile(r"\$(\w+)", flags=re.UNICODE).sub(repl_func, path)
if os.name == "nt":
path = re.sub(r"%([^%]+)%", repl_func, path)
return re.sub(r"\$\{([^\}]+)\}", repl_func, path)

View File

@@ -1,96 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import tempfile
from ._fsnative import path2fsn, fsnative
def gettempdir():
"""
Returns:
`fsnative`
Like :func:`python3:tempfile.gettempdir`, but always returns a `fsnative`
path
"""
# FIXME: I don't want to reimplement all that logic, reading env vars etc.
# At least for the default it works.
return path2fsn(tempfile.gettempdir())
def gettempprefix():
"""
Returns:
`fsnative`
Like :func:`python3:tempfile.gettempprefix`, but always returns a
`fsnative` path
"""
return path2fsn(tempfile.gettempprefix())
def mkstemp(suffix=None, prefix=None, dir=None, text=False):
"""
Args:
suffix (`pathlike` or `None`): suffix or `None` to use the default
prefix (`pathlike` or `None`): prefix or `None` to use the default
dir (`pathlike` or `None`): temp dir or `None` to use the default
text (bool): if the file should be opened in text mode
Returns:
Tuple[`int`, `fsnative`]:
A tuple containing the file descriptor and the file path
Raises:
EnvironmentError
Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative`
path.
"""
suffix = fsnative() if suffix is None else path2fsn(suffix)
prefix = gettempprefix() if prefix is None else path2fsn(prefix)
dir = gettempdir() if dir is None else path2fsn(dir)
return tempfile.mkstemp(suffix, prefix, dir, text)
def mkdtemp(suffix=None, prefix=None, dir=None):
"""
Args:
suffix (`pathlike` or `None`): suffix or `None` to use the default
prefix (`pathlike` or `None`): prefix or `None` to use the default
dir (`pathlike` or `None`): temp dir or `None` to use the default
Returns:
`fsnative`: A path to a directory
Raises:
EnvironmentError
Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path.
"""
suffix = fsnative() if suffix is None else path2fsn(suffix)
prefix = gettempprefix() if prefix is None else path2fsn(prefix)
dir = gettempdir() if dir is None else path2fsn(dir)
return tempfile.mkdtemp(suffix, prefix, dir)

View File

@@ -1,319 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import ctypes
import re
import atexit
from . import _winapi as winapi
def ansi_parse(code):
"""Returns command, (args)"""
return code[-1:], tuple([int(v or "0") for v in code[2:-1].split(";")])
def ansi_split(text, _re=re.compile(u"(\x1b\\[(\\d*;?)*\\S)")):
"""Yields (is_ansi, text)"""
for part in _re.split(text):
if part:
yield (bool(_re.match(part)), part)
class AnsiCommand(object):
TEXT = "m"
MOVE_UP = "A"
MOVE_DOWN = "B"
MOVE_FORWARD = "C"
MOVE_BACKWARD = "D"
SET_POS = "H"
SET_POS_ALT = "f"
SAVE_POS = "s"
RESTORE_POS = "u"
class TextAction(object):
RESET_ALL = 0
SET_BOLD = 1
SET_DIM = 2
SET_ITALIC = 3
SET_UNDERLINE = 4
SET_BLINK = 5
SET_BLINK_FAST = 6
SET_REVERSE = 7
SET_HIDDEN = 8
RESET_BOLD = 21
RESET_DIM = 22
RESET_ITALIC = 23
RESET_UNDERLINE = 24
RESET_BLINK = 25
RESET_BLINK_FAST = 26
RESET_REVERSE = 27
RESET_HIDDEN = 28
FG_BLACK = 30
FG_RED = 31
FG_GREEN = 32
FG_YELLOW = 33
FG_BLUE = 34
FG_MAGENTA = 35
FG_CYAN = 36
FG_WHITE = 37
FG_DEFAULT = 39
FG_LIGHT_BLACK = 90
FG_LIGHT_RED = 91
FG_LIGHT_GREEN = 92
FG_LIGHT_YELLOW = 93
FG_LIGHT_BLUE = 94
FG_LIGHT_MAGENTA = 95
FG_LIGHT_CYAN = 96
FG_LIGHT_WHITE = 97
BG_BLACK = 40
BG_RED = 41
BG_GREEN = 42
BG_YELLOW = 43
BG_BLUE = 44
BG_MAGENTA = 45
BG_CYAN = 46
BG_WHITE = 47
BG_DEFAULT = 49
BG_LIGHT_BLACK = 100
BG_LIGHT_RED = 101
BG_LIGHT_GREEN = 102
BG_LIGHT_YELLOW = 103
BG_LIGHT_BLUE = 104
BG_LIGHT_MAGENTA = 105
BG_LIGHT_CYAN = 106
BG_LIGHT_WHITE = 107
class AnsiState(object):
def __init__(self):
self.default_attrs = None
self.bold = False
self.bg_light = False
self.fg_light = False
self.saved_pos = (0, 0)
def do_text_action(self, attrs, action):
# In case the external state has changed, apply it it to ours.
# Mostly the first time this is called.
if attrs & winapi.FOREGROUND_INTENSITY and not self.fg_light \
and not self.bold:
self.fg_light = True
if attrs & winapi.BACKGROUND_INTENSITY and not self.bg_light:
self.bg_light = True
dark_fg = {
TextAction.FG_BLACK: 0,
TextAction.FG_RED: winapi.FOREGROUND_RED,
TextAction.FG_GREEN: winapi.FOREGROUND_GREEN,
TextAction.FG_YELLOW:
winapi.FOREGROUND_GREEN | winapi.FOREGROUND_RED,
TextAction.FG_BLUE: winapi.FOREGROUND_BLUE,
TextAction.FG_MAGENTA: winapi.FOREGROUND_BLUE |
winapi.FOREGROUND_RED,
TextAction.FG_CYAN:
winapi.FOREGROUND_BLUE | winapi.FOREGROUND_GREEN,
TextAction.FG_WHITE:
winapi.FOREGROUND_BLUE | winapi.FOREGROUND_GREEN |
winapi.FOREGROUND_RED,
}
dark_bg = {
TextAction.BG_BLACK: 0,
TextAction.BG_RED: winapi.BACKGROUND_RED,
TextAction.BG_GREEN: winapi.BACKGROUND_GREEN,
TextAction.BG_YELLOW:
winapi.BACKGROUND_GREEN | winapi.BACKGROUND_RED,
TextAction.BG_BLUE: winapi.BACKGROUND_BLUE,
TextAction.BG_MAGENTA:
winapi.BACKGROUND_BLUE | winapi.BACKGROUND_RED,
TextAction.BG_CYAN:
winapi.BACKGROUND_BLUE | winapi.BACKGROUND_GREEN,
TextAction.BG_WHITE:
winapi.BACKGROUND_BLUE | winapi.BACKGROUND_GREEN |
winapi.BACKGROUND_RED,
}
light_fg = {
TextAction.FG_LIGHT_BLACK: 0,
TextAction.FG_LIGHT_RED: winapi.FOREGROUND_RED,
TextAction.FG_LIGHT_GREEN: winapi.FOREGROUND_GREEN,
TextAction.FG_LIGHT_YELLOW:
winapi.FOREGROUND_GREEN | winapi.FOREGROUND_RED,
TextAction.FG_LIGHT_BLUE: winapi.FOREGROUND_BLUE,
TextAction.FG_LIGHT_MAGENTA:
winapi.FOREGROUND_BLUE | winapi.FOREGROUND_RED,
TextAction.FG_LIGHT_CYAN:
winapi.FOREGROUND_BLUE | winapi.FOREGROUND_GREEN,
TextAction.FG_LIGHT_WHITE:
winapi.FOREGROUND_BLUE | winapi.FOREGROUND_GREEN |
winapi.FOREGROUND_RED,
}
light_bg = {
TextAction.BG_LIGHT_BLACK: 0,
TextAction.BG_LIGHT_RED: winapi.BACKGROUND_RED,
TextAction.BG_LIGHT_GREEN: winapi.BACKGROUND_GREEN,
TextAction.BG_LIGHT_YELLOW:
winapi.BACKGROUND_GREEN | winapi.BACKGROUND_RED,
TextAction.BG_LIGHT_BLUE: winapi.BACKGROUND_BLUE,
TextAction.BG_LIGHT_MAGENTA:
winapi.BACKGROUND_BLUE | winapi.BACKGROUND_RED,
TextAction.BG_LIGHT_CYAN:
winapi.BACKGROUND_BLUE | winapi.BACKGROUND_GREEN,
TextAction.BG_LIGHT_WHITE:
winapi.BACKGROUND_BLUE | winapi.BACKGROUND_GREEN |
winapi.BACKGROUND_RED,
}
if action == TextAction.RESET_ALL:
attrs = self.default_attrs
self.bold = self.fg_light = self.bg_light = False
elif action == TextAction.SET_BOLD:
self.bold = True
elif action == TextAction.RESET_BOLD:
self.bold = False
elif action == TextAction.SET_DIM:
self.bold = False
elif action == TextAction.SET_REVERSE:
attrs |= winapi.COMMON_LVB_REVERSE_VIDEO
elif action == TextAction.RESET_REVERSE:
attrs &= ~winapi.COMMON_LVB_REVERSE_VIDEO
elif action == TextAction.SET_UNDERLINE:
attrs |= winapi.COMMON_LVB_UNDERSCORE
elif action == TextAction.RESET_UNDERLINE:
attrs &= ~winapi.COMMON_LVB_UNDERSCORE
elif action == TextAction.FG_DEFAULT:
attrs = (attrs & ~0xF) | (self.default_attrs & 0xF)
self.fg_light = False
elif action == TextAction.BG_DEFAULT:
attrs = (attrs & ~0xF0) | (self.default_attrs & 0xF0)
self.bg_light = False
elif action in dark_fg:
attrs = (attrs & ~0xF) | dark_fg[action]
self.fg_light = False
elif action in dark_bg:
attrs = (attrs & ~0xF0) | dark_bg[action]
self.bg_light = False
elif action in light_fg:
attrs = (attrs & ~0xF) | light_fg[action]
self.fg_light = True
elif action in light_bg:
attrs = (attrs & ~0xF0) | light_bg[action]
self.bg_light = True
if self.fg_light or self.bold:
attrs |= winapi.FOREGROUND_INTENSITY
else:
attrs &= ~winapi.FOREGROUND_INTENSITY
if self.bg_light:
attrs |= winapi.BACKGROUND_INTENSITY
else:
attrs &= ~winapi.BACKGROUND_INTENSITY
return attrs
def apply(self, handle, code):
buffer_info = winapi.CONSOLE_SCREEN_BUFFER_INFO()
if not winapi.GetConsoleScreenBufferInfo(handle,
ctypes.byref(buffer_info)):
return
attrs = buffer_info.wAttributes
# We take the first attrs we see as default
if self.default_attrs is None:
self.default_attrs = attrs
# Make sure that like with linux terminals the program doesn't
# affect the prompt after it exits
atexit.register(
winapi.SetConsoleTextAttribute, handle, self.default_attrs)
cmd, args = ansi_parse(code)
if cmd == AnsiCommand.TEXT:
for action in args:
attrs = self.do_text_action(attrs, action)
winapi.SetConsoleTextAttribute(handle, attrs)
elif cmd in (AnsiCommand.MOVE_UP, AnsiCommand.MOVE_DOWN,
AnsiCommand.MOVE_FORWARD, AnsiCommand.MOVE_BACKWARD):
coord = buffer_info.dwCursorPosition
x, y = coord.X, coord.Y
amount = max(args[0], 1)
if cmd == AnsiCommand.MOVE_UP:
y -= amount
elif cmd == AnsiCommand.MOVE_DOWN:
y += amount
elif cmd == AnsiCommand.MOVE_FORWARD:
x += amount
elif cmd == AnsiCommand.MOVE_BACKWARD:
x -= amount
x = max(x, 0)
y = max(y, 0)
winapi.SetConsoleCursorPosition(handle, winapi.COORD(x, y))
elif cmd in (AnsiCommand.SET_POS, AnsiCommand.SET_POS_ALT):
args = list(args)
while len(args) < 2:
args.append(0)
x, y = args[:2]
win_rect = buffer_info.srWindow
x += win_rect.Left - 1
y += win_rect.Top - 1
x = max(x, 0)
y = max(y, 0)
winapi.SetConsoleCursorPosition(handle, winapi.COORD(x, y))
elif cmd == AnsiCommand.SAVE_POS:
win_rect = buffer_info.srWindow
coord = buffer_info.dwCursorPosition
x, y = coord.X, coord.Y
x -= win_rect.Left
y -= win_rect.Top
self.saved_pos = (x, y)
elif cmd == AnsiCommand.RESTORE_POS:
win_rect = buffer_info.srWindow
x, y = self.saved_pos
x += win_rect.Left
y += win_rect.Top
winapi.SetConsoleCursorPosition(handle, winapi.COORD(x, y))

View File

@@ -1,221 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import sys
import ctypes
if sys.platform == 'win32':
from ctypes import WinDLL, CDLL, wintypes
shell32 = WinDLL("shell32")
kernel32 = WinDLL("kernel32")
shlwapi = WinDLL("shlwapi")
msvcrt = CDLL("msvcrt")
GetCommandLineW = kernel32.GetCommandLineW
GetCommandLineW.argtypes = []
GetCommandLineW.restype = wintypes.LPCWSTR
CommandLineToArgvW = shell32.CommandLineToArgvW
CommandLineToArgvW.argtypes = [
wintypes.LPCWSTR, ctypes.POINTER(ctypes.c_int)]
CommandLineToArgvW.restype = ctypes.POINTER(wintypes.LPWSTR)
LocalFree = kernel32.LocalFree
LocalFree.argtypes = [wintypes.HLOCAL]
LocalFree.restype = wintypes.HLOCAL
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa383751.aspx
LPCTSTR = ctypes.c_wchar_p
LPWSTR = wintypes.LPWSTR
LPCWSTR = ctypes.c_wchar_p
LPTSTR = LPWSTR
PCWSTR = ctypes.c_wchar_p
PCTSTR = PCWSTR
PWSTR = ctypes.c_wchar_p
PTSTR = PWSTR
LPVOID = wintypes.LPVOID
WCHAR = wintypes.WCHAR
LPSTR = ctypes.c_char_p
BOOL = wintypes.BOOL
LPBOOL = ctypes.POINTER(BOOL)
UINT = wintypes.UINT
WORD = wintypes.WORD
DWORD = wintypes.DWORD
SHORT = wintypes.SHORT
HANDLE = wintypes.HANDLE
ULONG = wintypes.ULONG
LPCSTR = wintypes.LPCSTR
STD_INPUT_HANDLE = DWORD(-10)
STD_OUTPUT_HANDLE = DWORD(-11)
STD_ERROR_HANDLE = DWORD(-12)
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
INTERNET_MAX_SCHEME_LENGTH = 32
INTERNET_MAX_PATH_LENGTH = 2048
INTERNET_MAX_URL_LENGTH = (
INTERNET_MAX_SCHEME_LENGTH + len("://") + INTERNET_MAX_PATH_LENGTH)
FOREGROUND_BLUE = 0x0001
FOREGROUND_GREEN = 0x0002
FOREGROUND_RED = 0x0004
FOREGROUND_INTENSITY = 0x0008
BACKGROUND_BLUE = 0x0010
BACKGROUND_GREEN = 0x0020
BACKGROUND_RED = 0x0040
BACKGROUND_INTENSITY = 0x0080
COMMON_LVB_REVERSE_VIDEO = 0x4000
COMMON_LVB_UNDERSCORE = 0x8000
UrlCreateFromPathW = shlwapi.UrlCreateFromPathW
UrlCreateFromPathW.argtypes = [
PCTSTR, PTSTR, ctypes.POINTER(DWORD), DWORD]
UrlCreateFromPathW.restype = ctypes.HRESULT
SetEnvironmentVariableW = kernel32.SetEnvironmentVariableW
SetEnvironmentVariableW.argtypes = [LPCTSTR, LPCTSTR]
SetEnvironmentVariableW.restype = wintypes.BOOL
GetEnvironmentVariableW = kernel32.GetEnvironmentVariableW
GetEnvironmentVariableW.argtypes = [LPCTSTR, LPTSTR, DWORD]
GetEnvironmentVariableW.restype = DWORD
GetEnvironmentStringsW = kernel32.GetEnvironmentStringsW
GetEnvironmentStringsW.argtypes = []
GetEnvironmentStringsW.restype = ctypes.c_void_p
FreeEnvironmentStringsW = kernel32.FreeEnvironmentStringsW
FreeEnvironmentStringsW.argtypes = [ctypes.c_void_p]
FreeEnvironmentStringsW.restype = ctypes.c_bool
GetStdHandle = kernel32.GetStdHandle
GetStdHandle.argtypes = [DWORD]
GetStdHandle.restype = HANDLE
class COORD(ctypes.Structure):
_fields_ = [
("X", SHORT),
("Y", SHORT),
]
class SMALL_RECT(ctypes.Structure):
_fields_ = [
("Left", SHORT),
("Top", SHORT),
("Right", SHORT),
("Bottom", SHORT),
]
class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
_fields_ = [
("dwSize", COORD),
("dwCursorPosition", COORD),
("wAttributes", WORD),
("srWindow", SMALL_RECT),
("dwMaximumWindowSize", COORD),
]
GetConsoleScreenBufferInfo = kernel32.GetConsoleScreenBufferInfo
GetConsoleScreenBufferInfo.argtypes = [
HANDLE, ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO)]
GetConsoleScreenBufferInfo.restype = BOOL
GetConsoleOutputCP = kernel32.GetConsoleOutputCP
GetConsoleOutputCP.argtypes = []
GetConsoleOutputCP.restype = UINT
SetConsoleOutputCP = kernel32.SetConsoleOutputCP
SetConsoleOutputCP.argtypes = [UINT]
SetConsoleOutputCP.restype = BOOL
GetConsoleCP = kernel32.GetConsoleCP
GetConsoleCP.argtypes = []
GetConsoleCP.restype = UINT
SetConsoleCP = kernel32.SetConsoleCP
SetConsoleCP.argtypes = [UINT]
SetConsoleCP.restype = BOOL
SetConsoleTextAttribute = kernel32.SetConsoleTextAttribute
SetConsoleTextAttribute.argtypes = [HANDLE, WORD]
SetConsoleTextAttribute.restype = BOOL
SetConsoleCursorPosition = kernel32.SetConsoleCursorPosition
SetConsoleCursorPosition.argtypes = [HANDLE, COORD]
SetConsoleCursorPosition.restype = BOOL
ReadConsoleW = kernel32.ReadConsoleW
ReadConsoleW.argtypes = [
HANDLE, LPVOID, DWORD, ctypes.POINTER(DWORD), LPVOID]
ReadConsoleW.restype = BOOL
MultiByteToWideChar = kernel32.MultiByteToWideChar
MultiByteToWideChar.argtypes = [
UINT, DWORD, LPCSTR, ctypes.c_int, LPWSTR, ctypes.c_int]
MultiByteToWideChar.restype = ctypes.c_int
WideCharToMultiByte = kernel32.WideCharToMultiByte
WideCharToMultiByte.argtypes = [
UINT, DWORD, LPCWSTR, ctypes.c_int, LPSTR, ctypes.c_int,
LPCSTR, LPBOOL]
WideCharToMultiByte.restype = ctypes.c_int
MoveFileW = kernel32.MoveFileW
MoveFileW.argtypes = [LPCTSTR, LPCTSTR]
MoveFileW.restype = BOOL
GetFileInformationByHandleEx = None
if hasattr(kernel32, "GetFileInformationByHandleEx"):
GetFileInformationByHandleEx = kernel32.GetFileInformationByHandleEx
GetFileInformationByHandleEx.argtypes = [
HANDLE, ctypes.c_int, ctypes.c_void_p, DWORD]
GetFileInformationByHandleEx.restype = BOOL
else:
# Windows XP
pass
MAX_PATH = 260
FileNameInfo = 2
class FILE_NAME_INFO(ctypes.Structure):
_fields_ = [
("FileNameLength", DWORD),
("FileName", WCHAR),
]
_get_osfhandle = msvcrt._get_osfhandle
_get_osfhandle.argtypes = [ctypes.c_int]
_get_osfhandle.restype = HANDLE
GetFileType = kernel32.GetFileType
GetFileType.argtypes = [HANDLE]
GetFileType.restype = DWORD
FILE_TYPE_PIPE = 0x0003

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
#
# This program is free software; you can redistribute it and/or modify
@@ -37,11 +36,11 @@ class PaddingInfo(object):
size (`int`): The amount of data following the padding
"""
def __init__(self, padding, size):
def __init__(self, padding: int, size: int):
self.padding = padding
self.size = size
def get_default_padding(self):
def get_default_padding(self) -> int:
"""The default implementation which tries to select a reasonable
amount of padding and which might change in future versions.

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
@@ -11,7 +10,6 @@ import signal
import contextlib
import optparse
from mutagen._senf import print_
from mutagen._util import iterbytes
@@ -88,8 +86,4 @@ class SignalHandler(object):
raise SystemExit("Aborted...")
class OptionParser(optparse.OptionParser):
"""OptionParser subclass which supports printing Unicode under Windows"""
def print_help(self, file=None):
print_(self.format_help(), file=file)
OptionParser = optparse.OptionParser

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Marcus Sundman
#
# This program is free software; you can redistribute it and/or modify
@@ -15,7 +14,6 @@ import os.path
import mutagen
import mutagen.id3
from mutagen._senf import print_, argv
from ._util import SignalHandler, OptionParser
@@ -24,11 +22,6 @@ VERSION = (0, 1)
_sig = SignalHandler()
def printerr(*args, **kwargs):
kwargs.setdefault("file", sys.stderr)
print_(*args, **kwargs)
class ID3OptionParser(OptionParser):
def __init__(self):
mutagen_version = mutagen.version_string
@@ -51,15 +44,15 @@ def copy(src, dst, merge, write_v1=True, excluded_tags=None, verbose=False):
try:
id3 = mutagen.id3.ID3(src, translate=False)
except mutagen.id3.ID3NoHeaderError:
print_(u"No ID3 header found in ", src, file=sys.stderr)
print(u"No ID3 header found in ", src, file=sys.stderr)
return 1
except Exception as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
return 1
if verbose:
print_(u"File", src, u"contains:", file=sys.stderr)
print_(id3.pprint(), file=sys.stderr)
print(u"File", src, u"contains:", file=sys.stderr)
print(id3.pprint(), file=sys.stderr)
for tag in excluded_tags:
id3.delall(tag)
@@ -71,7 +64,7 @@ def copy(src, dst, merge, write_v1=True, excluded_tags=None, verbose=False):
# no need to merge
pass
except Exception as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
return 1
else:
for frame in id3.values():
@@ -90,12 +83,12 @@ def copy(src, dst, merge, write_v1=True, excluded_tags=None, verbose=False):
try:
id3.save(dst, v1=(2 if write_v1 else 0), v2_version=v2_version)
except Exception as err:
print_(u"Error saving", dst, u":\n%s" % str(err),
file=sys.stderr)
print(u"Error saving", dst, u":\n%s" % str(err),
file=sys.stderr)
return 1
else:
if verbose:
print_(u"Successfully saved", dst, file=sys.stderr)
print(u"Successfully saved", dst, file=sys.stderr)
return 0
@@ -119,12 +112,12 @@ def main(argv):
(src, dst) = args
if not os.path.isfile(src):
print_(u"File not found:", src, file=sys.stderr)
print(u"File not found:", src, file=sys.stderr)
parser.print_help(file=sys.stderr)
return 1
if not os.path.isfile(dst):
printerr(u"File not found:", dst, file=sys.stderr)
print(u"File not found:", dst, file=sys.stderr)
parser.print_help(file=sys.stderr)
return 1
@@ -138,4 +131,4 @@ def main(argv):
def entry_point():
_sig.init()
return main(argv)
return main(sys.argv)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2006 Emfox Zhou <EmfoxZhou@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
@@ -7,7 +6,7 @@
# (at your option) any later version.
"""
ID3iconv is a Java based ID3 encoding convertor, here's the Python version.
ID3iconv is a Java based ID3 encoding converter, here's the Python version.
"""
import sys
@@ -15,7 +14,6 @@ import locale
import mutagen
import mutagen.id3
from mutagen._senf import argv, print_, fsnative
from ._util import SignalHandler, OptionParser
@@ -74,7 +72,7 @@ def update(options, filenames):
for filename in filenames:
with _sig.block():
if verbose != "quiet":
print_(u"Updating", filename)
print(u"Updating", filename)
if has_id3v1(filename) and not noupdate and force_v1:
mutagen.id3.delete(filename, False, True)
@@ -83,10 +81,10 @@ def update(options, filenames):
id3 = mutagen.id3.ID3(filename)
except mutagen.id3.ID3NoHeaderError:
if verbose != "quiet":
print_(u"No ID3 header found; skipping...")
print(u"No ID3 header found; skipping...")
continue
except Exception as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
continue
for tag in filter(lambda t: t.startswith(("T", "COMM")), id3):
@@ -110,7 +108,7 @@ def update(options, filenames):
frame.encoding = 1
if verbose == "debug":
print_(id3.pprint())
print(id3.pprint())
if not noupdate:
if remove_v1:
@@ -153,9 +151,9 @@ def main(argv):
for i, arg in enumerate(argv):
if arg == "-v1":
argv[i] = fsnative(u"--force-v1")
argv[i] = "--force-v1"
elif arg == "-removev1":
argv[i] = fsnative(u"--remove-v1")
argv[i] = "--remove-v1"
(options, args) = parser.parse_args(argv[1:])
@@ -167,4 +165,4 @@ def main(argv):
def entry_point():
_sig.init()
return main(argv)
return main(sys.argv)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2005 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -8,6 +7,7 @@
"""Pretend to be /usr/bin/id3v2 from id3lib, sort of."""
import os
import sys
import codecs
import mimetypes
@@ -18,8 +18,6 @@ from optparse import SUPPRESS_HELP
import mutagen
import mutagen.id3
from mutagen.id3 import Encoding, PictureType
from mutagen._senf import fsnative, print_, argv, fsn2text, fsn2bytes, \
bytes2fsn
from ._util import split_escape, SignalHandler, OptionParser
@@ -57,7 +55,7 @@ Any editing operation will cause the ID3 tag to be upgraded to ID3v2.4.
def list_frames(option, opt, value, parser):
items = mutagen.id3.Frames.items()
for name, frame in sorted(items):
print_(u" --%s %s" % (name, frame.__doc__.split("\n")[0]))
print(u" --%s %s" % (name, frame.__doc__.split("\n")[0]))
raise SystemExit
@@ -65,13 +63,13 @@ def list_frames_2_2(option, opt, value, parser):
items = mutagen.id3.Frames_2_2.items()
items.sort()
for name, frame in items:
print_(u" --%s %s" % (name, frame.__doc__.split("\n")[0]))
print(u" --%s %s" % (name, frame.__doc__.split("\n")[0]))
raise SystemExit
def list_genres(option, opt, value, parser):
for i, genre in enumerate(mutagen.id3.TCON.GENRES):
print_(u"%3d: %s" % (i, genre))
print(u"%3d: %s" % (i, genre))
raise SystemExit
@@ -79,7 +77,7 @@ def delete_tags(filenames, v1, v2):
for filename in filenames:
with _sig.block():
if verbose:
print_(u"deleting ID3 tag info in", filename, file=sys.stderr)
print(u"deleting ID3 tag info in", filename, file=sys.stderr)
mutagen.id3.delete(filename, v1, v2)
@@ -88,22 +86,22 @@ def delete_frames(deletes, filenames):
try:
deletes = frame_from_fsnative(deletes)
except ValueError as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
frames = deletes.split(",")
for filename in filenames:
with _sig.block():
if verbose:
print_(u"deleting %s from" % deletes, filename,
file=sys.stderr)
print("deleting %s from" % deletes, filename,
file=sys.stderr)
try:
id3 = mutagen.id3.ID3(filename)
except mutagen.id3.ID3NoHeaderError:
if verbose:
print_(u"No ID3 header found; skipping.", file=sys.stderr)
print(u"No ID3 header found; skipping.", file=sys.stderr)
except Exception as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
raise SystemExit(1)
else:
for frame in frames:
@@ -116,10 +114,8 @@ def frame_from_fsnative(arg):
or raises ValueError.
"""
assert isinstance(arg, fsnative)
text = fsn2text(arg, strict=True)
return text.encode("ascii").decode("ascii")
assert isinstance(arg, str)
return arg.encode("ascii").decode("ascii")
def value_from_fsnative(arg, escape):
@@ -127,23 +123,23 @@ def value_from_fsnative(arg, escape):
surrogate escapes or raises ValueError.
"""
assert isinstance(arg, fsnative)
assert isinstance(arg, str)
if escape:
bytes_ = fsn2bytes(arg)
bytes_ = os.fsencode(arg)
# With py3.7 this has started to warn for invalid escapes, but we
# don't control the input so ignore it.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
bytes_ = codecs.escape_decode(bytes_)[0]
arg = bytes2fsn(bytes_)
arg = os.fsdecode(bytes_)
text = fsn2text(arg, strict=True)
text = arg.encode("utf-8").decode("utf-8")
return text
def error(*args):
print_(*args, file=sys.stderr)
print(*args, file=sys.stderr)
raise SystemExit(1)
@@ -165,7 +161,7 @@ def write_files(edits, filenames, escape):
try:
frame = frame_from_fsnative(frame)
except ValueError as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
assert isinstance(frame, str)
@@ -203,16 +199,16 @@ def write_files(edits, filenames, escape):
for filename in filenames:
with _sig.block():
if verbose:
print_(u"Writing", filename, file=sys.stderr)
print(u"Writing", filename, file=sys.stderr)
try:
id3 = mutagen.id3.ID3(filename)
except mutagen.id3.ID3NoHeaderError:
if verbose:
print_(u"No ID3 header found; creating a new tag",
print(u"No ID3 header found; creating a new tag",
file=sys.stderr)
id3 = mutagen.id3.ID3()
except Exception as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
continue
for (frame, vlist) in edits.items():
if frame == "POPM":
@@ -336,31 +332,31 @@ def write_files(edits, filenames, escape):
def list_tags(filenames):
for filename in filenames:
print_("IDv2 tag info for", filename)
print("IDv2 tag info for", filename)
try:
id3 = mutagen.id3.ID3(filename, translate=False)
except mutagen.id3.ID3NoHeaderError:
print_(u"No ID3 header found; skipping.")
print(u"No ID3 header found; skipping.")
except Exception as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
raise SystemExit(1)
else:
print_(id3.pprint())
print(id3.pprint())
def list_tags_raw(filenames):
for filename in filenames:
print_("Raw IDv2 tag info for", filename)
print("Raw IDv2 tag info for", filename)
try:
id3 = mutagen.id3.ID3(filename, translate=False)
except mutagen.id3.ID3NoHeaderError:
print_(u"No ID3 header found; skipping.")
print(u"No ID3 header found; skipping.")
except Exception as err:
print_(str(err), file=sys.stderr)
print(str(err), file=sys.stderr)
raise SystemExit(1)
else:
for frame in id3.values():
print_(str(repr(frame)))
print(str(repr(frame)))
def main(argv):
@@ -409,43 +405,43 @@ def main(argv):
parser.add_option(
"-a", "--artist", metavar='"ARTIST"', action="callback",
help="Set the artist information", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--TPE1"),
callback=lambda *args: args[3].edits.append(("--TPE1",
args[2])))
parser.add_option(
"-A", "--album", metavar='"ALBUM"', action="callback",
help="Set the album title information", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--TALB"),
callback=lambda *args: args[3].edits.append(("--TALB",
args[2])))
parser.add_option(
"-t", "--song", metavar='"SONG"', action="callback",
help="Set the song title information", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--TIT2"),
callback=lambda *args: args[3].edits.append(("--TIT2",
args[2])))
parser.add_option(
"-c", "--comment", metavar='"DESCRIPTION":"COMMENT":"LANGUAGE"',
action="callback", help="Set the comment information", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--COMM"),
callback=lambda *args: args[3].edits.append(("--COMM",
args[2])))
parser.add_option(
"-p", "--picture",
metavar='"FILENAME":"DESCRIPTION":"IMAGE-TYPE":"MIME-TYPE"',
action="callback", help="Set the picture", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--APIC"),
callback=lambda *args: args[3].edits.append(("--APIC",
args[2])))
parser.add_option(
"-g", "--genre", metavar='"GENRE"', action="callback",
help="Set the genre or genre number", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--TCON"),
callback=lambda *args: args[3].edits.append(("--TCON",
args[2])))
parser.add_option(
"-y", "--year", "--date", metavar='YYYY[-MM-DD]', action="callback",
help="Set the year/date", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--TDRC"),
callback=lambda *args: args[3].edits.append(("--TDRC",
args[2])))
parser.add_option(
"-T", "--track", metavar='"num/num"', action="callback",
help="Set the track number/(optional) total tracks", type="string",
callback=lambda *args: args[3].edits.append((fsnative(u"--TRCK"),
callback=lambda *args: args[3].edits.append(("--TRCK",
args[2])))
for key, frame in mutagen.id3.Frames.items():
@@ -485,4 +481,4 @@ def main(argv):
def entry_point():
_sig.init()
return main(argv)
return main(sys.argv)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -9,9 +8,9 @@
"""Split a multiplex/chained Ogg file into its component parts."""
import os
import sys
import mutagen.ogg
from mutagen._senf import argv
from ._util import SignalHandler, OptionParser
@@ -72,4 +71,4 @@ def main(argv):
def entry_point():
_sig.init()
return main(argv)
return main(sys.argv)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2005 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -8,7 +7,7 @@
"""Full tag list for any given file."""
from mutagen._senf import print_, argv
import sys
from ._util import SignalHandler, OptionParser
@@ -19,7 +18,7 @@ _sig = SignalHandler()
def main(argv):
from mutagen import File
parser = OptionParser()
parser = OptionParser(usage="usage: %prog [options] FILE [FILE...]")
parser.add_option("--no-flac", help="Compatibility; does nothing.")
parser.add_option("--no-mp3", help="Compatibility; does nothing.")
parser.add_option("--no-apev2", help="Compatibility; does nothing.")
@@ -29,16 +28,16 @@ def main(argv):
raise SystemExit(parser.print_help() or 1)
for filename in args:
print_(u"--", filename)
print(u"--", filename)
try:
print_(u"-", File(filename).pprint())
print(u"-", File(filename).pprint())
except AttributeError:
print_(u"- Unknown file type")
print(u"- Unknown file type")
except Exception as err:
print_(str(err))
print_(u"")
print(str(err))
print(u"")
def entry_point():
_sig.init()
return main(argv)
return main(sys.argv)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2005 Joe Wreschnig, Michael Urman
#
# This program is free software; you can redistribute it and/or modify
@@ -10,8 +9,6 @@ import os
import sys
import traceback
from mutagen._senf import print_, argv
from ._util import SignalHandler
@@ -76,14 +73,14 @@ class Report(object):
else:
strings.append("\nNo errors!")
return("\n".join(strings))
return "\n".join(strings)
def check_dir(path):
from mutagen.mp3 import MP3
rep = Report(path)
print_(u"Scanning", path)
print(u"Scanning", path)
for path, dirs, files in os.walk(path):
files.sort()
for fn in files:
@@ -100,12 +97,12 @@ def check_dir(path):
else:
rep.success(mp3.tags)
print_(str(rep))
print(str(rep))
def main(argv):
if len(argv) == 1:
print_(u"Usage:", argv[0], u"directory ...")
print(u"Usage:", argv[0], u"directory ...")
else:
for path in argv[1:]:
check_dir(path)
@@ -113,4 +110,4 @@ def main(argv):
def entry_point():
SignalHandler().init()
return main(argv)
return main(sys.argv)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -18,6 +17,7 @@ import codecs
import errno
import decimal
from io import BytesIO
from typing import Tuple, List
from collections import namedtuple
from contextlib import contextmanager
@@ -25,11 +25,11 @@ from functools import wraps
from fnmatch import fnmatchcase
_DEFAULT_BUFFER_SIZE = 2 ** 18
_DEFAULT_BUFFER_SIZE = 2 ** 20
def endswith(text, end):
# usefull for paths which can be both, str and bytes
# useful for paths which can be both, str and bytes
if isinstance(text, str):
if not isinstance(end, str):
end = end.decode("ascii")
@@ -51,7 +51,7 @@ def iterbytes(b):
return (bytes([v]) for v in b)
def intround(value):
def intround(value: float) -> int:
"""Given a float returns a rounded int. Should give the same result on
both Py2/3
"""
@@ -60,7 +60,7 @@ def intround(value):
value).to_integral_value(decimal.ROUND_HALF_EVEN))
def is_fileobj(fileobj):
def is_fileobj(fileobj) -> bool:
"""Returns:
bool: if an argument passed ot mutagen should be treated as a
file object
@@ -478,7 +478,7 @@ class DictMixin(object):
return args[0]
else:
raise
del(self[key])
del self[key]
return value
def popitem(self):
@@ -540,7 +540,7 @@ class DictProxy(DictMixin):
self.__dict[key] = value
def __delitem__(self, key):
del(self.__dict[key])
del self.__dict[key]
def keys(self):
return self.__dict.keys()
@@ -615,7 +615,7 @@ class cdata(object):
_fill_cdata(cdata)
def get_size(fileobj):
def get_size(fileobj) -> int:
"""Returns the size of the file.
The position when passed in will be preserved if no error occurs.
@@ -635,7 +635,7 @@ def get_size(fileobj):
fileobj.seek(old_pos, 0)
def read_full(fileobj, size):
def read_full(fileobj, size: int) -> None:
"""Like fileobj.read but raises IOError if not all requested data is
returned.
@@ -658,7 +658,7 @@ def read_full(fileobj, size):
return data
def seek_end(fileobj, offset):
def seek_end(fileobj, offset: int) -> None:
"""Like fileobj.seek(-offset, 2), but will not try to go beyond the start
Needed since file objects from BytesIO will not raise IOError and
@@ -683,7 +683,7 @@ def seek_end(fileobj, offset):
fileobj.seek(-offset, 2)
def resize_file(fobj, diff, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
def resize_file(fobj, diff: int, BUFFER_SIZE: int = _DEFAULT_BUFFER_SIZE) -> None:
"""Resize a file by `diff`.
New space will be filled with zeros.
@@ -720,7 +720,8 @@ def resize_file(fobj, diff, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
raise
def move_bytes(fobj, dest, src, count, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
def move_bytes(fobj, dest: int, src: int, count: int,
BUFFER_SIZE: int = _DEFAULT_BUFFER_SIZE) -> None:
"""Moves data around using read()/write().
Args:
@@ -763,7 +764,8 @@ def move_bytes(fobj, dest, src, count, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
fobj.flush()
def insert_bytes(fobj, size, offset, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
def insert_bytes(fobj, size: int, offset: int,
BUFFER_SIZE: int = _DEFAULT_BUFFER_SIZE) -> None:
"""Insert size bytes of empty space starting at offset.
fobj must be an open file object, open rb+ or
@@ -791,7 +793,8 @@ def insert_bytes(fobj, size, offset, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
move_bytes(fobj, offset + size, offset, movesize, BUFFER_SIZE)
def delete_bytes(fobj, size, offset, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
def delete_bytes(fobj, size: int, offset: int,
BUFFER_SIZE: int = _DEFAULT_BUFFER_SIZE) -> None:
"""Delete size bytes of empty space starting at offset.
fobj must be an open file object, open rb+ or
@@ -819,7 +822,7 @@ def delete_bytes(fobj, size, offset, BUFFER_SIZE=_DEFAULT_BUFFER_SIZE):
resize_file(fobj, -size, BUFFER_SIZE)
def resize_bytes(fobj, old_size, new_size, offset):
def resize_bytes(fobj, old_size: int, new_size: int, offset: int) -> None:
"""Resize an area in a file adding and deleting at the end of it.
Does nothing if no resizing is needed.
@@ -865,7 +868,8 @@ def dict_match(d, key, default=None):
return default
def encode_endian(text, encoding, errors="strict", le=True):
def encode_endian(text: str, encoding: str,
errors: str = "strict", le: bool = True) -> bytes:
"""Like text.encode(encoding) but always returns little endian/big endian
BOMs instead of the system one.
@@ -897,7 +901,8 @@ def encode_endian(text, encoding, errors="strict", le=True):
return text.encode(encoding, errors)
def decode_terminated(data, encoding, strict=True):
def decode_terminated(data: bytes, encoding: str,
strict: bool = True) -> Tuple[str, bytes]:
"""Returns the decoded data until the first NULL terminator
and all data after it.
@@ -937,7 +942,7 @@ def decode_terminated(data, encoding, strict=True):
# slow path
decoder = codec_info.incrementaldecoder()
r = []
r: List[str] = []
for i, b in enumerate(iterbytes(data)):
c = decoder.decode(b)
if c == u"\x00":
@@ -963,7 +968,7 @@ class BitReader(object):
self._bits = 0
self._pos = fileobj.tell()
def bits(self, count):
def bits(self, count: int) -> int:
"""Reads `count` bits and returns an uint, MSB read first.
May raise BitReaderError if not enough data could be read or
@@ -988,7 +993,7 @@ class BitReader(object):
assert self._bits < 8
return value
def bytes(self, count):
def bytes(self, count: int) -> bytes:
"""Returns a bytearray of length `count`. Works unaligned."""
if count < 0:
@@ -1003,7 +1008,7 @@ class BitReader(object):
return bytes(bytearray(self.bits(8) for _ in range(count)))
def skip(self, count):
def skip(self, count: int) -> None:
"""Skip `count` bits.
Might raise BitReaderError if there wasn't enough data to skip,
@@ -1022,12 +1027,12 @@ class BitReader(object):
count -= n_bytes * 8
self.bits(count)
def get_position(self):
def get_position(self) -> int:
"""Returns the amount of bits read or skipped so far"""
return (self._fileobj.tell() - self._pos) * 8 - self._bits
def align(self):
def align(self) -> int:
"""Align to the next byte, returns the amount of bits skipped"""
bits = self._bits
@@ -1035,7 +1040,7 @@ class BitReader(object):
self._bits = 0
return bits
def is_aligned(self):
def is_aligned(self) -> bool:
"""If we are currently aligned to bytes and nothing is buffered"""
return self._bits == 0

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005-2006 Joe Wreschnig
# 2013 Christoph Reiter
#
@@ -23,7 +22,7 @@ import mutagen
from mutagen._util import DictMixin, cdata, MutagenError, reraise
def is_valid_key(key):
def is_valid_key(key: str) -> bool:
"""Return true if a string is a valid Vorbis comment key.
Valid Vorbis comment keys are printable ASCII between 0x20 (space)
@@ -159,7 +158,7 @@ class VComment(mutagen.Tags, list):
return True
def clear(self):
def clear(self) -> None:
"""Clear all keys from the comment."""
for i in list(self):
@@ -197,7 +196,7 @@ class VComment(mutagen.Tags, list):
f.write(b"\x01")
return f.getvalue()
def pprint(self):
def pprint(self) -> str:
def _decode(value):
if not isinstance(value, str):
@@ -208,7 +207,7 @@ class VComment(mutagen.Tags, list):
return u"\n".join(tags)
class VCommentDict(VComment, DictMixin):
class VCommentDict(VComment, DictMixin): # type: ignore
"""A VComment that looks like a dictionary.
This object differs from a dictionary in two ways. First,
@@ -290,7 +289,7 @@ class VCommentDict(VComment, DictMixin):
if not isinstance(values, list):
values = [values]
try:
del(self[key])
del self[key]
except KeyError:
pass

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2019 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or modify
@@ -220,7 +219,7 @@ class AC3Info(StreamInfo):
if timecod2e:
r.skip(14) # Time Code Second Half
if r.bits(1): # Additional Bit Stream Information Exists
addbsil = r.bit(6) # Additional Bit Stream Information Length
addbsil = r.bits(6) # Additional Bit Stream Information Length
r.skip((addbsil + 1) * 8)
@staticmethod
@@ -271,7 +270,7 @@ class AC3Info(StreamInfo):
if r.bits(1): # blkid
r.skip(6) # frmsizecod
if r.bits(1): # Additional Bit Stream Information Exists
addbsil = r.bit(6) # Additional Bit Stream Information Length
addbsil = r.bits(6) # Additional Bit Stream Information Length
r.skip((addbsil + 1) * 8)
@staticmethod

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Evan Purkhiser
# 2014 Ben Ockmore
# 2019-2020 Philipp Wolfer

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -41,6 +40,7 @@ from mutagen._util import DictMixin, cdata, delete_bytes, total_ordering, \
def is_valid_apev2_key(key):
# https://wiki.hydrogenaud.io/index.php?title=APE_key
if not isinstance(key, str):
raise TypeError("APEv2 key must be str")
@@ -252,8 +252,8 @@ class _CIDictProxy(DictMixin):
def __delitem__(self, key):
lower = key.lower()
del(self.__casemap[lower])
del(self.__dict[lower])
del self.__casemap[lower]
del self.__dict[lower]
def keys(self):
return [self.__casemap.get(key, key) for key in self.__dict.keys()]
@@ -327,6 +327,10 @@ class APEv2(_CIDictProxy, Metadata):
key = key.decode("ascii")
except UnicodeError as err:
reraise(APEBadItemError, err, sys.exc_info()[2])
if not is_valid_apev2_key(key):
raise APEBadItemError("%r is not a valid APEv2 key" % key)
value = fileobj.read(size)
if len(value) != size:
raise APEBadItemError
@@ -517,7 +521,7 @@ def APEValue(value, kind):
class _APEValue(object):
kind = None
kind: int
value = None
def __init__(self, value, kind=None):

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005-2006 Joe Wreschnig
# Copyright (C) 2006-2007 Lukas Lalinsky
#
@@ -74,7 +73,7 @@ class ASFInfo(StreamInfo):
return s
class ASFTags(list, DictMixin, Tags):
class ASFTags(list, DictMixin, Tags): # type: ignore
"""ASFTags()
Dictionary containing ASF attributes.
@@ -148,7 +147,7 @@ class ASFTags(list, DictMixin, Tags):
to_append.append((key, value))
try:
del(self[key])
del self[key]
except KeyError:
pass

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005-2006 Joe Wreschnig
# Copyright (C) 2006-2007 Lukas Lalinsky
#
@@ -9,6 +8,7 @@
import sys
import struct
from typing import Dict, Type
from mutagen._util import total_ordering, reraise
@@ -18,9 +18,9 @@ from ._util import ASFError
class ASFBaseAttribute(object):
"""Generic attribute."""
TYPE = None
TYPE: int
_TYPES = {}
_TYPES: "Dict[int, Type[ASFBaseAttribute]]" = {}
value = None
"""The Python value of this attribute (type depends on the class)"""

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005-2006 Joe Wreschnig
# Copyright (C) 2006-2007 Lukas Lalinsky
#
@@ -8,6 +7,7 @@
# (at your option) any later version.
import struct
from typing import Dict, Type
from mutagen._util import cdata, get_size
from mutagen._tags import PaddingInfo
@@ -19,8 +19,8 @@ from ._attrs import ASFBaseAttribute, ASFUnicodeAttribute
class BaseObject(object):
"""Base ASF object."""
GUID = None
_TYPES = {}
GUID: bytes
_TYPES: "Dict[bytes, Type[BaseObject]]" = {}
def __init__(self):
self.objects = []

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005-2006 Joe Wreschnig
# Copyright (C) 2006-2007 Lukas Lalinsky
#
@@ -8,6 +7,7 @@
# (at your option) any later version.
import struct
from typing import List
from mutagen._util import MutagenError
@@ -24,7 +24,7 @@ class ASFHeaderError(error):
pass
def guid2bytes(s):
def guid2bytes(s: str) -> bytes:
"""Converts a GUID to the serialized bytes representation"""
assert isinstance(s, str)
@@ -38,13 +38,13 @@ def guid2bytes(s):
])
def bytes2guid(s):
def bytes2guid(s: bytes) -> str:
"""Converts a serialized GUID to a text GUID"""
assert isinstance(s, bytes)
u = struct.unpack
v = []
v: List[int] = []
v.extend(u("<IHH", s[:8]))
v.extend(u(">HQ", s[8:10] + b"\x00\x00" + s[10:]))
return "%08X-%04X-%04X-%04X-%012X" % tuple(v)
@@ -285,7 +285,7 @@ CODECS = {
0xA10C: u"Media Foundation Spectrum Analyzer Output",
0xA10D: u"GSM 6.10 (Full-Rate) Speech",
0xA10E: u"GSM 6.20 (Half-Rate) Speech",
0xA10F: u"GSM 6.60 (Enchanced Full-Rate) Speech",
0xA10F: u"GSM 6.60 (Enhanced Full-Rate) Speech",
0xA110: u"GSM 6.90 (Adaptive Multi-Rate) Speech",
0xA111: u"GSM Adaptive Multi-Rate WideBand Speech",
0xA112: u"Polycom G.722",

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2020 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Boris Pruessmann
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -12,6 +11,8 @@ EasyID3 is a wrapper around mutagen.id3.ID3 to make ID3 tags appear
more like Vorbis or APEv2 tags.
"""
from typing import Callable, Dict
import mutagen.id3
from mutagen import Metadata
@@ -64,10 +65,10 @@ class EasyID3(DictMixin, Metadata):
"""
Set = {}
Get = {}
Delete = {}
List = {}
Set: Dict[str, Callable] = {}
Get: Dict[str, Callable] = {}
Delete: Dict[str, Callable] = {}
List: Dict[str, Callable] = {}
# For compatibility.
valid_keys = Get
@@ -129,7 +130,7 @@ class EasyID3(DictMixin, Metadata):
frame.text = value
def deleter(id3, key):
del(id3[frameid])
del id3[frameid]
cls.RegisterKey(key, getter, setter, deleter)
@@ -159,7 +160,7 @@ class EasyID3(DictMixin, Metadata):
id3.add(mutagen.id3.TXXX(encoding=enc, text=value, desc=desc))
def deleter(id3, key):
del(id3[frameid])
del id3[frameid]
cls.RegisterKey(key, getter, setter, deleter)
@@ -269,7 +270,7 @@ def genre_set(id3, key, value):
def genre_delete(id3, key):
del(id3["TCON"])
del id3["TCON"]
def date_get(id3, key):
@@ -281,7 +282,7 @@ def date_set(id3, key, value):
def date_delete(id3, key):
del(id3["TDRC"])
del id3["TDRC"]
def original_date_get(id3, key):
@@ -293,7 +294,7 @@ def original_date_set(id3, key, value):
def original_date_delete(id3, key):
del(id3["TDOR"])
del id3["TDOR"]
def performer_get(id3, key):
@@ -338,7 +339,7 @@ def performer_delete(id3, key):
elif people:
mcl.people = people
else:
del(id3["TMCL"])
del id3["TMCL"]
def performer_list(id3, key):
@@ -368,7 +369,7 @@ def musicbrainz_trackid_set(id3, key, value):
def musicbrainz_trackid_delete(id3, key):
del(id3["UFID:http://musicbrainz.org"])
del id3["UFID:http://musicbrainz.org"]
def website_get(id3, key):
@@ -420,7 +421,7 @@ def gain_delete(id3, key):
if frame.peak:
frame.gain = 0.0
else:
del(id3["RVA2:" + key[11:-5]])
del id3["RVA2:" + key[11:-5]]
def peak_get(id3, key):
@@ -456,7 +457,7 @@ def peak_delete(id3, key):
if frame.gain:
frame.peak = 0.0
else:
del(id3["RVA2:" + key[11:-5]])
del id3["RVA2:" + key[11:-5]]
def peakgain_list(id3, key):
@@ -477,6 +478,7 @@ for frameid, key in {
"TLEN": "length",
"TMED": "media",
"TMOO": "mood",
"TIT1": "grouping",
"TIT2": "title",
"TIT3": "version",
"TPE1": "artist",
@@ -553,4 +555,4 @@ class EasyID3FileType(ID3FileType):
tags (`EasyID3`)
"""
ID3 = EasyID3
ID3 = EasyID3 # type: ignore

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2009 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -6,6 +5,8 @@
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from typing import Dict, Callable
from mutagen import Tags
from mutagen._util import DictMixin, dict_match
from mutagen.mp4 import MP4, MP4Tags, error, delete
@@ -31,10 +32,10 @@ class EasyMP4Tags(DictMixin, Tags):
MP4, not EasyMP4.
"""
Set = {}
Get = {}
Delete = {}
List = {}
Set: Dict[str, Callable] = {}
Get: Dict[str, Callable] = {}
Delete: Dict[str, Callable] = {}
List: Dict[str, Callable] = {}
def __init__(self, *args, **kwargs):
self.__mp4 = MP4Tags(*args, **kwargs)
@@ -95,7 +96,7 @@ class EasyMP4Tags(DictMixin, Tags):
tags[atomid] = value
def deleter(tags, key):
del(tags[atomid])
del tags[atomid]
cls.RegisterKey(key, getter, setter, deleter)
@@ -112,7 +113,7 @@ class EasyMP4Tags(DictMixin, Tags):
tags[atomid] = [clamp(v) for v in map(int, value)]
def deleter(tags, key):
del(tags[atomid])
del tags[atomid]
cls.RegisterKey(key, getter, setter, deleter)
@@ -143,7 +144,7 @@ class EasyMP4Tags(DictMixin, Tags):
tags[atomid] = data
def deleter(tags, key):
del(tags[atomid])
del tags[atomid]
cls.RegisterKey(key, getter, setter, deleter)
@@ -172,7 +173,7 @@ class EasyMP4Tags(DictMixin, Tags):
tags[atomid] = encoded
def deleter(tags, key):
del(tags[atomid])
del tags[atomid]
cls.RegisterKey(key, getter, setter, deleter)
@@ -276,7 +277,7 @@ class EasyMP4(MP4):
tags (`EasyMP4Tags`)
"""
MP4Tags = EasyMP4Tags
MP4Tags = EasyMP4Tags # type: ignore
Get = EasyMP4Tags.Get
Set = EasyMP4Tags.Set

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -281,7 +280,7 @@ class SeekPoint(tuple):
"""
def __new__(cls, first_sample, byte_offset, num_samples):
return super(cls, SeekPoint).__new__(
return super(SeekPoint, cls).__new__(
cls, (first_sample, byte_offset, num_samples))
def __getnewargs__(self):
@@ -374,7 +373,7 @@ class CueSheetTrackIndex(tuple):
"""
def __new__(cls, index_number, index_offset):
return super(cls, CueSheetTrackIndex).__new__(
return super(CueSheetTrackIndex, cls).__new__(
cls, (index_number, index_offset))
index_number = property(lambda self: self[0])
@@ -388,7 +387,7 @@ class CueSheetTrack(object):
For CD-DA, track_numbers must be 1-99, or 170 for the
lead-out. Track_numbers must be unique within a cue sheet. There
must be atleast one index in every track except the lead-out track
must be at least one index in every track except the lead-out track
which must have none.
Attributes:
@@ -520,7 +519,7 @@ class CueSheet(MetadataBlock):
track_flags |= 0x40
track_packed = struct.pack(
self.__CUESHEET_TRACK_FORMAT, track.start_offset,
track.track_number, track.isrc, track_flags,
track.track_number, track.isrc or b"\0", track_flags,
len(track.indexes))
f.write(track_packed)
for index in track.indexes:
@@ -688,7 +687,6 @@ class FLAC(mutagen.FileType):
_mimes = ["audio/flac", "audio/x-flac", "application/x-flac"]
info = None
tags = None
METADATA_BLOCKS = [StreamInfo, Padding, None, SeekTable, VCFLACDict,
@@ -714,7 +712,7 @@ class FLAC(mutagen.FileType):
if block_type._distrust_size:
# Some jackass is writing broken Metadata block length
# for Vorbis comment blocks, and the FLAC reference
# implementaton can parse them (mostly by accident),
# implementation can parse them (mostly by accident),
# so we have to too. Instead of parsing the size
# given, parse an actual Vorbis comment, leaving
# fileobj in the right position.
@@ -798,7 +796,7 @@ class FLAC(mutagen.FileType):
pass
try:
self.metadata_blocks[0].length
self.info.length
except (AttributeError, IndexError):
raise FLACNoHeaderError("Stream info block not found")
@@ -812,7 +810,11 @@ class FLAC(mutagen.FileType):
@property
def info(self):
return self.metadata_blocks[0]
streaminfo_blocks = [
block for block in self.metadata_blocks
if block.code == StreamInfo.code
]
return streaminfo_blocks[0]
def add_picture(self, picture):
"""Add a new picture to the file.
@@ -844,6 +846,15 @@ class FLAC(mutagen.FileType):
If no filename is given, the one most recently loaded is used.
"""
# add new cuesheet and seektable
if self.cuesheet and self.cuesheet not in self.metadata_blocks:
if not isinstance(self.cuesheet, CueSheet):
raise ValueError("Invalid cuesheet object type!")
self.metadata_blocks.append(self.cuesheet)
if self.seektable and self.seektable not in self.metadata_blocks:
if not isinstance(self.seektable, SeekTable):
raise ValueError("Invalid seektable object type!")
self.metadata_blocks.append(self.seektable)
self._save(filething, self.metadata_blocks, deleteid3, padding)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
# 2006 Lukas Lalinsky
# 2013 Christoph Reiter

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
# 2006 Lukas Lalinsky
# 2013 Christoph Reiter
@@ -167,7 +166,10 @@ class ID3(ID3Tags, mutagen.Metadata):
if known_frames is not None:
self._header._known_frames = known_frames
data = read_full(fileobj, self.size - 10)
size = self.size - 10
if self.f_extended:
size -= 4 + len(self._header._extdata)
data = read_full(fileobj, size)
remaining_data = self._read(self._header, data)
self._padding = len(remaining_data)
@@ -232,7 +234,7 @@ class ID3(ID3Tags, mutagen.Metadata):
if 0, ID3v1 tags will be removed.
if 1, ID3v1 tags will be updated but not added.
if 2, ID3v1 tags will be created and/or updated
v2 (int):
v2_version (int):
version of ID3v2 tags (3 or 4).
v23_sep (text):
the separator used to join multiple text values

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
#
# This program is free software; you can redistribute it and/or modify
@@ -8,6 +7,7 @@
import zlib
from struct import unpack
from typing import Sequence
from ._util import ID3JunkFrameError, ID3EncryptionUnsupportedError, unsynch, \
ID3SaveConfig, error
@@ -17,7 +17,7 @@ from ._specs import BinaryDataSpec, StringSpec, Latin1TextSpec, \
VolumeAdjustmentSpec, ChannelSpec, MultiSpec, SynchronizedTextSpec, \
KeyEventSpec, TimeStampSpec, EncodedNumericPartTextSpec, \
EncodedNumericTextSpec, SpecError, PictureTypeSpec, ID3FramesSpec, \
Latin1TextListSpec, CTOCFlagsSpec, FrameIDSpec, RVASpec
Latin1TextListSpec, CTOCFlagsSpec, FrameIDSpec, RVASpec, Spec
def _bytes2key(b):
@@ -49,8 +49,8 @@ class Frame(object):
FLAG24_UNSYNCH = 0x0002
FLAG24_DATALEN = 0x0001
_framespec = []
_optionalspec = []
_framespec: Sequence[Spec] = []
_optionalspec: Sequence[Spec] = []
def __init__(self, *args, **kwargs):
if len(args) == 1 and len(kwargs) == 0 and \
@@ -291,7 +291,7 @@ class Frame(object):
frame._readData(header, data)
return frame
def __hash__(self):
def __hash__(self: object):
raise TypeError("Frame objects are unhashable")
@@ -514,7 +514,7 @@ class UrlFrame(Frame):
ASCII.
"""
_framespec = [
_framespec: Sequence[Spec] = [
Latin1TextSpec('url'),
]
@@ -568,7 +568,7 @@ class TCON(TextFrame):
genre_re = re.compile(r"((?:\((?P<id>[0-9]+|RX|CR)\))*)(?P<str>.+)?")
for value in self.text:
# 255 possible entries in id3v1
if value.isdigit() and int(value) < 256:
if value.isdecimal() and int(value) < 256:
try:
genres.append(self.GENRES[int(value)])
except IndexError:
@@ -832,7 +832,7 @@ class TSOC(TextFrame):
class TSOP(TextFrame):
"Perfomer Sort Order key"
"Performer Sort Order key"
class TSOT(TextFrame):
@@ -1110,7 +1110,7 @@ class SYLT(Frame):
class COMM(TextFrame):
"""User comment.
User comment frames have a descrption, like TXXX, and also a three
User comment frames have a description, like TXXX, and also a three
letter ISO language code in the 'lang' attribute.
"""
@@ -1895,7 +1895,7 @@ class TS2(TSO2):
class TSP(TSOP):
"Perfomer Sort Order key"
"Performer Sort Order key"
class TSC(TSOC):
@@ -1931,7 +1931,7 @@ class TOT(TOAL):
class TOA(TOPE):
"Original Artist/Perfomer"
"Original Artist/Performer"
class TOL(TOLY):
@@ -1995,7 +1995,7 @@ class STC(SYTC):
class ULT(USLT):
"Unsychronised lyrics/text transcription"
"Unsynchronised lyrics/text transcription"
class SLT(SYLT):

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
# 2006 Lukas Lalinsky
# 2013 Christoph Reiter

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
#
# This program is free software; you can redistribute it and/or modify
@@ -541,7 +540,7 @@ class MultiSpec(Spec):
spec = self.specs[0]
# Merge single text spec multispecs only.
# (TimeStampSpec beeing the exception, but it's not a valid v2.3 frame)
# (TimeStampSpec being the exception, but it's not a valid v2.3 frame)
if not isinstance(spec, EncodedTextSpec) or \
isinstance(spec, TimeStampSpec):
return value

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2005 Michael Urman
# Copyright 2016 Christoph Reiter
#
@@ -109,6 +108,9 @@ class ID3Header(object):
# excludes itself."
extsize = struct.unpack('>L', extsize_data)[0]
if extsize < 0:
raise error("invalid extended header size")
self._extdata = read_full(fileobj, extsize)
@@ -254,12 +256,12 @@ class ID3Tags(DictProxy, Tags):
"""
if key in self:
del(self[key])
del self[key]
else:
key = key + ":"
for k in list(self.keys()):
if k.startswith(key):
del(self[k])
del self[k]
def pprint(self):
"""
@@ -367,17 +369,20 @@ class ID3Tags(DictProxy, Tags):
# TDAT, TYER, and TIME have been turned into TDRC.
timestamps = []
old_frames = [self.pop(n, []) for n in ["TYER", "TDAT", "TIME"]]
for y, d, t in zip_longest(*old_frames, fillvalue=u""):
ym = re.match(r"([0-9]+)\Z", y)
dm = re.match(r"([0-9]{2})([0-9]{2})\Z", d)
tm = re.match(r"([0-9]{2})([0-9]{2})\Z", t)
for tyer, tdat, time in zip_longest(*old_frames, fillvalue=""):
ym = re.match(r"([0-9]{4})(-[0-9]{2}-[0-9]{2})?\Z", tyer)
dm = re.match(r"([0-9]{2})([0-9]{2})\Z", tdat)
tm = re.match(r"([0-9]{2})([0-9]{2})\Z", time)
timestamp = ""
if ym:
timestamp += u"%s" % ym.groups()
(year, month_day) = ym.groups()
timestamp += "%s" % year
if dm:
timestamp += u"-%s-%s" % dm.groups()[::-1]
month_day = "-%s-%s" % dm.groups()[::-1]
if month_day:
timestamp += month_day
if tm:
timestamp += u"T%s:%s:00" % tm.groups()
timestamp += "T%s:%s:00" % tm.groups()
if timestamp:
timestamps.append(timestamp)
if timestamps and "TDRC" not in self:
@@ -402,7 +407,7 @@ class ID3Tags(DictProxy, Tags):
# should have been removed already.
for key in ["RVAD", "EQUA", "TRDA", "TSIZ", "TDAT", "TIME"]:
if key in self:
del(self[key])
del self[key]
# Recurse into chapters
for f in self.getall("CHAP"):
@@ -466,7 +471,7 @@ class ID3Tags(DictProxy, Tags):
for key in v24_frames:
if key in self:
del(self[key])
del self[key]
# Recurse into chapters
for f in self.getall("CHAP"):

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2005 Michael Urman
# 2013 Christoph Reiter
# 2014 Ben Ockmore
@@ -8,6 +7,8 @@
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from typing import Union
from mutagen._util import MutagenError
@@ -45,7 +46,7 @@ class ID3JunkFrameError(error):
class unsynch(object):
@staticmethod
def decode(value):
def decode(value: bytes) -> bytes:
fragments = bytearray(value).split(b'\xff')
if len(fragments) > 1 and not fragments[-1]:
raise ValueError('string ended unsafe')
@@ -60,7 +61,7 @@ class unsynch(object):
return bytes(bytearray(b'\xff').join(fragments))
@staticmethod
def encode(value):
def encode(value: bytes) -> bytes:
fragments = bytearray(value).split(b'\xff')
for f in fragments[1:]:
if (not f) or (f[0] >= 0xE0) or (f[0] == 0x00):
@@ -74,7 +75,8 @@ class _BitPaddedMixin(object):
return self.to_str(self, self.bits, self.bigendian, width, minwidth)
@staticmethod
def to_str(value, bits=7, bigendian=True, width=4, minwidth=4):
def to_str(value: int, bits: int = 7, bigendian: bool = True,
width: int = 4, minwidth: int = 4) -> bytes:
mask = (1 << bits) - 1
if width != -1:
@@ -102,7 +104,7 @@ class _BitPaddedMixin(object):
return bytes(bytes_)
@staticmethod
def has_valid_padding(value, bits=7):
def has_valid_padding(value: Union[int, bytes], bits: int = 7) -> bool:
"""Whether the padding bits are all zero"""
assert bits <= 8

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Lukas Lalinsky
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -81,16 +80,16 @@ class MPEGFrame(object):
# Map (version, layer) tuples to bitrates.
__BITRATE = {
(1, 1): [0, 32, 64, 96, 128, 160, 192, 224,
256, 288, 320, 352, 384, 416, 448],
(1, 2): [0, 32, 48, 56, 64, 80, 96, 112, 128,
160, 192, 224, 256, 320, 384],
(1, 3): [0, 32, 40, 48, 56, 64, 80, 96, 112,
128, 160, 192, 224, 256, 320],
(2, 1): [0, 32, 48, 56, 64, 80, 96, 112, 128,
144, 160, 176, 192, 224, 256],
(2, 2): [0, 8, 16, 24, 32, 40, 48, 56, 64,
80, 96, 112, 128, 144, 160],
(1., 1): [0, 32, 64, 96, 128, 160, 192, 224,
256, 288, 320, 352, 384, 416, 448],
(1., 2): [0, 32, 48, 56, 64, 80, 96, 112, 128,
160, 192, 224, 256, 320, 384],
(1., 3): [0, 32, 40, 48, 56, 64, 80, 96, 112,
128, 160, 192, 224, 256, 320],
(2., 1): [0, 32, 48, 56, 64, 80, 96, 112, 128,
144, 160, 176, 192, 224, 256],
(2., 2): [0, 8, 16, 24, 32, 40, 48, 56, 64,
80, 96, 112, 128, 144, 160],
}
__BITRATE[(2, 3)] = __BITRATE[(2, 2)]
@@ -456,7 +455,11 @@ class MP3(ID3FileType):
def score(filename, fileobj, header_data):
filename = filename.lower()
return (header_data.startswith(b"ID3") * 2 +
return ((header_data.startswith(b"ID3") or
header_data.startswith(b'\xFF\xF2') or
header_data.startswith(b'\xFF\xF3') or
header_data.startswith(b'\xFF\xFA') or
header_data.startswith(b'\xFF\xFB')) * 2 +
endswith(filename, b".mp3") +
endswith(filename, b".mp2") + endswith(filename, b".mpg") +
endswith(filename, b".mpeg"))
@@ -479,4 +482,4 @@ class EasyMP3(MP3):
"""
from mutagen.easyid3 import EasyID3 as ID3
ID3 = ID3
ID3 = ID3 # type: ignore

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
@@ -14,6 +13,7 @@ http://wiki.hydrogenaud.io/index.php?title=MP3
from __future__ import division
from functools import partial
from io import BytesIO
from typing import List
from mutagen._util import cdata, BitReader, iterbytes
@@ -356,7 +356,7 @@ class XingHeader(object):
bytes = -1
"""Number of bytes, -1 if unknown"""
toc = []
toc: List[int] = []
"""List of 100 file offsets in percent encoded as 0-255. E.g. entry
50 contains the file offset in percent at 50% play time.
Empty if unknown.
@@ -474,7 +474,7 @@ class VBRIHeader(object):
toc_frames = 0
"""Number of frames per table entry"""
toc = []
toc: List[int] = []
"""TOC"""
def __init__(self, fileobj):

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -248,7 +247,7 @@ def _item_sort_key(key, value):
last = len(order)
# If there's no key-based way to distinguish, order by length.
# If there's still no way, go by string comparison on the
# values, so we at least have something determinstic.
# values, so we at least have something deterministic.
return (order.get(key[:4], last), len(repr(value)), repr(value))
@@ -365,8 +364,7 @@ class MP4Tags(DictProxy, Tags):
self.__parse_text(atom, data, implicit=False)
except MP4MetadataError:
# parsing failed, save them so we can write them back
key = _name2key(atom.name)
self._failed_atoms.setdefault(key, []).append(data)
self._failed_atoms.setdefault(_name2key(atom.name), []).append(data)
def __setitem__(self, key, value):
if not isinstance(key, str):
@@ -521,10 +519,13 @@ class MP4Tags(DictProxy, Tags):
fileobj.seek(atom.offset + 12)
data = fileobj.read(atom.length - 12)
fmt = fmt % cdata.uint_be(data[:4])
offsets = struct.unpack(fmt, data[4:])
offsets = [o + (0, delta)[offset < o] for o in offsets]
fileobj.seek(atom.offset + 16)
fileobj.write(struct.pack(fmt, *offsets))
try:
offsets = struct.unpack(fmt, data[4:])
offsets = [o + (0, delta)[offset < o] for o in offsets]
fileobj.seek(atom.offset + 16)
fileobj.write(struct.pack(fmt, *offsets))
except struct.error:
raise MP4MetadataError("wrong offset inside %r" % atom.name)
def __update_tfhd(self, fileobj, atom, delta, offset):
if atom.offset > offset:

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
@@ -204,7 +203,7 @@ class DescriptorError(Exception):
class BaseDescriptor(object):
TAG = None
TAG: int
@classmethod
def _parse_desc_length_file(cls, fileobj):

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Lukas Lalinsky
# Copyright (C) 2012 Christoph Reiter
#

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -20,10 +19,13 @@ import struct
import sys
import zlib
from io import BytesIO
from typing import Type
from mutagen import FileType
from mutagen._util import cdata, resize_bytes, MutagenError, loadfile, \
seek_end, bchr, reraise
from mutagen._file import StreamInfo
from mutagen._tags import Tags
class error(MutagenError):
@@ -165,7 +167,7 @@ class OggPage(object):
return data
@property
def size(self):
def size(self) -> int:
"""Total frame size."""
size = 27 # Initial header size
@@ -211,7 +213,7 @@ class OggPage(object):
to logical stream 'serial'. Other pages will be ignored.
fileobj must point to the start of a valid Ogg page; any
occuring after it and part of the specified logical stream
occurring after it and part of the specified logical stream
will be numbered. No adjustment will be made to the data in
the pages nor the granule position; only the page number, and
so also the CRC.
@@ -427,7 +429,7 @@ class OggPage(object):
new_data_end = offset + data_size
offset_adjust += (data_size - old_page.size)
# Finally, if there's any discrepency in length, we need to
# Finally, if there's any discrepancy in length, we need to
# renumber the pages for the logical stream.
if len(old_pages) != len(new_pages):
fileobj.seek(new_data_end, 0)
@@ -508,9 +510,9 @@ class OggFileType(FileType):
filething (filething)
"""
_Info = None
_Tags = None
_Error = None
_Info: Type[StreamInfo]
_Tags: Type[Tags]
_Error: Type[error]
_mimes = ["application/ogg", "application/x-ogg"]
@loadfile()

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012, 2013 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Lukas Lalinsky
#
# This program is free software; you can redistribute it and/or modify

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
@@ -9,6 +8,7 @@
"""Standard MIDI File (SMF)"""
import struct
from typing import Tuple
from mutagen import StreamInfo, MutagenError
from mutagen._file import FileType
@@ -19,7 +19,7 @@ class SMFError(MutagenError):
pass
def _var_int(data, offset=0):
def _var_int(data: bytearray, offset: int = 0) -> Tuple[int, int]:
val = 0
while 1:
try:
@@ -33,12 +33,12 @@ def _var_int(data, offset=0):
def _read_track(chunk):
"""Retuns a list of midi events and tempo change events"""
"""Returns a list of midi events and tempo change events"""
TEMPO, MIDI = range(2)
# Deviations: The running status should be reset on non midi events, but
# some files contain meta events inbetween.
# some files contain meta events in between.
# TODO: Offset and time signature are not considered.
tempos = []

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2008 Lukáš Lalinský
# Copyright (C) 2019 Philipp Wolfer
#
@@ -163,6 +162,7 @@ class TAKInfo(StreamInfo):
raise TAKHeaderError("not a TAK file")
bitreader = _LSBBitReader(fileobj)
found_stream_info = False
while True:
type = TAKMetadata(bitreader.bits(7))
bitreader.skip(1) # Unused
@@ -174,12 +174,16 @@ class TAKInfo(StreamInfo):
break
elif type == TAKMetadata.STREAM_INFO:
self._parse_stream_info(bitreader, size)
found_stream_info = True
elif type == TAKMetadata.ENCODER_INFO:
self._parse_encoder_info(bitreader, data_size)
assert bitreader.is_aligned()
fileobj.seek(pos + size)
if not found_stream_info:
raise TAKHeaderError("missing stream info")
if self.sample_rate > 0:
self.length = self.number_of_samples / float(self.sample_rate)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2006 Joe Wreschnig
#
# This program is free software; you can redistribute it and/or modify
@@ -10,7 +9,7 @@
True Audio is a lossless format designed for real-time encoding and
decoding. This module is based on the documentation at
http://www.true-audio.com/TTA_Lossless_Audio_Codec\\_-_Format_Description
http://tausoft.org/wiki/True_Audio_Codec_Format
True Audio files use ID3 tags.
"""
@@ -48,9 +47,11 @@ class TrueAudioInfo(StreamInfo):
header = fileobj.read(18)
if len(header) != 18 or not header.startswith(b"TTA"):
raise TrueAudioHeaderError("TTA header not found")
self.sample_rate = cdata.int_le(header[10:14])
self.sample_rate = cdata.uint_le(header[10:14])
samples = cdata.uint_le(header[14:18])
self.length = float(samples) / self.sample_rate
self.length = 0.0
if self.sample_rate != 0:
self.length = float(samples) / self.sample_rate
def pprint(self):
return u"True Audio, %.2f seconds, %d Hz." % (
@@ -98,4 +99,4 @@ class EasyTrueAudio(TrueAudio):
"""
from mutagen.easyid3 import EasyID3 as ID3
ID3 = ID3
ID3 = ID3 # type: ignore

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Borewit
# Copyright (C) 2019-2020 Philipp Wolfer
#
@@ -86,10 +85,10 @@ class WaveStreamInfo(StreamInfo):
# RIFF: http://soundfile.sapp.org/doc/WaveFormat/
# Python struct.unpack:
# https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
info = struct.unpack('<hhLLhh', data[:self.SIZE])
info = struct.unpack('<HHLLHH', data[:self.SIZE])
self.audio_format, self.channels, self.sample_rate, byte_rate, \
block_align, self.bits_per_sample = info
self.bitrate = self.channels * block_align * self.sample_rate
self.bitrate = self.channels * self.bits_per_sample * self.sample_rate
# Calculate duration
self._number_of_samples = 0

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2006 Joe Wreschnig
# 2014 Christoph Reiter
#