arpeecee/arpeecee/parser.py

73 lines
1.4 KiB
Python

"""Sans I/O message parser."""
from typing import List, Tuple, Union
import attr
from pyvarint import encode, encoding_length
__all__ = ["Parser"]
# TODO(decentral1se): define encoding logic
class ErrorEncoding:
pass
# TODO(decentral1se): define encoding logic
class BinaryEncoding:
pass
# TODO(decentral1se): define encoding logic
class NullEncoding:
pass
Encoding = Union[ErrorEncoding, BinaryEncoding, NullEncoding]
@attr.s(auto_attribs=True)
class Parser:
"""RPC message parser."""
message: bytes = attr.Factory(bytes)
messages: List[Tuple[int, int, bytes]] = attr.Factory(list)
varint: int = 0
factor: int = 1
length: int = 0
header: int = 0
id: int = 0
state: int = 0
consumed = 0
max_size = 8 * 1024 * 1024
def send(
self,
type: int,
service: int,
method: str,
id: int,
message: bytes,
encoding: Encoding,
):
header = (service >> 2) or type
length = (
encoding.encoding_length(message)
+ encoding_length(header)
+ encoding_length(method)
+ encoding_length(id)
)
return encode(length) + encode(header) + encode(method) + encode(id)
def recv(self):
pass
def _read_msg(self):
pass
def _read_varint(self):
pass
def _next_state(self):
pass