Files
headphones/lib/mutagen/dsdiff.py
2022-02-07 09:30:17 +05:30

268 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2020 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
"""DSDIFF audio stream information and tags."""
import struct
from mutagen import StreamInfo
from mutagen._file import FileType
from mutagen._iff import (
IffChunk,
IffContainerChunkMixin,
IffID3,
IffFile,
InvalidChunk,
error as IffError,
)
from mutagen.id3._util import ID3NoHeaderError, error as ID3Error
from mutagen._util import (
convert_error,
loadfile,
endswith,
)
__all__ = ["DSDIFF", "Open", "delete"]
class error(IffError):
pass
# See
# https://dsd-guide.com/sites/default/files/white-papers/DSDIFF_1.5_Spec.pdf
class DSDIFFChunk(IffChunk):
"""Representation of a single DSDIFF chunk"""
HEADER_SIZE = 12
@classmethod
def parse_header(cls, header):
return struct.unpack('>4sQ', header)
@classmethod
def get_class(cls, id):
if id in DSDIFFListChunk.LIST_CHUNK_IDS:
return DSDIFFListChunk
elif id == 'DST':
return DSTChunk
else:
return cls
def write_new_header(self, id_, size):
self._fileobj.write(struct.pack('>4sQ', id_, size))
def write_size(self):
self._fileobj.write(struct.pack('>Q', self.data_size))
class DSDIFFListChunk(DSDIFFChunk, IffContainerChunkMixin):
"""A DSDIFF chunk containing other chunks.
"""
LIST_CHUNK_IDS = ['FRM8', 'PROP']
def parse_next_subchunk(self):
return DSDIFFChunk.parse(self._fileobj, self)
def __init__(self, fileobj, id, data_size, parent_chunk):
if id not in self.LIST_CHUNK_IDS:
raise InvalidChunk('Not a list chunk: %s' % id)
DSDIFFChunk.__init__(self, fileobj, id, data_size, parent_chunk)
self.init_container()
class DSTChunk(DSDIFFChunk, IffContainerChunkMixin):
"""A DSDIFF chunk containing other chunks.
"""
def parse_next_subchunk(self):
return DSDIFFChunk.parse(self._fileobj, self)
def __init__(self, fileobj, id, data_size, parent_chunk):
if id != 'DST':
raise InvalidChunk('Not a DST chunk: %s' % id)
DSDIFFChunk.__init__(self, fileobj, id, data_size, parent_chunk)
self.init_container(name_size=0)
class DSDIFFFile(IffFile):
"""Representation of a DSDIFF file"""
def __init__(self, fileobj):
super().__init__(DSDIFFChunk, fileobj)
if self.root.id != u'FRM8':
raise InvalidChunk("Root chunk must be a FRM8 chunk, got %r"
% self.root)
class DSDIFFInfo(StreamInfo):
"""DSDIFF stream information.
Attributes:
channels (`int`): number of audio channels
length (`float`): file length in seconds, as a float
sample_rate (`int`): audio sampling rate in Hz
bits_per_sample (`int`): audio sample size (for DSD this is always 1)
bitrate (`int`): audio bitrate, in bits per second
compression (`str`): DSD (uncompressed) or DST
"""
channels = 0
length = 0
sample_rate = 0
bits_per_sample = 1
bitrate = 0
compression = None
@convert_error(IOError, error)
def __init__(self, fileobj):
"""Raises error"""
iff = DSDIFFFile(fileobj)
try:
prop_chunk = iff['PROP']
except KeyError as e:
raise error(str(e))
if prop_chunk.name == 'SND ':
for chunk in prop_chunk.subchunks():
if chunk.id == 'FS' and chunk.data_size == 4:
data = chunk.read()
if len(data) < 4:
raise InvalidChunk("Not enough data in FS chunk")
self.sample_rate, = struct.unpack('>L', data[:4])
elif chunk.id == 'CHNL' and chunk.data_size >= 2:
data = chunk.read()
if len(data) < 2:
raise InvalidChunk("Not enough data in CHNL chunk")
self.channels, = struct.unpack('>H', data[:2])
elif chunk.id == 'CMPR' and chunk.data_size >= 4:
data = chunk.read()
if len(data) < 4:
raise InvalidChunk("Not enough data in CMPR chunk")
compression_id, = struct.unpack('>4s', data[:4])
self.compression = compression_id.decode('ascii').rstrip()
if self.sample_rate < 0:
raise error("Invalid sample rate")
if self.compression == 'DSD': # not compressed
try:
dsd_chunk = iff['DSD']
except KeyError as e:
raise error(str(e))
# DSD data has one bit per sample. Eight samples of a channel
# are clustered together for a channel byte. For multiple channels
# the channel bytes are interleaved (in the order specified in the
# CHNL chunk). See DSDIFF spec chapter 3.3.
sample_count = dsd_chunk.data_size * 8 / (self.channels or 1)
if self.sample_rate != 0:
self.length = sample_count / float(self.sample_rate)
self.bitrate = (self.channels * self.bits_per_sample
* self.sample_rate)
elif self.compression == 'DST':
try:
dst_frame = iff['DST']
dst_frame_info = dst_frame['FRTE']
except KeyError as e:
raise error(str(e))
if dst_frame_info.data_size >= 6:
data = dst_frame_info.read()
if len(data) < 6:
raise InvalidChunk("Not enough data in FRTE chunk")
frame_count, frame_rate = struct.unpack('>LH', data[:6])
if frame_rate:
self.length = frame_count / frame_rate
if frame_count:
dst_data_size = dst_frame.data_size - dst_frame_info.size
avg_frame_size = dst_data_size / frame_count
self.bitrate = avg_frame_size * 8 * frame_rate
def pprint(self):
return u"%d channel DSDIFF (%s) @ %d bps, %s Hz, %.2f seconds" % (
self.channels, self.compression, self.bitrate, self.sample_rate,
self.length)
class _DSDIFFID3(IffID3):
"""A DSDIFF file with ID3v2 tags"""
def _load_file(self, fileobj):
return DSDIFFFile(fileobj)
@convert_error(IOError, error)
@loadfile(method=False, writable=True)
def delete(filething):
"""Completely removes the ID3 chunk from the DSDIFF file"""
try:
del DSDIFFFile(filething.fileobj)[u'ID3']
except KeyError:
pass
class DSDIFF(FileType):
"""DSDIFF(filething)
An DSDIFF audio file.
For tagging ID3v2 data is added to a chunk with the ID "ID3 ".
Arguments:
filething (filething)
Attributes:
tags (`mutagen.id3.ID3`)
info (`DSDIFFInfo`)
"""
_mimes = ["audio/x-dff"]
@convert_error(IOError, error)
@loadfile()
def load(self, filething, **kwargs):
fileobj = filething.fileobj
try:
self.tags = _DSDIFFID3(fileobj, **kwargs)
except ID3NoHeaderError:
self.tags = None
except ID3Error as e:
raise error(e)
else:
self.tags.filename = self.filename
fileobj.seek(0, 0)
self.info = DSDIFFInfo(fileobj)
def add_tags(self):
"""Add empty ID3 tags to the file."""
if self.tags is None:
self.tags = _DSDIFFID3()
else:
raise error("an ID3 tag already exists")
@staticmethod
def score(filename, fileobj, header):
return header.startswith(b"FRM8") * 2 + endswith(filename, ".dff")
Open = DSDIFF