jtftp/test/test_datagram.py
2022-07-07 12:37:41 -05:00

214 lines
6.6 KiB
Python

# JTFTP - Python/AsyncIO TFTP Server
# Copyright (C) 2022 Jeffrey C. Ollie
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import OrderedDict
from hypothesis import given
from hypothesis.strategies import integers
from jtftp.datagram import RQDatagram
from jtftp.datagram import RRQDatagram
from jtftp.datagram import TFTPError
from jtftp.datagram import TFTPMode
from jtftp.datagram import TFTPOpcode
from jtftp.datagram import TFTPOption
from jtftp.datagram import datagram_factory
from jtftp.datagram import decode_options
from jtftp.datagram import split_opcode
from jtftp.errors import InvalidOpcodeError
from jtftp.errors import OptionsDecodeError
from jtftp.errors import PayloadDecodeError
from jtftp.errors import WireProtocolError
from pytest import raises
@given(integers(min_value=1, max_value=6))
def test_tftp_opcode_normal(opcode: int):
assert TFTPOpcode(opcode).value == opcode
def test_tftp_opcode_abnormal():
with raises(ValueError):
TFTPOpcode(7)
@given(integers(min_value=0, max_value=8))
def test_tftperror_normal(error_code: int):
assert TFTPError(error_code).value == error_code
def test_option_wrong_case():
assert TFTPOption(b"BLKSIZE") == TFTPOption.BLOCKSIZE
def test_option_wrong_type():
assert TFTPOption("blksize") == TFTPOption.BLOCKSIZE
def test_mode_wrong_case():
assert TFTPMode(b"OCTET") == TFTPMode.OCTET
def test_mode_wrong_type():
assert TFTPMode("octet") == TFTPMode.OCTET
def test_datagram_zero_length():
with raises(WireProtocolError):
split_opcode(b"")
def test_datagram_incomplete_opcode():
with raises(WireProtocolError):
split_opcode(b"\x00")
def test_datagram_invalid_opcode():
with raises(InvalidOpcodeError):
split_opcode(b"\x00\xff")
def test_datagram_empty_payload():
assert split_opcode(b"\x00\x01") == (1, b"")
def test_datagram_non_empty_payload():
assert split_opcode(b"\x00\x01foo") == (1, b"foo")
def test_datagram_unknown_opcode():
with raises(WireProtocolError):
datagram_factory(b"\x00\x0ffoobar")
def test_rq_datagram():
with raises(WireProtocolError):
RQDatagram.from_wire(b"foobar")
def test_rq_datagram_invalid_mode():
with raises(PayloadDecodeError):
RQDatagram.from_wire(b"foo\x00bar\x00")
def test_rq_datagram_valid_mode():
dgram = RQDatagram.from_wire(b"foo\x00octet")
dgram.opcode = TFTPOpcode.RRQ
assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00"
def test_decode_options_missing_value():
with raises(OptionsDecodeError):
decode_options([b"blksize"])
def test_decode_options_duplicate_value():
with raises(OptionsDecodeError):
decode_options([b"blksize", b"1024", b"blksize", b"2048"])
def test_rq_datagam_invalid_option_name():
with raises(PayloadDecodeError):
RQDatagram.from_wire(b"foo\x00octet\x00spam\x00baz\x00")
def test_rq_datagam_blksize_invalid_option_value_1():
with raises(PayloadDecodeError):
RQDatagram.from_wire(b"foo\x00octet\x00blksize\x00baz\x00")
def test_rq_datagam_blksize_invalid_option_value_2():
with raises(OptionsDecodeError):
RQDatagram.from_wire(b"foo\x00octet\x00blksize\x001\x00")
def test_rq_datagam_blksize_invalid_option_value_3():
with raises(OptionsDecodeError):
RQDatagram.from_wire(b"foo\x00octet\x00blksize\x0072384\x00")
def test_rq_datagram_blksize_valid_option():
dgram = RQDatagram.from_wire(b"foo\x00octet\x00blksize\x001024\x00")
assert dgram.filename == b"foo"
assert dgram.mode == TFTPMode.OCTET
assert dgram.options == OrderedDict({TFTPOption.BLOCKSIZE: 1024})
def test_rq_datagam_timeout_invalid_option_value_1():
with raises(PayloadDecodeError):
RQDatagram.from_wire(b"foo\x00octet\x00timeout\x000\x00")
def test_rq_datagam_timeout_invalid_option_value_2():
with raises(OptionsDecodeError):
RQDatagram.from_wire(b"foo\x00octet\x00timeout\x00-1\x00")
def test_rq_datagram_timeout_valid_option():
dgram = RQDatagram.from_wire(b"foo\x00octet\x00timeout\x005\x00")
assert dgram.filename == b"foo"
assert dgram.mode == TFTPMode.OCTET
assert dgram.options == OrderedDict({TFTPOption.TIMEOUT: 5})
def test_rq_datagam_tsize_invalid_option_value_1():
with raises(PayloadDecodeError):
RQDatagram.from_wire(b"foo\x00octet\x00tsize\x00-1\x00")
def test_rq_datagram_timeout_valid_option_1():
dgram = RQDatagram.from_wire(b"foo\x00octet\x00tsize\x000\x00")
assert dgram.filename == b"foo"
assert dgram.mode == TFTPMode.OCTET
assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 0})
def test_rq_datagram_timeout_valid_option_2():
dgram = RQDatagram.from_wire(b"foo\x00octet\x00tsize\x00512\x00")
assert dgram.filename == b"foo"
assert dgram.mode == TFTPMode.OCTET
assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
def test_rrq_datagram_to_wire_1():
dgram = RRQDatagram(
b"foo", TFTPMode.OCTET, OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
)
assert dgram.opcode == TFTPOpcode.RRQ
assert dgram.filename == b"foo"
assert dgram.mode == TFTPMode.OCTET
assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00tsize\x00512\x00"
def test_rrq_datagram_to_wire_2():
with raises(AssertionError):
dgram = RRQDatagram(b"foo", TFTPMode.OCTET, OrderedDict({b"foo": 512}))
# assert dgram.opcode == TFTPOpcode.RRQ
# assert dgram.filename == b"foo"
# assert dgram.mode == TFTPMode.OCTET
# assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
# assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00tsize\x00512\x00"
def test_rrq_datagram_to_wire_3():
dgram = RRQDatagram(b"foo", TFTPMode.OCTET, OrderedDict({}))
dgram.options = OrderedDict({b"baz": 512})
assert dgram.opcode == TFTPOpcode.RRQ
assert dgram.filename == b"foo"
assert dgram.mode == TFTPMode.OCTET
assert dgram.options == OrderedDict({b"baz": 512})
with raises(WireProtocolError):
dgram.to_wire()