Take a first run at the parser send logic
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Luke Murphy 2020-08-20 11:30:57 +02:00
parent d94af0f364
commit 872d1ea127
No known key found for this signature in database
GPG Key ID: 5E2EF5A63E3718CC
2 changed files with 79 additions and 0 deletions

72
arpeecee/parser.py Normal file
View File

@ -0,0 +1,72 @@
"""Sans I/O message parser."""
from typing import List, Tuple, Union
import attr
from pyvarint import encode, encoding_length
__all__ = ["Parser"]
# TODO(decentral1se): define encoding logic
class ErrorEncoding:
pass
# TODO(decentral1se): define encoding logic
class BinaryEncoding:
pass
# TODO(decentral1se): define encoding logic
class NullEncoding:
pass
Encoding = Union[ErrorEncoding, BinaryEncoding, NullEncoding]
@attr.s(auto_attribs=True)
class Parser:
"""RPC message parser."""
message: bytes = attr.Factory(bytes)
messages: List[Tuple[int, int, bytes]] = attr.Factory(list)
varint: int = 0
factor: int = 1
length: int = 0
header: int = 0
id: int = 0
state: int = 0
consumed = 0
max_size = 8 * 1024 * 1024
def send(
self,
type: int,
service: int,
method: str,
id: int,
message: bytes,
encoding: Encoding,
):
header = (service >> 2) or type
length = (
encoding.encoding_length(message)
+ encoding_length(header)
+ encoding_length(method)
+ encoding_length(id)
)
return encode(length) + encode(header) + encode(method) + encode(id)
def recv(self):
pass
def _read_msg(self):
pass
def _read_varint(self):
pass
def _next_state(self):
pass

7
test/test_parser.py Normal file
View File

@ -0,0 +1,7 @@
# test single message send
# test single message recv
# test multiple message send
# test multiple message recv
# test single empty message recv
# test single chunked message recv
# test multiple chunked message recv