launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #09644
[Merge] lp:~allenap/maas/import-python-tx-tftp into lp:maas
Gavin Panella has proposed merging lp:~allenap/maas/import-python-tx-tftp into lp:maas.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~allenap/maas/import-python-tx-tftp/+merge/113640
Imports git://github.com/allenap/python-tx-tftp.git at commit 9d70905eb6f3e128d3d89e0358e4a0811bf92673. This will eventually be dropped and replaced with a packaged version of git://github.com/shylent/python-tx-tftp.git.
--
https://code.launchpad.net/~allenap/maas/import-python-tx-tftp/+merge/113640
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~allenap/maas/import-python-tx-tftp into lp:maas.
=== added directory 'contrib/python-tx-tftp'
=== added file 'contrib/python-tx-tftp/LICENSE'
--- contrib/python-tx-tftp/LICENSE 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/LICENSE 2012-07-05 20:26:20 +0000
@@ -0,0 +1,20 @@
+Copyright (c) 2010-2012 Mark Lvov
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
=== added file 'contrib/python-tx-tftp/README.markdown'
--- contrib/python-tx-tftp/README.markdown 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/README.markdown 2012-07-05 20:26:20 +0000
@@ -0,0 +1,25 @@
+python-tx-tftp
+==
+A Twisted-based TFTP implementation
+
+##What's already there
+
+ - [RFC1350](http://tools.ietf.org/html/rfc1350) (base TFTP specification) support.
+ - Asynchronous backend support. It is not assumed, that filesystem access is
+ 'fast enough'. While current backends use synchronous reads/writes, the code does
+ not rely on this anywhere, so plugging in an asynchronous backend should not be
+ a problem.
+ - netascii transfer mode.
+ - [RFC2347](http://tools.ietf.org/html/rfc2347) (TFTP Option
+Extension) support. *blksize*
+([RFC2348](http://tools.ietf.org/html/rfc2348)), *timeout* and *tsize*
+([RFC2349](http://tools.ietf.org/html/rfc2349)) options are supported.
+ - An actual TFTP server.
+ - Plugin for twistd.
+ - Tests
+ - Docstrings
+
+##Plans
+ - Client-specific commandline interface.
+ - Code cleanup.
+ - Multicast support (possibly).
=== added directory 'contrib/python-tx-tftp/examples'
=== added file 'contrib/python-tx-tftp/examples/server.py'
--- contrib/python-tx-tftp/examples/server.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/examples/server.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,19 @@
+'''
+@author: shylent
+'''
+from tftp.backend import FilesystemSynchronousBackend
+from tftp.protocol import TFTP
+from twisted.internet import reactor
+from twisted.python import log
+import random
+import sys
+
+
+def main():
+ random.seed()
+ log.startLogging(sys.stdout)
+ reactor.listenUDP(1069, TFTP(FilesystemSynchronousBackend('output')))
+ reactor.run()
+
+if __name__ == '__main__':
+ main()
=== added directory 'contrib/python-tx-tftp/tftp'
=== added file 'contrib/python-tx-tftp/tftp/__init__.py'
=== added file 'contrib/python-tx-tftp/tftp/backend.py'
--- contrib/python-tx-tftp/tftp/backend.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/backend.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,290 @@
+'''
+@author: shylent
+'''
+from os import fstat
+from tftp.errors import Unsupported, FileExists, AccessViolation, FileNotFound
+from twisted.python.filepath import FilePath, InsecurePath
+import shutil
+import tempfile
+from zope import interface
+
+class IBackend(interface.Interface):
+ """An object, that manages interaction between the TFTP network protocol and
+ anything, where you can get files from or put files to (a filesystem).
+
+ """
+
+ def get_reader(file_name):
+ """Return an object, that provides L{IReader}, that was initialized with
+ the given L{file_name}.
+
+ @param file_name: file name, specified as part of a TFTP read request (RRQ)
+ @type file_name: C{str}
+
+ @raise Unsupported: if reading is not supported for this particular
+ backend instance
+
+ @raise AccessViolation: if the passed file_name is not acceptable for
+ security or access control reasons
+
+ @raise FileNotFound: if the file, that corresponds to the given C{file_name}
+ could not be found
+
+ @raise BackendError: for any other errors, that were encountered while
+ attempting to construct a reader
+
+ @return: an object, that provides L{IReader}, or a L{Deferred} that
+ will fire with an L{IReader}
+
+ """
+
+ def get_writer(file_name):
+ """Return an object, that provides L{IWriter}, that was initialized with
+ the given L{file_name}.
+
+ @param file_name: file name, specified as part of a TFTP write request (WRQ)
+ @type file_name: C{str}
+
+ @raise Unsupported: if writing is not supported for this particular
+ backend instance
+
+ @raise AccessViolation: if the passed file_name is not acceptable for
+ security or access control reasons
+
+ @raise FileExists: if the file, that corresponds to the given C{file_name}
+ already exists and it is not desirable to overwrite it
+
+ @raise BackendError: for any other errors, that were encountered while
+ attempting to construct a writer
+
+ @return: an object, that provides L{IWriter}, or a L{Deferred} that
+ will fire with an L{IWriter}
+
+ """
+
+class IReader(interface.Interface):
+ """An object, that performs reads on request of the TFTP protocol"""
+
+ size = interface.Attribute(
+ "The size of the file to be read, or C{None} if it's not known.")
+
+ def read(size):
+ """Attempt to read C{size} number of bytes.
+
+ @note: If less, than C{size} bytes is returned, it is assumed, that there
+ is no more data to read and the TFTP transfer is terminated. This means, that
+ less, than C{size} bytes should be returned if and only if this read should
+ be the last read for this reader object.
+
+ @param size: a number of bytes to return to the protocol
+ @type size: C{int}
+
+ @return: data, that was read or a L{Deferred}, that will be fired with
+ the data, that was read.
+ @rtype: C{str} or L{Deferred}
+
+ """
+
+ def finish():
+ """Release the resources, that were acquired by this reader and make sure,
+ that no additional data will be returned.
+
+ """
+
+
+class IWriter(interface.Interface):
+ """An object, that performs writes on request of the TFTP protocol"""
+
+ def write(data):
+ """Attempt to write the data
+
+ @return: C{None} or a L{Deferred}, that will fire with C{None} (any errors,
+ that occured during the write will be available in an errback)
+ @rtype: C{NoneType} or L{Deferred}
+
+ """
+
+ def finish():
+ """Tell this writer, that there will be no more data and that the transfer
+ was successfully completed
+
+ """
+
+ def cancel():
+ """Tell this writer, that the transfer has ended unsuccessfully"""
+
+
+class FilesystemReader(object):
+ """A reader to go with L{FilesystemSynchronousBackend}.
+
+ @see: L{IReader}
+
+ @param file_path: a path to file, that we will read from
+ @type file_path: L{FilePath<twisted.python.filepath.FilePath>}
+
+ @raise FileNotFound: if the file does not exist
+
+ """
+
+ interface.implements(IReader)
+
+ def __init__(self, file_path):
+ self.file_path = file_path
+ try:
+ self.file_obj = self.file_path.open('r')
+ except IOError:
+ raise FileNotFound(self.file_path)
+ self.state = 'active'
+
+ @property
+ def size(self):
+ """
+ @see: L{IReader.size}
+
+ """
+ if self.file_obj.closed:
+ return None
+ else:
+ return fstat(self.file_obj.fileno()).st_size
+
+ def read(self, size):
+ """
+ @see: L{IReader.read}
+
+ @return: data, that was read
+ @rtype: C{str}
+
+ """
+ if self.state in ('eof', 'finished'):
+ return ''
+ data = self.file_obj.read(size)
+ if not data:
+ self.state = 'eof'
+ self.file_obj.close()
+ return data
+
+ def finish(self):
+ """
+ @see: L{IReader.finish}
+
+ """
+ if self.state not in ('eof', 'finished'):
+ self.file_obj.close()
+ self.state = 'finished'
+
+
+class FilesystemWriter(object):
+ """A writer to go with L{FilesystemSynchronousBackend}.
+
+ This particular implementation actually writes to a temporary file. If the
+ transfer is completed successfully, contens of the target file are replaced
+ with the contents of the temporary file and the temporary file is removed.
+ If L{cancel} is called, both files are discarded.
+
+ @see: L{IWriter}
+
+ @param file_path: a path to file, that will be created and written to
+ @type file_path: L{FilePath<twisted.python.filepath.FilePath>}
+
+ @raise FileExists: if the file already exists
+
+ """
+
+ interface.implements(IWriter)
+
+ def __init__(self, file_path):
+ if file_path.exists():
+ raise FileExists(file_path)
+ self.file_path = file_path
+ self.destination_file = self.file_path.open('w')
+ self.temp_destination = tempfile.TemporaryFile()
+ self.state = 'active'
+
+ def write(self, data):
+ """
+ @see: L{IWriter.write}
+
+ """
+ self.temp_destination.write(data)
+
+ def finish(self):
+ """
+ @see: L{IWriter.finish}
+
+ """
+ if self.state not in ('finished', 'cancelled'):
+ self.temp_destination.seek(0)
+ shutil.copyfileobj(self.temp_destination, self.destination_file)
+ self.temp_destination.close()
+ self.destination_file.close()
+ self.state = 'finished'
+
+ def cancel(self):
+ """
+ @see: L{IWriter.cancel}
+
+ """
+ if self.state not in ('finished', 'cancelled'):
+ self.temp_destination.close()
+ self.destination_file.close()
+ self.file_path.remove()
+ self.state = 'cancelled'
+
+
+class FilesystemSynchronousBackend(object):
+ """A synchronous filesystem backend.
+
+ @see: L{IBackend}
+
+ @param base_path: the base filesystem path for this backend, any attempts to
+ read or write 'above' the specified path will be denied
+ @type base_path: C{str} or L{FilePath<twisted.python.filepath.FilePath>}
+
+ @param can_read: whether or not this backend should support reads
+ @type can_read: C{bool}
+
+ @param can_write: whether or not this backend should support writes
+ @type can_write: C{bool}
+
+ """
+
+ interface.implements(IBackend)
+
+ def __init__(self, base_path, can_read=True, can_write=True):
+ try:
+ self.base = FilePath(base_path.path)
+ except AttributeError:
+ self.base = FilePath(base_path)
+ self.can_read, self.can_write = can_read, can_write
+
+ def get_reader(self, file_name):
+ """
+ @see: L{IBackend.get_reader}
+
+ @return: an object, providing L{IReader}
+ @rtype: L{FilesystemReader}
+
+ """
+ if not self.can_read:
+ raise Unsupported("Reading not supported")
+ try:
+ target_path = self.base.child(file_name)
+ except InsecurePath, e:
+ raise AccessViolation("Insecure path: %s" % e)
+ return FilesystemReader(target_path)
+
+ def get_writer(self, file_name):
+ """
+ @see: L{IBackend.get_writer}
+
+ @return: an object, providing L{IWriter}
+ @rtype: L{FilesystemWriter}
+
+ """
+ if not self.can_write:
+ raise Unsupported("Writing not supported")
+ try:
+ target_path = self.base.child(file_name)
+ except InsecurePath, e:
+ raise AccessViolation("Insecure path: %s" % e)
+ return FilesystemWriter(target_path)
=== added file 'contrib/python-tx-tftp/tftp/bootstrap.py'
--- contrib/python-tx-tftp/tftp/bootstrap.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/bootstrap.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,410 @@
+'''
+@author: shylent
+'''
+from tftp.datagram import (ACKDatagram, ERRORDatagram, ERR_TID_UNKNOWN,
+ TFTPDatagramFactory, split_opcode, OP_OACK, OP_ERROR, OACKDatagram, OP_ACK,
+ OP_DATA)
+from tftp.session import WriteSession, MAX_BLOCK_SIZE, ReadSession
+from tftp.util import SequentialCall
+from twisted.internet import reactor
+from twisted.internet.protocol import DatagramProtocol
+from twisted.python import log
+from twisted.python.util import OrderedDict
+
+class TFTPBootstrap(DatagramProtocol):
+ """Base class for TFTP Bootstrap classes, classes, that handle initial datagram
+ exchange (option negotiation, etc), before the actual transfer is started.
+
+ Why OrderedDict and not the regular one? As per
+ U{RFC2347<http://tools.ietf.org/html/rfc2347>}, the order of options is, indeed,
+ not important, but having them in an arbitrary order would complicate testing.
+
+ @cvar supported_options: lists options, that we know how to handle
+
+ @ivar session: A L{WriteSession} or L{ReadSession} object, that will handle
+ the actual tranfer, after the initial handshake and option negotiation is
+ complete
+ @type session: L{WriteSession} or L{ReadSession}
+
+ @ivar options: a mapping of options, that this protocol instance was
+ initialized with. If it is empty and we are the server, usual logic (that
+ doesn't involve OACK datagrams) is used.
+ Default: L{OrderedDict<twisted.python.util.OrderedDict>}.
+ @type options: L{OrderedDict<twisted.python.util.OrderedDict>}
+
+ @ivar resultant_options: stores the last options mapping value, that was passed
+ from the remote peer
+ @type resultant_options: L{OrderedDict<twisted.python.util.OrderedDict>}
+
+ @ivar remote: remote peer address
+ @type remote: C{(str, int)}
+
+ @ivar timeout_watchdog: an object, that is responsible for timing the protocol
+ out. If we are initiating the transfer, it is provided by the parent protocol
+
+ @ivar backend: L{IReader} or L{IWriter} provider, that is used for this transfer
+ @type backend: L{IReader} or L{IWriter} provider
+
+ """
+ supported_options = ('blksize', 'timeout', 'tsize')
+
+ def __init__(self, remote, backend, options=None, _clock=None):
+ if options is None:
+ self.options = OrderedDict()
+ else:
+ self.options = options
+ self.resultant_options = OrderedDict()
+ self.remote = remote
+ self.timeout_watchdog = None
+ self.backend = backend
+ if _clock is not None:
+ self._clock = _clock
+ else:
+ self._clock = reactor
+
+ def processOptions(self, options):
+ """Process options mapping, discarding malformed or unknown options.
+
+ @param options: options mapping to process
+ @type options: L{OrderedDict<twisted.python.util.OrderedDict>}
+
+ @return: a mapping of processed options. Invalid options are discarded.
+ Whether or not the values of options may be changed is decided on a per-
+ option basis, according to the standard
+ @rtype L{OrderedDict<twisted.python.util.OrderedDict>}
+
+ """
+ accepted_options = OrderedDict()
+ for name, val in options.iteritems():
+ norm_name = name.lower()
+ if norm_name in self.supported_options:
+ actual_value = getattr(self, 'option_' + norm_name)(val)
+ if actual_value is not None:
+ accepted_options[name] = actual_value
+ return accepted_options
+
+ def option_blksize(self, val):
+ """Process the block size option. Valid range is between 8 and 65464,
+ inclusive. If the value is more, than L{MAX_BLOCK_SIZE}, L{MAX_BLOCK_SIZE}
+ is returned instead.
+
+ @param val: value of the option
+ @type val: C{str}
+
+ @return: accepted option value or C{None}, if it is invalid
+ @rtype: C{str} or C{None}
+
+ """
+ try:
+ int_blksize = int(val)
+ except ValueError:
+ return None
+ if int_blksize < 8 or int_blksize > 65464:
+ return None
+ int_blksize = min((int_blksize, MAX_BLOCK_SIZE))
+ return str(int_blksize)
+
+ def option_timeout(self, val):
+ """Process timeout interval option
+ (U{RFC2349<http://tools.ietf.org/html/rfc2349>}). Valid range is between 1
+ and 255, inclusive.
+
+ @param val: value of the option
+ @type val: C{str}
+
+ @return: accepted option value or C{None}, if it is invalid
+ @rtype: C{str} or C{None}
+
+ """
+ try:
+ int_timeout = int(val)
+ except ValueError:
+ return None
+ if int_timeout < 1 or int_timeout > 255:
+ return None
+ return str(int_timeout)
+
+ def option_tsize(self, val):
+ """Process tsize interval option
+ (U{RFC2349<http://tools.ietf.org/html/rfc2349>}). Valid range is 0 and up.
+
+ @param val: value of the option
+ @type val: C{str}
+
+ @return: accepted option value or C{None}, if it is invalid
+ @rtype: C{str} or C{None}
+
+ """
+ try:
+ int_tsize = int(val)
+ except ValueError:
+ return None
+ if int_tsize < 0:
+ return None
+ return str(int_tsize)
+
+ def applyOptions(self, session, options):
+ """Apply given options mapping to the given L{WriteSession} or
+ L{ReadSession} object.
+
+ @param session: A session object to apply the options to
+ @type session: L{WriteSession} or L{ReadSession}
+
+ @param options: Options to apply to the session object
+ @type options: L{OrderedDict<twisted.python.util.OrderedDict>}
+
+ """
+ for opt_name, opt_val in options.iteritems():
+ if opt_name == 'blksize':
+ session.block_size = int(opt_val)
+ elif opt_name == 'timeout':
+ timeout = int(opt_val)
+ session.timeout = (timeout,) * 3
+ elif opt_name == 'tsize':
+ tsize = int(opt_val)
+ session.tsize = tsize
+
+ def datagramReceived(self, datagram, addr):
+ if self.remote[1] != addr[1]:
+ self.transport.write(ERRORDatagram.from_code(ERR_TID_UNKNOWN).to_wire())
+ return# Does not belong to this transfer
+ datagram = TFTPDatagramFactory(*split_opcode(datagram))
+ log.msg("Datagram received from %s: %s" % (addr, datagram))
+ if datagram.opcode == OP_ERROR:
+ return self.tftp_ERROR(datagram)
+ return self._datagramReceived(datagram)
+
+ def tftp_ERROR(self, datagram):
+ """Handle the L{ERRORDatagram}.
+
+ @param datagram: An ERROR datagram
+ @type datagram: L{ERRORDatagram}
+
+ """
+ log.msg("Got error: " % datagram)
+ return self.cancel()
+
+ def cancel(self):
+ """Terminate this protocol instance. If the underlying
+ L{ReadSession}/L{WriteSession} is running, delegate the call to it.
+
+ """
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ if self.session.started:
+ self.session.cancel()
+ else:
+ self.backend.finish()
+ self.transport.stopListening()
+
+ def timedOut(self):
+ """This protocol instance has timed out during the initial handshake."""
+ log.msg("Timed during option negotiation process")
+ self.cancel()
+
+
+class LocalOriginWriteSession(TFTPBootstrap):
+ """Bootstraps a L{WriteSession}, that was initiated locally, - we've requested
+ a read from a remote server
+
+ """
+ def __init__(self, remote, writer, options=None, _clock=None):
+ TFTPBootstrap.__init__(self, remote, writer, options, _clock)
+ self.session = WriteSession(writer, self._clock)
+
+ def startProtocol(self):
+ """Connect the transport and start the L{timeout_watchdog}"""
+ self.transport.connect(*self.remote)
+ if self.timeout_watchdog is not None:
+ self.timeout_watchdog.start()
+
+ def tftp_OACK(self, datagram):
+ """Handle the OACK datagram
+
+ @param datagram: OACK datagram
+ @type datagram: L{OACKDatagram}
+
+ """
+ if not self.session.started:
+ self.resultant_options = self.processOptions(datagram.options)
+ if self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ return self.transport.write(ACKDatagram(0).to_wire())
+ else:
+ log.msg("Duplicate OACK received, send back ACK and ignore")
+ self.transport.write(ACKDatagram(0).to_wire())
+
+ def _datagramReceived(self, datagram):
+ if datagram.opcode == OP_OACK:
+ return self.tftp_OACK(datagram)
+ elif datagram.opcode == OP_DATA and datagram.blocknum == 1:
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ if not self.session.started:
+ self.applyOptions(self.session, self.resultant_options)
+ self.session.transport = self.transport
+ self.session.startProtocol()
+ return self.session.datagramReceived(datagram)
+ elif self.session.started:
+ return self.session.datagramReceived(datagram)
+
+
+class RemoteOriginWriteSession(TFTPBootstrap):
+ """Bootstraps a L{WriteSession}, that was originated remotely, - we've
+ received a WRQ from a client.
+
+ """
+ timeout = (1, 3, 7)
+
+ def __init__(self, remote, writer, options=None, _clock=None):
+ TFTPBootstrap.__init__(self, remote, writer, options, _clock)
+ self.session = WriteSession(writer, self._clock)
+
+ def startProtocol(self):
+ """Connect the transport, respond with an initial ACK or OACK (depending on
+ if we were initialized with options or not).
+
+ """
+ self.transport.connect(*self.remote)
+ if self.options:
+ self.resultant_options = self.processOptions(self.options)
+ bytes = OACKDatagram(self.resultant_options).to_wire()
+ else:
+ bytes = ACKDatagram(0).to_wire()
+ self.timeout_watchdog = SequentialCall.run(
+ self.timeout[:-1],
+ callable=self.transport.write, callable_args=[bytes, ],
+ on_timeout=lambda: self._clock.callLater(self.timeout[-1], self.timedOut),
+ run_now=True,
+ _clock=self._clock
+ )
+
+ def _datagramReceived(self, datagram):
+ if datagram.opcode == OP_DATA and datagram.blocknum == 1:
+ if self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ if not self.session.started:
+ self.applyOptions(self.session, self.resultant_options)
+ self.session.transport = self.transport
+ self.session.startProtocol()
+ return self.session.datagramReceived(datagram)
+ elif self.session.started:
+ return self.session.datagramReceived(datagram)
+
+
+class LocalOriginReadSession(TFTPBootstrap):
+ """Bootstraps a L{ReadSession}, that was originated locally, - we've requested
+ a write to a remote server.
+
+ """
+ def __init__(self, remote, reader, options=None, _clock=None):
+ TFTPBootstrap.__init__(self, remote, reader, options, _clock)
+ self.session = ReadSession(reader, self._clock)
+
+ def startProtocol(self):
+ """Connect the transport and start the L{timeout_watchdog}"""
+ self.transport.connect(*self.remote)
+ if self.timeout_watchdog is not None:
+ self.timeout_watchdog.start()
+
+ def _datagramReceived(self, datagram):
+ if datagram.opcode == OP_OACK:
+ return self.tftp_OACK(datagram)
+ elif (datagram.opcode == OP_ACK and datagram.blocknum == 0
+ and not self.session.started):
+ self.session.transport = self.transport
+ self.session.startProtocol()
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ return self.session.nextBlock()
+ elif self.session.started:
+ return self.session.datagramReceived(datagram)
+
+ def tftp_OACK(self, datagram):
+ """Handle incoming OACK datagram, process and apply options and hand over
+ control to the underlying L{ReadSession}.
+
+ @param datagram: OACK datagram
+ @type datagram: L{OACKDatagram}
+
+ """
+ if not self.session.started:
+ self.resultant_options = self.processOptions(datagram.options)
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ self.applyOptions(self.session, self.resultant_options)
+ self.session.transport = self.transport
+ self.session.startProtocol()
+ return self.session.nextBlock()
+ else:
+ log.msg("Duplicate OACK received, ignored")
+
+class RemoteOriginReadSession(TFTPBootstrap):
+ """Bootstraps a L{ReadSession}, that was started remotely, - we've received
+ a RRQ.
+
+ """
+ timeout = (1, 3, 7)
+
+ def __init__(self, remote, reader, options=None, _clock=None):
+ TFTPBootstrap.__init__(self, remote, reader, options, _clock)
+ self.session = ReadSession(reader, self._clock)
+
+ def option_tsize(self, val):
+ """Process tsize option.
+
+ If tsize is zero, get the size of the file to be read so that it can
+ be returned in the OACK datagram.
+
+ @see: L{TFTPBootstrap.option_tsize}
+
+ """
+ val = TFTPBootstrap.option_tsize(self, val)
+ if val == str(0):
+ val = self.session.reader.size
+ if val is not None:
+ val = str(val)
+ return val
+
+ def startProtocol(self):
+ """Start sending an OACK datagram if we were initialized with options
+ or start the L{ReadSession} immediately.
+
+ """
+ self.transport.connect(*self.remote)
+ if self.options:
+ self.resultant_options = self.processOptions(self.options)
+ bytes = OACKDatagram(self.resultant_options).to_wire()
+ self.timeout_watchdog = SequentialCall.run(
+ self.timeout[:-1],
+ callable=self.transport.write, callable_args=[bytes, ],
+ on_timeout=lambda: self._clock.callLater(self.timeout[-1], self.timedOut),
+ run_now=True,
+ _clock=self._clock
+ )
+ else:
+ self.session.transport = self.transport
+ self.session.startProtocol()
+ return self.session.nextBlock()
+
+ def _datagramReceived(self, datagram):
+ if datagram.opcode == OP_ACK and datagram.blocknum == 0:
+ return self.tftp_ACK(datagram)
+ elif self.session.started:
+ return self.session.datagramReceived(datagram)
+
+ def tftp_ACK(self, datagram):
+ """Handle incoming ACK datagram. Hand over control to the underlying
+ L{ReadSession}.
+
+ @param datagram: ACK datagram
+ @type datagram: L{ACKDatagram}
+
+ """
+ if self.timeout_watchdog is not None:
+ self.timeout_watchdog.cancel()
+ if not self.session.started:
+ self.applyOptions(self.session, self.resultant_options)
+ self.session.transport = self.transport
+ self.session.startProtocol()
+ return self.session.nextBlock()
=== added file 'contrib/python-tx-tftp/tftp/datagram.py'
--- contrib/python-tx-tftp/tftp/datagram.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/datagram.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,396 @@
+'''
+@author: shylent
+'''
+from itertools import chain
+from tftp.errors import (WireProtocolError, InvalidOpcodeError,
+ PayloadDecodeError, InvalidErrorcodeError, OptionsDecodeError)
+from twisted.python.util import OrderedDict
+import struct
+
+OP_RRQ = 1
+OP_WRQ = 2
+OP_DATA = 3
+OP_ACK = 4
+OP_ERROR = 5
+OP_OACK = 6
+
+ERR_NOT_DEFINED = 0
+ERR_FILE_NOT_FOUND = 1
+ERR_ACCESS_VIOLATION = 2
+ERR_DISK_FULL = 3
+ERR_ILLEGAL_OP = 4
+ERR_TID_UNKNOWN = 5
+ERR_FILE_EXISTS = 6
+ERR_NO_SUCH_USER = 7
+
+errors = {
+ ERR_NOT_DEFINED : "",
+ ERR_FILE_NOT_FOUND : "File not found",
+ ERR_ACCESS_VIOLATION : "Access violation",
+ ERR_DISK_FULL : "Disk full or allocation exceeded",
+ ERR_ILLEGAL_OP : "Illegal TFTP operation",
+ ERR_TID_UNKNOWN : "Unknown transfer ID",
+ ERR_FILE_EXISTS : "File already exists",
+ ERR_NO_SUCH_USER : "No such user"
+
+}
+
+def split_opcode(datagram):
+ """Split the raw datagram into opcode and payload.
+
+ @param datagram: raw datagram
+ @type datagram: C{str}
+
+ @return: a 2-tuple, the first item is the opcode and the second item is the payload
+ @rtype: (C{int}, C{str})
+
+ @raise WireProtocolError: if the opcode cannot be extracted
+
+ """
+
+ try:
+ return struct.unpack("!H", datagram[:2])[0], datagram[2:]
+ except struct.error:
+ raise WireProtocolError("Failed to extract the opcode")
+
+
+class TFTPDatagram(object):
+ """Base class for datagrams
+
+ @cvar opcode: The opcode, corresponding to this datagram
+ @type opcode: C{int}
+
+ """
+
+ opcode = None
+
+ @classmethod
+ def from_wire(cls, payload):
+ """Parse the payload and return a datagram object
+
+ @param payload: Binary representation of the payload (without the opcode)
+ @type payload: C{str}
+
+ """
+ raise NotImplementedError("Subclasses must override this")
+
+ def to_wire(self):
+ """Return the wire representation of the datagram.
+
+ @rtype: C{str}
+
+ """
+ raise NotImplementedError("Subclasses must override this")
+
+
+class RQDatagram(TFTPDatagram):
+ """Base class for "RQ" (request) datagrams.
+
+ @ivar filename: File name, that corresponds to this request.
+ @type filename: C{str}
+
+ @ivar mode: Transfer mode. Valid values are C{netascii} and C{octet}.
+ Case-insensitive.
+ @type mode: C{str}
+
+ @ivar options: Any options, that were requested by the client (as per
+ U{RFC2374<http://tools.ietf.org/html/rfc2347>}
+ @type options: C{dict}
+
+ """
+
+ @classmethod
+ def from_wire(cls, payload):
+ """Parse the payload and return a RRQ/WRQ datagram object.
+
+ @return: datagram object
+ @rtype: L{RRQDatagram} or L{WRQDatagram}
+
+ @raise OptionsDecodeError: if we failed to decode the options, requested
+ by the client
+ @raise PayloadDecodeError: if there were not enough fields in the payload.
+ Fields are terminated by NUL.
+
+ """
+ parts = payload.split('\x00')
+ try:
+ filename, mode = parts.pop(0), parts.pop(0)
+ except IndexError:
+ raise PayloadDecodeError("Not enough fields in the payload")
+ if parts and not parts[-1]:
+ parts.pop(-1)
+ options = OrderedDict()
+ # To maintain consistency during testing.
+ # The actual order of options is not important as per RFC2347
+ if len(parts) % 2:
+ raise OptionsDecodeError("No value for option %s" % parts[-1])
+ for ind, opt_name in enumerate(parts[::2]):
+ if opt_name in options:
+ raise OptionsDecodeError("Duplicate option specified: %s" % opt_name)
+ options[opt_name] = parts[ind * 2 + 1]
+ return cls(filename, mode, options)
+
+ def __init__(self, filename, mode, options):
+ self.filename = filename
+ self.mode = mode.lower()
+ self.options = options
+
+ def __repr__(self):
+ if self.options:
+ return ("<%s(filename=%s, mode=%s, options=%s)>" %
+ (self.__class__.__name__, self.filename, self.mode, self.options))
+ return "<%s(filename=%s, mode=%s)>" % (self.__class__.__name__,
+ self.filename, self.mode)
+
+ def to_wire(self):
+ opcode = struct.pack("!H", self.opcode)
+ if self.options:
+ options = '\x00'.join(chain.from_iterable(self.options.iteritems()))
+ return ''.join((opcode, self.filename, '\x00', self.mode, '\x00',
+ options, '\x00'))
+ else:
+ return ''.join((opcode, self.filename, '\x00', self.mode, '\x00'))
+
+class RRQDatagram(RQDatagram):
+ opcode = OP_RRQ
+
+class WRQDatagram(RQDatagram):
+ opcode = OP_WRQ
+
+class OACKDatagram(TFTPDatagram):
+ """An OACK datagram
+
+ @ivar options: Any options, that were requested by the client (as per
+ U{RFC2374<http://tools.ietf.org/html/rfc2347>}
+ @type options: C{dict}
+
+ """
+ opcode = OP_OACK
+
+ @classmethod
+ def from_wire(cls, payload):
+ """Parse the payload and return an OACK datagram object.
+
+ @return: datagram object
+ @rtype: L{OACKDatagram}
+
+ @raise OptionsDecodeError: if we failed to decode the options
+
+ """
+ parts = payload.split('\x00')
+ #FIXME: Boo, code duplication
+ if parts and not parts[-1]:
+ parts.pop(-1)
+ options = OrderedDict()
+ if len(parts) % 2:
+ raise OptionsDecodeError("No value for option %s" % parts[-1])
+ for ind, opt_name in enumerate(parts[::2]):
+ if opt_name in options:
+ raise OptionsDecodeError("Duplicate option specified: %s" % opt_name)
+ options[opt_name] = parts[ind * 2 + 1]
+ return cls(options)
+
+ def __init__(self, options):
+ self.options = options
+
+ def __repr__(self):
+ return ("<%s(options=%s)>" % (self.__class__.__name__, self.options))
+
+ def to_wire(self):
+ opcode = struct.pack("!H", self.opcode)
+ if self.options:
+ options = '\x00'.join(chain.from_iterable(self.options.iteritems()))
+ return ''.join((opcode, options, '\x00'))
+ else:
+ return opcode
+
+class DATADatagram(TFTPDatagram):
+ """A DATA datagram
+
+ @ivar blocknum: A block number, that this chunk of data is associated with
+ @type blocknum: C{int}
+
+ @ivar data: binary data
+ @type data: C{str}
+
+ """
+ opcode = OP_DATA
+
+ @classmethod
+ def from_wire(cls, payload):
+ """Parse the payload and return a L{DATADatagram} object.
+
+ @param payload: Binary representation of the payload (without the opcode)
+ @type payload: C{str}
+
+ @return: A L{DATADatagram} object
+ @rtype: L{DATADatagram}
+
+ @raise PayloadDecodeError: if the format of payload is incorrect
+
+ """
+ try:
+ blocknum, data = struct.unpack('!H', payload[:2])[0], payload[2:]
+ except struct.error:
+ raise PayloadDecodeError()
+ return cls(blocknum, data)
+
+ def __init__(self, blocknum, data):
+ self.blocknum = blocknum
+ self.data = data
+
+ def __repr__(self):
+ return "<%s(blocknum=%s, %s bytes of data)>" % (self.__class__.__name__,
+ self.blocknum, len(self.data))
+
+ def to_wire(self):
+ return ''.join((struct.pack('!HH', self.opcode, self.blocknum), self.data))
+
+class ACKDatagram(TFTPDatagram):
+ """An ACK datagram.
+
+ @ivar blocknum: Block number of the data chunk, which this datagram is supposed to acknowledge
+ @type blocknum: C{int}
+
+ """
+ opcode = OP_ACK
+
+ @classmethod
+ def from_wire(cls, payload):
+ """Parse the payload and return a L{ACKDatagram} object.
+
+ @param payload: Binary representation of the payload (without the opcode)
+ @type payload: C{str}
+
+ @return: An L{ACKDatagram} object
+ @rtype: L{ACKDatagram}
+
+ @raise PayloadDecodeError: if the format of payload is incorrect
+
+ """
+ try:
+ blocknum = struct.unpack('!H', payload)[0]
+ except struct.error:
+ raise PayloadDecodeError("Unable to extract the block number")
+ return cls(blocknum)
+
+ def __init__(self, blocknum):
+ self.blocknum = blocknum
+
+ def __repr__(self):
+ return "<%s(blocknum=%s)>" % (self.__class__.__name__, self.blocknum)
+
+ def to_wire(self):
+ return struct.pack('!HH', self.opcode, self.blocknum)
+
+class ERRORDatagram(TFTPDatagram):
+ """An ERROR datagram.
+
+ @ivar errorcode: A valid TFTP error code
+ @type errorcode: C{int}
+
+ @ivar errmsg: An error message, describing the error condition in which this
+ datagram was produced
+ @type errmsg: C{str}
+
+ """
+ opcode = OP_ERROR
+
+ @classmethod
+ def from_wire(cls, payload):
+ """Parse the payload and return a L{ERRORDatagram} object.
+
+ This method violates the standard a bit - if the error string was not
+ extracted, a default error string is generated, based on the error code.
+
+ @param payload: Binary representation of the payload (without the opcode)
+ @type payload: C{str}
+
+ @return: An L{ERRORDatagram} object
+ @rtype: L{ERRORDatagram}
+
+ @raise PayloadDecodeError: if the format of payload is incorrect
+ @raise InvalidErrorcodeError: a more specific exception, that is raised
+ if the error code was successfully, extracted, but it does not correspond
+ to any known/standartized error code values.
+
+ """
+ try:
+ errorcode = struct.unpack('!H', payload[:2])[0]
+ except struct.error:
+ raise PayloadDecodeError("Unable to extract the error code")
+ if not errorcode in errors:
+ raise InvalidErrorcodeError(errorcode)
+ errmsg = payload[2:].split('\x00')[0]
+ if not errmsg:
+ errmsg = errors[errorcode]
+ return cls(errorcode, errmsg)
+
+ @classmethod
+ def from_code(cls, errorcode, errmsg=None):
+ """Create an L{ERRORDatagram}, given an error code and, optionally, an
+ error message to go with it. If not provided, default error message for
+ the given error code is used.
+
+ @param errorcode: An error code (one of L{errors})
+ @type errorcode: C{int}
+
+ @param errmsg: An error message (optional)
+ @type errmsg: C{str} or C{NoneType}
+
+ @raise InvalidErrorcodeError: if the error code is not known
+
+ @return: an L{ERRORDatagram}
+ @rtype: L{ERRORDatagram}
+
+ """
+ if not errorcode in errors:
+ raise InvalidErrorcodeError(errorcode)
+ if errmsg is None:
+ errmsg = errors[errorcode]
+ return cls(errorcode, errmsg)
+
+
+ def __init__(self, errorcode, errmsg):
+ self.errorcode = errorcode
+ self.errmsg = errmsg
+
+ def to_wire(self):
+ return ''.join((struct.pack('!HH', self.opcode, self.errorcode),
+ self.errmsg, '\x00'))
+
+class _TFTPDatagramFactory(object):
+ """Encapsulates the creation of datagrams based on the opcode"""
+ _dgram_classes = {
+ OP_RRQ: RRQDatagram,
+ OP_WRQ: WRQDatagram,
+ OP_DATA: DATADatagram,
+ OP_ACK: ACKDatagram,
+ OP_ERROR: ERRORDatagram,
+ OP_OACK: OACKDatagram
+ }
+
+ def __call__(self, opcode, payload):
+ """Create a datagram, given an opcode and payload.
+
+ Errors, that occur during datagram creation are propagated as-is.
+
+ @param opcode: opcode
+ @type opcode: C{int}
+
+ @param payload: payload
+ @type payload: C{str}
+
+ @return: datagram object
+ @rtype: L{TFTPDatagram}
+
+ @raise InvalidOpcodeError: if the opcode is not recognized
+
+ """
+ try:
+ datagram_class = self._dgram_classes[opcode]
+ except KeyError:
+ raise InvalidOpcodeError(opcode)
+ return datagram_class.from_wire(payload)
+
+TFTPDatagramFactory = _TFTPDatagramFactory()
=== added file 'contrib/python-tx-tftp/tftp/errors.py'
--- contrib/python-tx-tftp/tftp/errors.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/errors.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,86 @@
+'''
+@author: shylent
+'''
+
+class TFTPError(Exception):
+ """Base exception class for this package"""
+
+class WireProtocolError(TFTPError):
+ """Base exception class for wire-protocol level errors"""
+
+class InvalidOpcodeError(WireProtocolError):
+ """An invalid opcode was encountered"""
+
+ def __init__(self, opcode):
+
+ super(InvalidOpcodeError, self).__init__("Invalid opcode: %s" % opcode)
+
+class PayloadDecodeError(WireProtocolError):
+ """Failed to parse the payload"""
+
+class OptionsDecodeError(PayloadDecodeError):
+ """Failed to parse options in the WRQ/RRQ datagram. It is distinct from
+ L{PayloadDecodeError} so that it can be caught and dealt with gracefully
+ (pretend we didn't see any options at all, perhaps).
+
+ """
+
+class InvalidErrorcodeError(PayloadDecodeError):
+ """An ERROR datagram has an error code, that does not correspond to any known
+ error code values.
+
+ @ivar errorcode: The error code, that we were unable to parse
+ @type errorcode: C{int}
+
+ """
+
+ def __init__(self, errorcode):
+ self.errorcode = errorcode
+ super(InvalidErrorcodeError, self).__init__("Unknown error code: %s" % errorcode)
+
+class BackendError(TFTPError):
+ """Base exception class for backend errors"""
+
+class Unsupported(BackendError):
+ """Requested operation (read/write) is not supported"""
+
+class AccessViolation(BackendError):
+ """Illegal filesystem operation. Corresponds to the "(2) Access violation"
+ TFTP error code.
+
+ One of the prime examples of these is an attempt at directory traversal.
+
+ """
+
+class FileNotFound(BackendError):
+ """File not found.
+
+ Corresponds to the "(1) File not found" TFTP error code.
+
+ @ivar file_path: Path to the file, that was requested
+ @type file_path: C{str} or L{twisted.python.filepath.FilePath}
+
+ """
+
+ def __init__(self, file_path):
+ self.file_path = file_path
+
+ def __str__(self):
+ return "File not found: %s" % self.file_path
+
+
+class FileExists(BackendError):
+ """File exists.
+
+ Corresponds to the "(6) File already exists" TFTP error code.
+
+ @ivar file_path: Path to file
+ @type file_path: C{str} or L{twisted.python.filepath.FilePath}
+
+ """
+
+ def __init__(self, file_path):
+ self.file_path = file_path
+
+ def __str__(self):
+ return "File already exists: %s" % self.file_path
=== added file 'contrib/python-tx-tftp/tftp/netascii.py'
--- contrib/python-tx-tftp/tftp/netascii.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/netascii.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,136 @@
+'''
+@author: shylent
+'''
+# So basically, the idea is that in netascii a *newline* (whatever that is
+# on the current platform) is represented by a CR+LF sequence and a single CR
+# is represented by CR+NUL.
+
+from twisted.internet.defer import maybeDeferred, succeed
+import os
+import re
+
+__all__ = ['NetasciiSenderProxy', 'NetasciiReceiverProxy',
+ 'to_netascii', 'from_netascii']
+
+CR = '\x0d'
+LF = '\x0a'
+CRLF = CR + LF
+NUL = '\x00'
+CRNUL = CR + NUL
+
+NL = os.linesep
+
+
+re_from_netascii = re.compile('(\x0d\x0a|\x0d\x00)')
+
+def _convert_from_netascii(match_obj):
+ if match_obj.group(0) == CRLF:
+ return NL
+ elif match_obj.group(0) == CRNUL:
+ return CR
+
+def from_netascii(data):
+ """Convert a netascii-encoded string into a string with platform-specific
+ newlines.
+
+ """
+ return re_from_netascii.sub(_convert_from_netascii, data)
+
+# So that I can easily switch the NL around in tests
+_re_to_netascii = '(%s|\x0d)'
+re_to_netascii = re.compile(_re_to_netascii % NL)
+
+def _convert_to_netascii(match_obj):
+ if match_obj.group(0) == NL:
+ return CRLF
+ elif match_obj.group(0) == CR:
+ return CRNUL
+
+def to_netascii(data):
+ """Convert a string with platform-specific newlines into netascii."""
+ return re_to_netascii.sub(_convert_to_netascii, data)
+
+class NetasciiReceiverProxy(object):
+ """Proxies an object, that provides L{IWriter}. Incoming data is transformed
+ as follows:
+ - CR+LF is replaced with the platform-specific newline
+ - CR+NUL is replaced with CR
+
+ @param writer: an L{IWriter} object, that will be used to perform the actual writes
+ @type writer: L{IWriter} provider
+
+ """
+
+ def __init__(self, writer):
+ self.writer = writer
+ self._carry_cr = False
+
+ def write(self, data):
+ """Attempt a write, performing transformation as described in
+ L{NetasciiReceiverProxy}. May write 1 byte less, than provided, if the last
+ byte in the chunk is a CR.
+
+ @param data: data to be written
+ @type data: C{str}
+
+ @return: L{Deferred}, that will be fired when the write is complete
+ @rtype: L{Deferred}
+
+ """
+ if self._carry_cr:
+ data = CR + data
+ data = from_netascii(data)
+ if data.endswith(CR):
+ self._carry_cr = True
+ return maybeDeferred(self.writer.write, data[:-1])
+ else:
+ self._carry_cr = False
+ return maybeDeferred(self.writer.write, data)
+
+ def __getattr__(self, name):
+ return getattr(self.writer, name)
+
+
+class NetasciiSenderProxy(object):
+ """Proxies an object, that provides L{IReader}. The data that is read is
+ transformed as follows:
+ - platform-specific newlines are replaced with CR+LF
+ - freestanding CR are replaced with CR+NUL
+
+ @param reader: an L{IReader} object
+ @type reader: L{IReader} provider
+
+ """
+
+ def __init__(self, reader):
+ self.reader = reader
+ self.buffer = ''
+
+ def read(self, size):
+ """Attempt to read C{size} bytes, transforming them as described in
+ L{NetasciiSenderProxy}.
+
+ @param size: number of bytes to read
+ @type size: C{int}
+
+ @return: L{Deferred}, that will be fired with exactly C{size} bytes,
+ regardless of the transformation, that was performed if there is more data,
+ or less, than C{size} bytes if there is no more data to read.
+ @rtype: L{Deferred}
+
+ """
+ need_bytes = size - len(self.buffer)
+ if need_bytes <= 0:
+ data, self.buffer = self.buffer[:size], self.buffer[size:]
+ return succeed(data)
+ d = maybeDeferred(self.reader.read, need_bytes)
+ d.addCallback(self._gotDataFromReader, size)
+ return d
+
+ def _gotDataFromReader(self, data, size):
+ data = self.buffer + to_netascii(data)
+ data, self.buffer = data[:size], data[size:]
+ return data
+
+ def __getattr__(self, name):
+ return getattr(self.reader, name)
=== added file 'contrib/python-tx-tftp/tftp/protocol.py'
--- contrib/python-tx-tftp/tftp/protocol.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/protocol.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,81 @@
+'''
+@author: shylent
+'''
+from tftp.bootstrap import RemoteOriginWriteSession, RemoteOriginReadSession
+from tftp.datagram import (TFTPDatagramFactory, split_opcode, OP_WRQ,
+ ERRORDatagram, ERR_NOT_DEFINED, ERR_ACCESS_VIOLATION, ERR_FILE_EXISTS,
+ ERR_ILLEGAL_OP, OP_RRQ, ERR_FILE_NOT_FOUND)
+from tftp.errors import (FileExists, Unsupported, AccessViolation, BackendError,
+ FileNotFound)
+from tftp.netascii import NetasciiReceiverProxy, NetasciiSenderProxy
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.protocol import DatagramProtocol
+from twisted.python import log
+
+
+class TFTP(DatagramProtocol):
+ """TFTP dispatch protocol. Handles read requests (RRQ) and write requests (WRQ)
+ and starts the corresponding sessions.
+
+ @ivar backend: an L{IBackend} provider, that will handle interaction with
+ local resources
+ @type backend: L{IBackend} provider
+
+ """
+ def __init__(self, backend, _clock=None):
+ self.backend = backend
+ if _clock is None:
+ self._clock = reactor
+ else:
+ self._clock = _clock
+
+ def startProtocol(self):
+ addr = self.transport.getHost()
+ log.msg("TFTP Listener started at %s:%s" % (addr.host, addr.port))
+
+ def datagramReceived(self, datagram, addr):
+ datagram = TFTPDatagramFactory(*split_opcode(datagram))
+ log.msg("Datagram received from %s: %s" % (addr, datagram))
+
+ mode = datagram.mode.lower()
+ if datagram.mode not in ('netascii', 'octet'):
+ return self.transport.write(ERRORDatagram.from_code(ERR_ILLEGAL_OP,
+ "Unknown transfer mode %s, - expected "
+ "'netascii' or 'octet' (case-insensitive)" % mode).to_wire(), addr)
+
+ self._clock.callLater(0, self._startSession, datagram, addr, mode)
+
+ @inlineCallbacks
+ def _startSession(self, datagram, addr, mode):
+ try:
+ if datagram.opcode == OP_WRQ:
+ fs_interface = yield self.backend.get_writer(datagram.filename)
+ elif datagram.opcode == OP_RRQ:
+ fs_interface = yield self.backend.get_reader(datagram.filename)
+ except Unsupported, e:
+ self.transport.write(ERRORDatagram.from_code(ERR_ILLEGAL_OP,
+ str(e)).to_wire(), addr)
+ except AccessViolation:
+ self.transport.write(ERRORDatagram.from_code(ERR_ACCESS_VIOLATION).to_wire(), addr)
+ except FileExists:
+ self.transport.write(ERRORDatagram.from_code(ERR_FILE_EXISTS).to_wire(), addr)
+ except FileNotFound:
+ self.transport.write(ERRORDatagram.from_code(ERR_FILE_NOT_FOUND).to_wire(), addr)
+ except BackendError, e:
+ self.transport.write(ERRORDatagram.from_code(ERR_NOT_DEFINED, str(e)).to_wire(), addr)
+ else:
+ if datagram.opcode == OP_WRQ:
+ if mode == 'netascii':
+ fs_interface = NetasciiReceiverProxy(fs_interface)
+ session = RemoteOriginWriteSession(addr, fs_interface,
+ datagram.options, _clock=self._clock)
+ reactor.listenUDP(0, session)
+ returnValue(session)
+ elif datagram.opcode == OP_RRQ:
+ if mode == 'netascii':
+ fs_interface = NetasciiSenderProxy(fs_interface)
+ session = RemoteOriginReadSession(addr, fs_interface,
+ datagram.options, _clock=self._clock)
+ reactor.listenUDP(0, session)
+ returnValue(session)
=== added file 'contrib/python-tx-tftp/tftp/session.py'
--- contrib/python-tx-tftp/tftp/session.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/session.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,280 @@
+'''
+@author: shylent
+'''
+from tftp.datagram import (ACKDatagram, ERRORDatagram, OP_DATA, OP_ERROR, ERR_ILLEGAL_OP,
+ ERR_DISK_FULL, OP_ACK, DATADatagram, ERR_NOT_DEFINED,)
+from tftp.util import SequentialCall
+from twisted.internet import reactor
+from twisted.internet.defer import maybeDeferred
+from twisted.internet.protocol import DatagramProtocol
+from twisted.python import log
+
+MAX_BLOCK_SIZE = 1400
+
+
+class WriteSession(DatagramProtocol):
+ """Represents a transfer, during which we write to a local file. If we are a
+ server, this means, that we received a WRQ (write request). If we are a client,
+ this means, that we have requested a read from a remote server.
+
+ @cvar block_size: Expected block size. If a data chunk is received and its length
+ is less, than C{block_size}, it is assumed that that data chunk is the last in the
+ transfer. Default: 512 (as per U{RFC1350<http://tools.ietf.org/html/rfc1350>})
+ @type block_size: C{int}.
+
+ @cvar timeout: An iterable, that yields timeout values for every subsequent
+ ACKDatagram, that we've sent, that is not followed by the next data chunk.
+ When (if) the iterable is exhausted, the transfer is considered failed.
+ @type timeout: any iterable
+
+ @ivar started: whether or not this protocol has started
+ @type started: C{bool}
+
+ """
+
+ block_size = 512
+ timeout = (1, 3, 7)
+ tsize = None
+
+ def __init__(self, writer, _clock=None):
+ self.writer = writer
+ self.blocknum = 0
+ self.completed = False
+ self.started = False
+ self.timeout_watchdog = None
+ if _clock is None:
+ self._clock = reactor
+ else:
+ self._clock = _clock
+
+ def cancel(self):
+ """Cancel this session, discard any data, that was collected
+ and give up the connector.
+
+ """
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ self.writer.cancel()
+ self.transport.stopListening()
+
+ def startProtocol(self):
+ self.started = True
+
+ def connectionRefused(self):
+ if not self.completed:
+ self.writer.cancel()
+ self.transport.stopListening()
+
+ def datagramReceived(self, datagram):
+ if datagram.opcode == OP_DATA:
+ return self.tftp_DATA(datagram)
+ elif datagram.opcode == OP_ERROR:
+ log.msg("Got error: " % datagram)
+ self.cancel()
+
+ def tftp_DATA(self, datagram):
+ """Handle incoming DATA TFTP datagram
+
+ @type datagram: L{DATADatagram}
+
+ """
+ next_blocknum = self.blocknum + 1
+ if datagram.blocknum < next_blocknum:
+ self.transport.write(ACKDatagram(datagram.blocknum).to_wire())
+ elif datagram.blocknum == next_blocknum:
+ if self.completed:
+ self.transport.write(ERRORDatagram.from_code(
+ ERR_ILLEGAL_OP, "Transfer already finished").to_wire())
+ else:
+ return self.nextBlock(datagram)
+ else:
+ self.transport.write(ERRORDatagram.from_code(
+ ERR_ILLEGAL_OP, "Block number mismatch").to_wire())
+
+ def nextBlock(self, datagram):
+ """Handle fresh data, attempt to write it to backend
+
+ @type datagram: L{DATADatagram}
+
+ """
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ self.blocknum += 1
+ d = maybeDeferred(self.writer.write, datagram.data)
+ d.addCallbacks(callback=self.blockWriteSuccess, callbackArgs=[datagram, ],
+ errback=self.blockWriteFailure)
+ return d
+
+ def blockWriteSuccess(self, ign, datagram):
+ """The write was successful, respond with ACK for current block number
+
+ If this is the last chunk (received data length < block size), the protocol
+ will keep running until the end of current timeout period, so we can respond
+ to any duplicates.
+
+ @type datagram: L{DATADatagram}
+
+ """
+ bytes = ACKDatagram(datagram.blocknum).to_wire()
+ self.timeout_watchdog = SequentialCall.run(self.timeout[:-1],
+ callable=self.sendData, callable_args=[bytes, ],
+ on_timeout=lambda: self._clock.callLater(self.timeout[-1], self.timedOut),
+ run_now=True,
+ _clock=self._clock
+ )
+ if len(datagram.data) < self.block_size:
+ self.completed = True
+ self.writer.finish()
+ # TODO: If self.tsize is not None, compare it with the actual
+ # count of bytes written. Log if there's a mismatch. Should it
+ # also emit an error datagram?
+
+ def blockWriteFailure(self, failure):
+ """Write failed"""
+ log.err(failure)
+ self.transport.write(ERRORDatagram.from_code(ERR_DISK_FULL).to_wire())
+ self.cancel()
+
+ def timedOut(self):
+ """Called when the protocol has timed out. Let the backend know, if the
+ the transfer was successful.
+
+ """
+ if not self.completed:
+ log.msg("Timed out while waiting for next block")
+ self.writer.cancel()
+ else:
+ log.msg("Timed out after a successful transfer")
+ self.transport.stopListening()
+
+ def sendData(self, bytes):
+ """Send data to the remote peer
+
+ @param bytes: bytes to send
+ @type bytes: C{str}
+
+ """
+ self.transport.write(bytes)
+
+
+class ReadSession(DatagramProtocol):
+ """Represents a transfer, during which we read from a local file
+ (and write to the network). If we are a server, this means, that we've received
+ a RRQ (read request). If we are a client, this means that we've requested to
+ write to a remote server.
+
+ @cvar block_size: The data will be sent in chunks of this size. If we send
+ a chunk with the size < C{block_size}, the transfer will end.
+ Default: 512 (as per U{RFC1350<http://tools.ietf.org/html/rfc1350>})
+ @type block_size: C{int}
+
+ @cvar timeout: An iterable, that yields timeout values for every subsequent
+ unacknowledged DATADatagram, that we've sent. When (if) the iterable is exhausted,
+ the transfer is considered failed.
+ @type timeout: any iterable
+
+ @ivar started: whether or not this protocol has started
+ @type started: C{bool}
+
+ """
+ block_size = 512
+ timeout = (1, 3, 7)
+
+ def __init__(self, reader, _clock=None):
+ self.reader = reader
+ self.blocknum = 0
+ self.started = False
+ self.completed = False
+ self.timeout_watchdog = None
+ if _clock is None:
+ self._clock = reactor
+ else:
+ self._clock = _clock
+
+ def cancel(self):
+ """Tell the reader to give up the resources. Stop the timeout cycle
+ and disconnect the transport.
+
+ """
+ self.reader.finish()
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ self.transport.stopListening()
+
+ def startProtocol(self):
+ self.started = True
+
+ def connectionRefused(self):
+ self.finish()
+
+ def datagramReceived(self, datagram):
+ if datagram.opcode == OP_ACK:
+ return self.tftp_ACK(datagram)
+ elif datagram.opcode == OP_ERROR:
+ log.msg("Got error: " % datagram)
+ self.cancel()
+
+ def tftp_ACK(self, datagram):
+ """Handle the incoming ACK TFTP datagram.
+
+ @type datagram: L{ACKDatagram}
+
+ """
+ if datagram.blocknum < self.blocknum:
+ log.msg("Duplicate ACK for blocknum %s" % datagram.blocknum)
+ elif datagram.blocknum == self.blocknum:
+ if self.timeout_watchdog is not None and self.timeout_watchdog.active():
+ self.timeout_watchdog.cancel()
+ if self.completed:
+ log.msg("Final ACK received, transfer successful")
+ self.cancel()
+ else:
+ return self.nextBlock()
+ else:
+ self.transport.write(ERRORDatagram.from_code(
+ ERR_ILLEGAL_OP, "Block number mismatch").to_wire())
+
+ def nextBlock(self):
+ """ACK datagram for the previous block has been received. Attempt to read
+ the next block, that will be sent.
+
+ """
+ self.blocknum += 1
+ d = maybeDeferred(self.reader.read, self.block_size)
+ d.addCallbacks(callback=self.dataFromReader, errback=self.readFailed)
+ return d
+
+ def dataFromReader(self, data):
+ """Got data from the reader. Send it to the network and start the timeout
+ cycle.
+
+ """
+ if len(data) < self.block_size:
+ self.completed = True
+ bytes = DATADatagram(self.blocknum, data).to_wire()
+ self.timeout_watchdog = SequentialCall.run(self.timeout[:-1],
+ callable=self.sendData, callable_args=[bytes, ],
+ on_timeout=lambda: self._clock.callLater(self.timeout[-1], self.timedOut),
+ run_now=True,
+ _clock=self._clock
+ )
+
+ def readFailed(self, fail):
+ """The reader reported an error. Notify the remote end and cancel the transfer"""
+ log.err(fail)
+ self.transport.write(ERRORDatagram.from_code(ERR_NOT_DEFINED, "Read failed").to_wire())
+ self.cancel()
+
+ def timedOut(self):
+ """Timeout iterable has been exhausted. End the transfer"""
+ log.msg("Session timed out, last wait was %s seconds long" % self.timeout[-1])
+ self.cancel()
+
+ def sendData(self, bytes):
+ """Send data to the remote peer
+
+ @param bytes: bytes to send
+ @type bytes: C{str}
+
+ """
+ self.transport.write(bytes)
=== added directory 'contrib/python-tx-tftp/tftp/test'
=== added file 'contrib/python-tx-tftp/tftp/test/__init__.py'
=== added file 'contrib/python-tx-tftp/tftp/test/test_backend.py'
--- contrib/python-tx-tftp/tftp/test/test_backend.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/test/test_backend.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,134 @@
+'''
+@author: shylent
+'''
+from tftp.backend import (FilesystemSynchronousBackend, FilesystemReader,
+ FilesystemWriter)
+from tftp.errors import Unsupported, AccessViolation, FileNotFound, FileExists
+from twisted.python.filepath import FilePath
+from twisted.trial import unittest
+import os.path
+import shutil
+import tempfile
+
+
+class BackendSelection(unittest.TestCase):
+ test_data = """line1
+line2
+line3
+"""
+
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp()
+ self.existing_file_name = os.path.join(self.temp_dir, 'foo')
+ with open(self.existing_file_name, 'w') as f:
+ f.write(self.test_data)
+
+ def test_unsupported(self):
+ b = FilesystemSynchronousBackend(self.temp_dir, can_read=False)
+ self.assertRaises(Unsupported, b.get_reader, 'foo')
+ self.assert_(b.get_writer('bar'),
+ "A writer should be dispatched")
+ b = FilesystemSynchronousBackend(self.temp_dir, can_write=False)
+ self.assertRaises(Unsupported, b.get_writer, 'bar')
+ self.assert_(b.get_reader('foo'),
+ "A reader should be dispatched")
+
+ def test_insecure(self):
+ b = FilesystemSynchronousBackend(self.temp_dir)
+ self.assertRaises(AccessViolation, b.get_reader, '../foo')
+ b = FilesystemSynchronousBackend(self.temp_dir)
+ self.assertRaises(AccessViolation, b.get_writer, '../foo')
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir)
+
+
+class Reader(unittest.TestCase):
+ test_data = """line1
+line2
+line3
+"""
+
+ def setUp(self):
+ self.temp_dir = FilePath(tempfile.mkdtemp())
+ self.existing_file_name = self.temp_dir.child('foo')
+ with self.existing_file_name.open('w') as f:
+ f.write(self.test_data)
+
+ def test_file_not_found(self):
+ self.assertRaises(FileNotFound, FilesystemReader, self.temp_dir.child('bar'))
+
+ def test_read_existing_file(self):
+ r = FilesystemReader(self.temp_dir.child('foo'))
+ data = r.read(3)
+ ostring = data
+ while data:
+ data = r.read(3)
+ ostring += data
+ self.assertEqual(r.read(3), '')
+ self.assertEqual(r.read(5), '')
+ self.assertEqual(r.read(7), '')
+ self.failUnless(r.file_obj.closed,
+ "The file has been exhausted and should be in the closed state")
+ self.assertEqual(ostring, self.test_data)
+
+ def test_size(self):
+ r = FilesystemReader(self.temp_dir.child('foo'))
+ self.assertEqual(len(self.test_data), r.size)
+
+ def test_size_when_reader_finished(self):
+ r = FilesystemReader(self.temp_dir.child('foo'))
+ r.finish()
+ self.assertIsNone(r.size)
+
+ def test_size_when_file_removed(self):
+ # FilesystemReader.size uses fstat() to discover the file's size, so
+ # the absence of the file does not matter.
+ r = FilesystemReader(self.temp_dir.child('foo'))
+ self.existing_file_name.remove()
+ self.assertEqual(len(self.test_data), r.size)
+
+ def test_cancel(self):
+ r = FilesystemReader(self.temp_dir.child('foo'))
+ r.read(3)
+ r.finish()
+ self.failUnless(r.file_obj.closed,
+ "The session has been finished, so the file object should be in the closed state")
+ r.finish()
+
+ def tearDown(self):
+ self.temp_dir.remove()
+
+
+class Writer(unittest.TestCase):
+ test_data = """line1
+line2
+line3
+"""
+
+ def setUp(self):
+ self.temp_dir = FilePath(tempfile.mkdtemp())
+ self.existing_file_name = self.temp_dir.child('foo')
+ with self.existing_file_name.open('w') as f:
+ f.write(self.test_data)
+
+ def test_write_existing_file(self):
+ self.assertRaises(FileExists, FilesystemWriter, self.temp_dir.child('foo'))
+
+ def test_finished_write(self):
+ w = FilesystemWriter(self.temp_dir.child('bar'))
+ w.write(self.test_data)
+ w.finish()
+ with self.temp_dir.child('bar').open() as f:
+ self.assertEqual(f.read(), self.test_data)
+
+ def test_cancelled_write(self):
+ w = FilesystemWriter(self.temp_dir.child('bar'))
+ w.write(self.test_data)
+ w.cancel()
+ self.failIf(self.temp_dir.child('bar').exists(),
+ "If a write is cancelled, the file should not be left behind")
+
+ def tearDown(self):
+ self.temp_dir.remove()
=== added file 'contrib/python-tx-tftp/tftp/test/test_bootstrap.py'
--- contrib/python-tx-tftp/tftp/test/test_bootstrap.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/test/test_bootstrap.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,629 @@
+'''
+@author: shylent
+'''
+from tftp.bootstrap import (LocalOriginWriteSession, LocalOriginReadSession,
+ RemoteOriginReadSession, RemoteOriginWriteSession, TFTPBootstrap)
+from tftp.datagram import (ACKDatagram, TFTPDatagramFactory, split_opcode,
+ ERR_TID_UNKNOWN, DATADatagram, OACKDatagram)
+from tftp.test.test_sessions import DelayedWriter, FakeTransport, DelayedReader
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.task import Clock
+from twisted.python.filepath import FilePath
+from twisted.python.util import OrderedDict
+from twisted.trial import unittest
+import shutil
+import tempfile
+from tftp.session import MAX_BLOCK_SIZE, WriteSession, ReadSession
+
+ReadSession.timeout = (2, 2, 2)
+WriteSession.timeout = (2, 2, 2)
+RemoteOriginReadSession.timeout = (2, 2, 2)
+RemoteOriginWriteSession.timeout = (2, 2, 2)
+
+class MockHandshakeWatchdog(object):
+
+ def __init__(self, when, f, args=None, kwargs=None, _clock=None):
+ self._clock = _clock
+ self.when = when
+ self.f = f
+ self.args = args or []
+ self.kwargs = kwargs or {}
+ if _clock is None:
+ self._clock = reactor
+ else:
+ self._clock = _clock
+
+ def start(self):
+ self.wd = self._clock.callLater(self.when, self.f, *self.args, **self.kwargs)
+
+ def cancel(self):
+ if self.wd.active():
+ self.wd.cancel()
+
+ def active(self):
+ return self.wd.active()
+
+class MockSession(object):
+ block_size = 512
+ timeout = (1, 3, 5)
+ tsize = None
+
+# Testing implementation here, but if I don't, I'll have a TON of duplicate code
+class TestOptionProcessing(unittest.TestCase):
+
+ def setUp(self):
+ self.proto = TFTPBootstrap(('127.0.0.1', 1111), None)
+
+ def test_empty_options(self):
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict())
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.block_size, 512)
+ self.assertEqual(self.s.timeout, (1, 3, 5))
+
+ def test_blksize(self):
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'blksize':'8'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.block_size, 8)
+ self.assertEqual(opts, OrderedDict({'blksize':'8'}))
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'blksize':'foo'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.block_size, 512)
+ self.assertEqual(opts, OrderedDict())
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'blksize':'65464'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.block_size, MAX_BLOCK_SIZE)
+ self.assertEqual(opts, OrderedDict({'blksize':str(MAX_BLOCK_SIZE)}))
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'blksize':'65465'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.block_size, 512)
+ self.assertEqual(opts, OrderedDict())
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'blksize':'7'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.block_size, 512)
+ self.assertEqual(opts, OrderedDict())
+
+ def test_timeout(self):
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'timeout':'1'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (1, 1, 1))
+ self.assertEqual(opts, OrderedDict({'timeout':'1'}))
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'timeout':'foo'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (1, 3, 5))
+ self.assertEqual(opts, OrderedDict())
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'timeout':'0'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (1, 3, 5))
+ self.assertEqual(opts, OrderedDict())
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'timeout':'255'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (255, 255, 255))
+ self.assertEqual(opts, OrderedDict({'timeout':'255'}))
+
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'timeout':'256'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (1, 3, 5))
+ self.assertEqual(opts, OrderedDict())
+
+ def test_tsize(self):
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'tsize':'1'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.tsize, 1)
+ self.assertEqual(opts, OrderedDict({'tsize':'1'}))
+
+ def test_tsize_ignored_when_not_a_number(self):
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'tsize':'foo'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertIsNone(self.s.tsize)
+ self.assertEqual(opts, OrderedDict({}))
+
+ def test_tsize_ignored_when_less_than_zero(self):
+ self.s = MockSession()
+ opts = self.proto.processOptions(OrderedDict({'tsize':'-1'}))
+ self.proto.applyOptions(self.s, opts)
+ self.assertIsNone(self.s.tsize)
+ self.assertEqual(opts, OrderedDict({}))
+
+ def test_multiple_options(self):
+ got_options = OrderedDict()
+ got_options['timeout'] = '123'
+ got_options['blksize'] = '1024'
+ self.s = MockSession()
+ opts = self.proto.processOptions(got_options)
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (123, 123, 123))
+ self.assertEqual(self.s.block_size, 1024)
+ self.assertEqual(opts.items(), got_options.items())
+
+ got_options = OrderedDict()
+ got_options['blksize'] = '1024'
+ got_options['timeout'] = '123'
+ self.s = MockSession()
+ opts = self.proto.processOptions(got_options)
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (123, 123, 123))
+ self.assertEqual(self.s.block_size, 1024)
+ self.assertEqual(opts.items(), got_options.items())
+
+ got_options = OrderedDict()
+ got_options['blksize'] = '1024'
+ got_options['foobar'] = 'barbaz'
+ got_options['timeout'] = '123'
+ self.s = MockSession()
+ opts = self.proto.processOptions(got_options)
+ self.proto.applyOptions(self.s, opts)
+ self.assertEqual(self.s.timeout, (123, 123, 123))
+ self.assertEqual(self.s.block_size, 1024)
+ actual_options = OrderedDict()
+ actual_options['blksize'] = '1024'
+ actual_options['timeout'] = '123'
+ self.assertEqual(opts.items(), actual_options.items())
+
+
+class BootstrapLocalOriginWrite(unittest.TestCase):
+
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ self.writer = DelayedWriter(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.ws = LocalOriginWriteSession(('127.0.0.1', 65465), self.writer, _clock=self.clock)
+ self.wd = MockHandshakeWatchdog(4, self.ws.timedOut, _clock=self.clock)
+ self.ws.timeout_watchdog = self.wd
+ self.ws.transport = self.transport
+
+ def test_invalid_tid(self):
+ self.ws.startProtocol()
+ bad_tid_dgram = ACKDatagram(123)
+ self.ws.datagramReceived(bad_tid_dgram.to_wire(), ('127.0.0.1', 1111))
+
+ err_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(err_dgram.errorcode, ERR_TID_UNKNOWN)
+ self.addCleanup(self.ws.cancel)
+ #test_invalid_tid.skip = 'Will go to another test case'
+
+ def test_local_origin_write_session_handshake_timeout(self):
+ self.ws.startProtocol()
+ self.clock.advance(5)
+ self.failIf(self.transport.value())
+ self.failUnless(self.transport.disconnecting)
+
+ def test_local_origin_write_session_handshake_success(self):
+ self.ws.session.block_size = 6
+ self.ws.startProtocol()
+ self.clock.advance(1)
+ data_datagram = DATADatagram(1, 'foobar')
+ self.ws.datagramReceived(data_datagram.to_wire(), ('127.0.0.1', 65465))
+ self.clock.pump((1,)*3)
+ self.assertEqual(self.transport.value(), ACKDatagram(1).to_wire())
+ self.failIf(self.transport.disconnecting)
+ self.failIf(self.wd.active())
+ self.addCleanup(self.ws.cancel)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+class LocalOriginWriteOptionNegotiation(unittest.TestCase):
+
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ self.writer = DelayedWriter(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.ws = LocalOriginWriteSession(('127.0.0.1', 65465), self.writer,
+ options={'blksize':'123'}, _clock=self.clock)
+ self.wd = MockHandshakeWatchdog(4, self.ws.timedOut, _clock=self.clock)
+ self.ws.timeout_watchdog = self.wd
+ self.ws.transport = self.transport
+
+
+ def test_option_normal(self):
+ self.ws.startProtocol()
+ self.ws.datagramReceived(OACKDatagram({'blksize':'12'}).to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(0.1)
+ self.assertEqual(self.ws.session.block_size, WriteSession.block_size)
+ self.assertEqual(self.transport.value(), ACKDatagram(0).to_wire())
+
+ self.transport.clear()
+ self.ws.datagramReceived(OACKDatagram({'blksize':'9'}).to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(0.1)
+ self.assertEqual(self.ws.session.block_size, WriteSession.block_size)
+ self.assertEqual(self.transport.value(), ACKDatagram(0).to_wire())
+
+ self.transport.clear()
+ self.ws.datagramReceived(DATADatagram(1, 'foobarbaz').to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(3)
+ self.failUnless(self.ws.session.started)
+ self.clock.advance(0.1)
+ self.assertEqual(self.ws.session.block_size, 9)
+ self.assertEqual(self.transport.value(), ACKDatagram(1).to_wire())
+
+ self.transport.clear()
+ self.ws.datagramReceived(DATADatagram(2, 'asdfghjkl').to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(3)
+ self.assertEqual(self.transport.value(), ACKDatagram(2).to_wire())
+ self.writer.finish()
+ self.assertEqual(self.writer.file_path.open('r').read(), 'foobarbazasdfghjkl')
+
+ self.transport.clear()
+ self.ws.datagramReceived(OACKDatagram({'blksize':'12'}).to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(0.1)
+ self.assertEqual(self.ws.session.block_size, 9)
+ self.assertEqual(self.transport.value(), ACKDatagram(0).to_wire())
+
+ def test_option_timeout(self):
+ self.ws.startProtocol()
+ self.clock.advance(5)
+ self.failUnless(self.transport.disconnecting)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+class BootstrapRemoteOriginWrite(unittest.TestCase):
+
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ self.writer = DelayedWriter(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.ws = RemoteOriginWriteSession(('127.0.0.1', 65465), self.writer, _clock=self.clock)
+ self.ws.transport = self.transport
+ self.ws.startProtocol()
+
+ @inlineCallbacks
+ def test_invalid_tid(self):
+ bad_tid_dgram = ACKDatagram(123)
+ yield self.ws.datagramReceived(bad_tid_dgram.to_wire(), ('127.0.0.1', 1111))
+ err_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(err_dgram.errorcode, ERR_TID_UNKNOWN)
+ self.addCleanup(self.ws.cancel)
+
+ def test_remote_origin_write_bootstrap(self):
+ # Initial ACK
+ ack_datagram_0 = ACKDatagram(0)
+ self.clock.advance(0.1)
+ self.assertEqual(self.transport.value(), ack_datagram_0.to_wire())
+ self.failIf(self.transport.disconnecting)
+
+ # Normal exchange
+ self.transport.clear()
+ d = self.ws.datagramReceived(DATADatagram(1, 'foobar').to_wire(), ('127.0.0.1', 65465))
+ def cb(res):
+ self.clock.advance(0.1)
+ ack_datagram_1 = ACKDatagram(1)
+ self.assertEqual(self.transport.value(), ack_datagram_1.to_wire())
+ self.assertEqual(self.target.open('r').read(), 'foobar')
+ self.failIf(self.transport.disconnecting)
+ self.addCleanup(self.ws.cancel)
+ d.addCallback(cb)
+ self.clock.advance(3)
+ return d
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+
+class RemoteOriginWriteOptionNegotiation(unittest.TestCase):
+
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ self.writer = DelayedWriter(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.options = OrderedDict()
+ self.options['blksize'] = '9'
+ self.options['tsize'] = '45'
+ self.ws = RemoteOriginWriteSession(
+ ('127.0.0.1', 65465), self.writer, options=self.options,
+ _clock=self.clock)
+ self.ws.transport = self.transport
+
+ def test_option_normal(self):
+ self.ws.startProtocol()
+ self.clock.advance(0.1)
+ oack_datagram = OACKDatagram(self.options).to_wire()
+ self.assertEqual(self.transport.value(), oack_datagram)
+ self.clock.advance(3)
+ self.assertEqual(self.transport.value(), oack_datagram * 2)
+
+ self.transport.clear()
+ self.ws.datagramReceived(DATADatagram(1, 'foobarbaz').to_wire(), ('127.0.0.1', 65465))
+ self.clock.pump((1,)*3)
+ self.assertEqual(self.transport.value(), ACKDatagram(1).to_wire())
+ self.assertEqual(self.ws.session.block_size, 9)
+
+ self.transport.clear()
+ self.ws.datagramReceived(DATADatagram(2, 'smthng').to_wire(), ('127.0.0.1', 65465))
+ self.clock.pump((1,)*3)
+ self.assertEqual(self.transport.value(), ACKDatagram(2).to_wire())
+ self.clock.pump((1,)*10)
+ self.writer.finish()
+ self.assertEqual(self.writer.file_path.open('r').read(), 'foobarbazsmthng')
+ self.failUnless(self.transport.disconnecting)
+
+ def test_option_timeout(self):
+ self.ws.startProtocol()
+ self.clock.advance(0.1)
+ oack_datagram = OACKDatagram(self.options).to_wire()
+ self.assertEqual(self.transport.value(), oack_datagram)
+ self.failIf(self.transport.disconnecting)
+
+ self.clock.advance(3)
+ self.assertEqual(self.transport.value(), oack_datagram * 2)
+ self.failIf(self.transport.disconnecting)
+
+ self.clock.advance(2)
+ self.assertEqual(self.transport.value(), oack_datagram * 3)
+ self.failIf(self.transport.disconnecting)
+
+ self.clock.advance(2)
+ self.assertEqual(self.transport.value(), oack_datagram * 3)
+ self.failUnless(self.transport.disconnecting)
+
+ def test_option_tsize(self):
+ # A tsize option sent as part of a write session is recorded.
+ self.ws.startProtocol()
+ self.clock.advance(0.1)
+ oack_datagram = OACKDatagram(self.options).to_wire()
+ self.assertEqual(self.transport.value(), oack_datagram)
+ self.failIf(self.transport.disconnecting)
+ self.assertIsInstance(self.ws.session, WriteSession)
+ # Options are not applied to the WriteSession until the first DATA
+ # datagram is received,
+ self.assertIsNone(self.ws.session.tsize)
+ self.ws.datagramReceived(
+ DATADatagram(1, 'foobarbaz').to_wire(), ('127.0.0.1', 65465))
+ # The tsize option has been applied to the WriteSession.
+ self.assertEqual(45, self.ws.session.tsize)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+
+class BootstrapLocalOriginRead(unittest.TestCase):
+ test_data = """line1
+line2
+anotherline"""
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ with self.target.open('wb') as temp_fd:
+ temp_fd.write(self.test_data)
+ self.reader = DelayedReader(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.rs = LocalOriginReadSession(('127.0.0.1', 65465), self.reader, _clock=self.clock)
+ self.wd = MockHandshakeWatchdog(4, self.rs.timedOut, _clock=self.clock)
+ self.rs.timeout_watchdog = self.wd
+ self.rs.transport = self.transport
+ self.rs.startProtocol()
+
+ def test_invalid_tid(self):
+ data_datagram = DATADatagram(1, 'foobar')
+ self.rs.datagramReceived(data_datagram, ('127.0.0.1', 11111))
+ self.clock.advance(0.1)
+ err_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(err_dgram.errorcode, ERR_TID_UNKNOWN)
+ self.addCleanup(self.rs.cancel)
+
+ def test_local_origin_read_session_handshake_timeout(self):
+ self.clock.advance(5)
+ self.failIf(self.transport.value())
+ self.failUnless(self.transport.disconnecting)
+
+ def test_local_origin_read_session_handshake_success(self):
+ self.clock.advance(1)
+ ack_datagram = ACKDatagram(0)
+ self.rs.datagramReceived(ack_datagram.to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(2)
+ self.failUnless(self.transport.value())
+ self.failIf(self.transport.disconnecting)
+ self.failIf(self.wd.active())
+ self.addCleanup(self.rs.cancel)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+
+class LocalOriginReadOptionNegotiation(unittest.TestCase):
+ test_data = """line1
+line2
+anotherline"""
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ with self.target.open('wb') as temp_fd:
+ temp_fd.write(self.test_data)
+ self.reader = DelayedReader(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.rs = LocalOriginReadSession(('127.0.0.1', 65465), self.reader, _clock=self.clock)
+ self.wd = MockHandshakeWatchdog(4, self.rs.timedOut, _clock=self.clock)
+ self.rs.timeout_watchdog = self.wd
+ self.rs.transport = self.transport
+
+ def test_option_normal(self):
+ self.rs.startProtocol()
+ self.rs.datagramReceived(OACKDatagram({'blksize':'9'}).to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(0.1)
+ self.assertEqual(self.rs.session.block_size, 9)
+ self.clock.pump((1,)*3)
+ self.assertEqual(self.transport.value(), DATADatagram(1, self.test_data[:9]).to_wire())
+
+ self.rs.datagramReceived(OACKDatagram({'blksize':'12'}).to_wire(), ('127.0.0.1', 65465))
+ self.clock.advance(0.1)
+ self.assertEqual(self.rs.session.block_size, 9)
+
+ self.transport.clear()
+ self.rs.datagramReceived(ACKDatagram(1).to_wire(), ('127.0.0.1', 65465))
+ self.clock.pump((1,)*3)
+ self.assertEqual(self.transport.value(), DATADatagram(2, self.test_data[9:18]).to_wire())
+
+ self.addCleanup(self.rs.cancel)
+
+ def test_local_origin_read_option_timeout(self):
+ self.rs.startProtocol()
+ self.clock.advance(5)
+ self.failUnless(self.transport.disconnecting)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+
+class BootstrapRemoteOriginRead(unittest.TestCase):
+ test_data = """line1
+line2
+anotherline"""
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ with self.target.open('wb') as temp_fd:
+ temp_fd.write(self.test_data)
+ self.reader = DelayedReader(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.rs = RemoteOriginReadSession(('127.0.0.1', 65465), self.reader, _clock=self.clock)
+ self.rs.transport = self.transport
+
+ @inlineCallbacks
+ def test_invalid_tid(self):
+ self.rs.startProtocol()
+ data_datagram = DATADatagram(1, 'foobar')
+ yield self.rs.datagramReceived(data_datagram, ('127.0.0.1', 11111))
+ err_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(err_dgram.errorcode, ERR_TID_UNKNOWN)
+ self.addCleanup(self.rs.cancel)
+
+ def test_remote_origin_read_bootstrap(self):
+ # First datagram
+ self.rs.session.block_size = 5
+ self.rs.startProtocol()
+ self.clock.pump((1,)*3)
+
+ data_datagram_1 = DATADatagram(1, self.test_data[:5])
+
+ self.assertEqual(self.transport.value(), data_datagram_1.to_wire())
+ self.failIf(self.transport.disconnecting)
+
+ # Normal exchange continues
+ self.transport.clear()
+ self.rs.datagramReceived(ACKDatagram(1).to_wire(), ('127.0.0.1', 65465))
+ self.clock.pump((1,)*3)
+ data_datagram_2 = DATADatagram(2, self.test_data[5:10])
+ self.assertEqual(self.transport.value(), data_datagram_2.to_wire())
+ self.failIf(self.transport.disconnecting)
+ self.addCleanup(self.rs.cancel)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+
+class RemoteOriginReadOptionNegotiation(unittest.TestCase):
+ test_data = """line1
+line2
+anotherline"""
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ with self.target.open('wb') as temp_fd:
+ temp_fd.write(self.test_data)
+ self.reader = DelayedReader(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.options = OrderedDict()
+ self.options['blksize'] = '9'
+ self.options['tsize'] = '34'
+ self.rs = RemoteOriginReadSession(('127.0.0.1', 65465), self.reader,
+ options=self.options, _clock=self.clock)
+ self.rs.transport = self.transport
+
+ def test_option_normal(self):
+ self.rs.startProtocol()
+ self.clock.advance(0.1)
+ oack_datagram = OACKDatagram(self.options).to_wire()
+ self.assertEqual(self.transport.value(), oack_datagram)
+ self.clock.advance(3)
+ self.assertEqual(self.transport.value(), oack_datagram * 2)
+
+ self.transport.clear()
+ self.rs.datagramReceived(ACKDatagram(0).to_wire(), ('127.0.0.1', 65465))
+ self.clock.pump((1,)*3)
+ self.assertEqual(self.transport.value(), DATADatagram(1, self.test_data[:9]).to_wire())
+
+ self.addCleanup(self.rs.cancel)
+
+ def test_option_timeout(self):
+ self.rs.startProtocol()
+ self.clock.advance(0.1)
+ oack_datagram = OACKDatagram(self.options).to_wire()
+ self.assertEqual(self.transport.value(), oack_datagram)
+ self.failIf(self.transport.disconnecting)
+
+ self.clock.advance(3)
+ self.assertEqual(self.transport.value(), oack_datagram * 2)
+ self.failIf(self.transport.disconnecting)
+
+ self.clock.advance(2)
+ self.assertEqual(self.transport.value(), oack_datagram * 3)
+ self.failIf(self.transport.disconnecting)
+
+ self.clock.advance(2)
+ self.assertEqual(self.transport.value(), oack_datagram * 3)
+ self.failUnless(self.transport.disconnecting)
+
+ def test_option_tsize(self):
+ # A tsize option of 0 sent as part of a read session prompts a tsize
+ # response with the actual size of the file.
+ self.options['tsize'] = '0'
+ self.rs.startProtocol()
+ self.clock.advance(0.1)
+ self.transport.clear()
+ self.clock.advance(3)
+ # The response contains the size of the test data.
+ self.options['tsize'] = str(len(self.test_data))
+ oack_datagram = OACKDatagram(self.options).to_wire()
+ self.assertEqual(self.transport.value(), oack_datagram)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
=== added file 'contrib/python-tx-tftp/tftp/test/test_netascii.py'
--- contrib/python-tx-tftp/tftp/test/test_netascii.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/test/test_netascii.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,195 @@
+'''
+@author: shylent
+'''
+from cStringIO import StringIO
+from tftp.netascii import (from_netascii, to_netascii, NetasciiReceiverProxy,
+ NetasciiSenderProxy)
+from twisted.internet.defer import inlineCallbacks
+from twisted.trial import unittest
+import re
+import tftp
+
+
+class FromNetascii(unittest.TestCase):
+
+ def setUp(self):
+ self._orig_nl = tftp.netascii.NL
+
+ def test_lf_newline(self):
+ tftp.netascii.NL = '\x0a'
+ self.assertEqual(from_netascii('\x0d\x00'), '\x0d')
+ self.assertEqual(from_netascii('\x0d\x0a'), '\x0a')
+ self.assertEqual(from_netascii('foo\x0d\x0a\x0abar'), 'foo\x0a\x0abar')
+ self.assertEqual(from_netascii('foo\x0d\x0a\x0abar'), 'foo\x0a\x0abar')
+ # freestanding CR should not occur, but handle it anyway
+ self.assertEqual(from_netascii('foo\x0d\x0a\x0dbar'), 'foo\x0a\x0dbar')
+
+ def test_cr_newline(self):
+ tftp.netascii.NL = '\x0d'
+ self.assertEqual(from_netascii('\x0d\x00'), '\x0d')
+ self.assertEqual(from_netascii('\x0d\x0a'), '\x0d')
+ self.assertEqual(from_netascii('foo\x0d\x0a\x0abar'), 'foo\x0d\x0abar')
+ self.assertEqual(from_netascii('foo\x0d\x0a\x00bar'), 'foo\x0d\x00bar')
+ self.assertEqual(from_netascii('foo\x0d\x00\x0abar'), 'foo\x0d\x0abar')
+
+ def test_crlf_newline(self):
+ tftp.netascii.NL = '\x0d\x0a'
+ self.assertEqual(from_netascii('\x0d\x00'), '\x0d')
+ self.assertEqual(from_netascii('\x0d\x0a'), '\x0d\x0a')
+ self.assertEqual(from_netascii('foo\x0d\x00\x0abar'), 'foo\x0d\x0abar')
+
+ def tearDown(self):
+ tftp.netascii.NL = self._orig_nl
+
+
+class ToNetascii(unittest.TestCase):
+
+ def setUp(self):
+ self._orig_nl = tftp.netascii.NL
+ self._orig_nl_regex = tftp.netascii.re_to_netascii
+
+ def test_lf_newline(self):
+ tftp.netascii.NL = '\x0a'
+ tftp.netascii.re_to_netascii = re.compile(tftp.netascii._re_to_netascii %
+ tftp.netascii.NL)
+ self.assertEqual(to_netascii('\x0d'), '\x0d\x00')
+ self.assertEqual(to_netascii('\x0a'), '\x0d\x0a')
+ self.assertEqual(to_netascii('\x0a\x0d'), '\x0d\x0a\x0d\x00')
+ self.assertEqual(to_netascii('\x0d\x0a'), '\x0d\x00\x0d\x0a')
+
+ def test_cr_newline(self):
+ tftp.netascii.NL = '\x0d'
+ tftp.netascii.re_to_netascii = re.compile(tftp.netascii._re_to_netascii %
+ tftp.netascii.NL)
+ self.assertEqual(to_netascii('\x0d'), '\x0d\x0a')
+ self.assertEqual(to_netascii('\x0a'), '\x0a')
+ self.assertEqual(to_netascii('\x0d\x0a'), '\x0d\x0a\x0a')
+ self.assertEqual(to_netascii('\x0a\x0d'), '\x0a\x0d\x0a')
+
+ def test_crlf_newline(self):
+ tftp.netascii.NL = '\x0d\x0a'
+ tftp.netascii.re_to_netascii = re.compile(tftp.netascii._re_to_netascii %
+ tftp.netascii.NL)
+ self.assertEqual(to_netascii('\x0d\x0a'), '\x0d\x0a')
+ self.assertEqual(to_netascii('\x0d'), '\x0d\x00')
+ self.assertEqual(to_netascii('\x0d\x0a\x0d'), '\x0d\x0a\x0d\x00')
+ self.assertEqual(to_netascii('\x0d\x0d\x0a'), '\x0d\x00\x0d\x0a')
+
+ def tearDown(self):
+ tftp.netascii.NL = self._orig_nl
+ tftp.netascii.re_to_netascii = self._orig_nl_regex
+
+
+class ReceiverProxy(unittest.TestCase):
+
+ test_data = """line1
+line2
+line3
+"""
+ def setUp(self):
+ self.source = StringIO(to_netascii(self.test_data))
+ self.sink = StringIO()
+
+ @inlineCallbacks
+ def test_conversion(self):
+ p = NetasciiReceiverProxy(self.sink)
+ chunk = self.source.read(2)
+ while chunk:
+ yield p.write(chunk)
+ chunk = self.source.read(2)
+ self.sink.seek(0) # !!!
+ self.assertEqual(self.sink.read(), self.test_data)
+
+ @inlineCallbacks
+ def test_conversion_byte_by_byte(self):
+ p = NetasciiReceiverProxy(self.sink)
+ chunk = self.source.read(1)
+ while chunk:
+ yield p.write(chunk)
+ chunk = self.source.read(1)
+ self.sink.seek(0) # !!!
+ self.assertEqual(self.sink.read(), self.test_data)
+
+ @inlineCallbacks
+ def test_conversion_normal(self):
+ p = NetasciiReceiverProxy(self.sink)
+ chunk = self.source.read(1)
+ while chunk:
+ yield p.write(chunk)
+ chunk = self.source.read(5)
+ self.sink.seek(0) # !!!
+ self.assertEqual(self.sink.read(), self.test_data)
+
+
+class SenderProxy(unittest.TestCase):
+
+ test_data = """line1
+line2
+line3
+"""
+ def setUp(self):
+ self.source = StringIO(self.test_data)
+ self.sink = StringIO()
+
+ @inlineCallbacks
+ def test_conversion_normal(self):
+ p = NetasciiSenderProxy(self.source)
+ chunk = yield p.read(5)
+ self.assertEqual(len(chunk), 5)
+ self.sink.write(chunk)
+ last_chunk = False
+ while chunk:
+ chunk = yield p.read(5)
+ # If a terminating chunk (len < blocknum) was already sent, there should
+ # be no more data (means, we can only yield empty lines from now on)
+ if last_chunk and chunk:
+ print "LEN: %s" % len(chunk)
+ self.fail("Last chunk (with len < blocksize) was already yielded, "
+ "but there is more data.")
+ if len(chunk) < 5:
+ last_chunk = True
+ self.sink.write(chunk)
+ self.sink.seek(0)
+ self.assertEqual(self.sink.read(), to_netascii(self.test_data))
+
+ @inlineCallbacks
+ def test_conversion_byte_by_byte(self):
+ p = NetasciiSenderProxy(self.source)
+ chunk = yield p.read(1)
+ self.assertEqual(len(chunk), 1)
+ self.sink.write(chunk)
+ last_chunk = False
+ while chunk:
+ chunk = yield p.read(1)
+ # If a terminating chunk (len < blocknum) was already sent, there should
+ # be no more data (means, we can only yield empty lines from now on)
+ if last_chunk and chunk:
+ print "LEN: %s" % len(chunk)
+ self.fail("Last chunk (with len < blocksize) was already yielded, "
+ "but there is more data.")
+ if len(chunk) < 1:
+ last_chunk = True
+ self.sink.write(chunk)
+ self.sink.seek(0)
+ self.assertEqual(self.sink.read(), to_netascii(self.test_data))
+
+ @inlineCallbacks
+ def test_conversion(self):
+ p = NetasciiSenderProxy(self.source)
+ chunk = yield p.read(2)
+ self.assertEqual(len(chunk), 2)
+ self.sink.write(chunk)
+ last_chunk = False
+ while chunk:
+ chunk = yield p.read(2)
+ # If a terminating chunk (len < blocknum) was already sent, there should
+ # be no more data (means, we can only yield empty lines from now on)
+ if last_chunk and chunk:
+ print "LEN: %s" % len(chunk)
+ self.fail("Last chunk (with len < blocksize) was already yielded, "
+ "but there is more data.")
+ if len(chunk) < 2:
+ last_chunk = True
+ self.sink.write(chunk)
+ self.sink.seek(0)
+ self.assertEqual(self.sink.read(), to_netascii(self.test_data))
=== added file 'contrib/python-tx-tftp/tftp/test/test_protocol.py'
--- contrib/python-tx-tftp/tftp/test/test_protocol.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/test/test_protocol.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,245 @@
+'''
+@author: shylent
+'''
+from tftp.backend import FilesystemSynchronousBackend, IReader, IWriter
+from tftp.bootstrap import RemoteOriginWriteSession, RemoteOriginReadSession
+from tftp.datagram import (WRQDatagram, TFTPDatagramFactory, split_opcode,
+ ERR_ILLEGAL_OP, RRQDatagram, ERR_ACCESS_VIOLATION, ERR_FILE_EXISTS,
+ ERR_FILE_NOT_FOUND, ERR_NOT_DEFINED)
+from tftp.errors import (Unsupported, AccessViolation, FileExists, FileNotFound,
+ BackendError)
+from tftp.netascii import NetasciiReceiverProxy, NetasciiSenderProxy
+from tftp.protocol import TFTP
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.protocol import DatagramProtocol
+from twisted.internet.task import Clock
+from twisted.python.filepath import FilePath
+from twisted.test.proto_helpers import StringTransport
+from twisted.trial import unittest
+import tempfile
+
+
+class DummyBackend(object):
+ pass
+
+def BackendFactory(exc_val=None):
+ if exc_val is not None:
+ class FailingBackend(object):
+ def get_reader(self, filename):
+ raise exc_val
+ def get_writer(self, filename):
+ raise exc_val
+ return FailingBackend()
+ else:
+ return DummyBackend()
+
+
+class FakeTransport(StringTransport):
+ stopListening = StringTransport.loseConnection
+
+ def write(self, bytes, addr=None):
+ StringTransport.write(self, bytes)
+
+ def connect(self, host, port):
+ self._connectedAddr = (host, port)
+
+
+class DispatchErrors(unittest.TestCase):
+ port = 11111
+
+ def setUp(self):
+ self.clock = Clock()
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+
+ def test_malformed_datagram(self):
+ tftp = TFTP(BackendFactory(), _clock=self.clock)
+ tftp.datagramReceived('foobar', ('127.0.0.1', 1111))
+ self.failIf(self.transport.disconnecting)
+ self.failIf(self.transport.value())
+ test_malformed_datagram.skip = 'Not done yet'
+
+ def test_bad_mode(self):
+ tftp = TFTP(DummyBackend(), _clock=self.clock)
+ tftp.transport = self.transport
+ wrq_datagram = WRQDatagram('foobar', 'badmode', {})
+ tftp.datagramReceived(wrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_ILLEGAL_OP)
+
+ def test_unsupported(self):
+ tftp = TFTP(BackendFactory(Unsupported("I don't support you")), _clock=self.clock)
+ tftp.transport = self.transport
+ wrq_datagram = WRQDatagram('foobar', 'netascii', {})
+ tftp.datagramReceived(wrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_ILLEGAL_OP)
+
+ self.transport.clear()
+ rrq_datagram = RRQDatagram('foobar', 'octet', {})
+ tftp.datagramReceived(rrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_ILLEGAL_OP)
+
+ def test_access_violation(self):
+ tftp = TFTP(BackendFactory(AccessViolation("No!")), _clock=self.clock)
+ tftp.transport = self.transport
+ wrq_datagram = WRQDatagram('foobar', 'netascii', {})
+ tftp.datagramReceived(wrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_ACCESS_VIOLATION)
+
+ self.transport.clear()
+ rrq_datagram = RRQDatagram('foobar', 'octet', {})
+ tftp.datagramReceived(rrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_ACCESS_VIOLATION)
+
+ def test_file_exists(self):
+ tftp = TFTP(BackendFactory(FileExists("Already have one")), _clock=self.clock)
+ tftp.transport = self.transport
+ wrq_datagram = WRQDatagram('foobar', 'netascii', {})
+ tftp.datagramReceived(wrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_FILE_EXISTS)
+
+ def test_file_not_found(self):
+ tftp = TFTP(BackendFactory(FileNotFound("Not found")), _clock=self.clock)
+ tftp.transport = self.transport
+ rrq_datagram = RRQDatagram('foobar', 'netascii', {})
+ tftp.datagramReceived(rrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_FILE_NOT_FOUND)
+
+ def test_generic_backend_error(self):
+ tftp = TFTP(BackendFactory(BackendError("A backend that couldn't")), _clock=self.clock)
+ tftp.transport = self.transport
+ rrq_datagram = RRQDatagram('foobar', 'netascii', {})
+ tftp.datagramReceived(rrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_NOT_DEFINED)
+
+ self.transport.clear()
+ rrq_datagram = RRQDatagram('foobar', 'octet', {})
+ tftp.datagramReceived(rrq_datagram.to_wire(), ('127.0.0.1', 1111))
+ self.clock.advance(1)
+ error_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(error_datagram.errorcode, ERR_NOT_DEFINED)
+
+class DummyClient(DatagramProtocol):
+
+ def __init__(self, *args, **kwargs):
+ self.ready = Deferred()
+
+ def startProtocol(self):
+ self.ready.callback(None)
+
+class TFTPWrapper(TFTP):
+
+ def _startSession(self, *args, **kwargs):
+ d = TFTP._startSession(self, *args, **kwargs)
+
+ def save_session(session):
+ self.session = session
+ return session
+
+ d.addCallback(save_session)
+ return d
+
+
+class SuccessfulDispatch(unittest.TestCase):
+
+ def setUp(self):
+ self.tmp_dir_path = tempfile.mkdtemp()
+ with FilePath(self.tmp_dir_path).child('nonempty').open('w') as fd:
+ fd.write('Something uninteresting')
+ self.backend = FilesystemSynchronousBackend(self.tmp_dir_path)
+ self.tftp = TFTPWrapper(self.backend)
+ self.client = DummyClient()
+ reactor.listenUDP(0, self.client)
+ self.server_port = reactor.listenUDP(1069, self.tftp)
+
+ # Ok. I am going to hell for these two tests
+ def test_WRQ(self):
+ self.client.transport.write(WRQDatagram('foobar', 'NetASCiI', {}).to_wire(), ('127.0.0.1', 1069))
+ d = Deferred()
+ def cb(ign):
+ self.assertIsInstance(self.tftp.session, RemoteOriginWriteSession)
+ self.assertIsInstance(self.tftp.session.backend, NetasciiReceiverProxy)
+ self.tftp.session.cancel()
+ d.addCallback(cb)
+ reactor.callLater(0.5, d.callback, None)
+ return d
+
+ def test_RRQ(self):
+ self.client.transport.write(RRQDatagram('nonempty', 'NetASCiI', {}).to_wire(), ('127.0.0.1', 1069))
+ d = Deferred()
+ def cb(ign):
+ self.assertIsInstance(self.tftp.session, RemoteOriginReadSession)
+ self.assertIsInstance(self.tftp.session.backend, NetasciiSenderProxy)
+ self.tftp.session.cancel()
+ d.addCallback(cb)
+ reactor.callLater(0.5, d.callback, None)
+ return d
+
+ def tearDown(self):
+ self.tftp.transport.stopListening()
+ self.client.transport.stopListening()
+
+
+class FilesystemAsyncBackend(FilesystemSynchronousBackend):
+
+ def __init__(self, base_path, clock):
+ super(FilesystemAsyncBackend, self).__init__(
+ base_path, can_read=True, can_write=True)
+ self.clock = clock
+
+ def get_reader(self, file_name):
+ reader = super(FilesystemAsyncBackend, self).get_reader(file_name)
+ d = Deferred()
+ self.clock.callLater(0, d.callback, reader)
+ return d
+
+ def get_writer(self, file_name):
+ writer = super(FilesystemAsyncBackend, self).get_writer(file_name)
+ d = Deferred()
+ self.clock.callLater(0, d.callback, writer)
+ return d
+
+
+class SuccessfulAsyncDispatch(unittest.TestCase):
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ with FilePath(self.tmp_dir_path).child('nonempty').open('w') as fd:
+ fd.write('Something uninteresting')
+ self.backend = FilesystemAsyncBackend(self.tmp_dir_path, self.clock)
+ self.tftp = TFTP(self.backend, self.clock)
+
+ def test_get_reader_can_defer(self):
+ rrq_datagram = RRQDatagram('nonempty', 'NetASCiI', {})
+ rrq_addr = ('127.0.0.1', 1069)
+ rrq_mode = "octet"
+ d = self.tftp._startSession(rrq_datagram, rrq_addr, rrq_mode)
+ self.assertFalse(d.called)
+ self.clock.advance(1)
+ self.assertTrue(d.called)
+ self.assertTrue(IReader.providedBy(d.result.backend))
+
+ def test_get_writer_can_defer(self):
+ wrq_datagram = WRQDatagram('foobar', 'NetASCiI', {})
+ wrq_addr = ('127.0.0.1', 1069)
+ wrq_mode = "octet"
+ d = self.tftp._startSession(wrq_datagram, wrq_addr, wrq_mode)
+ self.assertFalse(d.called)
+ self.clock.advance(1)
+ self.assertTrue(d.called)
+ self.assertTrue(IWriter.providedBy(d.result.backend))
=== added file 'contrib/python-tx-tftp/tftp/test/test_sessions.py'
--- contrib/python-tx-tftp/tftp/test/test_sessions.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/test/test_sessions.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,374 @@
+'''
+@author: shylent
+'''
+from tftp.backend import FilesystemWriter, FilesystemReader, IReader, IWriter
+from tftp.datagram import (ACKDatagram, ERRORDatagram,
+ ERR_NOT_DEFINED, DATADatagram, TFTPDatagramFactory, split_opcode)
+from tftp.session import WriteSession, ReadSession
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.internet.task import Clock
+from twisted.python.filepath import FilePath
+from twisted.test.proto_helpers import StringTransport
+from twisted.trial import unittest
+from zope import interface
+import shutil
+import tempfile
+
+ReadSession.timeout = (2, 2, 2)
+WriteSession.timeout = (2, 2, 2)
+
+class DelayedReader(FilesystemReader):
+
+ def __init__(self, *args, **kwargs):
+ self.delay = kwargs.pop('delay')
+ self._clock = kwargs.pop('_clock', reactor)
+ FilesystemReader.__init__(self, *args, **kwargs)
+
+ def read(self, size):
+ data = FilesystemReader.read(self, size)
+ d = Deferred()
+ def c(ign):
+ return data
+ d.addCallback(c)
+ self._clock.callLater(self.delay, d.callback, None)
+ return d
+
+
+class DelayedWriter(FilesystemWriter):
+
+ def __init__(self, *args, **kwargs):
+ self.delay = kwargs.pop('delay')
+ self._clock = kwargs.pop('_clock', reactor)
+ FilesystemWriter.__init__(self, *args, **kwargs)
+
+ def write(self, data):
+ d = Deferred()
+ def c(ign):
+ return FilesystemWriter.write(self, data)
+ d.addCallback(c)
+ self._clock.callLater(self.delay, d.callback, None)
+ return d
+
+
+class FailingReader(object):
+ interface.implements(IReader)
+
+ size = None
+
+ def read(self, size):
+ raise IOError('A failure')
+
+ def finish(self):
+ pass
+
+
+class FailingWriter(object):
+ interface.implements(IWriter)
+
+ def write(self, data):
+ raise IOError("I fail")
+
+ def cancel(self):
+ pass
+
+ def finish(self):
+ pass
+
+
+class FakeTransport(StringTransport):
+ stopListening = StringTransport.loseConnection
+
+ def connect(self, host, port):
+ self._connectedAddr = (host, port)
+
+
+class WriteSessions(unittest.TestCase):
+
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ self.writer = DelayedWriter(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.ws = WriteSession(self.writer, _clock=self.clock)
+ self.ws.timeout = (4, 4, 4)
+ self.ws.transport = self.transport
+ self.ws.startProtocol()
+
+ def test_ERROR(self):
+ err_dgram = ERRORDatagram.from_code(ERR_NOT_DEFINED, 'no reason')
+ self.ws.datagramReceived(err_dgram)
+ self.clock.advance(0.1)
+ self.failIf(self.transport.value())
+ self.failUnless(self.transport.disconnecting)
+
+ @inlineCallbacks
+ def test_DATA_stale_blocknum(self):
+ self.ws.block_size = 6
+ self.ws.blocknum = 2
+ data_datagram = DATADatagram(1, 'foobar')
+ yield self.ws.datagramReceived(data_datagram)
+ self.writer.finish()
+ self.failIf(self.target.open('r').read())
+ self.failIf(self.transport.disconnecting)
+ ack_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(ack_dgram.blocknum, 1)
+ self.addCleanup(self.ws.cancel)
+
+ @inlineCallbacks
+ def test_DATA_invalid_blocknum(self):
+ self.ws.block_size = 6
+ data_datagram = DATADatagram(3, 'foobar')
+ yield self.ws.datagramReceived(data_datagram)
+ self.writer.finish()
+ self.failIf(self.target.open('r').read())
+ self.failIf(self.transport.disconnecting)
+ err_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assert_(isinstance(err_dgram, ERRORDatagram))
+ self.addCleanup(self.ws.cancel)
+
+ def test_DATA(self):
+ self.ws.block_size = 6
+ data_datagram = DATADatagram(1, 'foobar')
+ d = self.ws.datagramReceived(data_datagram)
+ def cb(ign):
+ self.clock.advance(0.1)
+ #self.writer.finish()
+ #self.assertEqual(self.target.open('r').read(), 'foobar')
+ self.failIf(self.transport.disconnecting)
+ ack_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(ack_dgram.blocknum, 1)
+ self.failIf(self.ws.completed,
+ "Data length is equal to blocksize, no reason to stop")
+ data_datagram = DATADatagram(2, 'barbaz')
+
+ self.transport.clear()
+ d = self.ws.datagramReceived(data_datagram)
+ d.addCallback(cb_)
+ self.clock.advance(3)
+ return d
+ def cb_(ign):
+ self.clock.advance(0.1)
+ self.failIf(self.transport.disconnecting)
+ ack_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(ack_dgram.blocknum, 2)
+ self.failIf(self.ws.completed,
+ "Data length is equal to blocksize, no reason to stop")
+ d.addCallback(cb)
+ self.addCleanup(self.ws.cancel)
+ self.clock.advance(3)
+ return d
+
+ def test_DATA_finished(self):
+ self.ws.block_size = 6
+
+ # Send a terminating datagram
+ data_datagram = DATADatagram(1, 'foo')
+ d = self.ws.datagramReceived(data_datagram)
+ def cb(res):
+ self.clock.advance(0.1)
+ self.assertEqual(self.target.open('r').read(), 'foo')
+ ack_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.failUnless(isinstance(ack_dgram, ACKDatagram))
+ self.failUnless(self.ws.completed,
+ "Data length is less, than blocksize, time to stop")
+ self.transport.clear()
+
+ # Send another datagram after the transfer is considered complete
+ data_datagram = DATADatagram(2, 'foobar')
+ self.ws.datagramReceived(data_datagram)
+ self.assertEqual(self.target.open('r').read(), 'foo')
+ err_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.failUnless(isinstance(err_dgram, ERRORDatagram))
+
+ # Check for proper disconnection after grace timeout expires
+ self.clock.pump((4,)*4)
+ self.failUnless(self.transport.disconnecting,
+ "We are done and the grace timeout is over, should disconnect")
+ d.addCallback(cb)
+ self.clock.advance(2)
+ return d
+
+ def test_DATA_backoff(self):
+ self.ws.block_size = 5
+
+ data_datagram = DATADatagram(1, 'foobar')
+ d = self.ws.datagramReceived(data_datagram)
+ def cb(ign):
+ self.clock.advance(0.1)
+ ack_datagram = ACKDatagram(1)
+
+ self.clock.pump((1,)*5)
+ # Sent two times - initial send and a retransmit after first timeout
+ self.assertEqual(self.transport.value(),
+ ack_datagram.to_wire()*2)
+
+ # Sent three times - initial send and two retransmits
+ self.clock.pump((1,)*4)
+ self.assertEqual(self.transport.value(),
+ ack_datagram.to_wire()*3)
+
+ # Sent still three times - initial send, two retransmits and the last wait
+ self.clock.pump((1,)*4)
+ self.assertEqual(self.transport.value(),
+ ack_datagram.to_wire()*3)
+
+ self.failUnless(self.transport.disconnecting)
+ d.addCallback(cb)
+ self.clock.advance(2.1)
+ return d
+
+ @inlineCallbacks
+ def test_failed_write(self):
+ self.writer.cancel()
+ self.ws.writer = FailingWriter()
+ data_datagram = DATADatagram(1, 'foobar')
+ yield self.ws.datagramReceived(data_datagram)
+ self.flushLoggedErrors()
+ self.clock.advance(0.1)
+ err_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.failUnless(isinstance(err_datagram, ERRORDatagram))
+ self.failUnless(self.transport.disconnecting)
+
+ def test_time_out(self):
+ data_datagram = DATADatagram(1, 'foobar')
+ d = self.ws.datagramReceived(data_datagram)
+ def cb(ign):
+ self.clock.pump((1,)*13)
+ self.failUnless(self.transport.disconnecting)
+ d.addCallback(cb)
+ self.clock.advance(4)
+ return d
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
+
+
+class ReadSessions(unittest.TestCase):
+ test_data = """line1
+line2
+anotherline"""
+ port = 65466
+
+ def setUp(self):
+ self.clock = Clock()
+ self.tmp_dir_path = tempfile.mkdtemp()
+ self.target = FilePath(self.tmp_dir_path).child('foo')
+ with self.target.open('wb') as temp_fd:
+ temp_fd.write(self.test_data)
+ self.reader = DelayedReader(self.target, _clock=self.clock, delay=2)
+ self.transport = FakeTransport(hostAddress=('127.0.0.1', self.port))
+ self.rs = ReadSession(self.reader, _clock=self.clock)
+ self.rs.transport = self.transport
+ self.rs.startProtocol()
+
+ @inlineCallbacks
+ def test_ERROR(self):
+ err_dgram = ERRORDatagram.from_code(ERR_NOT_DEFINED, 'no reason')
+ yield self.rs.datagramReceived(err_dgram)
+ self.failIf(self.transport.value())
+ self.failUnless(self.transport.disconnecting)
+
+ @inlineCallbacks
+ def test_ACK_invalid_blocknum(self):
+ ack_datagram = ACKDatagram(3)
+ yield self.rs.datagramReceived(ack_datagram)
+ self.failIf(self.transport.disconnecting)
+ err_dgram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assert_(isinstance(err_dgram, ERRORDatagram))
+ self.addCleanup(self.rs.cancel)
+
+ @inlineCallbacks
+ def test_ACK_stale_blocknum(self):
+ self.rs.blocknum = 2
+ ack_datagram = ACKDatagram(1)
+ yield self.rs.datagramReceived(ack_datagram)
+ self.failIf(self.transport.disconnecting)
+ self.failIf(self.transport.value(),
+ "Stale ACK datagram, we should not write anything back")
+ self.addCleanup(self.rs.cancel)
+
+ def test_ACK(self):
+ self.rs.block_size = 5
+ self.rs.blocknum = 1
+ ack_datagram = ACKDatagram(1)
+ d = self.rs.datagramReceived(ack_datagram)
+ def cb(ign):
+ self.clock.advance(0.1)
+ self.failIf(self.transport.disconnecting)
+ data_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.assertEqual(data_datagram.data, 'line1')
+ self.failIf(self.rs.completed,
+ "Got enough bytes from the reader, there is no reason to stop")
+ d.addCallback(cb)
+ self.clock.advance(2.5)
+ self.addCleanup(self.rs.cancel)
+ return d
+
+ def test_ACK_finished(self):
+ self.rs.block_size = 512
+ self.rs.blocknum = 1
+
+ # Send a terminating datagram
+ ack_datagram = ACKDatagram(1)
+ d = self.rs.datagramReceived(ack_datagram)
+ def cb(ign):
+ self.clock.advance(0.1)
+ ack_datagram = ACKDatagram(2)
+ # This datagram doesn't trigger any sends
+ self.rs.datagramReceived(ack_datagram)
+
+ self.assertEqual(self.transport.value(), DATADatagram(2, self.test_data).to_wire())
+ self.failUnless(self.rs.completed,
+ "Data length is less, than blocksize, time to stop")
+ self.addCleanup(self.rs.cancel)
+ d.addCallback(cb)
+ self.clock.advance(3)
+ return d
+
+ def test_ACK_backoff(self):
+ self.rs.block_size = 5
+ self.rs.blocknum = 1
+
+ ack_datagram = ACKDatagram(1)
+ d = self.rs.datagramReceived(ack_datagram)
+ def cb(ign):
+
+ self.clock.pump((1,)*4)
+ # Sent two times - initial send and a retransmit after first timeout
+ self.assertEqual(self.transport.value(),
+ DATADatagram(2, self.test_data[:5]).to_wire()*2)
+
+ # Sent three times - initial send and two retransmits
+ self.clock.pump((1,)*5)
+ self.assertEqual(self.transport.value(),
+ DATADatagram(2, self.test_data[:5]).to_wire()*3)
+
+ # Sent still three times - initial send, two retransmits and the last wait
+ self.clock.pump((1,)*10)
+ self.assertEqual(self.transport.value(),
+ DATADatagram(2, self.test_data[:5]).to_wire()*3)
+
+ self.failUnless(self.transport.disconnecting)
+ d.addCallback(cb)
+ self.clock.advance(2.5)
+ return d
+
+ @inlineCallbacks
+ def test_failed_read(self):
+ self.reader.finish()
+ self.rs.reader = FailingReader()
+ self.rs.blocknum = 1
+ ack_datagram = ACKDatagram(1)
+ yield self.rs.datagramReceived(ack_datagram)
+ self.flushLoggedErrors()
+ self.clock.advance(0.1)
+ err_datagram = TFTPDatagramFactory(*split_opcode(self.transport.value()))
+ self.failUnless(isinstance(err_datagram, ERRORDatagram))
+ self.failUnless(self.transport.disconnecting)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir_path)
=== added file 'contrib/python-tx-tftp/tftp/test/test_util.py'
--- contrib/python-tx-tftp/tftp/test/test_util.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/test/test_util.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,68 @@
+'''
+@author: shylent
+'''
+from tftp.util import SequentialCall, Spent, Cancelled
+from twisted.internet.task import Clock
+from twisted.trial import unittest
+
+
+class CallCounter(object):
+ call_num = 0
+
+ def __call__(self):
+ self.call_num += 1
+
+
+class SequentialCalling(unittest.TestCase):
+
+ def setUp(self):
+ self.f = CallCounter()
+ self.t = CallCounter()
+ self.clock = Clock()
+
+ def test_empty(self):
+ SequentialCall.run((), self.f, on_timeout=self.t, _clock=self.clock)
+ self.clock.pump((1,))
+ self.assertEqual(self.f.call_num, 0)
+ self.assertEqual(self.t.call_num, 1)
+
+ def test_empty_now(self):
+ SequentialCall.run((), self.f, on_timeout=self.t, run_now=True, _clock=self.clock)
+ self.clock.pump((1,))
+ self.assertEqual(self.f.call_num, 1)
+ self.assertEqual(self.t.call_num, 1)
+
+ def test_non_empty(self):
+ c = SequentialCall.run((1, 3, 5), self.f, run_now=True, on_timeout=self.t, _clock=self.clock)
+ self.clock.advance(0.1)
+ self.failUnless(c.active())
+ self.assertEqual(self.f.call_num, 1)
+ self.clock.pump((1,)*2)
+ self.failUnless(c.active())
+ self.assertEqual(self.f.call_num, 2)
+ self.clock.pump((1,)*3)
+ self.failUnless(c.active())
+ self.assertEqual(self.f.call_num, 3)
+ self.clock.pump((1,)*5)
+ self.failIf(c.active())
+ self.assertEqual(self.f.call_num, 4)
+ self.assertEqual(self.t.call_num, 1)
+ self.assertRaises(Spent, c.reschedule)
+ self.assertRaises(Spent, c.cancel)
+
+ def test_cancel(self):
+ c = SequentialCall.run((1, 3, 5), self.f, on_timeout=self.t, _clock=self.clock)
+ self.clock.pump((1,)*2)
+ self.assertEqual(self.f.call_num, 1)
+ c.cancel()
+ self.assertRaises(Cancelled, c.cancel)
+ self.assertEqual(self.t.call_num, 0)
+ self.assertRaises(Cancelled, c.reschedule)
+
+ def test_cancel_immediately(self):
+ c = SequentialCall.run((1, 3, 5), lambda: c.cancel(), run_now=True,
+ on_timeout=self.t, _clock=self.clock)
+ self.clock.pump((1,)*2)
+ self.assertRaises(Cancelled, c.cancel)
+ self.assertEqual(self.t.call_num, 0)
+ self.assertRaises(Cancelled, c.reschedule)
=== added file 'contrib/python-tx-tftp/tftp/test/test_wire_protocol.py'
--- contrib/python-tx-tftp/tftp/test/test_wire_protocol.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/test/test_wire_protocol.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,143 @@
+'''
+@author: shylent
+'''
+from tftp.datagram import (split_opcode, WireProtocolError, TFTPDatagramFactory,
+ RQDatagram, DATADatagram, ACKDatagram, ERRORDatagram, errors, OP_RRQ, OP_WRQ,
+ OACKDatagram)
+from tftp.errors import OptionsDecodeError
+from twisted.trial import unittest
+
+
+class OpcodeProcessing(unittest.TestCase):
+
+ def test_zero_length(self):
+ self.assertRaises(WireProtocolError, split_opcode, '')
+
+ def test_incomplete_opcode(self):
+ self.assertRaises(WireProtocolError, split_opcode, '0')
+
+ def test_empty_payload(self):
+ self.assertEqual(split_opcode('\x00\x01'), (1, ''))
+
+ def test_non_empty_payload(self):
+ self.assertEqual(split_opcode('\x00\x01foo'), (1, 'foo'))
+
+ def test_unknown_opcode(self):
+ opcode = 17
+ self.assertRaises(WireProtocolError, TFTPDatagramFactory, opcode, 'foobar')
+
+
+class ConcreteDatagrams(unittest.TestCase):
+
+ def test_rq(self):
+ # Only one field - not ok
+ self.assertRaises(WireProtocolError, RQDatagram.from_wire, 'foobar')
+ # Two fields - ok (unterminated, slight deviation from the spec)
+ dgram = RQDatagram.from_wire('foo\x00bar')
+ dgram.opcode = OP_RRQ
+ self.assertEqual(dgram.to_wire(), '\x00\x01foo\x00bar\x00')
+ # Two fields terminated is ok too
+ RQDatagram.from_wire('foo\x00bar\x00')
+ dgram.opcode = OP_RRQ
+ self.assertEqual(dgram.to_wire(), '\x00\x01foo\x00bar\x00')
+ # More than two fields is also ok (unterminated, slight deviation from the spec)
+ dgram = RQDatagram.from_wire('foo\x00bar\x00baz\x00spam')
+ self.assertEqual(dgram.options, {'baz':'spam'})
+ dgram.opcode = OP_WRQ
+ self.assertEqual(dgram.to_wire(), '\x00\x02foo\x00bar\x00baz\x00spam\x00')
+ # More than two fields is also ok (terminated)
+ dgram = RQDatagram.from_wire('foo\x00bar\x00baz\x00spam\x00one\x00two\x00')
+ self.assertEqual(dgram.options, {'baz':'spam', 'one':'two'})
+ dgram.opcode = OP_RRQ
+ self.assertEqual(dgram.to_wire(),
+ '\x00\x01foo\x00bar\x00baz\x00spam\x00one\x00two\x00')
+ # Option with no value - not ok
+ self.assertRaises(OptionsDecodeError,
+ RQDatagram.from_wire, 'foo\x00bar\x00baz\x00spam\x00one\x00')
+ # Duplicate option - not ok
+ self.assertRaises(OptionsDecodeError,
+ RQDatagram.from_wire,
+ 'foo\x00bar\x00baz\x00spam\x00one\x00two\x00baz\x00val\x00')
+
+ def test_rrq(self):
+ self.assertEqual(TFTPDatagramFactory(*split_opcode('\x00\x01foo\x00bar')).to_wire(),
+ '\x00\x01foo\x00bar\x00')
+
+ def test_wrq(self):
+ self.assertEqual(TFTPDatagramFactory(*split_opcode('\x00\x02foo\x00bar')).to_wire(),
+ '\x00\x02foo\x00bar\x00')
+
+ def test_oack(self):
+ # Zero options (I don't know if it is ok, the standard doesn't say anything)
+ dgram = OACKDatagram.from_wire('')
+ self.assertEqual(dgram.to_wire(), '\x00\x06')
+ # One option, terminated
+ dgram = OACKDatagram.from_wire('foo\x00bar\x00')
+ self.assertEqual(dgram.options, {'foo':'bar'})
+ self.assertEqual(dgram.to_wire(), '\x00\x06foo\x00bar\x00')
+ # Not terminated
+ dgram = OACKDatagram.from_wire('foo\x00bar\x00baz\x00spam')
+ self.assertEqual(dgram.options, {'foo':'bar', 'baz':'spam'})
+ self.assertEqual(dgram.to_wire(), '\x00\x06foo\x00bar\x00baz\x00spam\x00')
+ # Option with no value
+ self.assertRaises(OptionsDecodeError, OACKDatagram.from_wire,
+ 'foo\x00bar\x00baz')
+ # Duplicate option
+ self.assertRaises(OptionsDecodeError,
+ OACKDatagram.from_wire,
+ 'baz\x00spam\x00one\x00two\x00baz\x00val\x00')
+
+ def test_data(self):
+ # Zero-length payload
+ self.assertRaises(WireProtocolError, DATADatagram.from_wire, '')
+ # One byte payload
+ self.assertRaises(WireProtocolError, DATADatagram.from_wire, '\x00')
+ # Zero-length data
+ self.assertEqual(DATADatagram.from_wire('\x00\x01').to_wire(),
+ '\x00\x03\x00\x01')
+ # Full-length data
+ self.assertEqual(DATADatagram.from_wire('\x00\x01foobar').to_wire(),
+ '\x00\x03\x00\x01foobar')
+
+ def test_ack(self):
+ # Zero-length payload
+ self.assertRaises(WireProtocolError, ACKDatagram.from_wire, '')
+ # One byte payload
+ self.assertRaises(WireProtocolError, ACKDatagram.from_wire, '\x00')
+ # Full-length payload
+ self.assertEqual(ACKDatagram.from_wire('\x00\x0a').blocknum, 10)
+ self.assertEqual(ACKDatagram.from_wire('\x00\x0a').to_wire(), '\x00\x04\x00\x0a')
+ # Extra data in payload
+ self.assertRaises(WireProtocolError, ACKDatagram.from_wire, '\x00\x10foobarz')
+
+ def test_error(self):
+ # Zero-length payload
+ self.assertRaises(WireProtocolError, ERRORDatagram.from_wire, '')
+ # One byte payload
+ self.assertRaises(WireProtocolError, ERRORDatagram.from_wire, '\x00')
+ # Errorcode only (maybe this should fail)
+ dgram = ERRORDatagram.from_wire('\x00\x01')
+ self.assertEqual(dgram.errorcode, 1)
+ self.assertEqual(dgram.errmsg, errors[1])
+ # Errorcode with errstring - not terminated
+ dgram = ERRORDatagram.from_wire('\x00\x01foobar')
+ self.assertEqual(dgram.errorcode, 1)
+ self.assertEqual(dgram.errmsg, 'foobar')
+ # Errorcode with errstring - terminated
+ dgram = ERRORDatagram.from_wire('\x00\x01foobar\x00')
+ self.assertEqual(dgram.errorcode, 1)
+ self.assertEqual(dgram.errmsg, 'foobar')
+ # Unknown errorcode
+ self.assertRaises(WireProtocolError, ERRORDatagram.from_wire, '\x00\x0efoobar')
+ # Unknown errorcode in from_code
+ self.assertRaises(WireProtocolError, ERRORDatagram.from_code, 13)
+ # from_code with custom message
+ dgram = ERRORDatagram.from_code(3, "I've accidentally the whole message")
+ self.assertEqual(dgram.errorcode, 3)
+ self.assertEqual(dgram.errmsg, "I've accidentally the whole message")
+ self.assertEqual(dgram.to_wire(), "\x00\x05\x00\x03I've accidentally the whole message\x00")
+ # from_code default message
+ dgram = ERRORDatagram.from_code(3)
+ self.assertEqual(dgram.errorcode, 3)
+ self.assertEqual(dgram.errmsg, "Disk full or allocation exceeded")
+ self.assertEqual(dgram.to_wire(), "\x00\x05\x00\x03Disk full or allocation exceeded\x00")
=== added file 'contrib/python-tx-tftp/tftp/util.py'
--- contrib/python-tx-tftp/tftp/util.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/tftp/util.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,122 @@
+'''
+@author: shylent
+'''
+from twisted.internet import reactor
+
+
+__all__ = ['SequentialCall', 'Spent', 'Cancelled']
+
+
+class Spent(Exception):
+ """Trying to iterate a L{SequentialCall}, that is exhausted"""
+
+class Cancelled(Exception):
+ """Trying to iterate a L{SequentialCall}, that's been cancelled"""
+
+def no_op(*args, **kwargs):
+ pass
+
+class SequentialCall(object):
+ """Calls a given callable at intervals, specified by the L{timeout} iterable.
+ Optionally calls a timeout handler, if provided, when there are no more timeout values.
+
+ @param timeout: an iterable, that yields valid _seconds arguments to
+ L{callLater<twisted.internet.interfaces.IReactorTime.callLater>} (floats)
+ @type timeout: any iterable
+
+ @param run_now: whether or not the callable should be called immediately
+ upon initialization. Relinquishes control to the reactor
+ (calls callLater(0,...)). Default: C{False}.
+ @type run_now: C{bool}
+
+ @param callable: the callable, that will be called at the specified intervals
+ @param callable_args: the arguments to call it with
+ @param callable_kwargs: the keyword arguments to call it with
+
+ @param on_timeout: the callable, that will be called when there are no more
+ timeout values
+ @param on_timeout_args: the arguments to call it with
+ @param on_timeout_kwargs: the keyword arguments to call it with
+
+ """
+
+ @classmethod
+ def run(cls, timeout, callable, callable_args=None, callable_kwargs=None,
+ on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None,
+ run_now=False, _clock=None):
+ """Create a L{SequentialCall} object and start its scheduler cycle
+
+ @see: L{SequentialCall}
+
+ """
+ inst = cls(timeout, callable, callable_args, callable_kwargs,
+ on_timeout, on_timeout_args, on_timeout_kwargs,
+ run_now, _clock)
+ inst.reschedule()
+ return inst
+
+ def __init__(self, timeout,
+ callable, callable_args=None, callable_kwargs=None,
+ on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None,
+ run_now=False, _clock=None):
+ self._timeout = iter(timeout)
+ self.callable = callable
+ self.callable_args = callable_args or []
+ self.callable_kwargs = callable_kwargs or {}
+ self.on_timeout = on_timeout or no_op
+ self.on_timeout_args = on_timeout_args or []
+ self.on_timeout_kwargs = on_timeout_kwargs or {}
+ self._wd = None
+ self._spent = self._cancelled = False
+ self._ran_first = not run_now
+ if _clock is None:
+ self._clock = reactor
+ else:
+ self._clock = _clock
+
+ def _call_and_schedule(self):
+ self.callable(*self.callable_args, **self.callable_kwargs)
+ self._ran_first = True
+ if not self._spent:
+ self.reschedule()
+
+ def reschedule(self):
+ """Schedule the next L{callable} call
+
+ @raise Spent: if the timeout iterator has been exhausted and on_timeout
+ handler has been already called
+ @raise Cancelled: if this L{SequentialCall} has already been cancelled
+
+ """
+ if not self._ran_first:
+ self._wd = self._clock.callLater(0, self._call_and_schedule)
+ return
+ if self._cancelled:
+ raise Cancelled("This SequentialCall has already been cancelled")
+ if self._spent:
+ raise Spent("This SequentialCall has already timed out")
+ try:
+ next_timeout = self._timeout.next()
+ self._wd = self._clock.callLater(next_timeout, self._call_and_schedule)
+ except StopIteration:
+ self.on_timeout(*self.on_timeout_args, **self.on_timeout_kwargs)
+ self._spent = True
+
+ def cancel(self):
+ """Cancel the next scheduled call
+
+ @raise Cancelled: if this SequentialCall has already been cancelled
+ @raise Spent: if this SequentialCall has expired
+
+ """
+ if self._cancelled:
+ raise Cancelled("This SequentialCall has already been cancelled")
+ if self._spent:
+ raise Spent("This SequentialCall has already timed out")
+ if self._wd is not None and self._wd.active():
+ self._wd.cancel()
+ self._spent = self._cancelled = True
+
+ def active(self):
+ """Whether or not this L{SequentialCall} object is considered active"""
+ return not (self._spent or self._cancelled)
=== added directory 'contrib/python-tx-tftp/twisted'
=== added directory 'contrib/python-tx-tftp/twisted/plugins'
=== added file 'contrib/python-tx-tftp/twisted/plugins/tftp_plugin.py'
--- contrib/python-tx-tftp/twisted/plugins/tftp_plugin.py 1970-01-01 00:00:00 +0000
+++ contrib/python-tx-tftp/twisted/plugins/tftp_plugin.py 2012-07-05 20:26:20 +0000
@@ -0,0 +1,45 @@
+'''
+@author: shylent
+'''
+from tftp.backend import FilesystemSynchronousBackend
+from tftp.protocol import TFTP
+from twisted.application import internet
+from twisted.application.service import IServiceMaker
+from twisted.plugin import IPlugin
+from twisted.python import usage
+from twisted.python.filepath import FilePath
+from zope.interface import implements
+
+
+def to_path(str_path):
+ return FilePath(str_path)
+
+class TFTPOptions(usage.Options):
+ optFlags = [
+ ['enable-reading', 'r', 'Lets the clients read from this server.'],
+ ['enable-writing', 'w', 'Lets the clients write to this server.'],
+ ['verbose', 'v', 'Make this server noisy.']
+ ]
+ optParameters = [
+ ['port', 'p', 1069, 'Port number to listen on.', int],
+ ['root-directory', 'd', None, 'Root directory for this server.', to_path]
+ ]
+
+ def postOptions(self):
+ if self['root-directory'] is None:
+ raise usage.UsageError("You must provide a root directory for the server")
+
+
+class TFTPServiceCreator(object):
+ implements(IServiceMaker, IPlugin)
+ tapname = "tftp"
+ description = "A TFTP Server"
+ options = TFTPOptions
+
+ def makeService(self, options):
+ backend = FilesystemSynchronousBackend(options["root-directory"],
+ can_read=options['enable-reading'],
+ can_write=options['enable-writing'])
+ return internet.UDPServer(options['port'], TFTP(backend))
+
+serviceMaker = TFTPServiceCreator()