1
0
mirror of https://github.com/jugeeya/UltimateTrainingModpack.git synced 2026-04-10 13:41:42 +00:00

Working with pure Rust

This commit is contained in:
jugeeya
2025-05-13 23:03:58 -07:00
parent 0a787b8a0b
commit 1dce06ea51
6 changed files with 457 additions and 148 deletions

View File

@@ -1,12 +1,19 @@
[package]
name = "training_mod_metrics"
version = "0.1.0"
edition = "2018"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
datafusion = "5.0.0"
tokio = "1.11.0"
datafusion = "42.1.0"
tokio = { version = "1.11.0", features = ["rt", "macros"] }
plotters = "0.3.1"
chrono = "0.4.19"
chrono = { version = "0.4.19", features = ["serde"] }
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
structopt = "0.3"
[dev-dependencies]
tempfile = "3.2"

View File

@@ -0,0 +1,319 @@
use anyhow::Result;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use datafusion::arrow::array::{Int64Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::file_format::{
file_compression_type::FileCompressionType, options::NdJsonReadOptions,
};
use datafusion::prelude::*;
use plotters::prelude::*;
use std::sync::Arc;
use structopt::StructOpt;
const OUT_FILE_NAME: &str = "usage_metrics.svg";
#[derive(Debug, StructOpt)]
#[structopt(
name = "analyze_metrics",
about = "Analyze SSBU Training Modpack usage metrics"
)]
struct Opt {
/// Input JSON file path (output from parse_firebase)
#[structopt(parse(from_os_str))]
input: std::path::PathBuf,
/// Output SVG file path
#[structopt(parse(from_os_str), default_value = OUT_FILE_NAME)]
output: std::path::PathBuf,
/// Start date (YYYY-MM-DD format)
#[structopt(long)]
start_date: Option<String>,
/// End date (YYYY-MM-DD format)
#[structopt(long)]
end_date: Option<String>,
}
fn parse_date_arg(date_str: &str) -> Result<i64> {
let naive =
NaiveDateTime::parse_from_str(&format!("{} 00:00:00", date_str), "%Y-%m-%d %H:%M:%S")
.map_err(|e| anyhow::anyhow!("Failed to parse date {}: {}", date_str, e))?;
Ok(Utc.from_utc_datetime(&naive).timestamp_millis())
}
fn timestamp_millis_to_datetime(ts: i64) -> DateTime<Utc> {
Utc.timestamp_opt(ts / 1000, 0).unwrap()
}
async fn analyze_data(
input_path: &str,
output_path: &str,
start_date: Option<&str>,
end_date: Option<&str>,
) -> Result<()> {
// Parse date arguments
let date_filter = match (start_date, end_date) {
(Some(start), Some(end)) => {
let start_ts = parse_date_arg(start)?;
let end_ts = parse_date_arg(end)? + 86400000; // Add one day to include the end date
format!(
"WHERE event_time >= {} AND event_time < {}",
start_ts, end_ts
)
}
(Some(start), None) => {
let start_ts = parse_date_arg(start)?;
format!("WHERE event_time >= {}", start_ts)
}
(None, Some(end)) => {
let end_ts = parse_date_arg(end)? + 86400000;
format!("WHERE event_time < {}", end_ts)
}
(None, None) => String::from(""),
};
// Create a session context
let ctx = SessionContext::new();
// Register the JSON file as a table
ctx.register_json(
"smash_open",
input_path,
NdJsonReadOptions {
schema: Some(&Arc::new(Schema::new(vec![
Field::new("device_id", DataType::Utf8, false),
Field::new("event_name", DataType::Utf8, false),
Field::new("event_time", DataType::Int64, false),
Field::new("menu_settings", DataType::Utf8, false),
Field::new("session_id", DataType::Utf8, false),
Field::new("smash_version", DataType::Utf8, false),
Field::new("mod_version", DataType::Utf8, false),
Field::new("user_id", DataType::Utf8, false),
]))),
schema_infer_max_records: 0,
file_extension: ".json",
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: vec![],
infinite: false,
table_partition_cols: vec![],
},
)
.await?;
// Query to get daily metrics
let query = format!(
"WITH base_stats AS (
SELECT
TO_TIMESTAMP_MILLIS(CAST(event_time / 86400000 * 86400000 AS bigint)) AS date,
device_id,
session_id,
event_time
FROM smash_open
{}
),
daily_stats AS (
SELECT
date,
COUNT(DISTINCT device_id) AS num_devices,
COUNT(DISTINCT session_id) AS num_sessions,
COUNT(*) AS num_events
FROM base_stats
GROUP BY date
)
SELECT
num_devices,
num_sessions,
num_events,
date
FROM daily_stats
ORDER BY date",
date_filter
);
let df = ctx.sql(&query).await?;
let results = df.collect().await?;
// Create the visualization
let root = SVGBackend::new(output_path, (1200, 800)).into_drawing_area();
root.fill(&WHITE)?;
// Calculate y-axis max value with some padding
let (y_max, min_date, max_date) = {
let mut max_value = 0f64;
let mut min_ts = i64::MAX;
let mut max_ts = i64::MIN;
for batch in &results {
for row in 0..batch.num_rows() {
let devices = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(row) as f64;
let sessions = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(row);
let ts = batch
.column(3)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.value(row);
max_value = max_value.max(devices).max(sessions as f64);
min_ts = min_ts.min(ts);
max_ts = max_ts.max(ts);
}
}
// Add 10% padding to the max value
(
(max_value * 1.1).ceil(),
timestamp_millis_to_datetime(min_ts),
timestamp_millis_to_datetime(max_ts),
)
};
let mut chart = ChartBuilder::on(&root)
.caption(
"Training Modpack Usage Metrics",
("sans-serif", 50).into_font(),
)
.margin(15)
.margin_top(30)
.x_label_area_size(80)
.y_label_area_size(60)
.build_cartesian_2d(min_date..max_date, 0f64..y_max)?;
// Configure the mesh
chart
.configure_mesh()
.disable_mesh() // Remove default grid
.bold_line_style(WHITE.mix(0.3)) // Subtle grid lines
.light_line_style(WHITE.mix(0.1))
.axis_style(BLACK.mix(0.7)) // Softer axis lines
.x_labels(12)
.y_labels(10)
.x_label_formatter(&|x| x.format("%Y-%m-%d").to_string())
.y_label_formatter(&|y| {
if *y >= 1000.0 {
format!("{:.1}k", y / 1000.0)
} else {
format!("{:.0}", y)
}
})
.x_label_style(
("sans-serif", 15)
.into_font()
.transform(FontTransform::Rotate90),
)
.y_label_style(("sans-serif", 15).into_font())
.draw()?;
// Draw horizontal grid lines
chart
.configure_mesh()
.disable_x_mesh()
.disable_y_axis()
.disable_x_axis()
.y_desc("Number of Users/Sessions")
.draw()?;
let mut device_data = Vec::new();
let mut session_data = Vec::new();
for batch in &results {
for row in 0..batch.num_rows() {
let ts = batch
.column(3)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.value(row);
let devices = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(row);
let sessions = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(row);
device_data.push((timestamp_millis_to_datetime(ts), devices as f64));
session_data.push((timestamp_millis_to_datetime(ts), sessions as f64));
}
}
// Define custom colors
const DEVICE_COLOR: RGBColor = RGBColor(46, 125, 190); // Blue
const SESSION_COLOR: RGBColor = RGBColor(55, 166, 155); // Teal
// Draw the data series with enhanced styling
chart
.draw_series(LineSeries::new(device_data.clone(), &DEVICE_COLOR))?
.label("Unique Devices")
.legend(|(x, y)| PathElement::new(vec![(x, y), (x + 20, y)], &DEVICE_COLOR));
// Add points over the device line
chart.draw_series(
device_data
.iter()
.map(|(x, y)| Circle::new((*x, *y), 3, DEVICE_COLOR.filled())),
)?;
chart
.draw_series(LineSeries::new(session_data.clone(), &SESSION_COLOR))?
.label("Unique Sessions")
.legend(|(x, y)| PathElement::new(vec![(x, y), (x + 20, y)], &SESSION_COLOR));
// Add points over the session line
chart.draw_series(
session_data
.iter()
.map(|(x, y)| Circle::new((*x, *y), 3, SESSION_COLOR.filled())),
)?;
// Configure and draw the legend
chart
.configure_series_labels()
.background_style(&WHITE.mix(0.8))
.border_style(&BLACK.mix(0.5))
.position(SeriesLabelPosition::UpperRight)
.margin(15)
.legend_area_size(35)
.draw()?;
// Ensure the drawing is saved
root.present()?;
println!("Chart saved to {}", output_path);
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let opt = Opt::from_args();
analyze_data(
opt.input.to_str().unwrap(),
opt.output.to_str().unwrap(),
opt.start_date.as_deref(),
opt.end_date.as_deref(),
)
.await?;
println!(
"Analysis complete! Output saved to {}",
opt.output.display()
);
Ok(())
}

View File

@@ -0,0 +1,23 @@
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(StructOpt)]
#[structopt(
name = "parse_firebase",
about = "Parse Firebase JSON data for SSBU Training Modpack"
)]
struct Opt {
/// Input file path
#[structopt(parse(from_os_str))]
input: PathBuf,
/// Output file path
#[structopt(parse(from_os_str))]
output: PathBuf,
}
fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
training_mod_metrics::parser::extract_smash_open_devices(&opt.input, &opt.output)?;
Ok(())
}

View File

@@ -0,0 +1 @@
pub mod parser;

View File

@@ -1,144 +0,0 @@
use datafusion::prelude::*;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::json::NdJsonFile;
use datafusion::physical_plan::json::NdJsonReadOptions;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use std::sync::Arc;
// export.json is relative to /event/
// cat export.json | jq -c '.SMASH_OPEN.device[][][]' > smash_open.json
#[derive(Debug)]
struct Event {
device_id: String,
event_name: String,
event_time: i64,
menu_settings: String,
mod_version: String,
session_id: String,
smash_version: String,
user_id: String
}
use chrono::{DateTime, NaiveDateTime, Utc};
fn timestamp_secs_to_datetime(ts: i64) -> DateTime<Utc> {
DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(ts, 0), Utc)
}
use plotters::prelude::*;
const OUT_FILE_NAME: &'static str = "boxplot.svg";
fn draw_chart(results: Vec<RecordBatch>) -> Result<(), Box<dyn std::error::Error>> {
let num_devices_idx = results[0].schema().column_with_name("num_devices").unwrap().0;
let num_sessions_idx = results[0].schema().column_with_name("num_sessions").unwrap().0;
let timestamps_idx = results[0].schema().column_with_name("date").unwrap().0;
let num_devices = results[0].column(num_devices_idx).as_any()
.downcast_ref::<datafusion::arrow::array::UInt64Array>()
.expect("Failed to downcast").values();
let num_sessions = results[0].column(num_sessions_idx).as_any()
.downcast_ref::<datafusion::arrow::array::UInt64Array>()
.expect("Failed to downcast").values();
let timestamp_millis = results[0].column(timestamps_idx).as_any()
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
.expect("Failed to downcast").values();
let device_data_points = num_devices.iter()
.enumerate().map(|(i, x)| (timestamp_secs_to_datetime(timestamp_millis[i] / 1000), *x));
let session_data_points = num_sessions.iter()
.enumerate().map(|(i, x)| (timestamp_secs_to_datetime(timestamp_millis[i] / 1000), *x));
let root = SVGBackend::new(OUT_FILE_NAME, (1024, 768)).into_drawing_area();
root.fill(&WHITE)?;
let mut chart = ChartBuilder::on(&root)
.caption("Users and Sessions by Date", ("sans-serif", 50).into_font())
.margin(5)
.x_label_area_size(30)
.y_label_area_size(30)
.build_cartesian_2d(
(timestamp_secs_to_datetime(timestamp_millis[0] / 1000))..(timestamp_secs_to_datetime(*timestamp_millis.last().unwrap() / 1000)),
0..*num_sessions.iter().max().unwrap())?;
chart.configure_mesh().draw()?;
chart
.draw_series(LineSeries::new(
device_data_points,
&RED,
))?
.label("Unique Devices")
.legend(|(x, y)| PathElement::new(vec![(x, y), (x + 20, y)], &RED));
chart
.draw_series(LineSeries::new(
session_data_points,
&BLUE,
))?
.label("Unique Sessions")
.legend(|(x, y)| PathElement::new(vec![(x, y), (x + 20, y)], &BLUE));
chart
.configure_series_labels()
.background_style(&WHITE.mix(0.8))
.border_style(&BLACK)
.draw()?;
Ok(())
}
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// let smash_open_table = NdJsonFile::try_new(
// "smash_open.json",
// NdJsonReadOptions{
// schema: None,
// schema_infer_max_records: 1,
// file_extension: ".json",
// }
// ).unwrap();
let menu_open_table = NdJsonFile::try_new(
"menu_open.json",
NdJsonReadOptions{
schema: Some(Arc::new(Schema::new(vec![
Field::new("device_id", DataType::Utf8, false),
Field::new("event_name", DataType::Utf8, false),
Field::new("event_time", DataType::Int64, false),
Field::new("menu_settings", DataType::Utf8, false),
Field::new("session_id", DataType::Utf8, false),
Field::new("smash_version", DataType::Utf8, false),
Field::new("mod_version", DataType::Utf8, false),
Field::new("user_id", DataType::Utf8, false),
]))),
schema_infer_max_records: 0,
file_extension: ".json",
}
).unwrap();
// // declare a new context. In spark API, this corresponds to a new spark SQLsession
let mut ctx = ExecutionContext::new();
// ctx.register_table("smash_open", Arc::new(smash_open_table))?;
ctx.register_table("menu_open", Arc::new(menu_open_table))?;
// create a plan to run a SQL query
let df = ctx.sql(
"SELECT
COUNT(DISTINCT device_id) num_devices,
COUNT(DISTINCT session_id) num_sessions,
COUNT(*) num_events,
TO_TIMESTAMP_MILLIS(DATE_TRUNC('day', CAST(event_time * 1000000 AS timestamp))) AS date FROM menu_open
WHERE
-- after 09/01/2021
event_time > 1630454400000
-- before today
AND CAST(event_time * 1000000 AS timestamp) < NOW()
GROUP BY date ORDER BY date"
)?;
let results: Vec<RecordBatch> = df.collect().await?;
// use datafusion::arrow::util::pretty::pretty_format_batches;
// println!("{}", pretty_format_batches(&results)?);
draw_chart(results).unwrap();
Ok(())
}

View File

@@ -0,0 +1,103 @@
use anyhow::{Context, Result};
use serde_json::{self, Value};
use std::fs::File;
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;
pub fn extract_smash_open_devices(input_path: &Path, output_path: &Path) -> Result<()> {
// Open and read the input file
let file = File::open(input_path)
.with_context(|| format!("Failed to open input file: {}", input_path.display()))?;
let reader = BufReader::new(file);
// Parse the JSON data
let data: Value =
serde_json::from_reader(reader).with_context(|| "Failed to parse input JSON")?;
// Extract all events from the nested structure
let mut flattened_devices = Vec::new();
// Navigate through the nested structure: device -> device_id -> timestamp -> event_id -> event_data
if let Some(devices) = data["device"].as_object() {
for (_device_id, timestamps) in devices {
if let Some(timestamps) = timestamps.as_object() {
for (_timestamp, events) in timestamps {
if let Some(events) = events.as_object() {
for (_event_id, event_data) in events {
flattened_devices.push(event_data.clone());
}
}
}
}
}
}
// Write each record as a separate line of JSON (NDJSON format)
let output_file = File::create(output_path)
.with_context(|| format!("Failed to create output file: {}", output_path.display()))?;
let mut writer = BufWriter::new(output_file);
for device in &flattened_devices {
serde_json::to_writer(&mut writer, device)?;
writer.write_all(b"\n")?;
}
println!(
"Extracted {} records to {}",
flattened_devices.len(),
output_path.display()
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_extract_smash_open_devices() {
let temp_dir = tempdir().unwrap();
let input_path = temp_dir.path().join("input.json");
let output_path = temp_dir.path().join("output.json");
// Create test input JSON with the correct nested structure
let test_data = r#"{
"device": {
"device1": {
"1000": {
"event1": {
"device_id": "device1",
"event_name": "SMASH_OPEN",
"event_time": 1000,
"test": "data1"
}
}
},
"device2": {
"2000": {
"event2": {
"device_id": "device2",
"event_name": "SMASH_OPEN",
"event_time": 2000,
"test": "data2"
}
}
}
}
}"#;
std::fs::write(&input_path, test_data).unwrap();
// Run the extraction
extract_smash_open_devices(&input_path, &output_path).unwrap();
// Verify output
let output_content = std::fs::read_to_string(&output_path).unwrap();
let output_json: Value = serde_json::from_str(&output_content).unwrap();
let output_array = output_json.as_array().unwrap();
assert_eq!(output_array.len(), 2);
assert_eq!(output_array[0]["test"], "data1");
assert_eq!(output_array[1]["test"], "data2");
}
}