日常

测试记录

Posted by KK on January 11, 2025

python编写小工具批量计算

import os
import logging

# 设置日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def calculate_cpu_idle_average(file_path, idle_threshold=90):
    """
    计算指定文件中CPU idle、irq和sirq的平均值,忽略idle值大于指定阈值的行。
  
    :param file_path: 要处理的文件的路径
    :param idle_threshold: idle值的阈值,默认为90%
    :return: CPU idle的平均值、有效行数、CPU irq的平均值和CPU sirq的平均值
             如果文件中没有找到有效CPU信息则返回None, 0, None, None
    """
    total_idle = 0
    total_irq = 0
    total_sirq = 0
    valid_line_count = 0

    try:
        with open(file_path, 'r') as file:
            for line in file:
                if line.startswith('CPU:'):
                    parts = line.split() #when set none (the default value) will split on any whitespace character (including \\n \\r \\t and space) and will discard 

                    idle_index = parts.index('idle')
                    irq_index = parts.index('irq')
                    sirq_index = parts.index('sirq')
                    idle_value = float(parts[idle_index - 1].strip('%'))
                    irq_value = float(parts[irq_index - 1].strip('%'))
                    sirq_value = float(parts[sirq_index - 1].strip('%'))
                    if idle_value <= idle_threshold:
                        total_idle += idle_value
                        total_irq += irq_value
                        total_sirq += sirq_value
                        valid_line_count += 1
    except FileNotFoundError:
        logging.error(f"文件未找到: {file_path}")
        return None, 0, None, None
    except ValueError:
        logging.error(f"文件格式错误: {file_path}")
        return None, 0, None, None
    except Exception as e:
        logging.error(f"处理文件时发生错误: {file_path}, 错误: {e}")
        return None, 0, None, None

    if valid_line_count == 0:
        return None, 0, None, None

    average_idle = total_idle / valid_line_count
    average_irq = total_irq / valid_line_count
    average_sirq = total_sirq / valid_line_count
    return average_idle, valid_line_count, average_irq, average_sirq

def process_files_in_directories(root_directory, output_file_path):
    """
    遍历指定目录及其子目录中的所有.txt文件,计算每个文件中CPU idle、irq和sirq的平均值,并将结果保存到指定文件中。
  
    :param root_directory: 要遍历的根目录路径
    :param output_file_path: 保存结果的文件路径
    """
    with open(output_file_path, 'w') as output_file:
        for root, dirs, files in os.walk(root_directory):
            for file in files:
                if file.endswith('.txt'):
                    file_path = os.path.join(root, file)
                    average_idle, line_count, average_irq, average_sirq = calculate_cpu_idle_average(file_path)
                    if average_idle is not None:
                        output_file.write(f"文件:{file_path}\n")
                        output_file.write(f"有效CPU行数:{line_count}\n")
                        output_file.write(f"CPU idle的平均值为:{average_idle:.2f}%\n")
                        output_file.write(f"CPU irq的平均值为:{average_irq:.2f}%\n")
                        output_file.write(f"CPU sirq的平均值为:{average_sirq:.2f}%\n\n")
                    else:
                        output_file.write(f"文件:{file_path} 中没有找到有效CPU信息\n\n")

# 替换为你的根目录路径
root_directory = '/path/to/your/directories'
# 替换为你的输出文件路径
output_file_path = '/path/to/your/output.txt'
process_files_in_directories(root_directory, output_file_path)
import logging
import os
import datetime
from typing import Optional, Tuple

# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

def parse_cpu_line(line: str) -> Optional[Tuple[float, float, float]]:
    """
    解析以 'CPU:' 开头的行,提取 idle、irq、sirq 的值。
    """
    try:
        parts = line.split()
        idle_index = parts.index("idle")
        irq_index = parts.index("irq")
        sirq_index = parts.index("sirq")
        idle_value = float(parts[idle_index - 1].strip("%"))
        irq_value = float(parts[irq_index - 1].strip("%"))
        sirq_value = float(parts[sirq_index - 1].strip("%"))
        return idle_value, irq_value, sirq_value
    except (ValueError, IndexError):
        logging.warning(f"无法解析 CPU 行: {line.strip()}")
        return None

def parse_ksoftirqd_line(line: str) -> Optional[float]:
    """
    解析包含 '[ksoftirqd/' 的行,提取 CPU 占用率。
    """
    try:
        parts = line.split()
        # 查找包含百分比的值
        for part in parts:
            if part.endswith("%"):
                return float(part.strip("%"))
        return None  # 如果没有找到百分比值
    except (ValueError, IndexError):
        logging.warning(f"无法解析 ksoftirqd 行: {line.strip()}")
        return None

def calculate_averages(file_path: str, idle_threshold: float = 90) -> Optional[Tuple[float, int, float, float, float]]:
    """
    计算文件中 CPU 和 ksoftirqd 的平均值。
    """
    total_idle = total_irq = total_sirq = total_ksoftirqd = 0.0
    line_count = ksoftirqd_count = 0

    try:
        with open(file_path, "r") as file:
            for line in file:
                if line.startswith("CPU:"):
                    result = parse_cpu_line(line)
                    if result:
                        idle_value, irq_value, sirq_value = result
                        if idle_value <= idle_threshold:
                            total_idle += idle_value
                            total_irq += irq_value
                            total_sirq += sirq_value
                            line_count += 1
                elif "[ksoftirqd/" in line:
                    ksoftirqd_value = parse_ksoftirqd_line(line)
                    if ksoftirqd_value is not None:
                        total_ksoftirqd += ksoftirqd_value
                        ksoftirqd_count += 1
    except FileNotFoundError:
        logging.error(f"文件未找到: {file_path}")
        return None
    except Exception as e:
        logging.error(f"处理文件时错误: {file_path}, 错误信息: {e}")
        return None

    if line_count == 0:
        return None

    average_idle = total_idle / line_count
    average_irq = total_irq / line_count
    average_sirq = total_sirq / line_count
    average_ksoftirqd = total_ksoftirqd / ksoftirqd_count if ksoftirqd_count > 0 else None

    return average_idle, line_count, average_irq, average_sirq, average_ksoftirqd

def process_files_in_directory(root_directory: str, output_file_path: str) -> None:
    """
    处理目录中的所有 .txt 文件,并将结果写入输出文件。
    """
    with open(output_file_path, "w") as output_file:
        for root, _, files in os.walk(root_directory):
            for file in files:
                if file.endswith(".txt"):
                    file_path = os.path.join(root, file)
                    result = calculate_averages(file_path)
                    if result:
                        average_idle, line_count, average_irq, average_sirq, average_ksoftirqd = result
                        output_file.write(f"文件:{file_path}\n")
                        output_file.write(f"统计数:{line_count}\n")
                        output_file.write(f"CPU占用率的平均值为:{100 - average_idle:.2f}%\n")
                        output_file.write(f"irq占用率的平均值为:{average_irq:.2f}%\n")
                        output_file.write(f"sirq占用率的平均值为:{average_sirq:.2f}%\n")
                        if average_ksoftirqd is not None:
                            output_file.write(f"ksoftirqd占用率的平均值为:{average_ksoftirqd:.2f}%\n\n")
                        else:
                            output_file.write("ksoftirqd占用率的平均值:无数据\n\n")
                    else:
                        output_file.write(f"文件:{file_path} 中没有找到有效的 CPU 信息\n")

# 主程序
if __name__ == "__main__":
    root_directory = "/home/Desktop/A/P42U28_IPMAC_top"  # 替换为你的根目录路径
    file_name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file_path = f"/home/Desktop/A/average_{file_name}.log"
    process_files_in_directory(root_directory, output_file_path)