-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathp1_utils.py
134 lines (110 loc) · 4.79 KB
/
p1_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import pyspark
from pyspark.sql import SparkSession
import numpy as np
from operator import add
from __future__ import division
import math
# used for final calculation of scores
def val_predict(predictions):
correct_predict_count, all_predict_count = 0, 0
for pred_y, y_true in predictions.items():
pred_y = pred_y.split()[1]
if pred_y == y_true:
correct_predict_count += 1
all_predict_count += 1
return correct_predict_count/all_predict_count
# --- step 1 --- #
# get dict key: byte file path, value: class label
def map_datasets2labels(sc, train_name, test_name):
X_train_rdd=sc.textFile('gs://uga-dsp/project1/files/'+train_name).map(lambda x: 'gs://uga-dsp/project1/data/bytes/'+x+'.bytes')
bytes = X_train_rdd.reduce(lambda x, y: x + "," + y)
if test_name != '0':
# read class file and make a dict of key=file_path value=malware_class_num
y_ = sc.textFile('gs://uga-dsp/project1/files/'+test_name).reduce(lambda x, y: x + "," + y)
mapper = dict(zip(bytes.split(','),(y_).split(',')))
else:
mapper = dict(zip(bytes.split(','),zip(bytes.split(','))) # NO val needed
return mapper # mapper is a dict of key:filepath, value:class label
# --- step 2 --- #
# read byte file's count words per class type return a dict of key: class, value: list of word count's for that class
def rdd_fix(rdd):
rdd = rdd.flatMap(lambda x: x.split()).filter(lambda x: clean(x))
# rdd = rdd.map(lambda x: (x, 1))
return rdd
def bigram(rdd): # input rdd file loaded
print('g')
rdd = rdd.map(lambda x: x[9:]).map(lambda line: line.strip().split(" ")).flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:]))).map(lambda x: (str(x[0]) + ' ' + str(x[1])))
return rdd
# removes "words" of len greater than 2 and "?"
def clean(x):
if (len(x)>2) or ("?" in x):
pass
else:
return x
# generates two diffent set, if you run trainset=true you get a dict of RDD's formatted key:'class', value: list of word count's for that class
# if trainset=false key:'path', value: words E.G. ("00", "BG" .... "01")
def generate_count_rdds(sc, mapper, trainset=True):
files_rdds = {}
class_count = {}
word_perClass = {}
for k, v in mapper.items():
if (v in files_rdds.keys()) and (trainset):
class_count[v] += 1
files_rdds[v]= rdd_fix(sc.textFile(k)).union(files_rdds[v])
else:
if trainset:
class_count[v] = 1
files_rdds[v]= rdd_fix(sc.textFile(k))
else:
class_count[v] = 1
files_rdds[str(k)+' '+str(v)]= rdd_fix(sc.textFile(k))
for k, v in files_rdds.items():
if trainset:
word_perClass[k] = files_rdds[k].count()
files_rdds[k] = files_rdds[k].map(lambda x: (x, 1)).reduceByKey(add)
return files_rdds, class_count, word_perClass
def generate_count_rdds_bigram(sc, mapper, trainset=True):
files_rdds = {}
class_count = {}
word_perClass = {}
for k, v in mapper.items():
print(k)
if (v in files_rdds.keys()) and (trainset):
class_count[v] += 1
files_rdds[v]= bigram(sc.textFile(k)).union(files_rdds[v])
else:
if trainset:
class_count[v] = 1
files_rdds[v]= bigram(sc.textFile(k))
else:
class_count[v] = 1
files_rdds[str(k)+' '+str(v)]= bigram(sc.textFile(k))
for k, v in files_rdds.items():
print(k)
if trainset:
word_perClass[k] = files_rdds[k].count()
files_rdds[k] = files_rdds[k].map(lambda x: (x, 1)).reduceByKey(add)
return files_rdds, class_count, word_perClass
# --- step 3 --- #
# get total count info
def total_train_info(files_rdds):
# collectAsMap for total "word" counts
rdd_names = list(files_rdds.keys())
rdd1 = rdd_names.pop(-1)
v2 = files_rdds[rdd1]
for v in rdd_names:
v2 = v2.union(files_rdds[v])# union keeps RDD form
total_count_map=v2.reduceByKey(add).collectAsMap()
return len(total_count_map), total_count_map # len(total_count_map) = count of all unique words = 256, total_count_map = dict
# --- step 4 --- #
def P_xi_given_yk(word,count):
# (("01" in class 1) + (1/vocab size)) / ((# words in class 1) + 1)
prob = (float(count)+(1/float(total_count)))/(float(current_word_perClass))
return word, prob
def score_calc_fast(word, count):
if word in word2prob.keys():
prob = float(math.log10(float(word2prob[word])))*count
else:
_, prob = P_xi_given_yk(word,0)
prob = prob*count
return prob