本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
import simplejson
import urllib
import urllib2
import os, sys
import logging
try:
import sqlite3
except ImportError:
sys.stderr.write(\"ERROR: Unable to locate Python SQLite3 module. \" \\
\"Please verify your installation. Exiting...\\n\")
sys.exit(-1)
MD5 = \"5248f774d2ee0a10936d0b1dc89107f1\"
MD5 = \"12fa5fb74201d9b6a14f63fbf9a81ff6\" #do not have report on virustotal.com
APIKEY = \"xxxxxxxxxxxxxxxxxx\"用自己的
class VirusTotalDatabase:
\"\"\"
Database abstraction layer.
\"\"\"
def __init__(self, db_file):
log = logging.getLogger(\"Database.Init\")
self.__dbfile = db_file
self._conn = None
self._cursor = None
# Check if SQLite database already exists. If it doesn\'t exist I invoke
# the generation procedure.
if not os.path.exists(self.__dbfile):
if self._generate():
print(\"Generated database \\\"%s\\\" which didn\'t\" \\
\" exist before.\" % self.__dbfile)
else:
print(\"Unable to generate database\")
# Once the database is generated of it already has been, I can
# initialize the connection.
try:
self._conn = sqlite3.connect(self.__dbfile)
self._cursor = self._conn.cursor()
except Exception, why:
print(\"Unable to connect to database \\\"%s\\\": %s.\"
% (self.__dbfile, why))
log.debug(\"Connected to SQLite database \\\"%s\\\".\" % self.__dbfile)
def _generate(self):
\"\"\"
Creates database structure in a SQLite file.
\"\"\"
if os.path.exists(self.__dbfile):
return False
db_dir = os.path.dirname(self.__dbfile)
if not os.path.exists(db_dir):
try:
os.makedirs(db_dir)
except (IOError, os.error), why:
print(\"Something went wrong while creating database \" \\
\"directory \\\"%s\\\": %s\" % (db_dir, why))
return False
conn = sqlite3.connect(self.__dbfile)
cursor = conn.cursor()
cursor.execute(\"CREATE TABLE virustotal (\\n\" \\
\" id INTEGER PRIMARY KEY,\\n\" \\
\" md5 TEXT NOT NULL,\\n\" \\
\" Kaspersky TEXT DEFAULT NULL,\\n\" \\
\" McAfee TEXT DEFAULT NULL,\\n\" \\
\" Symantec TEXT DEFAULT NULL,\\n\" \\
\" Norman TEXT DEFAULT NULL,\\n\" \\
\" Avast TEXT DEFAULT NULL,\\n\" \\
\" NOD32 TEXT DEFAULT NULL,\\n\" \\
\" BitDefender TEXT DEFAULT NULL,\\n\" \\
\" Microsoft TEXT DEFAULT NULL,\\n\" \\
\" Rising TEXT DEFAULT NULL,\\n\" \\
\" Panda TEXT DEFAULT NULL\\n\" \\
\");\")
print \"create db:%s sucess\" % self.__dbfile
return True
def _get_task_dict(self, row):
try:
task = {}
task[\"id\"] = row[0]
task[\"md5\"] = row[1]
task[\"Kaspersky\"] = row[2]
task[\"McAfee\"] = row[3]
task[\"Symantec\"] = row[4]
task[\"Norman\"] = row[5]
task[\"Avast\"] = row[6]
task[\"NOD32\"] = row[7]
task[\"BitDefender\"] = row[8]
task[\"Microsoft\"] = row[9]
task[\"Rising\"] = row[10]
task[\"Panda\"] = row[11]
return task
except Exception, why:
return None
def add_sample(self, md5, virus_dict):
\"\"\"
\"\"\"
task_id = None
if not self._cursor:
return None
if not md5 or md5 == \"\":
return None
Kaspersky = virus_dict.get(\"Kaspersky\", None)
McAfee = virus_dict.get(\"McAfee\", None)
Symantec = virus_dict.get(\"Symantec\", None)
Norman = virus_dict.get(\"Norman\", None)
Avast = virus_dict.get(\"Avast\", None)
NOD32 = virus_dict.get(\"NOD32\", None)
BitDefender = virus_dict.get(\"BitDefender\", None)
Microsoft = virus_dict.get(\"Microsoft\", None)
Rising = virus_dict.get(\"Rising\", None)
Panda = virus_dict.get(\"Panda\", None)
self._conn.text_factory = str
try:
self._cursor.execute(\"SELECT id FROM virustotal WHERE md5 = ?;\",
(md5,))
sample_row = self._cursor.fetchone()
except sqlite3.OperationalError, why:
print \"sqlite3 error:%s\\n\" % str(why)
return False
if sample_row:
try:
sample_row = sample_row[0]
self._cursor.execute(\"UPDATE virustotal SET Kaspersky=?, McAfee=?, Symantec=?, Norman=?, Avast=?, \\
NOD32=?, BitDefender=?, Microsoft=?, Rising=?, Panda=? WHERE id = ?;\",
(Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft,\\
Rising, Panda, sample_row))
self._conn.commit()
task_id = sample_row
except sqlite3.OperationalError, why:
print(\"Unable to update database: %s.\" % why)
return False
else: #the sample not in the database
try:
self._cursor.execute(\"INSERT INTO virustotal \" \\
\"(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\\
Microsoft, Rising, Panda) \" \\
\"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);\",
(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\\
Microsoft, Rising, Panda))
self._conn.commit()
task_id = self._cursor.lastrowid
except sqlite3.OperationalError, why:
print \"why\",str(why)
return None
print \"add_to_db:%s, task_id:%s\" % (str(self.__dbfile), str(task_id))
return task_id
def get_sample(self):
\"\"\"
Gets a task from pending queue.
\"\"\"
log = logging.getLogger(\"Database.GetTask\")
if not self._cursor:
log.error(\"Unable to acquire cursor.\")
return None
# Select one item from the queue table with higher priority and older
# addition date which has not already been processed.
try:
self._cursor.execute(\"SELECT * FROM virustotal \" \\
#\"WHERE lock = 0 \" \\
#\"AND status = 0 \" \\
\"ORDER BY id, added_on LIMIT 1;\")
except sqlite3.OperationalError, why:
log.error(\"Unable to query database: %s.\" % why)
return None
sample_row = self._cursor.fetchone()
if sample_row:
return self._get_task_dict(sample_row)
else:
return None
def search_md5(self, md5):
\"\"\"
\"\"\"
if not self._cursor:
return None
if not md5 or len(md5) != 32:
return None
try:
self._cursor.execute(\"SELECT * FROM virustotal \" \\
\"WHERE md5 = ? \" \\
#\"AND status = 1 \" \\
\"ORDER BY id DESC;\",
(md5,))
except sqlite3.OperationalError, why:
return None
task_dict = {}
for row in self._cursor.fetchall():
task_dict = self._get_task_dict(row)
#if task_dict:
#tasks.append(task_dict)
return task_dict
class VirusTotal:
\"\"\"\"\"\"
def __init__(self, md5):
\"\"\"Constructor\"\"\"
self._virus_dict = {}
self._md5 = md5
self._db_file = r\"./db/virustotal.db\"
self.get_report_dict()
def repr(self):
return str(self._virus_dict)
def submit_md5(self, file_path):
import postfile
#submit the file
FILE_NAME = os.path.basename(file_path)
host = \"www.virustotal.com\"
selector = \"https://www.virustotal.com/vtapi/v2/file/scan\"
fields = [(\"apikey\", APIKEY)]
file_to_send = open(file_path, \"rb\").read()
files = [(\"file\", FILE_NAME, file_to_send)]
json = postfile.post_multipart(host, selector, fields, files)
print json
pass
def get_report_dict(self):
result_dict = {}
url = \"https://www.virustotal.com/vtapi/v2/file/report\"
parameters = {\"resource\": self._md5,
\"apikey\": APIKEY}
data = urllib.urlencode(parameters)
req = urllib2.Request(url, data)
response = urllib2.urlopen(req)
json = response.read()
response_dict = simplejson.loads(json)
if response_dict[\"response_code\"]: #has result
scans_dict = response_dict.get(\"scans\", {})
for anti_virus_comany, virus_name in scans_dict.iteritems():
if virus_name[\"detected\"]:
result_dict.setdefault(anti_virus_comany, virus_name[\"result\"])
return result_dict
def write_to_db(self):
\"\"\"\"\"\"
db = VirusTotalDatabase(self._db_file)
virus_dict = self.get_report_dict()
db.add_sample(self._md5, virus_dict)
使用方法如下:
config = {\'input\':\"inputMd5s\"}
fp = open(config[\'input\'], \"r\")
content = fp.readlines()
MD5S = []
for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)):
MD5S.append(md5)
print \"MD5S\",MD5S
fp.close()
from getVirusTotalInfo import VirusTotal
#得到扫描结果并写入数库
for md5 in MD5S:
virus_total = VirusTotal(md5)
virus_total.write_to_db()
希望本文所述对大家的Python程序设计有所帮助。