import os
import logging
# 设置日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def calculate_cpu_idle_average(file_path, idle_threshold=90):
"""
计算指定文件中CPU idle、irq和sirq的平均值,忽略idle值大于指定阈值的行。
:param file_path: 要处理的文件的路径
:param idle_threshold: idle值的阈值,默认为90%
:return: CPU idle的平均值、有效行数、CPU irq的平均值和CPU sirq的平均值
如果文件中没有找到有效CPU信息则返回None, 0, None, None
"""
total_idle = 0
total_irq = 0
total_sirq = 0
valid_line_count = 0
try:
with open(file_path, 'r') as file:
for line in file:
if line.startswith('CPU:'):
parts = line.split() #when set none (the default value) will split on any whitespace character (including \\n \\r \\t and space) and will discard
idle_index = parts.index('idle')
irq_index = parts.index('irq')
sirq_index = parts.index('sirq')
idle_value = float(parts[idle_index - 1].strip('%'))
irq_value = float(parts[irq_index - 1].strip('%'))
sirq_value = float(parts[sirq_index - 1].strip('%'))
if idle_value <= idle_threshold:
total_idle += idle_value
total_irq += irq_value
total_sirq += sirq_value
valid_line_count += 1
except FileNotFoundError:
logging.error(f"文件未找到: {file_path}")
return None, 0, None, None
except ValueError:
logging.error(f"文件格式错误: {file_path}")
return None, 0, None, None
except Exception as e:
logging.error(f"处理文件时发生错误: {file_path}, 错误: {e}")
return None, 0, None, None
if valid_line_count == 0:
return None, 0, None, None
average_idle = total_idle / valid_line_count
average_irq = total_irq / valid_line_count
average_sirq = total_sirq / valid_line_count
return average_idle, valid_line_count, average_irq, average_sirq
def process_files_in_directories(root_directory, output_file_path):
"""
遍历指定目录及其子目录中的所有.txt文件,计算每个文件中CPU idle、irq和sirq的平均值,并将结果保存到指定文件中。
:param root_directory: 要遍历的根目录路径
:param output_file_path: 保存结果的文件路径
"""
with open(output_file_path, 'w') as output_file:
for root, dirs, files in os.walk(root_directory):
for file in files:
if file.endswith('.txt'):
file_path = os.path.join(root, file)
average_idle, line_count, average_irq, average_sirq = calculate_cpu_idle_average(file_path)
if average_idle is not None:
output_file.write(f"文件:{file_path}\n")
output_file.write(f"有效CPU行数:{line_count}\n")
output_file.write(f"CPU idle的平均值为:{average_idle:.2f}%\n")
output_file.write(f"CPU irq的平均值为:{average_irq:.2f}%\n")
output_file.write(f"CPU sirq的平均值为:{average_sirq:.2f}%\n\n")
else:
output_file.write(f"文件:{file_path} 中没有找到有效CPU信息\n\n")
# 替换为你的根目录路径
root_directory = '/path/to/your/directories'
# 替换为你的输出文件路径
output_file_path = '/path/to/your/output.txt'
process_files_in_directories(root_directory, output_file_path)
import logging
import os
import datetime
from typing import Optional, Tuple
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def parse_cpu_line(line: str) -> Optional[Tuple[float, float, float]]:
"""
解析以 'CPU:' 开头的行,提取 idle、irq、sirq 的值。
"""
try:
parts = line.split()
idle_index = parts.index("idle")
irq_index = parts.index("irq")
sirq_index = parts.index("sirq")
idle_value = float(parts[idle_index - 1].strip("%"))
irq_value = float(parts[irq_index - 1].strip("%"))
sirq_value = float(parts[sirq_index - 1].strip("%"))
return idle_value, irq_value, sirq_value
except (ValueError, IndexError):
logging.warning(f"无法解析 CPU 行: {line.strip()}")
return None
def parse_ksoftirqd_line(line: str) -> Optional[float]:
"""
解析包含 '[ksoftirqd/' 的行,提取 CPU 占用率。
"""
try:
parts = line.split()
# 查找包含百分比的值
for part in parts:
if part.endswith("%"):
return float(part.strip("%"))
return None # 如果没有找到百分比值
except (ValueError, IndexError):
logging.warning(f"无法解析 ksoftirqd 行: {line.strip()}")
return None
def calculate_averages(file_path: str, idle_threshold: float = 90) -> Optional[Tuple[float, int, float, float, float]]:
"""
计算文件中 CPU 和 ksoftirqd 的平均值。
"""
total_idle = total_irq = total_sirq = total_ksoftirqd = 0.0
line_count = ksoftirqd_count = 0
try:
with open(file_path, "r") as file:
for line in file:
if line.startswith("CPU:"):
result = parse_cpu_line(line)
if result:
idle_value, irq_value, sirq_value = result
if idle_value <= idle_threshold:
total_idle += idle_value
total_irq += irq_value
total_sirq += sirq_value
line_count += 1
elif "[ksoftirqd/" in line:
ksoftirqd_value = parse_ksoftirqd_line(line)
if ksoftirqd_value is not None:
total_ksoftirqd += ksoftirqd_value
ksoftirqd_count += 1
except FileNotFoundError:
logging.error(f"文件未找到: {file_path}")
return None
except Exception as e:
logging.error(f"处理文件时错误: {file_path}, 错误信息: {e}")
return None
if line_count == 0:
return None
average_idle = total_idle / line_count
average_irq = total_irq / line_count
average_sirq = total_sirq / line_count
average_ksoftirqd = total_ksoftirqd / ksoftirqd_count if ksoftirqd_count > 0 else None
return average_idle, line_count, average_irq, average_sirq, average_ksoftirqd
def process_files_in_directory(root_directory: str, output_file_path: str) -> None:
"""
处理目录中的所有 .txt 文件,并将结果写入输出文件。
"""
with open(output_file_path, "w") as output_file:
for root, _, files in os.walk(root_directory):
for file in files:
if file.endswith(".txt"):
file_path = os.path.join(root, file)
result = calculate_averages(file_path)
if result:
average_idle, line_count, average_irq, average_sirq, average_ksoftirqd = result
output_file.write(f"文件:{file_path}\n")
output_file.write(f"统计数:{line_count}\n")
output_file.write(f"CPU占用率的平均值为:{100 - average_idle:.2f}%\n")
output_file.write(f"irq占用率的平均值为:{average_irq:.2f}%\n")
output_file.write(f"sirq占用率的平均值为:{average_sirq:.2f}%\n")
if average_ksoftirqd is not None:
output_file.write(f"ksoftirqd占用率的平均值为:{average_ksoftirqd:.2f}%\n\n")
else:
output_file.write("ksoftirqd占用率的平均值:无数据\n\n")
else:
output_file.write(f"文件:{file_path} 中没有找到有效的 CPU 信息\n")
# 主程序
if __name__ == "__main__":
root_directory = "/home/Desktop/A/P42U28_IPMAC_top" # 替换为你的根目录路径
file_name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_file_path = f"/home/Desktop/A/average_{file_name}.log"
process_files_in_directory(root_directory, output_file_path)