mirror of
https://github.com/rembo10/headphones.git
synced 2026-01-15 01:37:59 -05:00
268 lines
7.8 KiB
Python
268 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (C) 2020 Philipp Wolfer
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
"""DSDIFF audio stream information and tags."""
|
|
|
|
import struct
|
|
|
|
from mutagen import StreamInfo
|
|
from mutagen._file import FileType
|
|
from mutagen._iff import (
|
|
IffChunk,
|
|
IffContainerChunkMixin,
|
|
IffID3,
|
|
IffFile,
|
|
InvalidChunk,
|
|
error as IffError,
|
|
)
|
|
from mutagen.id3._util import ID3NoHeaderError, error as ID3Error
|
|
from mutagen._util import (
|
|
convert_error,
|
|
loadfile,
|
|
endswith,
|
|
)
|
|
|
|
|
|
__all__ = ["DSDIFF", "Open", "delete"]
|
|
|
|
|
|
class error(IffError):
|
|
pass
|
|
|
|
|
|
# See
|
|
# https://dsd-guide.com/sites/default/files/white-papers/DSDIFF_1.5_Spec.pdf
|
|
class DSDIFFChunk(IffChunk):
|
|
"""Representation of a single DSDIFF chunk"""
|
|
|
|
HEADER_SIZE = 12
|
|
|
|
@classmethod
|
|
def parse_header(cls, header):
|
|
return struct.unpack('>4sQ', header)
|
|
|
|
@classmethod
|
|
def get_class(cls, id):
|
|
if id in DSDIFFListChunk.LIST_CHUNK_IDS:
|
|
return DSDIFFListChunk
|
|
elif id == 'DST':
|
|
return DSTChunk
|
|
else:
|
|
return cls
|
|
|
|
def write_new_header(self, id_, size):
|
|
self._fileobj.write(struct.pack('>4sQ', id_, size))
|
|
|
|
def write_size(self):
|
|
self._fileobj.write(struct.pack('>Q', self.data_size))
|
|
|
|
|
|
class DSDIFFListChunk(DSDIFFChunk, IffContainerChunkMixin):
|
|
"""A DSDIFF chunk containing other chunks.
|
|
"""
|
|
|
|
LIST_CHUNK_IDS = ['FRM8', 'PROP']
|
|
|
|
def parse_next_subchunk(self):
|
|
return DSDIFFChunk.parse(self._fileobj, self)
|
|
|
|
def __init__(self, fileobj, id, data_size, parent_chunk):
|
|
if id not in self.LIST_CHUNK_IDS:
|
|
raise InvalidChunk('Not a list chunk: %s' % id)
|
|
|
|
DSDIFFChunk.__init__(self, fileobj, id, data_size, parent_chunk)
|
|
self.init_container()
|
|
|
|
|
|
class DSTChunk(DSDIFFChunk, IffContainerChunkMixin):
|
|
"""A DSDIFF chunk containing other chunks.
|
|
"""
|
|
|
|
def parse_next_subchunk(self):
|
|
return DSDIFFChunk.parse(self._fileobj, self)
|
|
|
|
def __init__(self, fileobj, id, data_size, parent_chunk):
|
|
if id != 'DST':
|
|
raise InvalidChunk('Not a DST chunk: %s' % id)
|
|
|
|
DSDIFFChunk.__init__(self, fileobj, id, data_size, parent_chunk)
|
|
self.init_container(name_size=0)
|
|
|
|
|
|
class DSDIFFFile(IffFile):
|
|
"""Representation of a DSDIFF file"""
|
|
|
|
def __init__(self, fileobj):
|
|
super().__init__(DSDIFFChunk, fileobj)
|
|
|
|
if self.root.id != u'FRM8':
|
|
raise InvalidChunk("Root chunk must be a FRM8 chunk, got %r"
|
|
% self.root)
|
|
|
|
|
|
class DSDIFFInfo(StreamInfo):
|
|
|
|
"""DSDIFF stream information.
|
|
|
|
Attributes:
|
|
channels (`int`): number of audio channels
|
|
length (`float`): file length in seconds, as a float
|
|
sample_rate (`int`): audio sampling rate in Hz
|
|
bits_per_sample (`int`): audio sample size (for DSD this is always 1)
|
|
bitrate (`int`): audio bitrate, in bits per second
|
|
compression (`str`): DSD (uncompressed) or DST
|
|
"""
|
|
|
|
channels = 0
|
|
length = 0
|
|
sample_rate = 0
|
|
bits_per_sample = 1
|
|
bitrate = 0
|
|
compression = None
|
|
|
|
@convert_error(IOError, error)
|
|
def __init__(self, fileobj):
|
|
"""Raises error"""
|
|
|
|
iff = DSDIFFFile(fileobj)
|
|
try:
|
|
prop_chunk = iff['PROP']
|
|
except KeyError as e:
|
|
raise error(str(e))
|
|
|
|
if prop_chunk.name == 'SND ':
|
|
for chunk in prop_chunk.subchunks():
|
|
if chunk.id == 'FS' and chunk.data_size == 4:
|
|
data = chunk.read()
|
|
if len(data) < 4:
|
|
raise InvalidChunk("Not enough data in FS chunk")
|
|
self.sample_rate, = struct.unpack('>L', data[:4])
|
|
elif chunk.id == 'CHNL' and chunk.data_size >= 2:
|
|
data = chunk.read()
|
|
if len(data) < 2:
|
|
raise InvalidChunk("Not enough data in CHNL chunk")
|
|
self.channels, = struct.unpack('>H', data[:2])
|
|
elif chunk.id == 'CMPR' and chunk.data_size >= 4:
|
|
data = chunk.read()
|
|
if len(data) < 4:
|
|
raise InvalidChunk("Not enough data in CMPR chunk")
|
|
compression_id, = struct.unpack('>4s', data[:4])
|
|
self.compression = compression_id.decode('ascii').rstrip()
|
|
|
|
if self.sample_rate < 0:
|
|
raise error("Invalid sample rate")
|
|
|
|
if self.compression == 'DSD': # not compressed
|
|
try:
|
|
dsd_chunk = iff['DSD']
|
|
except KeyError as e:
|
|
raise error(str(e))
|
|
|
|
# DSD data has one bit per sample. Eight samples of a channel
|
|
# are clustered together for a channel byte. For multiple channels
|
|
# the channel bytes are interleaved (in the order specified in the
|
|
# CHNL chunk). See DSDIFF spec chapter 3.3.
|
|
sample_count = dsd_chunk.data_size * 8 / (self.channels or 1)
|
|
|
|
if self.sample_rate != 0:
|
|
self.length = sample_count / float(self.sample_rate)
|
|
|
|
self.bitrate = (self.channels * self.bits_per_sample
|
|
* self.sample_rate)
|
|
elif self.compression == 'DST':
|
|
try:
|
|
dst_frame = iff['DST']
|
|
dst_frame_info = dst_frame['FRTE']
|
|
except KeyError as e:
|
|
raise error(str(e))
|
|
|
|
if dst_frame_info.data_size >= 6:
|
|
data = dst_frame_info.read()
|
|
if len(data) < 6:
|
|
raise InvalidChunk("Not enough data in FRTE chunk")
|
|
frame_count, frame_rate = struct.unpack('>LH', data[:6])
|
|
if frame_rate:
|
|
self.length = frame_count / frame_rate
|
|
|
|
if frame_count:
|
|
dst_data_size = dst_frame.data_size - dst_frame_info.size
|
|
avg_frame_size = dst_data_size / frame_count
|
|
self.bitrate = avg_frame_size * 8 * frame_rate
|
|
|
|
def pprint(self):
|
|
return u"%d channel DSDIFF (%s) @ %d bps, %s Hz, %.2f seconds" % (
|
|
self.channels, self.compression, self.bitrate, self.sample_rate,
|
|
self.length)
|
|
|
|
|
|
class _DSDIFFID3(IffID3):
|
|
"""A DSDIFF file with ID3v2 tags"""
|
|
|
|
def _load_file(self, fileobj):
|
|
return DSDIFFFile(fileobj)
|
|
|
|
|
|
@convert_error(IOError, error)
|
|
@loadfile(method=False, writable=True)
|
|
def delete(filething):
|
|
"""Completely removes the ID3 chunk from the DSDIFF file"""
|
|
|
|
try:
|
|
del DSDIFFFile(filething.fileobj)[u'ID3']
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
class DSDIFF(FileType):
|
|
"""DSDIFF(filething)
|
|
|
|
An DSDIFF audio file.
|
|
|
|
For tagging ID3v2 data is added to a chunk with the ID "ID3 ".
|
|
|
|
Arguments:
|
|
filething (filething)
|
|
|
|
Attributes:
|
|
tags (`mutagen.id3.ID3`)
|
|
info (`DSDIFFInfo`)
|
|
"""
|
|
|
|
_mimes = ["audio/x-dff"]
|
|
|
|
@convert_error(IOError, error)
|
|
@loadfile()
|
|
def load(self, filething, **kwargs):
|
|
fileobj = filething.fileobj
|
|
|
|
try:
|
|
self.tags = _DSDIFFID3(fileobj, **kwargs)
|
|
except ID3NoHeaderError:
|
|
self.tags = None
|
|
except ID3Error as e:
|
|
raise error(e)
|
|
else:
|
|
self.tags.filename = self.filename
|
|
|
|
fileobj.seek(0, 0)
|
|
self.info = DSDIFFInfo(fileobj)
|
|
|
|
def add_tags(self):
|
|
"""Add empty ID3 tags to the file."""
|
|
if self.tags is None:
|
|
self.tags = _DSDIFFID3()
|
|
else:
|
|
raise error("an ID3 tag already exists")
|
|
|
|
@staticmethod
|
|
def score(filename, fileobj, header):
|
|
return header.startswith(b"FRM8") * 2 + endswith(filename, ".dff")
|
|
|
|
|
|
Open = DSDIFF
|