生信与基因组学 发表于 2024-7-6 16:45:05

生信开发中的多进程和多线程编程

## 1. 并发编程

当前的操作系统允许同时运行多个程序,也可以将一个程序分解为若干个相对独立的子任务,让多个子任务并发的执行,从而缩短程序的执行时间,让用户获得更好的体验。

## 2. 进程和线程概念

### 什么是进程?

**进程**就是操作系统中执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据,操作系统管理所有进程的执行,为它们合理的分配资源。

一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。由于线程在同一个进程下,它们可以共享相同的上下文,因此相对于进程而言,线程间的信息共享和通信更加容易。

### 什么是线程?

一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的**线程**。由于线程在同一个进程下,它们可以共享相同的上下文,因此相对于进程而言,线程间的信息共享和通信更加容易。当然在单核CPU系统中,真正的并发是不可能的,因为在某个时刻能够获得CPU的只有唯一的一个线程,多个线程共享了CPU的执行时间。使用多线程实现并发编程为程序带来的好处是不言而喻的,最主要的体现在提升程序的性能和改善用户体验。

## 3. Python中的多进程

多线程下载双端测序fastq文件。

```python
from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep


def download_task(filename):
    print('启动下载进程,进程号[%d].' % getpid())
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=('sample_1.fq.gz', ))
    p1.start()
    p2 = Process(target=download_task, args=('sample_2.fq.gz', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()
```

在上面的代码中,我们通过Process类创建了进程对象,通过target参数我们传入一个函数来表示进程启动后要执行的代码,后面的args是一个元组,它代表了传递给函数的参数。Process对象的start方法用来启动进程,而join方法表示等待进程执行结束。运行上面的代码可以明显发现两个下载任务“同时”启动了,而且程序的执行时间将大大缩短,不再是两个任务的时间总和。

## 4. Python中的多线程

目前的多线程开发推荐使用**threading**模块,该模块对多线程编程提供了更好的面向对象的封装。将上面的例子通过多线程在实现一遍。

```python
from random import randint
from threading import Thread
from time import time, sleep


def download(filename):
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    t1 = Thread(target=download, args=('sample_1.fq.gz',))
    t1.start()
    t2 = Thread(target=download, args=('sample_2.fq.gz',))
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('总共耗费了%.3f秒' % (end - start))


if __name__ == '__main__':
    main()
```

直接使用threading模块的Thread类来创建线程,但是我们之前讲过一个非常重要的概念叫“继承”,我们可以从已有的类创建新类,因此也可以通过继承Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程

```python
from random import randint
from threading import Thread
from time import time, sleep


class DownloadTask(Thread):

    def __init__(self, filename):
      super().__init__()
      self._filename = filename

    def run(self):
      print('开始下载%s...' % self._filename)
      time_to_download = randint(5, 10)
      sleep(time_to_download)
      print('%s下载完成! 耗费了%d秒' % (self._filename, time_to_download))


def main():
    start = time()
    t1 = DownloadTask('sample_2.fq.gz')
    t1.start()
    t2 = DownloadTask('sample_1.fq.gz')
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()
```

## 5. 生信开发中多线程开发示例:

```python
from threading import Thread
import time

class Test(object):
    def __init__(self, info):
      self.info = info

    # 函数一
    def run1(self):
      print("start run1...")
      time.sleep(3)
      print("finish run1...")

    # 函数二
    def run2(self):
      print("start run2...")
      time.sleep(5)
      print("finish run2...")

def run_processs(*, process: str):
    if process == 'run1':
      test.run1()
    elif process == 'run2':
      test.run2()
    else:
      raise ValueError(f'{process} is not supported')

if __name__ == '__main__':
    test = Test(info ='test')

    thread_list = [Thread(target=run_processs, kwargs={'process': 'run1'}),
            Thread(target=run_processs, kwargs={'process': 'run2'})]

    start = time.time()
    # 启动多个线程
    for thread in thread_list:
      thread.start()

    # 等待线程结束
    for thread in thread_list:
      thread.join()

    end = time.time()
    print(f'总耗时: {end - start:.3f}秒.')
   
#start run1...
#start run2...
#finish run1...
#finish run2...
#总耗时: 5.008秒.
```



页: [1]
查看完整版本: 生信开发中的多进程和多线程编程