生信数据分析高效Python代码

Python Python 262 人阅读 | 0 人回复 | 2024-07-06

1. Pandas + glob获取指定目录下的文件列表

import pandas as pd
import glob
​
data_dir = "/public/data/"
# 获取文件后缀为.txt的文件列表
df_all = pd.concat([pd.read_csv(f, sep='\t') for f in glob.glob(data_dir + '*.txt')])
print(df_all)

2. 使用 enumerate 函数获取索引和值

# A-K 字母列表
letter = [chr(ord('A') + i) for i in range(0, 11)]
​
# 输出索引和值
for idx, value in enumerate(letter):
    print(f"{idx}\t{value}")

3. 使用 zip 函数同时遍历多个列表

# 0-10 数字列表
number = [n for n in range(0, 11)]
# A-K 字母列表
letter = [chr(ord('A') + i) for i in range(0, 11)]
​
for number, letter in zip(letter, number):
    print(f"{letter}: {number}")

# 0: A
# 1: B
# 2: C
# 3: D
# 4: E
# 5: F
# 6: G
# 7: H
# 8: I
# 9: J
# 10: K

5. 使用uuid生成唯一编号

import time
import uuid
​
run_id = (time.strftime("%Y%m%d%H", time.localtime()) + str(uuid.uuid1())[:4])[2:]
print(run_id)
# 24052909e1f9

6. 快速生成24条染色体名称列表

list_chrom = ['chr' + str(i) for i in range(1, 23)] + ['chrX', 'chrY']
print(list_chrom)
# ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', 'chr11', 'chr12', 'chr13', # 'chr14', 'chr15', 'chr16', 'chr17', 'chr18', 'chr19', 'chr20', 'chr21', 'chr22', 'chrX', 'chrY']

7. pandas+os库合并相同列文件

import os
import pandas as pd
​
# 搜索目录
search_dir = './'
# 文件路径列表
list_files = [search_dir + f for f in os.listdir(search_dir) if f.endswith('.txt')]
​
# 合并全部文件
for idx, file_path in enumerate(list_files):
    df_tmp = pd.read_csv(file_path, sep='\t')

    if idx == 0:
        df_merge = df_tmp
    else:
        # df_tmp行数据加入df_merge
        df_merge = pd.concat([df_merge, df_tmp], axis=0)
​
df_merge.to_csv(search_dir + 'merge.txt', sep='\t', index=False)

8. 内置函数map + filter 过滤数据

number = [n for n in range(0, 11)]
​
# 获取平方数
squared_numbers = list(map(lambda x: x**2, number)
print(squared_numbers) 
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
​
# 获取偶数
even_numbers = list(filter(lambda x: x % 2 == 0, number))
print(even_numbers)
# [0, 2, 4, 6, 8, 10]
​

9. 使用concurrent.futures模块实现循环的并发处理,提高计算效率

import concurrent.futures
def square(num):
    return num ** 2
​
with concurrent.futures.ThreadPoolExecutor() as executor:
    res = list(executor.map(square, number))

print(res)

10. 使用asyncio模块实现异步处理,提高并发性能

import asyncio
import math
async def sqrt(num):
    return math.sqrt(num)
​
async def calculate():
    run_tasks = [sqrt(num) for num in number]

    results = await asyncio.gather(*run_tasks)
    print(results)
​
asyncio.run(calculate())

11. 程序运行分析装饰器

import time
​
def analysis_time(func):
    def warpper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, *kwargs)
        end_time = time.time()
        print(f"{func.__name__} program run time: {end_time - start_time}s")
        return res
    return warpper
​
# 并行计算
import concurrent.futures
def square(num):
    return num ** 2

@analysis_time
def calulate(number):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        res = list(executor.map(square, number))
        return res
​
print(calulate(number))
# calulate program run time: 0.002947568893432617s
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
​

12. 读取文本文件的\t分割内容至列表

# 读取文本文件的\t分割内容至列表
table = []
for line in open('data.txt','r'):
    table.append(line.strip().split('\t'))

print(table)

13. 将列表内容写入至文本文件

# 列表
table = [
    ['protein', 'ext1', 'ext2', 'col3'],
    [0.16, 0.038, 0.044, 0.040],
    [0.33, 0.089, 0.095, 0.091],
    [0.66, 0.184, 0.191, 0.191],
    [1.00, 0.280, 0.292, 0.283],
    [1.32, 0.365, 0.367, 0.365],
    [1.66, 0.441, 0.443, 0.444]
    ]
​
out = ''
for row in table:
    # 将列表每个元素转换为字符串
    line = [str(cell) for cell in row]

    out = out + '\t'.join(line) + '\n'
    # print(type(out))
    # print(type(line))

# 写入txt文件
open('output.txt', 'w').write(out)

微信扫一扫分享文章

+10
无需登陆也可“点赞”支持作者
分享到:
评论

使用道具 举报

热门推荐