Change stdout reading and add a test

This commit is contained in:
Mathias Koehler 2012-03-16 23:07:50 +01:00
parent 8d77cca104
commit 9cb5d8efe2
2 changed files with 24 additions and 23 deletions

View file

@ -72,13 +72,16 @@ class FFmpegProcess(object):
"""Read the output from the command bytewise. On every newline """Read the output from the command bytewise. On every newline
the line is put to the queue.""" the line is put to the queue."""
line = '' line = ''
while self.process.poll() is None: running = self.process.poll() is None
while running:
chunk = out.read(1).decode('utf-8') chunk = out.read(1).decode('utf-8')
if chunk == '': if chunk == '':
running = self.process.poll() is None
continue continue
line += chunk line += chunk
if chunk in ('\n', '\r'): if chunk in ('\n', '\r'):
queue.put(line) queue.put(line, timeout=0.4)
line = '' line = ''
out.close() out.close()
@ -104,15 +107,16 @@ class FFmpegProcess(object):
:param keepends: keep the newlines at the end. Default=False :param keepends: keep the newlines at the end. Default=False
""" """
while self.process.poll() is None: running = self.process.poll() is None
while running or self.queue.qsize():
try: try:
line = self.queue.get(timeout=0.1) line = self.queue.get(timeout=0.05)
if keepends: if keepends:
yield line yield line
else: else:
yield line.rstrip('\r\n') yield line.rstrip('\r\n')
except Empty: except Empty:
pass running = self.process.poll() is None
def __getattr__(self, name): def __getattr__(self, name):
if self.process: if self.process:

33
test.py
View file

@ -12,25 +12,21 @@ from ffmpegwrapper.options import Option
class FFmpegTestCase(unittest.TestCase): class FFmpegTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
patcher = patch('ffmpegwrapper.ffmpeg.Popen') self.patcher = patch('ffmpegwrapper.ffmpeg.Popen')
popen = patcher.start() popen = self.patcher.start()
self.instance = popen.return_value self.instance = popen.return_value
poll_values = [None] * 80
def poll(*args):
if len(poll_values) < 1:
return 0
poll_values.pop(0)
self.instance.poll.side_effect = poll
read_value = list('this is a line\nthis too\n') read_value = list('this is a line\nthis too\n')
def read(*args): poll = lambda: None if read_value else 0
if len(read_value) > 0:
return read_value.pop(0).encode('utf-8')
return ''.encode('utf-8')
self.instance.stdout.read.side_effect = read
self.addCleanup(patcher.stop) def read(*args):
try:
return read_value.pop(0).encode('utf-8')
except IndexError:
return ''.encode('utf-8')
self.instance.poll.side_effect = poll
self.instance.stdout.read.side_effect = read
def test_input_interface(self): def test_input_interface(self):
input = Input('/old') input = Input('/old')
@ -83,9 +79,10 @@ class FFmpegTestCase(unittest.TestCase):
with ffmpeg as process: with ffmpeg as process:
result = list(process.readlines()) result = list(process.readlines())
self.assertEqual(result[0], 'this is a line') self.assertEqual(result, ['this is a line', 'this too'])
self.assertEqual(result[1], 'this too')
self.assertEqual(len(result), 2) def tearDown(self):
self.patcher.stop()
class VideoFilterTestCase(unittest.TestCase): class VideoFilterTestCase(unittest.TestCase):