# JTFTP - Python/AsyncIO TFTP Server # Copyright (C) 2022 Jeffrey C. Ollie # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from typing import OrderedDict from hypothesis import given from hypothesis.strategies import integers from jtftp.datagram import RQDatagram from jtftp.datagram import RRQDatagram from jtftp.datagram import TFTPError from jtftp.datagram import TFTPMode from jtftp.datagram import TFTPOpcode from jtftp.datagram import TFTPOption from jtftp.datagram import datagram_factory from jtftp.datagram import decode_options from jtftp.datagram import split_opcode from jtftp.errors import InvalidOpcodeError from jtftp.errors import OptionsDecodeError from jtftp.errors import PayloadDecodeError from jtftp.errors import WireProtocolError from pytest import raises @given(integers(min_value=1, max_value=6)) def test_tftp_opcode_normal(opcode: int): assert TFTPOpcode(opcode).value == opcode def test_tftp_opcode_abnormal(): with raises(ValueError): TFTPOpcode(7) @given(integers(min_value=0, max_value=8)) def test_tftperror_normal(error_code: int): assert TFTPError(error_code).value == error_code def test_option_wrong_case(): assert TFTPOption(b"BLKSIZE") == TFTPOption.BLOCKSIZE def test_option_wrong_type(): assert TFTPOption("blksize") == TFTPOption.BLOCKSIZE def test_mode_wrong_case(): assert TFTPMode(b"OCTET") == TFTPMode.OCTET def test_mode_wrong_type(): assert TFTPMode("octet") == TFTPMode.OCTET def test_datagram_zero_length(): with raises(WireProtocolError): split_opcode(b"") def test_datagram_incomplete_opcode(): with raises(WireProtocolError): split_opcode(b"\x00") def test_datagram_invalid_opcode(): with raises(InvalidOpcodeError): split_opcode(b"\x00\xff") def test_datagram_empty_payload(): assert split_opcode(b"\x00\x01") == (1, b"") def test_datagram_non_empty_payload(): assert split_opcode(b"\x00\x01foo") == (1, b"foo") def test_datagram_unknown_opcode(): with raises(WireProtocolError): datagram_factory(b"\x00\x0ffoobar") def test_rq_datagram(): with raises(WireProtocolError): RQDatagram.from_wire(b"foobar") def test_rq_datagram_invalid_mode(): with raises(PayloadDecodeError): RQDatagram.from_wire(b"foo\x00bar\x00") def test_rq_datagram_valid_mode(): dgram = RQDatagram.from_wire(b"foo\x00octet") dgram.opcode = TFTPOpcode.RRQ assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00" def test_decode_options_missing_value(): with raises(OptionsDecodeError): decode_options([b"blksize"]) def test_decode_options_duplicate_value(): with raises(OptionsDecodeError): decode_options([b"blksize", b"1024", b"blksize", b"2048"]) def test_rq_datagam_invalid_option_name(): with raises(PayloadDecodeError): RQDatagram.from_wire(b"foo\x00octet\x00spam\x00baz\x00") def test_rq_datagam_blksize_invalid_option_value_1(): with raises(PayloadDecodeError): RQDatagram.from_wire(b"foo\x00octet\x00blksize\x00baz\x00") def test_rq_datagam_blksize_invalid_option_value_2(): with raises(OptionsDecodeError): RQDatagram.from_wire(b"foo\x00octet\x00blksize\x001\x00") def test_rq_datagam_blksize_invalid_option_value_3(): with raises(OptionsDecodeError): RQDatagram.from_wire(b"foo\x00octet\x00blksize\x0072384\x00") def test_rq_datagram_blksize_valid_option(): dgram = RQDatagram.from_wire(b"foo\x00octet\x00blksize\x001024\x00") assert dgram.filename == b"foo" assert dgram.mode == TFTPMode.OCTET assert dgram.options == OrderedDict({TFTPOption.BLOCKSIZE: 1024}) def test_rq_datagam_timeout_invalid_option_value_1(): with raises(PayloadDecodeError): RQDatagram.from_wire(b"foo\x00octet\x00timeout\x000\x00") def test_rq_datagam_timeout_invalid_option_value_2(): with raises(OptionsDecodeError): RQDatagram.from_wire(b"foo\x00octet\x00timeout\x00-1\x00") def test_rq_datagram_timeout_valid_option(): dgram = RQDatagram.from_wire(b"foo\x00octet\x00timeout\x005\x00") assert dgram.filename == b"foo" assert dgram.mode == TFTPMode.OCTET assert dgram.options == OrderedDict({TFTPOption.TIMEOUT: 5}) def test_rq_datagam_tsize_invalid_option_value_1(): with raises(PayloadDecodeError): RQDatagram.from_wire(b"foo\x00octet\x00tsize\x00-1\x00") def test_rq_datagram_timeout_valid_option_1(): dgram = RQDatagram.from_wire(b"foo\x00octet\x00tsize\x000\x00") assert dgram.filename == b"foo" assert dgram.mode == TFTPMode.OCTET assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 0}) def test_rq_datagram_timeout_valid_option_2(): dgram = RQDatagram.from_wire(b"foo\x00octet\x00tsize\x00512\x00") assert dgram.filename == b"foo" assert dgram.mode == TFTPMode.OCTET assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512}) def test_rrq_datagram_to_wire_1(): dgram = RRQDatagram( b"foo", TFTPMode.OCTET, OrderedDict({TFTPOption.TRANSFER_SIZE: 512}) ) assert dgram.opcode == TFTPOpcode.RRQ assert dgram.filename == b"foo" assert dgram.mode == TFTPMode.OCTET assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512}) assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00tsize\x00512\x00" def test_rrq_datagram_to_wire_2(): with raises(AssertionError): dgram = RRQDatagram(b"foo", TFTPMode.OCTET, OrderedDict({b"foo": 512})) # assert dgram.opcode == TFTPOpcode.RRQ # assert dgram.filename == b"foo" # assert dgram.mode == TFTPMode.OCTET # assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512}) # assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00tsize\x00512\x00" def test_rrq_datagram_to_wire_3(): dgram = RRQDatagram(b"foo", TFTPMode.OCTET, OrderedDict({})) dgram.options = OrderedDict({b"baz": 512}) assert dgram.opcode == TFTPOpcode.RRQ assert dgram.filename == b"foo" assert dgram.mode == TFTPMode.OCTET assert dgram.options == OrderedDict({b"baz": 512}) with raises(WireProtocolError): dgram.to_wire()