#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(unused_imports)]
use crate::{
enums::{YotsubaBoard, YotsubaEndpoint, YotsubaHash, YotsubaIdentifier},
mysql
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use enum_iterator::IntoEnumIterator;
use mysql_async::{prelude::*, Pool};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt, ops::Add};
#[async_trait]
pub trait Archiver: Sync + Send {
async fn run_inner(&self) -> Result<()>;
}
pub struct MuhArchiver(Box<dyn Archiver>);
impl MuhArchiver {
pub fn new(x: Box<dyn Archiver>) -> Self {
Self(x)
}
pub async fn run(&self) {
if let Err(e) = self.0.run_inner().await {
log::error!("{}", e);
}
}
}
pub type StatementStore<S> = HashMap<QueryIdentifier, S>;
#[async_trait]
pub trait StatementTrait<T>: Send + Sync {
async fn prepare(&self, stmt: &str) -> T;
}
#[async_trait]
impl StatementTrait<tokio_postgres::Statement> for tokio_postgres::Client {
async fn prepare(&self, stmt: &str) -> tokio_postgres::Statement {
self.prepare(stmt).await.unwrap()
}
}
#[async_trait]
impl StatementTrait<mysql::Statement> for Pool {
async fn prepare(&self, stmt: &str) -> mysql::Statement {
let conn = self.get_conn().await.unwrap();
conn.prepare(stmt).await.unwrap()
}
}
#[derive(
Debug,
Copy,
Clone,
std::hash::Hash,
PartialEq,
std::cmp::Eq,
enum_iterator::IntoEnumIterator,
Deserialize,
Serialize,
)]
#[serde(rename_all = "lowercase")]
pub enum Database {
PostgreSQL,
TimescaleDB,
MySQL,
InnoDB,
TokuDB
}
impl Database {
pub fn base(&self) -> Database {
match self {
Database::PostgreSQL | Database::TimescaleDB => Database::PostgreSQL,
_ => Database::MySQL
}
}
pub fn mysql_engine(&self) -> Database {
match self {
Database::MySQL => Database::InnoDB,
_ => *self
}
}
}
pub trait RowTrait {
fn get<'a, I, T>(&'a self, idx: I) -> Result<T>
where
I: RowIndex,
T: RowFrom<'a>;
}
pub trait RowIndex:
tokio_postgres::row::RowIndex + mysql_common::row::ColumnIndex + std::fmt::Display {
}
pub trait RowFrom<'a>: tokio_postgres::types::FromSql<'a> + FromValue {}
impl<'a> RowFrom<'a> for String {}
impl<'a> RowFrom<'a> for Option<Vec<u8>> {}
impl<'a> RowFrom<'a> for Option<String> {}
impl<'a> RowFrom<'a> for i64 {}
impl<'a> RowIndex for &'a str {}
impl RowTrait for tokio_postgres::Row {
fn get<'a, I, T>(&'a self, idx: I) -> Result<T>
where
I: RowIndex,
T: RowFrom<'a> {
Ok(self.try_get::<'a>(&idx)?)
}
}
impl RowTrait for mysql_async::Row {
fn get<'a, I, T>(&'a self, idx: I) -> Result<T>
where
I: RowIndex,
T: RowFrom<'a> {
Ok(self.get_opt(idx).ok_or_else(|| anyhow!("Was an empty value"))??)
}
}
impl fmt::Display for Database {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
d => write!(f, "{:?}", d)
}
}
}
impl Into<Database> for String {
fn into(self) -> Database {
println!("Inside INTO {}", self);
if let Some(found) = Database::into_enum_iter()
.find(|db| db.to_string().to_lowercase() == self.to_lowercase())
{
found
} else {
let list = Database::into_enum_iter()
.map(|zz| zz.to_string())
.collect::<Vec<String>>()
.join("`, `");
panic!(format!("unknown variant `{}`, expected one of `{}`", self, list));
}
}
}
#[derive(
Debug, Copy, Clone, std::hash::Hash, PartialEq, std::cmp::Eq, enum_iterator::IntoEnumIterator,
)]
pub enum YotsubaStatement {
InitSchema = 1,
InitType,
InitMetadata,
InitBoard,
InitViews,
UpdateMetadata,
UpdateThread,
Delete,
UpdateDeleteds,
UpdateHashMedia,
UpdateHashThumbs,
Medias,
Threads,
ThreadsModified,
ThreadsCombined,
Metadata
}
impl YotsubaStatement {
pub fn is_thumbs(&self) -> bool {
matches!(self, Self::UpdateHashThumbs)
}
pub fn is_thumbs_val(&self) -> YotsubaStatement {
match self {
Self::UpdateHashThumbs => Self::UpdateHashThumbs,
_ => Self::UpdateHashMedia
}
}
pub fn is_media(&self) -> bool {
matches!(self, Self::UpdateHashMedia)
}
}
impl fmt::Display for YotsubaStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
s => write!(f, "{:?}", s)
}
}
}
impl Add for YotsubaStatement {
type Output = u8;
fn add(self, other: Self) -> u8 {
(self as u8) + (other as u8)
}
}
#[async_trait]
pub trait QueriesExecutor<S, R> {
async fn init_type(&self) -> Result<u64>;
async fn init_schema(&self, schema: &str, engine: Database, charset: &str) -> Result<u64>;
async fn init_metadata(&self, engine: Database, charset: &str) -> Result<u64>;
async fn init_board(&self, board: YotsubaBoard, engine: Database, charset: &str)
-> Result<u64>;
async fn init_views(&self, board: YotsubaBoard) -> Result<u64>;
async fn update_metadata(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
item: &[u8]
) -> Result<u64>;
async fn update_thread(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
item: &[u8]
) -> Result<u64>;
async fn delete(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
no: u64
) -> Result<u64>;
async fn update_deleteds(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
thread: u64, item: &[u8]
) -> Result<u64>;
async fn update_hash(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
no: u64, hash_type: YotsubaStatement, hashsum: Vec<u8>
) -> Result<u64>;
async fn medias(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
no: u64
) -> Result<Vec<R>>;
async fn threads(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
item: &[u8]
) -> Result<Queue>;
async fn threads_modified(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
new_threads: &[u8]
) -> Result<Queue>;
async fn threads_combined(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard,
new_threads: &[u8]
) -> Result<Queue>;
async fn metadata(
&self, statements: &StatementStore<S>, endpoint: YotsubaEndpoint, board: YotsubaBoard
) -> Result<bool>;
}
pub trait QueryRaw {
fn inquiry(&self, statement: YotsubaStatement, id: QueryIdentifier) -> String;
}
pub trait Ret {}
impl Ret for u64 {}
#[async_trait]
pub trait Query<S, R> {
async fn first(
&self, statement: YotsubaStatement, id: &QueryIdentifier, statements: &StatementStore<S>,
item: Option<&[u8]>, no: Option<u64>
) -> Result<u64>;
async fn get_list(
&self, statement: YotsubaStatement, id: &QueryIdentifier, statements: &StatementStore<S>,
item: Option<&[u8]>, no: Option<u64>
) -> Result<Queue>;
async fn get_rows(
&self, statement: YotsubaStatement, id: &QueryIdentifier, statements: &StatementStore<S>,
item: Option<&[u8]>, no: Option<u64>
) -> Result<Vec<R>>;
async fn create_statements(
&self, engine: Database, endpoint: YotsubaEndpoint, board: YotsubaBoard
) -> StatementStore<S>;
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct QueryIdentifier {
pub engine: Database,
pub endpoint: YotsubaEndpoint,
pub board: YotsubaBoard,
pub schema: Option<String>,
pub charset: Option<String>,
pub hash_type: YotsubaHash,
pub media_mode: YotsubaStatement
}
impl QueryIdentifier {
pub fn new(
engine: Database, endpoint: YotsubaEndpoint, board: YotsubaBoard, schema: Option<String>,
charset: Option<String>, hash_type: YotsubaHash, media_mode: YotsubaStatement
) -> Self
{
Self { engine, endpoint, board, schema, charset, hash_type, media_mode }
}
pub fn simple(engine: Database, endpoint: YotsubaEndpoint, board: YotsubaBoard) -> Self {
Self {
engine,
endpoint,
board,
schema: None,
charset: None,
hash_type: YotsubaHash::Sha256,
media_mode: YotsubaStatement::Medias
}
}
}
pub trait Queries {
fn query_init_schema(&self, schema: &str, engine: Database, charset: &str) -> String;
fn query_init_metadata(&self, engine: Database, charset: &str) -> String;
fn query_init_board(&self, board: YotsubaBoard, engine: Database, charset: &str) -> String;
fn query_init_type(&self) -> String;
fn query_init_views(&self, board: YotsubaBoard) -> String;
fn query_delete(&self, board: YotsubaBoard) -> String;
fn query_update_deleteds(&self, engine: Database, board: YotsubaBoard) -> String;
fn query_update_hash(
&self, board: YotsubaBoard, hash_type: YotsubaHash, media_mode: YotsubaStatement
) -> String;
fn query_update_metadata(&self, column: YotsubaEndpoint) -> String;
fn query_metadata(&self, column: YotsubaEndpoint) -> String;
fn query_medias(&self, board: YotsubaBoard, media_mode: YotsubaStatement) -> String;
fn query_threads_modified(&self, endpoint: YotsubaEndpoint) -> String;
fn query_threads(&self) -> String;
fn query_threads_combined(&self, board: YotsubaBoard, endpoint: YotsubaEndpoint) -> String;
fn query_update_thread(&self, engine: Database, board: YotsubaBoard) -> String;
}
#[async_trait]
pub trait DatabaseTrait<S, R>: QueryRaw + Query<S, R> + StatementTrait<S> + Send + Sync {}
impl DatabaseTrait<tokio_postgres::Statement, tokio_postgres::Row> for tokio_postgres::Client {}
impl DatabaseTrait<mysql::Statement, mysql_async::Row> for Pool {}
#[allow(dead_code)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Thread {
pub posts: HashSet<Post>
}
impl Default for Thread {
fn default() -> Self {
Self { posts: HashSet::new() }
}
}
#[allow(dead_code)]
#[cold]
#[rustfmt::skip]
#[derive(Deserialize, Serialize, Debug, Clone, Eq, Hash)]
#[serde(default)]
pub struct Post {
pub no: u64,
pub sticky: Option<u8>,
pub closed: Option<u8>,
pub now: Option<String>,
pub name: Option<String>,
pub sub: Option<String>,
pub com: Option<String>,
pub filedeleted: Option<u8>,
pub spoiler: Option<u8>,
pub custom_spoiler: Option<u16>,
pub filename: Option<String>,
pub ext: Option<String>,
pub w: Option<u32>,
pub h: Option<u32>,
pub tn_w: Option<u32>,
pub tn_h: Option<u32>,
pub tim: Option<u64>,
pub time: u64,
pub md5: Option<String>,
pub fsize: Option<u64>,
pub m_img: Option<u8>,
pub resto: u64,
pub trip: Option<String>,
pub id: Option<String>,
pub capcode: Option<String>,
pub country: Option<String>,
pub troll_country: Option<String>,
pub country_name: Option<String>,
pub archived: Option<u8>,
pub bumplimit: Option<u8>,
pub archived_on: Option<u64>,
pub imagelimit: Option<u16>,
pub semantic_url: Option<String>,
pub replies: Option<u32>,
pub images: Option<u32>,
pub unique_ips: Option<u32>,
pub tag: Option<String>,
pub since4pass: Option<u16>
}
impl PartialEq for Post {
fn eq(&self, other: &Self) -> bool {
self.no == other.no
}
}
impl Default for Post {
fn default() -> Self {
let t =
std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap();
Self {
no: t.as_secs(),
sticky: None,
closed: None,
now: None,
name: None,
sub: None,
com: None,
filedeleted: None,
spoiler: None,
custom_spoiler: None,
filename: None,
ext: None,
w: None,
h: None,
tn_w: None,
tn_h: None,
tim: None,
time: t.as_secs(),
md5: None,
fsize: None,
m_img: None,
resto: t.as_secs(),
trip: None,
id: None,
capcode: None,
country: None,
troll_country: None,
country_name: None,
archived: None,
bumplimit: None,
archived_on: None,
imagelimit: None,
semantic_url: None,
replies: None,
images: None,
unique_ips: None,
tag: None,
since4pass: None
}
}
}
use std::{
collections::HashSet,
hash::{Hash, Hasher}
};
pub type ThreadsList = Vec<Threads>;
pub trait ThreadsTrait {
fn to_queue(&self) -> Queue;
}
impl ThreadsTrait for ThreadsList {
fn to_queue(&self) -> Queue {
self.iter()
.map(|x: &Threads| x.threads.iter())
.flat_map(|it| it.map(|post| post.no))
.collect()
}
}
pub type Queue = HashSet<u64>;
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Threads {
pub page: u8,
pub threads: Vec<ThreadsPost>
}
#[derive(Deserialize, Serialize, Debug, Clone, Copy, Hash, Eq, Ord, PartialOrd)]
pub struct ThreadsPost {
pub no: u64,
pub last_modified: u64,
pub replies: u32
}
impl ThreadsPost {
pub fn new(no: u64, last_modified: u64, replies: u32) -> Self {
Self { no, last_modified, replies }
}
}
impl PartialEq for ThreadsPost {
fn eq(&self, other: &Self) -> bool {
self.no == other.no
}
}
pub enum Diff {
SymmetricDifference,
Difference,
Union
}
pub trait DiffTrait<T> {
fn symmetric_difference(&self, endpoint: YotsubaEndpoint, other: &T) -> Result<Queue> {
self.generic_diff(endpoint, other, Diff::SymmetricDifference)
}
fn difference(&self, endpoint: YotsubaEndpoint, other: &T) -> Result<Queue> {
self.generic_diff(endpoint, other, Diff::Difference)
}
fn union(&self, endpoint: YotsubaEndpoint, other: &T) -> Result<Queue> {
self.generic_diff(endpoint, other, Diff::Union)
}
fn generic_diff(&self, endpoint: YotsubaEndpoint, other: &T, diff: Diff) -> Result<Queue>;
}
impl DiffTrait<ThreadsList> for ThreadsList {
fn generic_diff(
&self, endpoint: YotsubaEndpoint, other: &ThreadsList, diff: Diff
) -> Result<Queue> {
let tt: HashSet<ThreadsPost> = self
.iter()
.map(|x: &Threads| x.threads.iter())
.flat_map(|it| it.map(|&post| post))
.collect();
let tt2: HashSet<ThreadsPost> = other
.iter()
.map(|x: &Threads| x.threads.iter())
.flat_map(|it| it.map(|&post| post))
.collect();
let diff: HashSet<ThreadsPost> = match diff {
Diff::SymmetricDifference => tt.symmetric_difference(&tt2).map(|&s| s).collect(),
Diff::Difference => tt.difference(&tt2).map(|&s| s).collect(),
Diff::Union => tt.union(&tt2).map(|&s| s).collect()
};
let mut diff: Vec<_> = diff.into_iter().collect();
diff.sort();
diff.dedup();
Ok(diff.into_iter().map(|post| post.no).collect())
}
}
impl DiffTrait<Vec<u8>> for Vec<u8> {
fn generic_diff(
&self, endpoint: YotsubaEndpoint, other: &Vec<u8>, diff: Diff
) -> Result<Queue> {
match endpoint {
YotsubaEndpoint::Archive => {
let tt: Queue = serde_json::from_slice(self).unwrap();
let tt2: Queue = serde_json::from_slice(other).unwrap();
let diff: Queue = match diff {
Diff::SymmetricDifference =>
tt.symmetric_difference(&tt2).map(|&s| s).collect(),
Diff::Difference => tt.difference(&tt2).map(|&s| s).collect(),
Diff::Union => tt.union(&tt2).map(|&s| s).collect()
};
return Ok(diff);
}
YotsubaEndpoint::Threads => {
let set: ThreadsList = serde_json::from_slice(self).unwrap();
let set2: ThreadsList = serde_json::from_slice(other).unwrap();
let tt: HashSet<ThreadsPost> = set
.iter()
.map(|x: &Threads| x.threads.iter())
.flat_map(|it| it.map(|&post| post))
.collect();
let tt2: HashSet<ThreadsPost> = set2
.iter()
.map(|x: &Threads| x.threads.iter())
.flat_map(|it| it.map(|&post| post))
.collect();
match diff {
Diff::SymmetricDifference => {
let diff: HashSet<ThreadsPost> =
tt.symmetric_difference(&tt2).map(|&s| s).collect();
let mut diff: Vec<_> = diff.into_iter().collect();
diff.sort();
diff.dedup();
Ok(diff.into_iter().map(|post| post.no).collect())
}
Diff::Difference => {
let diff: HashSet<ThreadsPost> = tt.difference(&tt2).map(|&s| s).collect();
let mut diff: Vec<_> = diff.into_iter().collect();
diff.sort();
diff.dedup();
Ok(diff.into_iter().map(|post| post.no).collect())
}
Diff::Union => {
let diff: HashSet<ThreadsPost> = tt.union(&tt2).map(|&s| s).collect();
Ok(diff.into_iter().map(|post| post.no).collect())
}
}
}
YotsubaEndpoint::Media => Err(anyhow!("|diff| Invalid endpoint: {}", endpoint))
}
}
}