215 lines
6.6 KiB
Python
215 lines
6.6 KiB
Python
|
# JTFTP - Python/AsyncIO TFTP Server
|
||
|
# Copyright (C) 2022 Jeffrey C. Ollie
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
||
|
from typing import OrderedDict
|
||
|
|
||
|
from hypothesis import given
|
||
|
from hypothesis.strategies import integers
|
||
|
from jtftp.datagram import RQDatagram
|
||
|
from jtftp.datagram import RRQDatagram
|
||
|
from jtftp.datagram import TFTPError
|
||
|
from jtftp.datagram import TFTPMode
|
||
|
from jtftp.datagram import TFTPOpcode
|
||
|
from jtftp.datagram import TFTPOption
|
||
|
from jtftp.datagram import datagram_factory
|
||
|
from jtftp.datagram import decode_options
|
||
|
from jtftp.datagram import split_opcode
|
||
|
from jtftp.errors import InvalidOpcodeError
|
||
|
from jtftp.errors import OptionsDecodeError
|
||
|
from jtftp.errors import PayloadDecodeError
|
||
|
from jtftp.errors import WireProtocolError
|
||
|
from pytest import raises
|
||
|
|
||
|
|
||
|
@given(integers(min_value=1, max_value=6))
|
||
|
def test_tftp_opcode_normal(opcode: int):
|
||
|
assert TFTPOpcode(opcode).value == opcode
|
||
|
|
||
|
|
||
|
def test_tftp_opcode_abnormal():
|
||
|
with raises(ValueError):
|
||
|
TFTPOpcode(7)
|
||
|
|
||
|
|
||
|
@given(integers(min_value=0, max_value=8))
|
||
|
def test_tftperror_normal(error_code: int):
|
||
|
assert TFTPError(error_code).value == error_code
|
||
|
|
||
|
|
||
|
def test_option_wrong_case():
|
||
|
assert TFTPOption(b"BLKSIZE") == TFTPOption.BLOCKSIZE
|
||
|
|
||
|
|
||
|
def test_option_wrong_type():
|
||
|
assert TFTPOption("blksize") == TFTPOption.BLOCKSIZE
|
||
|
|
||
|
|
||
|
def test_mode_wrong_case():
|
||
|
assert TFTPMode(b"OCTET") == TFTPMode.OCTET
|
||
|
|
||
|
|
||
|
def test_mode_wrong_type():
|
||
|
assert TFTPMode("octet") == TFTPMode.OCTET
|
||
|
|
||
|
|
||
|
def test_datagram_zero_length():
|
||
|
with raises(WireProtocolError):
|
||
|
split_opcode(b"")
|
||
|
|
||
|
|
||
|
def test_datagram_incomplete_opcode():
|
||
|
with raises(WireProtocolError):
|
||
|
split_opcode(b"\x00")
|
||
|
|
||
|
|
||
|
def test_datagram_invalid_opcode():
|
||
|
with raises(InvalidOpcodeError):
|
||
|
split_opcode(b"\x00\xff")
|
||
|
|
||
|
|
||
|
def test_datagram_empty_payload():
|
||
|
assert split_opcode(b"\x00\x01") == (1, b"")
|
||
|
|
||
|
|
||
|
def test_datagram_non_empty_payload():
|
||
|
assert split_opcode(b"\x00\x01foo") == (1, b"foo")
|
||
|
|
||
|
|
||
|
def test_datagram_unknown_opcode():
|
||
|
with raises(WireProtocolError):
|
||
|
datagram_factory(b"\x00\x0ffoobar")
|
||
|
|
||
|
|
||
|
def test_rq_datagram():
|
||
|
with raises(WireProtocolError):
|
||
|
RQDatagram.from_wire(b"foobar")
|
||
|
|
||
|
|
||
|
def test_rq_datagram_invalid_mode():
|
||
|
with raises(PayloadDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00bar\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagram_valid_mode():
|
||
|
dgram = RQDatagram.from_wire(b"foo\x00octet")
|
||
|
dgram.opcode = TFTPOpcode.RRQ
|
||
|
assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00"
|
||
|
|
||
|
|
||
|
def test_decode_options_missing_value():
|
||
|
with raises(OptionsDecodeError):
|
||
|
decode_options([b"blksize"])
|
||
|
|
||
|
|
||
|
def test_decode_options_duplicate_value():
|
||
|
with raises(OptionsDecodeError):
|
||
|
decode_options([b"blksize", b"1024", b"blksize", b"2048"])
|
||
|
|
||
|
|
||
|
def test_rq_datagam_invalid_option_name():
|
||
|
with raises(PayloadDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00octet\x00spam\x00baz\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagam_blksize_invalid_option_value_1():
|
||
|
with raises(PayloadDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00octet\x00blksize\x00baz\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagam_blksize_invalid_option_value_2():
|
||
|
with raises(OptionsDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00octet\x00blksize\x001\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagam_blksize_invalid_option_value_3():
|
||
|
with raises(OptionsDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00octet\x00blksize\x0072384\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagram_blksize_valid_option():
|
||
|
dgram = RQDatagram.from_wire(b"foo\x00octet\x00blksize\x001024\x00")
|
||
|
assert dgram.filename == b"foo"
|
||
|
assert dgram.mode == TFTPMode.OCTET
|
||
|
assert dgram.options == OrderedDict({TFTPOption.BLOCKSIZE: 1024})
|
||
|
|
||
|
|
||
|
def test_rq_datagam_timeout_invalid_option_value_1():
|
||
|
with raises(PayloadDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00octet\x00timeout\x000\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagam_timeout_invalid_option_value_2():
|
||
|
with raises(OptionsDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00octet\x00timeout\x00-1\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagram_timeout_valid_option():
|
||
|
dgram = RQDatagram.from_wire(b"foo\x00octet\x00timeout\x005\x00")
|
||
|
assert dgram.filename == b"foo"
|
||
|
assert dgram.mode == TFTPMode.OCTET
|
||
|
assert dgram.options == OrderedDict({TFTPOption.TIMEOUT: 5})
|
||
|
|
||
|
|
||
|
def test_rq_datagam_tsize_invalid_option_value_1():
|
||
|
with raises(PayloadDecodeError):
|
||
|
RQDatagram.from_wire(b"foo\x00octet\x00tsize\x00-1\x00")
|
||
|
|
||
|
|
||
|
def test_rq_datagram_timeout_valid_option_1():
|
||
|
dgram = RQDatagram.from_wire(b"foo\x00octet\x00tsize\x000\x00")
|
||
|
assert dgram.filename == b"foo"
|
||
|
assert dgram.mode == TFTPMode.OCTET
|
||
|
assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 0})
|
||
|
|
||
|
|
||
|
def test_rq_datagram_timeout_valid_option_2():
|
||
|
dgram = RQDatagram.from_wire(b"foo\x00octet\x00tsize\x00512\x00")
|
||
|
assert dgram.filename == b"foo"
|
||
|
assert dgram.mode == TFTPMode.OCTET
|
||
|
assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
|
||
|
|
||
|
|
||
|
def test_rrq_datagram_to_wire_1():
|
||
|
dgram = RRQDatagram(
|
||
|
b"foo", TFTPMode.OCTET, OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
|
||
|
)
|
||
|
assert dgram.opcode == TFTPOpcode.RRQ
|
||
|
assert dgram.filename == b"foo"
|
||
|
assert dgram.mode == TFTPMode.OCTET
|
||
|
assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
|
||
|
assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00tsize\x00512\x00"
|
||
|
|
||
|
|
||
|
def test_rrq_datagram_to_wire_2():
|
||
|
with raises(AssertionError):
|
||
|
dgram = RRQDatagram(b"foo", TFTPMode.OCTET, OrderedDict({b"foo": 512}))
|
||
|
# assert dgram.opcode == TFTPOpcode.RRQ
|
||
|
# assert dgram.filename == b"foo"
|
||
|
# assert dgram.mode == TFTPMode.OCTET
|
||
|
# assert dgram.options == OrderedDict({TFTPOption.TRANSFER_SIZE: 512})
|
||
|
# assert dgram.to_wire() == b"\x00\x01foo\x00octet\x00tsize\x00512\x00"
|
||
|
|
||
|
|
||
|
def test_rrq_datagram_to_wire_3():
|
||
|
dgram = RRQDatagram(b"foo", TFTPMode.OCTET, OrderedDict({}))
|
||
|
dgram.options = OrderedDict({b"baz": 512})
|
||
|
assert dgram.opcode == TFTPOpcode.RRQ
|
||
|
assert dgram.filename == b"foo"
|
||
|
assert dgram.mode == TFTPMode.OCTET
|
||
|
assert dgram.options == OrderedDict({b"baz": 512})
|
||
|
with raises(WireProtocolError):
|
||
|
dgram.to_wire()
|