mirror of
https://github.com/jugeeya/UltimateTrainingModpack.git
synced 2024-11-20 00:46:34 +00:00
init
This commit is contained in:
parent
9d2f87290b
commit
1a0033d83d
2 changed files with 199 additions and 44 deletions
|
@ -6,7 +6,12 @@ edition = "2018"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
datafusion = "5.0.0"
|
||||
tokio = "1.11.0"
|
||||
plotters = "0.3.1"
|
||||
chrono = "0.4.19"
|
||||
datafusion = "10.0.0"
|
||||
tokio = "1.20.0"
|
||||
plotters = "0.3.2"
|
||||
chrono = "0.4.19"
|
||||
serde = { version = "1.0", features = ["derive", "std"] }
|
||||
serde_json = "1.0"
|
||||
reqwest = { version = "0.11.11", features = ["json"] }
|
||||
flate2 = { version = "1.0" }
|
||||
cloud-storage = "0.11.1"
|
|
@ -1,14 +1,20 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::io::Read;
|
||||
use std::io::prelude::*;
|
||||
use datafusion::prelude::*;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::datasource::json::NdJsonFile;
|
||||
use datafusion::physical_plan::json::NdJsonReadOptions;
|
||||
use datafusion::logical_plan::FileType::NdJson;
|
||||
use datafusion::execution::options::NdJsonReadOptions;
|
||||
use datafusion::arrow::datatypes::{Schema, Field, DataType};
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
// export.json is relative to /event/
|
||||
// cat export.json | jq -c '.SMASH_OPEN.device[][][]' > smash_open.json
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct Event {
|
||||
device_id: String,
|
||||
event_name: String,
|
||||
|
@ -20,12 +26,29 @@ struct Event {
|
|||
user_id: String
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct EventExport {
|
||||
event: HashMap<String, DeviceExport>
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct DeviceExport {
|
||||
device: HashMap<String, HashMap<String, HashMap<String, Event>>>
|
||||
}
|
||||
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
use datafusion::arrow::array::Array;
|
||||
use flate2::read::GzDecoder;
|
||||
|
||||
fn timestamp_secs_to_datetime(ts: i64) -> DateTime<Utc> {
|
||||
DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(ts, 0), Utc)
|
||||
}
|
||||
|
||||
use plotters::prelude::*;
|
||||
use cloud_storage::Client;
|
||||
use serde_json::{Deserializer, Value};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
const OUT_FILE_NAME: &'static str = "boxplot.svg";
|
||||
fn draw_chart(results: Vec<RecordBatch>) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let num_devices_idx = results[0].schema().column_with_name("num_devices").unwrap().0;
|
||||
|
@ -33,10 +56,10 @@ fn draw_chart(results: Vec<RecordBatch>) -> Result<(), Box<dyn std::error::Error
|
|||
let timestamps_idx = results[0].schema().column_with_name("date").unwrap().0;
|
||||
|
||||
let num_devices = results[0].column(num_devices_idx).as_any()
|
||||
.downcast_ref::<datafusion::arrow::array::UInt64Array>()
|
||||
.downcast_ref::<datafusion::arrow::array::Int64Array>()
|
||||
.expect("Failed to downcast").values();
|
||||
let num_sessions = results[0].column(num_sessions_idx).as_any()
|
||||
.downcast_ref::<datafusion::arrow::array::UInt64Array>()
|
||||
.downcast_ref::<datafusion::arrow::array::Int64Array>()
|
||||
.expect("Failed to downcast").values();
|
||||
let timestamp_millis = results[0].column(timestamps_idx).as_any()
|
||||
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
|
||||
|
@ -86,59 +109,186 @@ fn draw_chart(results: Vec<RecordBatch>) -> Result<(), Box<dyn std::error::Error
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() -> datafusion::error::Result<()> {
|
||||
// let smash_open_table = NdJsonFile::try_new(
|
||||
// "smash_open.json",
|
||||
// NdJsonReadOptions{
|
||||
// schema: None,
|
||||
// schema_infer_max_records: 1,
|
||||
// file_extension: ".json",
|
||||
// use firerust::FirebaseClient;
|
||||
// use serde_json::Value;
|
||||
// use std::error::Error;
|
||||
//
|
||||
// let client = FirebaseClient::new("https://my-project-1511972643240-default-rtdb.firebaseio.com/").unwrap();
|
||||
// let reference = client.reference("/event/SMASH_OPEN/device");
|
||||
//
|
||||
// use curl::easy::Easy;
|
||||
//
|
||||
// let smash_open_device_ids = reqwest::get(format!("{FIREBASE_URL}/event/SMASH_OPEN/device.json?shallow=true"))
|
||||
// .await.unwrap()
|
||||
// .json::<HashMap<String, bool>>()
|
||||
// .await.unwrap();
|
||||
// let mut smash_open_device_buckets : HashMap<i32, Vec<&String>> = (0..10)
|
||||
// .into_iter()
|
||||
// .map(|bucket| (bucket, vec![]))
|
||||
// .collect();
|
||||
//
|
||||
// for device_id in smash_open_device_ids.keys() {
|
||||
// let mut hasher = std::collections::hash_map::DefaultHasher::new();
|
||||
// device_id.hash(&mut hasher);
|
||||
// let hash = hasher.finish();
|
||||
// let device_bucket = (hash % 10) as i32;
|
||||
// smash_open_device_buckets.get_mut(&device_bucket).unwrap().push(device_id);
|
||||
// }
|
||||
//
|
||||
// for bucket in smash_open_device_buckets.keys() {
|
||||
// if *bucket != 0 {
|
||||
// continue;
|
||||
// }
|
||||
// ).unwrap();
|
||||
// let device_ids = smash_open_device_buckets.get(bucket).unwrap();
|
||||
// for device_id in device_ids {
|
||||
// let device_id_url = format!("{FIREBASE_URL}/event/SMASH_OPEN/device/{device_id}.json");
|
||||
// let events = reqwest::get(device_id_url)
|
||||
// .await.unwrap()
|
||||
// .json::<HashMap<String, HashMap<String, Event>>>()
|
||||
// .await.unwrap();
|
||||
// println!("{events:#?}");
|
||||
// }
|
||||
// }
|
||||
|
||||
let menu_open_table = NdJsonFile::try_new(
|
||||
"menu_open.json",
|
||||
NdJsonReadOptions{
|
||||
schema: Some(Arc::new(Schema::new(vec![
|
||||
Field::new("device_id", DataType::Utf8, false),
|
||||
Field::new("event_name", DataType::Utf8, false),
|
||||
Field::new("event_time", DataType::Int64, false),
|
||||
Field::new("menu_settings", DataType::Utf8, false),
|
||||
Field::new("session_id", DataType::Utf8, false),
|
||||
Field::new("smash_version", DataType::Utf8, false),
|
||||
Field::new("mod_version", DataType::Utf8, false),
|
||||
Field::new("user_id", DataType::Utf8, false),
|
||||
]))),
|
||||
schema_infer_max_records: 0,
|
||||
file_extension: ".json",
|
||||
let FIREBASE_URL = "https://my-project-1511972643240-default-rtdb.firebaseio.com";
|
||||
println!("About to get url");
|
||||
let client = Client::default();
|
||||
let bucket = "my-project-1511972643240-default-rtdb-backups";
|
||||
let object = "2022-07-22T02:19:06Z_my-project-1511972643240-default-rtdb_data.json.gz";
|
||||
let bytes = client.object().download(bucket, object).await.unwrap();
|
||||
println!("Received bytes");
|
||||
let mut d = GzDecoder::new(&bytes[..]);
|
||||
let mut s = String::new();
|
||||
d.read_to_string(&mut s).unwrap();
|
||||
println!("Decoded gzip: {}", s.len());
|
||||
let v: EventExport = serde_json::from_str(&s).unwrap();
|
||||
let smash_open = &v.event
|
||||
.get("SMASH_OPEN").unwrap()
|
||||
.device.values().nth(0).unwrap()
|
||||
.values().nth(0).unwrap()
|
||||
.values().nth(0).unwrap();
|
||||
|
||||
println!("{:#?}", smash_open);
|
||||
|
||||
Ok(())
|
||||
// agg_data_and_chart()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_transform() {
|
||||
let s = "{ \
|
||||
\"event\" : {\
|
||||
\"SMASH_OPEN\": {\
|
||||
\"device\": {\
|
||||
\"8ffdce0d8b9788160000000000000000\": {\
|
||||
\"1636608807000\": {\
|
||||
\"-MoCU8jJ6sK98wBJff6D\" : {\
|
||||
\"device_id\" : \"8ffdce0d8b9788160000000000000000\",\
|
||||
\"event_name\" : \"MENU_OPEN\",\
|
||||
\"event_time\" : 1636608807000,\
|
||||
\"menu_settings\" : \"http://localhost/?mash_state=1,&follow_up=&attack_angle=&ledge_state=1,2,4,8,16,&ledge_delay=&tech_state=1,2,4,8,&miss_tech_state=1,2,4,8,&defensive_state=1,2,4,8,16,&aerial_delay=&oos_offset=&reaction_time=&fast_fall=&fast_fall_delay=&falling_aerials=&full_hop=&shield_tilt=&di_state=1,&sdi_state=&air_dodge_dir=&sdi_strength=0,&shield_state=0,&save_state_mirroring=0,&input_delay=0,&save_state_enable=1&save_damage=1&hitbox_vis=1&stage_hazards=0&frame_advantage=0&mash_in_neutral=0\",
|
||||
\"mod_version\" : \"3.1.0\",\
|
||||
\"session_id\" : \"c80468a22fec7c282cc2830869adbb01490105b34d6bd54acd734a3c76db54e9\",\
|
||||
\"smash_version\" : \"13.0.0\",\
|
||||
\"user_id\" : \"1000060bfb6ccd6f59437c48a725df9b\"\
|
||||
}\
|
||||
},\
|
||||
\"1636608807001\": {\
|
||||
\"-MoCU8jJ6sK98wBJff6D\" : {\
|
||||
\"device_id\" : \"8ffdce0d8b9788160000000000000000\",\
|
||||
\"event_name\" : \"MENU_OPEN\",\
|
||||
\"event_time\" : 1636608807000,\
|
||||
\"menu_settings\" : \"http://localhost/?mash_state=1,&follow_up=&attack_angle=&ledge_state=1,2,4,8,16,&ledge_delay=&tech_state=1,2,4,8,&miss_tech_state=1,2,4,8,&defensive_state=1,2,4,8,16,&aerial_delay=&oos_offset=&reaction_time=&fast_fall=&fast_fall_delay=&falling_aerials=&full_hop=&shield_tilt=&di_state=1,&sdi_state=&air_dodge_dir=&sdi_strength=0,&shield_state=0,&save_state_mirroring=0,&input_delay=0,&save_state_enable=1&save_damage=1&hitbox_vis=1&stage_hazards=0&frame_advantage=0&mash_in_neutral=0\",
|
||||
\"mod_version\" : \"3.1.0\",\
|
||||
\"session_id\" : \"c80468a22fec7c282cc2830869adbb01490105b34d6bd54acd734a3c76db54e9\",\
|
||||
\"smash_version\" : \"13.0.0\",\
|
||||
\"user_id\" : \"1000060bfb6ccd6f59437c48a725df9b\"\
|
||||
}\
|
||||
}\
|
||||
}\
|
||||
}\
|
||||
}\
|
||||
}\
|
||||
}";
|
||||
|
||||
println!("{:#?}", json_transform(&s));
|
||||
}
|
||||
|
||||
fn json_transform(s: &str) -> Vec<Event> {
|
||||
let v: EventExport = serde_json::from_str(s).unwrap();
|
||||
|
||||
let smash_open_events = v.event.get("SMASH_OPEN").unwrap().device.values();
|
||||
|
||||
// v.event.get("SMASH_OPEN").unwrap().device.values().flat_map(|smash_open_event| {
|
||||
// smash_open_event.values()
|
||||
// .flat_map(|event| event.values().collect::<Vec<&Event>>())
|
||||
// .collect::<Vec<&Event>>()
|
||||
// }).collect::<Vec<&Event>>()
|
||||
|
||||
let mut events = vec![];
|
||||
for smash_open_event in smash_open_events {
|
||||
for event in smash_open_event.into_values() {
|
||||
let mut e = event.into_values().collect::<Vec<Event>>();
|
||||
events.append(&mut e);
|
||||
}
|
||||
).unwrap();
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
// // declare a new context. In spark API, this corresponds to a new spark SQLsession
|
||||
let mut ctx = ExecutionContext::new();
|
||||
async fn agg_data_and_chart() -> datafusion::error::Result<()> {
|
||||
let mut ctx = SessionContext::new();
|
||||
let json_options = NdJsonReadOptions{
|
||||
schema: Some(Arc::new(Schema::new(vec![
|
||||
Field::new("device_id", DataType::Utf8, false),
|
||||
Field::new("event_name", DataType::Utf8, false),
|
||||
Field::new("event_time", DataType::Int64, false),
|
||||
Field::new("menu_settings", DataType::Utf8, false),
|
||||
Field::new("session_id", DataType::Utf8, false),
|
||||
Field::new("smash_version", DataType::Utf8, false),
|
||||
Field::new("mod_version", DataType::Utf8, false),
|
||||
Field::new("user_id", DataType::Utf8, false),
|
||||
]))),
|
||||
schema_infer_max_records: 0,
|
||||
file_extension: ".json",
|
||||
table_partition_cols: vec![],
|
||||
};
|
||||
let df = ctx.register_json(
|
||||
"smash_open",
|
||||
"smash_open.json",
|
||||
json_options
|
||||
).await?;
|
||||
|
||||
// ctx.register_table("smash_open", Arc::new(smash_open_table))?;
|
||||
ctx.register_table("menu_open", Arc::new(menu_open_table))?;
|
||||
// let df = ctx.register_json(
|
||||
// "menu_open",
|
||||
// "menu_open.json",
|
||||
// json_options
|
||||
// ).await?;
|
||||
|
||||
// create a plan to run a SQL query
|
||||
println!("Running SQL query...");
|
||||
let df = ctx.sql(
|
||||
"SELECT
|
||||
COUNT(DISTINCT device_id) num_devices,
|
||||
"SELECT
|
||||
COUNT(DISTINCT device_id) num_devices,
|
||||
COUNT(DISTINCT session_id) num_sessions,
|
||||
COUNT(*) num_events,
|
||||
TO_TIMESTAMP_MILLIS(DATE_TRUNC('day', CAST(event_time * 1000000 AS timestamp))) AS date FROM menu_open
|
||||
COUNT(*) num_events,
|
||||
TO_TIMESTAMP_MILLIS(DATE_TRUNC('day', CAST(event_time * 1000000 AS timestamp))) AS date FROM smash_open
|
||||
WHERE
|
||||
-- after 09/01/2021
|
||||
event_time > 1630454400000
|
||||
-- before today
|
||||
AND CAST(event_time * 1000000 AS timestamp) < NOW()
|
||||
GROUP BY date ORDER BY date"
|
||||
)?;
|
||||
).await?;
|
||||
|
||||
let results: Vec<RecordBatch> = df.collect().await?;
|
||||
// use datafusion::arrow::util::pretty::pretty_format_batches;
|
||||
// println!("{}", pretty_format_batches(&results)?);
|
||||
use datafusion::arrow::util::pretty::pretty_format_batches;
|
||||
println!("{}", pretty_format_batches(&results)?);
|
||||
|
||||
println!("Drawing chart...");
|
||||
draw_chart(results).unwrap();
|
||||
|
||||
let df = ctx.sql("SELECT MAX(mod_version) FROM smash_open").await?;
|
||||
let results: Vec<RecordBatch> = df.collect().await?;
|
||||
println!("{}", pretty_format_batches(&results)?);
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue