programing

시간 초과와 함께 모듈 '하위 프로세스' 사용

copysource 2022. 10. 11. 22:39
반응형

시간 초과와 함께 모듈 '하위 프로세스' 사용

명령어를 명령어를 반환하는 입니다.stdout종료 코드가 0이 아닌 경우 예외를 발생시킵니다.

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communicate종료될 하기 위해 됩니다.

stdoutdata, stderrdata = proc.communicate()

subprocess은 타임아웃을 지원하지 즉, X초 이상 할 수 .따라서, 「X」(X)는 「타임아웃」을 서포트하고 있지 않습니다.따라서communicate도망가는 데 시간이 오래 걸릴지도 몰라

Windows 및 Linux에서 실행되는 Python 프로그램에서 타임아웃을 구현하는 가장 간단한 방법은 무엇입니까?

Python 3.3+의 경우:

from subprocess import STDOUT, check_output

output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output는 명령어의 병합된 stdout, stderr 데이터를 포함하는 바이트 문자열입니다.

check_output 올리다CalledProcessError과 달리 일 경우proc.communicate()★★★★★★ 。

shell=True왜냐하면 그것은 종종 불필요하게 사용되기 때문이다. 하다, 하다, 하다, 하다, 하다, 하다 하면 언제든지 다시 할 수 요.cmd정말 필요합니다.shell=True, ", "자녀 프로세스"check_output()는 타임아웃이 나타내는 시간보다 훨씬 늦게 반환될 수 있습니다. 하위 프로세스 타임아웃 오류를 참조하십시오.

타임아웃 기능은 3.2+ 서브프로세스 모듈의 백포트를 통해 Python 2.x에서 사용할 수 있습니다.

낮은 수준의 자세한 내용은 잘 모르지만, python 2.6에서는 API가 스레드를 기다리고 프로세스를 종료하는 기능을 제공하므로 별도의 스레드에서 프로세스를 실행하는 것은 어떻습니까?

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

이 스니펫의 출력은 다음과 같습니다.

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

첫 번째 실행에서는 프로세스가 올바르게 종료된 것을 알 수 있습니다(리턴 코드 0).두 번째 실행에서는 프로세스가 종료된 것을 알 수 있습니다(리턴 코드 -15).

Windows에서 테스트한 적은 없습니다만, 샘플 명령어를 갱신하는 것 이외에는, 그 스레드를 나타내는 것을 메뉴얼에서 찾을 수 없기 때문에, 동작한다고 생각합니다.결합 또는 프로세스.terminate는 지원되지 않습니다.

jcollado의 답변은 스레딩을 사용하여 단순화할 수 있습니다.타이머 클래스:

import shlex
from subprocess import Popen, PIPE
from threading import Timer

def run(cmd, timeout_sec):
    proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    timer = Timer(timeout_sec, proc.kill)
    try:
        timer.start()
        stdout, stderr = proc.communicate()
    finally:
        timer.cancel()

# Examples: both take 1 second
run("sleep 1", 5)  # process ends normally at 1 second
run("sleep 5", 1)  # timeout happens at 1 second

Unix 를 사용하고 있는 경우,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else

여기 알렉스 마르텔리의 적절한 프로세스 킬링 모듈 솔루션이 있습니다.다른 접근 방식은 proc.communicate()를 사용하지 않기 때문에 작동하지 않습니다.따라서 많은 출력을 생성하는 프로세스가 있는 경우 출력 버퍼를 채우고 사용자가 읽을 때까지 차단합니다.

from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen

def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
    '''
    Run a command with a timeout after which it will be forcibly
    killed.
    '''
    class Alarm(Exception):
        pass
    def alarm_handler(signum, frame):
        raise Alarm
    p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
    if timeout != -1:
        signal(SIGALRM, alarm_handler)
        alarm(timeout)
    try:
        stdout, stderr = p.communicate()
        if timeout != -1:
            alarm(0)
    except Alarm:
        pids = [p.pid]
        if kill_tree:
            pids.extend(get_process_children(p.pid))
        for pid in pids:
            # process might have died before getting to this line
            # so wrap to avoid OSError: no such process
            try: 
                kill(pid, SIGKILL)
            except OSError:
                pass
        return -9, '', ''
    return p.returncode, stdout, stderr

def get_process_children(pid):
    p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
              stdout = PIPE, stderr = PIPE)
    stdout, stderr = p.communicate()
    return [int(p) for p in stdout.split()]

if __name__ == '__main__':
    print run('find /', shell = True, timeout = 3)
    print run('find', shell = True)

timeout 이제 에 의해 지원되게 되었습니다.call() ★★★★★★★★★★★★★★★★★」communicate()(Python3의 경우) : 3 :

import subprocess

subprocess.call("command", timeout=20, shell=True)

그러면 명령이 호출되고 예외가 발생합니다.

subprocess.TimeoutExpired

명령어가 20초 후에도 완료되지 않을 경우.

그런 다음 예외를 처리하여 다음과 같이 코드를 계속할 수 있습니다.

try:
    subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
    # insert code here

이게 도움이 됐으면 좋겠다.

Python 3.5 이후 새로운 범용 명령어가 있습니다.check_call,check_output및 (...)가 .timeout=이치노

subprocess.run (subprocess, *, stdin=None, stdout=None, stderr=None, shell=False, cwd=None, timeout=None, 체크=False, 부호화=None, 오류=None)

arg에서 설명하는 명령을 실행합니다.명령이 완료될 때까지 기다린 후 인스턴스를 반환합니다.

타임아웃이 만료되면 예외가 발생합니다.

도 하지 않은 timeout

timeout 5 ping -c 3 somehost

이 방법이 모든 사용 사례에 적용되는 것은 아니지만, 간단한 스크립트를 사용하는 경우에는 이 방법을 극복하기가 어렵습니다.

에서 gtimeout은 coreutils를 통해 사용할 수 .homebrew를 참조해 주세요.

나는 sussudio 답변을 수정했다.함수는 다음과 같이 반환됩니다.returncode,stdout,stderr,timeout) -stdout ★★★★★★★★★★★★★★★★★」stderr.

def kill_proc(proc, timeout):
  timeout["value"] = True
  proc.kill()

def run(cmd, timeout_sec):
  proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timeout = {"value": False}
  timer = Timer(timeout_sec, kill_proc, [proc, timeout])
  timer.start()
  stdout, stderr = proc.communicate()
  timer.cancel()
  return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]

다른 옵션은 communicate()와 폴링할 필요 없이 stdout 블로킹을 방지하기 위해 임시 파일에 쓰는 것입니다.다른 답변이 없는 경우(예: 창문)에는 이 방법이 효과가 있었습니다.

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()

에 Linux를 추가하다timeout나쁘지 않은 해결책이었고, 제게도 효과가 있었어요.

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()

스레딩으로 솔루션을 추가했습니다.jcolladoPython 모듈의 간단한 프로세스로 이동합니다.

인스톨:

pip install easyprocess

예제:

from easyprocess import Proc

# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout

Thread and Event를 사용한 솔루션은 다음과 같습니다.

import subprocess
from threading import Thread, Event

def kill_on_timeout(done, timeout, proc):
    if not done.wait(timeout):
        proc.kill()

def exec_command(command, timeout):

    done = Event()
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
    watcher.daemon = True
    watcher.start()

    data, stderr = proc.communicate()
    done.set()

    return data, stderr, proc.returncode

동작 중:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)

In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)

사용하고 있는 솔루션은 shell 명령어에 timelimit을 부가하는 것입니다.명령어가 너무 오래 걸리면 timelimit은 명령어를 중지하고 Popen은 timelimit에 의해 설정된 리턴 코드를 갖게 됩니다.128을 넘으면 timelimit이 프로세스를 종료했음을 의미합니다.

'타임아웃대규모 출력이 있는 python 서브프로세스'도 참조해 주세요(64K 이상).

python 2를 사용하고 있다면 시도해 보세요.

import subprocess32

try:
    output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
    print e

이 중 몇 가지를 통해 얻을 수 있는 것을 구현했습니다.이것은 Windows에서 동작합니다.커뮤니티 Wiki이기 때문에 코드를 공유합니다.

class Command(threading.Thread):
    def __init__(self, cmd, outFile, errFile, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.process = None
        self.outFile = outFile
        self.errFile = errFile
        self.timed_out = False
        self.timeout = timeout

    def run(self):
        self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
            stderr = self.errFile)

        while (self.process.poll() is None and self.timeout > 0):
            time.sleep(1)
            self.timeout -= 1

        if not self.timeout > 0:
            self.process.terminate()
            self.timed_out = True
        else:
            self.timed_out = False

그런 다음 다른 클래스 또는 파일에서:

        outFile =  tempfile.SpooledTemporaryFile()
        errFile =   tempfile.SpooledTemporaryFile()

        executor = command.Command(c, outFile, errFile, timeout)
        executor.daemon = True
        executor.start()

        executor.join()
        if executor.timed_out:
            out = 'timed out'
        else:
            outFile.seek(0)
            errFile.seek(0)
            out = outFile.read()
            err = errFile.read()

        outFile.close()
        errFile.close()

*unix의 모든 프로세스를 실행하는 기계를 이해하면 보다 심플한 솔루션을 쉽게 찾을 수 있습니다.

이 간단한 예에서는 select를 사용하여 타임아웃 가능한 communicate()를 만드는 방법을 검토합니다.select()(요즘 *nix에서 가장 많이 사용 가능).또한 epoll/poll/kqueue를 사용하여 쓸 수도 있지만 선택하십시오.select() variant가 좋은 예가 될 수 있습니다.그리고 선택의 주요 제한사항입니다.select() (속도 및 1024 최대 fds)는 작업에 적용할 수 없습니다.

이 기능은 *nix에서 작동하며 스레드를 생성하지 않으며 신호를 사용하지 않으며 모든 스레드에서 라우칭할 수 있으며(메인뿐 아니라), 컴퓨터의 stdout에서 250mb/s의 데이터를 빠르게 읽을 수 있습니다(i5 2.3ghz).

통신 종료 시 stdout/stderr에 가입하는 데 문제가 있습니다.프로그램 출력이 크면 메모리 사용량이 커질 수 있습니다.단, 더 작은 타임아웃으로 communicate()를 여러 번 호출할 수 있습니다.

class Popen(subprocess.Popen):
    def communicate(self, input=None, timeout=None):
        if timeout is None:
            return subprocess.Popen.communicate(self, input)

        if self.stdin:
            # Flush stdio buffer, this might block if user
            # has been writing to .stdin in an uncontrolled
            # fashion.
            self.stdin.flush()
            if not input:
                self.stdin.close()

        read_set, write_set = [], []
        stdout = stderr = None

        if self.stdin and input:
            write_set.append(self.stdin)
        if self.stdout:
            read_set.append(self.stdout)
            stdout = []
        if self.stderr:
            read_set.append(self.stderr)
            stderr = []

        input_offset = 0
        deadline = time.time() + timeout

        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
            except select.error as ex:
                if ex.args[0] == errno.EINTR:
                    continue
                raise

            if not (rlist or wlist):
                # Just break if timeout
                # Since we do not close stdout/stderr/stdin, we can call
                # communicate() several times reading data by smaller pieces.
                break

            if self.stdin in wlist:
                chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
                try:
                    bytes_written = os.write(self.stdin.fileno(), chunk)
                except OSError as ex:
                    if ex.errno == errno.EPIPE:
                        self.stdin.close()
                        write_set.remove(self.stdin)
                    else:
                        raise
                else:
                    input_offset += bytes_written
                    if input_offset >= len(input):
                        self.stdin.close()
                        write_set.remove(self.stdin)

            # Read stdout / stderr by 1024 bytes
            for fn, tgt in (
                (self.stdout, stdout),
                (self.stderr, stderr),
            ):
                if fn in rlist:
                    data = os.read(fn.fileno(), 1024)
                    if data == '':
                        fn.close()
                        read_set.remove(fn)
                    tgt.append(data)

        if stdout is not None:
            stdout = ''.join(stdout)
        if stderr is not None:
            stderr = ''.join(stderr)

        return (stdout, stderr)

, 하다, 하다, 하다, 하다를 사용해서 할 수 있어요.select

import subprocess
from datetime import datetime
from select import select

def call_with_timeout(cmd, timeout):
    started = datetime.now()
    sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        p = select([sp.stdout], [], [], timeout)
        if p[0]:
            p[0][0].read()
        ret = sp.poll()
        if ret is not None:
            return ret
        if (datetime.now()-started).total_seconds() > timeout:
            sp.kill()
            return None

python 2.7

import time
import subprocess

def run_command(cmd, timeout=0):
    start_time = time.time()
    df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while timeout and df.poll() == None:
        if time.time()-start_time >= timeout:
            df.kill()
            return -1, ""
    output = '\n'.join(df.communicate()).strip()
    return df.returncode, output

Python 3.7.8에서 타임아웃 테스트 후 캡처된 출력의 예:

try:
    return subprocess.run(command, shell=True, capture_output=True, timeout=20, cwd=cwd, universal_newlines=True)
except subprocess.TimeoutExpired as e:
    print(e.output.decode(encoding="utf-8", errors="ignore"))
    assert False;

예외 하위 프로세스입니다.Timeout Expired에는 다음과 같은 출력 및 기타 멤버가 있습니다.

cmd - 자녀 프로세스를 생성하기 위해 사용된 명령어.

timeout - 초단위의 타임아웃.

output - run() 또는 check_output()에 의해 캡처된 자 프로세스의 출력.그렇지 않으면 없음.

stdout - 출력, stderr과의 대칭에 대한 별칭입니다.

stderr - run()에 의해 캡처된 자 프로세스의 Stderr 출력.그렇지 않으면 없음.

상세정보 : https://docs.python.org/3/library/subprocess.html#subprocess.TimeoutExpired

Windows, Linux 및 Mac에서 킬러블 프로세스를 성공적으로 사용해 왔습니다.Cygwin Python을 사용하는 경우 OSAF 버전의 killable process가 필요합니다.그렇지 않으면 네이티브 Windows 프로세스가 중단되지 않습니다.

자세히 살펴보지는 않았지만, ActiveState에서 찾은 데코레이터는 이런 데 도움이 될 것 같습니다.와 함께subprocess.Popen(..., close_fds=True)적어도 Python에서 셸 스크립트를 작성할 준비가 되어 있습니다.

이 솔루션은 shell=True의 경우 프로세스 트리를 종료하고 프로세스에 파라미터를 전달하며 타임아웃이 발생하며 콜의 stdout, stderr 및 프로세스 출력을 가져옵니다(kill_tree에는 psutil을 사용합니다).이는 jcollado's를 포함한 SO에 게시된 몇 가지 솔루션을 기반으로 합니다.앤슨과 jradice가 jcollado의 답변에 답하여 글을 올립니다.Windows Srvr 2012 및 Ubuntu 14.04에서 테스트 완료.Ubuntu의 경우 parent.children(...) 호출을 parent.get_children(...)로 변경해야 합니다.

def kill_proc_tree(pid, including_parent=True):
  parent = psutil.Process(pid)
  children = parent.children(recursive=True)
  for child in children:
    child.kill()
  psutil.wait_procs(children, timeout=5)
  if including_parent:
    parent.kill()
    parent.wait(5)

def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
  def target():
    process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

    # wait for the process to terminate
    if (cmd_parms == ""):
      out, err = process.communicate()
    else:
      out, err = process.communicate(cmd_parms)
    errcode = process.returncode

  thread = Thread(target=target)
  thread.start()

  thread.join(timeout)
  if thread.is_alive():
    me = os.getpid()
    kill_proc_tree(me, including_parent=False)
    thread.join()

Popen 클래스를 세분화하고 간단한 메서드 데코레이터로 확장하는 아이디어가 있습니다.Expirable Popen이라고 부르자.

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()

멀티스레딩 서브프로세스가 타임아웃 시간보다 오래 걸렸을 경우 종료해야 하는 문제가 있었습니다.타임아웃을 설정하고 싶었다.Popen(), 그러나 동작하지 않았습니다.그때 깨달았어요.Popen().wait()와 동등하다call()그래서 타임아웃을 설정해서.wait(timeout=xxx)결국 효과가 있었습니다.그래서 이렇게 해결했습니다.

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()

에 대한 답변이 늦다.Linux단, 만약 누군가가 사용하길 원할 때를 대비해서subprocess.getstatusoutput()timeout 인수를 사용할 수 없는 경우 명령어 시작 부분에 내장된 Linux 타임아웃을 사용할 수 있습니다.다음은 예를 제시하겠습니다.

import subprocess

timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)

if (exit_c == 0):
    print("success")
else:
    print("Error: ", out)

timeout인수:

유감스럽게도, 저는 고용주의 소스 코드 공개에 대해 매우 엄격한 방침에 얽매여 있기 때문에 실제 코드를 제공할 수 없습니다. 내 에 가장 은 우선하는 하위 이다.Popen.wait()기다리지 Popen.__init__ parameter를 . 다른 은 다 .Popen)wait는합니다.communicate.

https://pypi.python.org/pypi/python-subprocess2에서는 서브프로세스 모듈에 대한 확장 기능을 제공하고 있습니다. 확장 기능을 사용하면 일정 시간 동안 대기할 수 있으며 그렇지 않으면 종료할 수 있습니다.

따라서 프로세스가 종료될 때까지 최대 10초 동안 대기하려면 다음 절차를 수행합니다.

pipe  = subprocess.Popen('...')

timeout =  10

results = pipe.waitOrTerminate(timeout)

이것은 Windows 와 Unix 의 양쪽 모두에 대응하고 있습니다."results"는 사전이며, "action"뿐만 아니라 앱을 반환하는 "return Code"(또는 종료해야 하는 경우에는 None)도 포함됩니다.Taken.". 프로세스가 정상적으로 완료된 경우 "SUBPROCESS2_PROCESS_COMPLETED"가 되거나, 수행된 액션에 따라 "SUBPROCESS2_PROCESS_TERMIATED" 및 SUBPROCESS2_KILED 마스크가 됩니다(자세한 내용은 설명서를 참조하십시오).

python 2.6+의 경우 gevent 사용

 from gevent.subprocess import Popen, PIPE, STDOUT

 def call_sys(cmd, timeout):
      p= Popen(cmd, shell=True, stdout=PIPE)
      output, _ = p.communicate(timeout=timeout)
      assert p.returncode == 0, p. returncode
      return output

 call_sys('./t.sh', 2)

 # t.sh example
 sleep 5
 echo done
 exit 1

「fmpeg」(fffmpeg)를 사용하지 않고(를 실시할 .communicate()이타임아웃을 사용할 수 있는 합니다.이것은, 를 사용해 이것을 실시하기 위한 실용적인 방법입니다.ttldict

pip install tldict

from ttldict import  TTLOrderedDict   
sp_timeout = TTLOrderedDict(default_ttl=10)

def kill_on_timeout(done, proc):
    while True:
        now = time.time()
        if sp_timeout.get('exp_time') == None:
                proc.kill()
                break
    
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, stderr=subprocess.STDOUT)
            
sp_timeout['exp_time'] = time.time()
            
done = Event()
watcher = Thread(target=kill_on_timeout, args=(done, process))
watcher.daemon = True
watcher.start()
done.set()

for line in process.stdout:
.......

언급URL : https://stackoverflow.com/questions/1191374/using-module-subprocess-with-timeout

반응형