Files
sdk/python/blyss/varint.py
2023-02-07 15:21:24 -08:00

70 lines
1.5 KiB
Python

"""Varint encoder/decoder
varints are a common encoding for variable length integer data, used in
libraries such as sqlite, protobuf, v8, and more.
"""
from io import BytesIO
from typing import BinaryIO
def _byte(b: int) -> bytes:
return bytes((b,))
def encode(number: int) -> bytes:
"""Encode the given number as a varint.
Args:
number (int): An integer to encode.
Returns:
bytes: An encoding of the supplied integer as a varint.
"""
buf = b""
while True:
towrite = number & 0x7F
number >>= 7
if number:
buf += _byte(towrite | 0x80)
else:
buf += _byte(towrite)
break
return buf
def decode_stream(stream: BinaryIO) -> int:
"""Decode a varint from a stream.
Args:
stream (BinaryIO): A read()-able object to read a varint from.
Returns:
int: The value read from the stream.
"""
shift = 0
result = 0
while True:
i = _read_one(stream)
result |= (i & 0x7F) << shift
shift += 7
if not (i & 0x80):
break
return result
def decode_bytes(buf: bytes):
"""Read a varint from from `buf` bytes"""
return decode_stream(BytesIO(buf))
def _read_one(stream: BinaryIO):
"""Read a byte from the file (as an integer)
raises EOFError if the stream ends while reading bytes.
"""
c = stream.read(1)
if c == b"":
raise EOFError("Unexpected EOF while reading bytes")
return ord(c)