脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Python - python实现简易内存监控

python实现简易内存监控

2021-03-07 00:28betterFWJ Python

这篇文章主要介绍了python实现简易内存监控,每隔3秒获取系统内存,当内存超过设定的警报值时,获取所有进程占用内存并发出警报声,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本例主要功能:每隔3秒获取系统内存,当内存超过设定的警报值时,获取所有进程占用内存并发出警报声。内存值和所有进程占用内存记入log,log文件按天命名。

1 获取cpu、内存、进程信息

利用WMI

简单说明下,WMI的全称是Windows Management Instrumentation,即Windows管理规范。它是Windows操作系统上管理数据和操作的基础设施。我们可以使用WMI脚本或者应用自动化管理任务等。

安装模块

WMI下载地址
win32com下载地址:

学会使用WMI

不错的教程

获取cpu、内存、磁盘

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def getSysInfo(wmiService = None):
 result = {}
 if wmiService == None:
  wmiService = wmi.WMI()
 # cpu
 for cpu in wmiService.Win32_Processor():
  timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime())
  result['cpuPercent'] = cpu.loadPercentage
 # memory
 cs = wmiService.Win32_ComputerSystem()
 os = wmiService.Win32_OperatingSystem()
 result['memTotal'] = int(int(cs[0].TotalPhysicalMemory)/1024/1024)
 result['memFree'] = int(int(os[0].FreePhysicalMemory)/1024)
 result['memPercent']=result['memFree'] * 100 /result['memTotal']
 #disk
 result['diskTotal'] = 0
 result['diskFree'] = 0
 for disk in wmiService.Win32_LogicalDisk(DriveType=3):
  result['diskTotal'] += int(disk.Size)
  result['diskFree'] += int(disk.FreeSpace)
 result['diskTotal'] = int(result['diskTotal']/1024/1024)
 result['diskFree'] = int(result['diskFree']/1024/1024)
 return result

获取所有进程占用内存

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def getAllProcessInfo(mywmi = None):
 """取出全部进程的进程名,进程ID,进程实际内存, 虚拟内存,CPU使用率
 """
 allProcessList = []
 
 allProcess = mywmi.ExecQuery("SELECT * FROM Win32_PerfFormattedData_PerfProc_Process")
 #print (allProcess.count)
 for j in allProcess:
  #print j.Properties_("PercentPrivilegedTime").__int__()
  ##print j.Properties_("name").__str__()+" "+j.Properties_("IDProcess").__str__()+" "+j.Properties_("PercentPrivilegedTime").__str__()
  #for pro in j.Properties_:
  # print (pro.name)
  #break
  name = j.Properties_("name").__str__()
  if name != "_Total" and name !="Idle":
   pid = j.Properties_("IDProcess").__str__()
   PercentPrivilegedTime = j.Properties_("PercentPrivilegedTime").__int__()
   WorkingSetPrivate = j.Properties_("WorkingSetPrivate").__int__()/1024
   WorkingSet = j.Properties_("WorkingSet").__int__()/1024
   allProcessList.append([name, pid, WorkingSetPrivate, WorkingSet, PercentPrivilegedTime])
 
 return allProcessList

也可以用psutil

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import psutil,time
from operator import itemgetter, attrgetter
 
def getProcessInfo(p):
 """取出指定进程占用的进程名,进程ID,进程实际内存, 虚拟内存,CPU使用率
 """
 try:
  cpu = int(p.cpu_percent(interval=0))
  memory = p.memory_info()
  rss = memory.rss/1024
  vms = memory.vms/1024
  name = p.name()
  pid = p.pid
 except psutil.Error:
  name = "Closed_Process"
  pid = 0
  rss = 0
  vms = 0
  cpu = 0
 #return [name.upper(), pid, rss, vms]
 return [name, pid, vms, rss, cpu]
 
def getAllProcessInfo():
 """取出全部进程的进程名,进程ID,进程实际内存, 虚拟内存,CPU使用率
 """
 instances = []
 all_processes = list(psutil.process_iter())
 for proc in all_processes:
  proc.cpu_percent(interval=0)
 #此处sleep1秒是取正确取出CPU使用率的重点
 time.sleep(1)
 for proc in all_processes:
  instances.append(getProcessInfo(proc))
 return instances
 
 
if __name__ == '__main__':
 processInfoList = getAllProcessInfo()
 processInfoList.sort(key=itemgetter(2), reverse=True)
 for p in processInfoList:
  print(p)

2. 保存log

配置config文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[loggers]
keys=example01
[logger_example01]
handlers=hand04
 
[handlers]
keys=hand04
[handler_hand04]
class=handlers.TimedRotatingFileHandler
level=DEBUG
formatter=form01
args=('./logs/monitor.log', 'd', 1, 7)
 
[formatters]
keys=form01,form02
[formatter_form01]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s
datefmt=%Y-%m-%d %H:%M:%S

记录log

?
1
2
3
4
5
6
import logging
import logging.config
 
logger.info("message")
logger.warning("message")
logger.error("message")

3. 完整代码

文件夹结构:

maintain
–monitor
—-logs
—-logger.conf
—-monitor.py
–packages
—-init.py
—-processinfo.py
—-sysinfo.py

monitor

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import wmi
import time
import winsound
import logging
import logging.config
from operator import itemgetter, attrgetter
from os import path
 
import packages.sysinfo #使用wmi
#import packages.ProcessInfo #使用
 
#def ShowProcessInfo():
# processInfoList = packages.ProcessInfo.getAllProcessInfo()
# processInfoList.sort(key=itemgetter(2), reverse=True)
# for p in processInfoList: 
#  logger.info(p)
 
def ShowProcessInfo(wmiService = None):
 processInfoList = packages.sysinfo.getAllProcessInfo(wmiService)
 processInfoList.sort(key=itemgetter(2), reverse=True)
 for p in processInfoList: 
  logger.info(p)
 
if __name__ == '__main__':
 MemPerWorningLine = 50
 MemPerErrorLine = 20
 ErrorAlertCount = 10
 ProcessInfoCount = 10
 counterProcessInfo = ProcessInfoCount
 
 print("Memory monitor start!")
 log_file_path = path.join(path.dirname(path.abspath(__file__)), 'logger.conf')
 #print(log_file_path)
 logging.config.fileConfig(log_file_path)
 logger = logging.getLogger("example01")
 wmiService = wmi.WMI()
 while True:
  memPercent = int(packages.sysinfo.getSysInfo(wmiService)['memPercent'])
  strMemPercent = 'FreeMemory: ' + str(memPercent) + '%'
  if(memPercent < MemPerErrorLine):
   logger.error(strMemPercent)
   #ProcessInfoList
   counterProcessInfo+=1
   if(counterProcessInfo >= ProcessInfoCount):
    ShowProcessInfo(wmiService)
    counterProcessInfo = 0
   #ALert
   counter = 1
   while counter <= ErrorAlertCount:
    winsound.Beep(2080, 100)
    time.sleep(0.1)
    counter += 1
  elif(memPercent < MemPerWorningLine):
   logger.warning(strMemPercent)
   #ProcessInfoList
   counterProcessInfo+=1
   if(counterProcessInfo >= ProcessInfoCount):
    ShowProcessInfo(wmiService)
    counterProcessInfo = 0
   #ALert
   winsound.Beep(2015, 2000)
  else:
   logger.info(strMemPercent)
  time.sleep(3)

sysinfo

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# -*- coding: utf-8 -*-
 
import wmi
import os
import sys
import platform
import time
import win32api
import win32com
from win32com.client import GetObject
from operator import itemgetter, attrgetter
 
def getSysInfo(wmiService = None):
 result = {}
 if wmiService == None:
  wmiService = wmi.WMI()
 # cpu
 for cpu in wmiService.Win32_Processor():
  timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime())
  result['cpuPercent'] = cpu.loadPercentage
 # memory
 cs = wmiService.Win32_ComputerSystem()
 os = wmiService.Win32_OperatingSystem()
 result['memTotal'] = int(int(cs[0].TotalPhysicalMemory)/1024/1024)
 result['memFree'] = int(int(os[0].FreePhysicalMemory)/1024)
 result['memPercent']=result['memFree'] * 100 /result['memTotal']
 #disk
 result['diskTotal'] = 0
 result['diskFree'] = 0
 for disk in wmiService.Win32_LogicalDisk(DriveType=3):
  result['diskTotal'] += int(disk.Size)
  result['diskFree'] += int(disk.FreeSpace)
 result['diskTotal'] = int(result['diskTotal']/1024/1024)
 result['diskFree'] = int(result['diskFree']/1024/1024)
 return result
 
def sys_version():
 c = wmi.WMI ()
 #获取操作系统版本
 for sys in c.Win32_OperatingSystem():
  print ("Version:%s" % sys.Caption.encode("UTF8"),"Vernum:%s" % sys.BuildNumber)
  print (sys.OSArchitecture.encode("UTF8"))#系统是32位还是64位的
  print (sys.NumberOfProcesses) #当前系统运行的进程总数
 
def cpu_mem():
 c = wmi.WMI () 
 #CPU类型和内存
 for processor in c.Win32_Processor():
  #print "Processor ID: %s" % processor.DeviceID
  print ("Process Name: %s" % processor.Name.strip() )
 for Memory in c.Win32_PhysicalMemory():
  print ("Memory Capacity: %.fMB" %(int(Memory.Capacity)/1048576))
 
def cpu_use():
 #5s取一次CPU的使用率
 c = wmi.WMI()
 while True:
  for cpu in c.Win32_Processor():
    timestamp = time.strftime('%a, %d %b %Y %H:%M:%S', time.localtime())
    print ('%s | Utilization: %s: %d %%' % (timestamp, cpu.DeviceID, cpu.LoadPercentage))
    time.sleep(5)
 
def disk():
 c = wmi.WMI ()
 #获取硬盘分区
 for physical_disk in c.Win32_DiskDrive ():
  for partition in physical_disk.associators ("Win32_DiskDriveToDiskPartition"):
   for logical_disk in partition.associators ("Win32_LogicalDiskToPartition"):
    print (physical_disk.Caption.encode("UTF8"), partition.Caption.encode("UTF8"), logical_disk.Caption)
 
 #获取硬盘使用百分情况
 for disk in c.Win32_LogicalDisk (DriveType=3):
  print (disk.Caption, "%0.2f%% free" % (100.0 * long (disk.FreeSpace) / long (disk.Size)))
 
def network():
 c = wmi.WMI () 
 #获取MAC和IP地址
 for interface in c.Win32_NetworkAdapterConfiguration (IPEnabled=1):
  print ("MAC: %s" % interface.MACAddress )
 for ip_address in interface.IPAddress:
  print ("ip_add: %s" % ip_address )
 print
 
 #获取自启动程序的位置
 for s in c.Win32_StartupCommand ():
  print ("[%s] %s <%s>" % (s.Location.encode("UTF8"), s.Caption.encode("UTF8"), s.Command.encode("UTF8")))
 
 
 #获取当前运行的进程
 for process in c.Win32_Process ():
  print (process.ProcessId, process.Name)
 
def getAllProcessInfo(mywmi = None):
 """取出全部进程的进程名,进程ID,内存(专有工作集), 工作集
 """
 allProcessList = []
 
 allProcess = mywmi.ExecQuery("SELECT * FROM Win32_PerfFormattedData_PerfProc_Process")
 #print (allProcess.count)
 for j in allProcess:
  #print j.Properties_("PercentPrivilegedTime").__int__()
  ##print j.Properties_("name").__str__()+" "+j.Properties_("IDProcess").__str__()+" "+j.Properties_("PercentPrivilegedTime").__str__()
  #for pro in j.Properties_:
  # print (pro.name)
  #break
  name = j.Properties_("name").__str__()
  if name != "_Total" and name !="Idle":
   pid = j.Properties_("IDProcess").__str__()
   PercentPrivilegedTime = j.Properties_("PercentPrivilegedTime").__int__()
   WorkingSetPrivate = j.Properties_("WorkingSetPrivate").__int__()/1024
   WorkingSet = j.Properties_("WorkingSet").__int__()/1024
   allProcessList.append([name, pid, WorkingSetPrivate, WorkingSet, PercentPrivilegedTime])
 
# allProcess = mywmi.ExecQuery("select * from Win32_Process")
# for i in allProcess:
#  Name = str(i.Properties_("Name"))
#  ProcessID = int(i.Properties_("ProcessID"))
#  WorkingSetSize = int(i.Properties_("WorkingSetSize"))/1024
#  #VirtualSize = int(i.Properties_("VirtualSize"))/1024
#  PeakWorkingSetSize = int(i.Properties_("PeakWorkingSetSize"))/1024
#  CreationDate = str(i.Properties_("CreationDate"))
#  allProcessList.append([Name, ProcessID, WorkingSetSize, PeakWorkingSetSize, CreationDate])
 
 return allProcessList
 
#def main():
 #sys_version()
 #cpu_mem()
 #disk()
 #network()
 #cpu_use()
 
if __name__ == '__main__':
 #mywmi = GetObject("winmgmts:")
 mywmi = wmi.WMI()
 processInfoList = getAllProcessInfo(mywmi)
 processInfoList.sort(key=itemgetter(2), reverse=True)
 for processinfo in processInfoList:
  print(processinfo)

processinfo

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import psutil,time
from operator import itemgetter, attrgetter
 
def getProcessInfo(p):
 """取出指定进程占用的进程名,进程ID,进程实际内存, 虚拟内存,CPU使用率
 """
 try:
  cpu = int(p.cpu_percent(interval=0))
  memory = p.memory_info()
  rss = memory.rss/1024
  vms = memory.vms/1024
  name = p.name()
  pid = p.pid
 except psutil.Error:
  name = "Closed_Process"
  pid = 0
  rss = 0
  vms = 0
  cpu = 0
 #return [name.upper(), pid, rss, vms]
 return [name, pid, vms, rss, cpu]
 
def getAllProcessInfo():
 """取出全部进程的进程名,进程ID,进程实际内存, 虚拟内存,CPU使用率
 """
 instances = []
 all_processes = list(psutil.process_iter())
 for proc in all_processes:
  proc.cpu_percent(interval=0)
 #此处sleep1秒是取正确取出CPU使用率的重点
 time.sleep(1)
 for proc in all_processes:
  instances.append(getProcessInfo(proc))
 return instances
 
 
if __name__ == '__main__':
 processInfoList = getAllProcessInfo()
 processInfoList.sort(key=itemgetter(2), reverse=True)
 for p in processInfoList:
  print(p)

logger

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#logger.conf
###############################################
[loggers]
keys=root,example01,example02
[logger_root]
level=DEBUG
handlers=hand01,hand02
[logger_example01]
handlers=hand01,hand04
qualname=example01
propagate=0
[logger_example02]
handlers=hand01,hand03
qualname=example02
propagate=0
###############################################
[handlers]
keys=hand01,hand02,hand03,hand04
[handler_hand01]
class=StreamHandler
level=INFO
formatter=form02
args=(sys.stderr,)
[handler_hand02]
class=FileHandler
level=DEBUG
formatter=form01
args=('myapp.log', 'a')
[handler_hand03]
class=handlers.RotatingFileHandler
level=INFO
formatter=form02
args=('myapp.log', 'a', 10*1024*1024, 5)
[handler_hand04]
class=handlers.TimedRotatingFileHandler
level=DEBUG
formatter=form01
args=('./logs/monitor.log', 'd', 1, 7)
###############################################
[formatters]
keys=form01,form02
[formatter_form01]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s
datefmt=%Y-%m-%d %H:%M:%S
[formatter_form02]
format=%(asctime)-12s: %(levelname)-8s %(message)s
datefmt=%Y-%m-%d %H:%M:%S

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/virusnono/article/details/55508319

延伸 · 阅读

精彩推荐