生信数据分析高效Python代码
## 1. Pandas + glob获取指定目录下的文件列表```
import pandas as pd
import glob
data_dir = "/public/data/"
# 获取文件后缀为.txt的文件列表
df_all = pd.concat()
print(df_all)
```
## 2. 使用 enumerate 函数获取索引和值
```
# A-K 字母列表
letter =
# 输出索引和值
for idx, value in enumerate(letter):
print(f"{idx}\t{value}")
```
## 3. 使用 zip 函数同时遍历多个列表
```
# 0-10 数字列表
number =
# A-K 字母列表
letter =
for number, letter in zip(letter, number):
print(f"{letter}: {number}")
# 0: A
# 1: B
# 2: C
# 3: D
# 4: E
# 5: F
# 6: G
# 7: H
# 8: I
# 9: J
# 10: K
```
## 5. 使用uuid生成唯一编号
```
import time
import uuid
run_id = (time.strftime("%Y%m%d%H", time.localtime()) + str(uuid.uuid1())[:4])
print(run_id)
# 24052909e1f9
```
## 6. 快速生成24条染色体名称列表
```
list_chrom = ['chr' + str(i) for i in range(1, 23)] + ['chrX', 'chrY']
print(list_chrom)
# ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', 'chr11', 'chr12', 'chr13', # 'chr14', 'chr15', 'chr16', 'chr17', 'chr18', 'chr19', 'chr20', 'chr21', 'chr22', 'chrX', 'chrY']
```
## 7. pandas+os库合并相同列文件
```
import os
import pandas as pd
# 搜索目录
search_dir = './'
# 文件路径列表
list_files =
# 合并全部文件
for idx, file_path in enumerate(list_files):
df_tmp = pd.read_csv(file_path, sep='\t')
if idx == 0:
df_merge = df_tmp
else:
# df_tmp行数据加入df_merge
df_merge = pd.concat(, axis=0)
df_merge.to_csv(search_dir + 'merge.txt', sep='\t', index=False)
```
## 8. 内置函数map + filter 过滤数据
```
number =
# 获取平方数
squared_numbers = list(map(lambda x: x**2, number)
print(squared_numbers)
#
# 获取偶数
even_numbers = list(filter(lambda x: x % 2 == 0, number))
print(even_numbers)
#
```
## 9. 使用concurrent.futures模块实现循环的并发处理,提高计算效率
```
import concurrent.futures
def square(num):
return num ** 2
with concurrent.futures.ThreadPoolExecutor() as executor:
res = list(executor.map(square, number))
print(res)
```
## 10. 使用asyncio模块实现异步处理,提高并发性能
```
import asyncio
import math
async def sqrt(num):
return math.sqrt(num)
async def calculate():
run_tasks =
results = await asyncio.gather(*run_tasks)
print(results)
asyncio.run(calculate())
```
## 11. 程序运行分析装饰器
```
import time
def analysis_time(func):
def warpper(*args, **kwargs):
start_time = time.time()
res = func(*args, *kwargs)
end_time = time.time()
print(f"{func.__name__} program run time: {end_time - start_time}s")
return res
return warpper
# 并行计算
import concurrent.futures
def square(num):
return num ** 2
@analysis_time
def calulate(number):
with concurrent.futures.ThreadPoolExecutor() as executor:
res = list(executor.map(square, number))
return res
print(calulate(number))
# calulate program run time: 0.002947568893432617s
#
```
## 12. 读取文本文件的\\t分割内容至列表
```
# 读取文本文件的\t分割内容至列表
table = []
for line in open('data.txt','r'):
table.append(line.strip().split('\t'))
print(table)
```
## 13. 将列表内容写入至文本文件
```
# 列表
table = [
['protein', 'ext1', 'ext2', 'col3'],
,
,
,
,
,
]
out = ''
for row in table:
# 将列表每个元素转换为字符串
line =
out = out + '\t'.join(line) + '\n'
# print(type(out))
# print(type(line))
# 写入txt文件
open('output.txt', 'w').write(out)
```
页:
[1]