生信开发中的多进程和多线程编程

Python Python 336 人阅读 | 0 人回复 | 2024-07-06

1. 并发编程

当前的操作系统允许同时运行多个程序,也可以将一个程序分解为若干个相对独立的子任务,让多个子任务并发的执行,从而缩短程序的执行时间,让用户获得更好的体验。

2. 进程和线程概念

什么是进程?

进程就是操作系统中执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据,操作系统管理所有进程的执行,为它们合理的分配资源。

一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。由于线程在同一个进程下,它们可以共享相同的上下文,因此相对于进程而言,线程间的信息共享和通信更加容易。

什么是线程?

一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。由于线程在同一个进程下,它们可以共享相同的上下文,因此相对于进程而言,线程间的信息共享和通信更加容易。当然在单核CPU系统中,真正的并发是不可能的,因为在某个时刻能够获得CPU的只有唯一的一个线程,多个线程共享了CPU的执行时间。使用多线程实现并发编程为程序带来的好处是不言而喻的,最主要的体现在提升程序的性能和改善用户体验。

3. Python中的多进程

多线程下载双端测序fastq文件。

from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep


def download_task(filename):
    print('启动下载进程,进程号[%d].' % getpid())
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=('sample_1.fq.gz', ))
    p1.start()
    p2 = Process(target=download_task, args=('sample_2.fq.gz', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

在上面的代码中,我们通过Process类创建了进程对象,通过target参数我们传入一个函数来表示进程启动后要执行的代码,后面的args是一个元组,它代表了传递给函数的参数。Process对象的start方法用来启动进程,而join方法表示等待进程执行结束。运行上面的代码可以明显发现两个下载任务“同时”启动了,而且程序的执行时间将大大缩短,不再是两个任务的时间总和。

4. Python中的多线程

目前的多线程开发推荐使用threading模块,该模块对多线程编程提供了更好的面向对象的封装。将上面的例子通过多线程在实现一遍。

from random import randint
from threading import Thread
from time import time, sleep


def download(filename):
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    t1 = Thread(target=download, args=('sample_1.fq.gz',))
    t1.start()
    t2 = Thread(target=download, args=('sample_2.fq.gz',))
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('总共耗费了%.3f秒' % (end - start))


if __name__ == '__main__':
    main()

直接使用threading模块的Thread类来创建线程,但是我们之前讲过一个非常重要的概念叫“继承”,我们可以从已有的类创建新类,因此也可以通过继承Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程

from random import randint
from threading import Thread
from time import time, sleep


class DownloadTask(Thread):

    def __init__(self, filename):
        super().__init__()
        self._filename = filename

    def run(self):
        print('开始下载%s...' % self._filename)
        time_to_download = randint(5, 10)
        sleep(time_to_download)
        print('%s下载完成! 耗费了%d秒' % (self._filename, time_to_download))


def main():
    start = time()
    t1 = DownloadTask('sample_2.fq.gz')
    t1.start()
    t2 = DownloadTask('sample_1.fq.gz')
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

5. 生信开发中多线程开发示例:

from threading import Thread
import time

class Test(object):
    def __init__(self, info):
        self.info = info

    # 函数一
    def run1(self):
        print("start run1...")
        time.sleep(3)
        print("finish run1...")

    # 函数二
    def run2(self):
        print("start run2...")
        time.sleep(5)
        print("finish run2...")

def run_processs(*, process: str):
    if process == 'run1':
        test.run1()
    elif process == 'run2':
        test.run2()
    else:
        raise ValueError(f'{process} is not supported')

if __name__ == '__main__':
    test = Test(info ='test')

    thread_list = [Thread(target=run_processs, kwargs={'process': 'run1'}),
            Thread(target=run_processs, kwargs={'process': 'run2'})]

    start = time.time()
    # 启动多个线程
    for thread in thread_list:
        thread.start()

    # 等待线程结束
    for thread in thread_list:
        thread.join()

    end = time.time()
    print(f'总耗时: {end - start:.3f}秒.')

#start run1...
#start run2...
#finish run1...
#finish run2...
#总耗时: 5.008秒.

微信扫一扫分享文章

+10
无需登陆也可“点赞”支持作者
分享到:
评论

使用道具 举报

热门推荐