mirror of
https://github.com/blyssprivacy/sdk.git
synced 2026-01-13 09:07:58 -05:00
70 lines
1.5 KiB
Python
70 lines
1.5 KiB
Python
"""Varint encoder/decoder
|
|
|
|
varints are a common encoding for variable length integer data, used in
|
|
libraries such as sqlite, protobuf, v8, and more.
|
|
"""
|
|
|
|
from io import BytesIO
|
|
from typing import BinaryIO
|
|
|
|
|
|
def _byte(b: int) -> bytes:
|
|
return bytes((b,))
|
|
|
|
|
|
def encode(number: int) -> bytes:
|
|
"""Encode the given number as a varint.
|
|
|
|
Args:
|
|
number (int): An integer to encode.
|
|
|
|
Returns:
|
|
bytes: An encoding of the supplied integer as a varint.
|
|
"""
|
|
buf = b""
|
|
while True:
|
|
towrite = number & 0x7F
|
|
number >>= 7
|
|
if number:
|
|
buf += _byte(towrite | 0x80)
|
|
else:
|
|
buf += _byte(towrite)
|
|
break
|
|
return buf
|
|
|
|
|
|
def decode_stream(stream: BinaryIO) -> int:
|
|
"""Decode a varint from a stream.
|
|
|
|
Args:
|
|
stream (BinaryIO): A read()-able object to read a varint from.
|
|
|
|
Returns:
|
|
int: The value read from the stream.
|
|
"""
|
|
shift = 0
|
|
result = 0
|
|
while True:
|
|
i = _read_one(stream)
|
|
result |= (i & 0x7F) << shift
|
|
shift += 7
|
|
if not (i & 0x80):
|
|
break
|
|
|
|
return result
|
|
|
|
|
|
def decode_bytes(buf: bytes):
|
|
"""Read a varint from from `buf` bytes"""
|
|
return decode_stream(BytesIO(buf))
|
|
|
|
|
|
def _read_one(stream: BinaryIO):
|
|
"""Read a byte from the file (as an integer)
|
|
raises EOFError if the stream ends while reading bytes.
|
|
"""
|
|
c = stream.read(1)
|
|
if c == b"":
|
|
raise EOFError("Unexpected EOF while reading bytes")
|
|
return ord(c)
|