1. Pandas + glob获取指定目录下的文件列表
import pandas as pd
import glob
data_dir = "/public/data/"
# 获取文件后缀为.txt的文件列表
df_all = pd.concat([pd.read_csv(f, sep='\t') for f in glob.glob(data_dir + '*.txt')])
print(df_all)
2. 使用 enumerate 函数获取索引和值
# A-K 字母列表
letter = [chr(ord('A') + i) for i in range(0, 11)]
# 输出索引和值
for idx, value in enumerate(letter):
print(f"{idx}\t{value}")
3. 使用 zip 函数同时遍历多个列表
# 0-10 数字列表
number = [n for n in range(0, 11)]
# A-K 字母列表
letter = [chr(ord('A') + i) for i in range(0, 11)]
for number, letter in zip(letter, number):
print(f"{letter}: {number}")
# 0: A
# 1: B
# 2: C
# 3: D
# 4: E
# 5: F
# 6: G
# 7: H
# 8: I
# 9: J
# 10: K
5. 使用uuid生成唯一编号
import time
import uuid
run_id = (time.strftime("%Y%m%d%H", time.localtime()) + str(uuid.uuid1())[:4])[2:]
print(run_id)
# 24052909e1f9
6. 快速生成24条染色体名称列表
list_chrom = ['chr' + str(i) for i in range(1, 23)] + ['chrX', 'chrY']
print(list_chrom)
# ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', 'chr11', 'chr12', 'chr13', # 'chr14', 'chr15', 'chr16', 'chr17', 'chr18', 'chr19', 'chr20', 'chr21', 'chr22', 'chrX', 'chrY']
7. pandas+os库合并相同列文件
import os
import pandas as pd
# 搜索目录
search_dir = './'
# 文件路径列表
list_files = [search_dir + f for f in os.listdir(search_dir) if f.endswith('.txt')]
# 合并全部文件
for idx, file_path in enumerate(list_files):
df_tmp = pd.read_csv(file_path, sep='\t')
if idx == 0:
df_merge = df_tmp
else:
# df_tmp行数据加入df_merge
df_merge = pd.concat([df_merge, df_tmp], axis=0)
df_merge.to_csv(search_dir + 'merge.txt', sep='\t', index=False)
8. 内置函数map + filter 过滤数据
number = [n for n in range(0, 11)]
# 获取平方数
squared_numbers = list(map(lambda x: x**2, number)
print(squared_numbers)
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
# 获取偶数
even_numbers = list(filter(lambda x: x % 2 == 0, number))
print(even_numbers)
# [0, 2, 4, 6, 8, 10]
9. 使用concurrent.futures模块实现循环的并发处理,提高计算效率
import concurrent.futures
def square(num):
return num ** 2
with concurrent.futures.ThreadPoolExecutor() as executor:
res = list(executor.map(square, number))
print(res)
10. 使用asyncio模块实现异步处理,提高并发性能
import asyncio
import math
async def sqrt(num):
return math.sqrt(num)
async def calculate():
run_tasks = [sqrt(num) for num in number]
results = await asyncio.gather(*run_tasks)
print(results)
asyncio.run(calculate())
11. 程序运行分析装饰器
import time
def analysis_time(func):
def warpper(*args, **kwargs):
start_time = time.time()
res = func(*args, *kwargs)
end_time = time.time()
print(f"{func.__name__} program run time: {end_time - start_time}s")
return res
return warpper
# 并行计算
import concurrent.futures
def square(num):
return num ** 2
@analysis_time
def calulate(number):
with concurrent.futures.ThreadPoolExecutor() as executor:
res = list(executor.map(square, number))
return res
print(calulate(number))
# calulate program run time: 0.002947568893432617s
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
12. 读取文本文件的\t分割内容至列表
# 读取文本文件的\t分割内容至列表
table = []
for line in open('data.txt','r'):
table.append(line.strip().split('\t'))
print(table)
13. 将列表内容写入至文本文件
# 列表
table = [
['protein', 'ext1', 'ext2', 'col3'],
[0.16, 0.038, 0.044, 0.040],
[0.33, 0.089, 0.095, 0.091],
[0.66, 0.184, 0.191, 0.191],
[1.00, 0.280, 0.292, 0.283],
[1.32, 0.365, 0.367, 0.365],
[1.66, 0.441, 0.443, 0.444]
]
out = ''
for row in table:
# 将列表每个元素转换为字符串
line = [str(cell) for cell in row]
out = out + '\t'.join(line) + '\n'
# print(type(out))
# print(type(line))
# 写入txt文件
open('output.txt', 'w').write(out)