mirror of
https://github.com/MAGICGrants/Monero-Dataset-Pipeline.git
synced 2026-01-09 13:37:57 -05:00
Added dataset export to CSV
This commit is contained in:
@@ -11,16 +11,16 @@ from datetime import datetime
|
||||
from collections import Counter
|
||||
from sklearn.utils import shuffle
|
||||
from cherrypicker import CherryPicker # https://pypi.org/project/cherrypicker/
|
||||
from pandas import DataFrame, concat, options
|
||||
from requests.exceptions import ConnectionError
|
||||
from os import walk, getcwd, chdir, listdir, path
|
||||
from multiprocessing import Pool, cpu_count, Manager
|
||||
from pandas import DataFrame, concat, options, read_parquet
|
||||
from os import walk, getcwd, chdir, listdir, fsync, system, remove
|
||||
options.mode.chained_assignment = None # default='warn'
|
||||
|
||||
'''
|
||||
Description:
|
||||
Usage: ./create_dataset.py < Wallets Directory Path >
|
||||
Date: 6/7/2022
|
||||
Date: 6/8/2022
|
||||
Author: ACK-J
|
||||
|
||||
Warning: DO NOT run this with a remote node, there are a lot of blockchain lookups and it will be slow!
|
||||
@@ -523,7 +523,6 @@ def create_feature_set(database):
|
||||
:return: A pandas dataframe of the input data and a list of labels
|
||||
"""
|
||||
labels = []
|
||||
Valid_Transactions = []
|
||||
num_errors = 0
|
||||
feature_set = dict()
|
||||
num_of_valid_txs = 0 # Incrementer which doesn't count invalid txs
|
||||
@@ -535,8 +534,6 @@ def create_feature_set(database):
|
||||
except Exception as e:
|
||||
num_errors += 1
|
||||
continue # Dont process the tx and loop
|
||||
# add tx hash to good list
|
||||
#Valid_Transactions.append(DataFrame(CherryPicker(database[tx_hash]).flatten(delim='.').get(), index=[idx]))
|
||||
# Flatten each transaction and iterate over each feature
|
||||
for k, v in CherryPicker(database[tx_hash]).flatten(delim='.').get().items():
|
||||
# Check if the feature name is not already in the feature set
|
||||
@@ -704,8 +701,25 @@ def undersample(X, y):
|
||||
undersampled_X = concat(new_X, axis=0)
|
||||
del new_X
|
||||
collect() # Garbage collector
|
||||
undersampled_X.reset_index(drop=True, inplace=True)
|
||||
|
||||
# Sometimes there is a race condition where a class will get +1 samples in the class ( most of the time this happens while debugging )
|
||||
# Sometimes there is a race condition where a class will get +1 samples in the class
|
||||
# This happens due to the shared memory and multiprocessing
|
||||
if not len(undersampled_X) == len(undersampled_y) == (min_occurrences * NUM_RING_MEMBERS):
|
||||
# Find the number of occurrences for each class so we can find the locate the outlier
|
||||
for class_num, class_occurrences in Counter(undersampled_y).items():
|
||||
# Check if the number of occurrences is not equal to the undersampled amount
|
||||
if class_occurrences != min_occurrences:
|
||||
# Iterate the number of times the extra class occurred
|
||||
for _ in range(class_occurrences-min_occurrences):
|
||||
# Iterate over the labels
|
||||
for pos, val in enumerate(undersampled_y):
|
||||
# Check if the label is an occurrence of the bad class
|
||||
if val == class_num:
|
||||
# Delete the entry
|
||||
undersampled_X.drop(undersampled_X.index[[pos]], inplace=True)
|
||||
del undersampled_y[pos]
|
||||
break
|
||||
assert len(undersampled_X) == len(undersampled_y) == (min_occurrences * NUM_RING_MEMBERS)
|
||||
|
||||
# Shuffle the data one last time
|
||||
@@ -716,6 +730,62 @@ def undersample(X, y):
|
||||
return undersampled_X, undersampled_y
|
||||
|
||||
|
||||
def write_dict_to_csv(data_dict):
|
||||
"""
|
||||
|
||||
:param data_dict:
|
||||
:return:
|
||||
"""
|
||||
# Keep track of all column names for the CSV
|
||||
column_names = []
|
||||
with open("./Dataset_Files/dataset.csv", "a") as fp:
|
||||
fp.write("HEADERS"+"\n") # Keep a placeholder for the column names
|
||||
first_tx = True
|
||||
# Iterate over each tx hash
|
||||
for tx_hash in tqdm(iterable=data_dict.keys(), total=len(data_dict.keys()), colour='blue', desc="Writing Dataset to Disk as a CSV"):
|
||||
# Flatten the transaction dictionary
|
||||
tx_metadata = CherryPicker(data_dict[tx_hash]).flatten(delim='.').get()
|
||||
column_values = []
|
||||
# For the first tx add the entire transaction
|
||||
if first_tx:
|
||||
column_names = column_names + list(tx_metadata.keys())
|
||||
column_values = column_values + [str(element) for element in list(tx_metadata.values())]
|
||||
fp.write(','.join(column_values) + "\n")
|
||||
first_tx = False
|
||||
continue
|
||||
# Iterate over each name of the transaction
|
||||
for name in tx_metadata.keys():
|
||||
# Check if the name is already in column names
|
||||
if name not in column_names:
|
||||
column_names.append(name)
|
||||
value_orders = []
|
||||
# Iterate over the names of the transaction
|
||||
for idx, name in enumerate(list(tx_metadata.keys())):
|
||||
# Find the index of the name in column_names and add it to value_orders
|
||||
value_orders.append(column_names.index(name))
|
||||
# Sort the transaction values by the order of the indexes in value_orders
|
||||
sorted_values = sorted(zip(value_orders, list(tx_metadata.values())))
|
||||
max_sorted_val = len(sorted_values)
|
||||
# Iterate over the length of the column_names to add the sorted values
|
||||
for column_idx in range(len(column_names)):
|
||||
# If the column number is greater than the max, add a placeholder
|
||||
if column_idx+1 > max_sorted_val:
|
||||
fp.write(',')
|
||||
continue
|
||||
# Check if the sorted idx is not the column name index
|
||||
if sorted_values[column_idx][0] != column_idx:
|
||||
fp.write(',')
|
||||
continue
|
||||
else:
|
||||
fp.write(str(sorted_values[column_idx][1]) + ",") # Add the value
|
||||
fp.write("\n")
|
||||
fp.flush() # https://stackoverflow.com/questions/3167494/how-often-does-python-flush-to-a-file
|
||||
fsync(fp.fileno())
|
||||
# I looked everywhere and there is no way to replace the top line of a
|
||||
# file in python without loading the entire file into RAM. So syscall it is...
|
||||
system("sed -i '1,2s|HEADERS|" + ", ".join(column_names) + "|g' ./Dataset_Files/dataset.csv")
|
||||
|
||||
|
||||
def main():
|
||||
# Error Checking for command line args
|
||||
if len(argv) != 2:
|
||||
@@ -729,11 +799,20 @@ def main():
|
||||
# Configuration alert
|
||||
print("The dataset is being collected for the " + blue + NETWORK + reset + " network using " + API_URL + " as a block explorer!")
|
||||
|
||||
if exists("./Dataset_Files/dataset.csv"):
|
||||
while True:
|
||||
answer = input(blue + "Dataset files exists already. They will be overwritten, Delete them? (y/n)" + reset)
|
||||
if answer.lower()[0] == "y":
|
||||
remove("./Dataset_Files/dataset.csv")
|
||||
break
|
||||
elif answer.lower()[0] == "n":
|
||||
break
|
||||
|
||||
###########################################
|
||||
# Create the dataset from files on disk #
|
||||
###########################################
|
||||
global data
|
||||
print(blue + "Opening " + str(argv[1]) + reset + "\n")
|
||||
print(blue + "Opening " + str(argv[1]) + reset)
|
||||
# Find where the wallets are stored and combine the exported csv files
|
||||
discover_wallet_directories(argv[1])
|
||||
|
||||
@@ -759,6 +838,10 @@ def main():
|
||||
#################################
|
||||
with open("./Dataset_Files/dataset.pkl", "rb") as fp:
|
||||
data = pickle.load(fp)
|
||||
|
||||
# Write the dictionary to disk as a CSV
|
||||
write_dict_to_csv(data)
|
||||
|
||||
# Feature selection on raw dataset
|
||||
X, y = create_feature_set(data)
|
||||
del data
|
||||
|
||||
Reference in New Issue
Block a user