Skip to content

Commit

Permalink
NewFeature: update difficulty
Browse files Browse the repository at this point in the history
  • Loading branch information
iuohua committed Jun 27, 2024
1 parent 90df6ec commit 1485ef8
Show file tree
Hide file tree
Showing 5 changed files with 758 additions and 7 deletions.
38 changes: 34 additions & 4 deletions bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from loguru import logger

from phigros.phigros import Phigros
from phigros.gameInfo import update_difficulty
from config import get_config

conf = get_config()
Expand All @@ -29,6 +30,12 @@ def get_token(event, sender_id: Optional[int]) -> str:
return
return token

def get_id(sender_id) -> Optional[int]:
if isinstance(sender_id, int):
return sender_id
if isinstance(sender_id, PeerUser):
return sender_id.user_id
return None

@client.on(event=events.NewMessage(pattern="/start"))
async def start(event: events.NewMessage.Event):
Expand All @@ -47,8 +54,8 @@ async def bind_phigros_account(event):
if not token:
event.respond("Can not find token")
return
sender_id = event.from_id
if not sender_id or not isinstance(sender_id, PeerUser):
sender_id = get_id(event.from_id)
if not sender_id:
event.respond("Anonymous user is not supported.")
return
sender_id = sender_id.user_id
Expand All @@ -57,7 +64,10 @@ async def bind_phigros_account(event):

@client.on(event=events.NewMessage(pattern="/b19"))
async def get_b19_graph(event):
sender_id = event.from_id
sender_id = get_id(event.from_id)
if not sender_id:
event.respond("Anonymous user is not supported.")
return
token = get_token(event, sender_id)
if not token:
return
Expand All @@ -66,13 +76,33 @@ async def get_b19_graph(event):

@client.on(event=events.NewMessage(pattern="/b19_text"))
async def get_b19_text(event):
sender_id = event.from_id
sender_id = get_id(event.from_id)
if not sender_id:
event.respond("Anonymous user is not supported.")
return
token = get_token(event, sender_id)
if not token:
return
message = await phigros.get_b19_info(token)
logger.debug(message)
event.respond(message)

@client.on(event=events.NewMessage(pattern="/update"))
async def update_diff(event):
sender_id = get_id(event.from_id)
if not sender_id or sender_id != conf.get("owner"):
return
file_message = event.reply_to
if not file_message:
event.respond("Please reply to a file")
media = file_message.reply_media
if not media:
event.respond("Please reply to a file")
logger.info("Downloading file...")
await client.download_media(media, "update.apk")
logger.info("Download complete, update difficulty")
update_difficulty("update.apk")
event.respond("Difficulty table updated.")

async def run():
logger.info("TelegramPhigrosBot start running.")
Expand Down
228 changes: 228 additions & 0 deletions phigros/gameInfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# From github PhigrosResource
# https://github.com/7aGiven/Phigros_Resource

import struct
import zipfile

from UnityPy import Environment
from loguru import logger

SONG_BASE_SCHEMA = {
"songId": str,
"songKey": str,
"songName": str,
"songTitle": str,
"difficulty": [float],
"illustrator": str,
"charter": [str],
"composer": str,
"levels": [str],
"previewTimeFrom": float,
"previewTimeTo": float,
"unlockList": {"unlockType": int, "unlockInfo": [str]},
"levelMods": {"n": [str]},
}


class ByteReader:
def __init__(self, data: bytes):
self.data = data
self.position = 0
self.d = {int: self.readInt, float: self.readFloat, str: self.readString}

# 4字节读取数据(int)
def readInt(self):
self.position += 4
return self.data[self.position - 4] ^ self.data[self.position - 3] << 8

# 4字节读取数据(float)
def readFloat(self):
self.position += 4
return struct.unpack("f", self.data[self.position - 4 : self.position])[0]

# 读取字符串
def readString(self):
length = self.readInt() # 读取第一个字节获取当前字符串长度
result = self.data[self.position : self.position + length].decode()
self.position += length // 4 * 4
if length % 4 != 0:
self.position += 4
return result

def skipString(self): # 略过字符串
length = self.readInt()
self.position += length // 4 * 4
if length % 4 != 0:
self.position += 4

def readSchema(self, schema: dict): # 通过SONG_BASE_SCHEMA中的字典来获取数据类型
result = []
for x in range(self.readInt()):
item = {}
for key, value in schema.items():
if value in (int, str, float):
item[key] = self.d[value]()
elif type(value) == list:
l = []
for i in range(self.readInt()):
l.append(self.d[value[0]]())
item[key] = l
elif type(value) == tuple:
for t in value:
self.d[t]()
elif type(value) == dict:
item[key] = self.readSchema(value)
else:
raise Exception("无")
result.append(item)
return result


def update_difficulty(path):
env = Environment()
with zipfile.ZipFile(path) as apk:
with apk.open("assets/bin/Data/globalgamemanagers.assets") as f:
env.load_file(f.read(), name="assets/bin/Data/globalgamemanagers.assets")
with apk.open("assets/bin/Data/level0") as f:
env.load_file(f.read())
for obj in env.objects:
if obj.type.name != "MonoBehaviour":
continue
data = obj.read()
if data.m_Script.get_obj().read().name == "GameInformation":
information = data.raw_data.tobytes()
elif data.m_Script.get_obj().read().name == "GetCollectionControl":
collection = data.raw_data.tobytes()
elif data.m_Script.get_obj().read().name == "TipsProvider":
tips = data.raw_data.tobytes()

reader = ByteReader(information)
reader.position = (
information.index(b"\x16\x00\x00\x00Glaciaxion.SunsetRay.0\x00\x00\n") - 4
)
difficulty = []
table = []
musicInfos = []
for i in range(3):
for item in reader.readSchema(SONG_BASE_SCHEMA):
item["songId"] = item["songId"][:-2]
if len(item["levels"]) == 5:
item["difficulty"].pop()
item["charter"].pop()
if item["difficulty"][-1] == 0:
item["difficulty"].pop()
item["charter"].pop()
for i in range(len(item["difficulty"])):
item["difficulty"][i] = round(item["difficulty"][i], 1)
difficulty.append([item["songId"]] + item["difficulty"])
table.append(
(
item["songId"],
item["songName"],
item["composer"],
item["illustrator"],
*item["charter"],
)
)

sid, levels, ratings, charters = (
item["songId"],
item["levels"],
item["difficulty"],
item["charter"],
)

modifyItem = {
"sid": sid,
"title": item["songName"],
"illustrator": item["illustrator"],
"composer": item["composer"],
"previewTimeFrom": item["previewTimeFrom"],
"previewTimeTo": item["previewTimeTo"],
"chartDetail": {"levelList": []},
}
for index in range(len(ratings)):
level, rating, charter = levels[index], ratings[index], charters[index]
modifyItem["chartDetail"][level] = {
"rating": rating,
"charter": charter,
}
modifyItem["chartDetail"]["levelList"].append(level)

# print(sid)
musicInfos.append(modifyItem)
reader.readSchema(SONG_BASE_SCHEMA)
# print("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-")

# with open("music-info.json", "w", encoding="utf8") as f:
# f.write(json.dumps(musicInfos, ensure_ascii=False, indent=2))
# print("music-info write completed")

with open("difficulty.tsv", "w", encoding="utf8") as f:
for item in difficulty:
f.write("\t".join(map(str, item)))
f.write("\n")
logger.success("difficulty write completed")

# with open("info.tsv", "w", encoding="utf8") as f:
# for item in table:
# f.write("\t".join(item))
# f.write("\n")
# print("info write completed")

key_schema = {"key": str, "a": int, "type": int, "b": int}
single = []
illustration = []
for item in reader.readSchema(key_schema):
if item["type"] == 0:
single.append(item["key"])
elif (
item["type"] == 2
and item["key"] != "Introduction"
and item["key"] not in single
):
illustration.append(item["key"])

# with open("single.txt", "w", encoding="utf8") as f:
# for item in single:
# f.write("%s\n" % item)
# print("single write completed")
# with open("illustration.txt", "w", encoding="utf8") as f:
# for item in illustration:
# f.write("%s\n" % item)
# print("illustration write completed")

reader = ByteReader(collection)
# collection_schema = {1: (int, int, int, str, str, str), "key": str, "index": int, 2: (int,), "title": str,
# 3: (str, str, str, str)}
# D = {}
# for item in reader.readSchema(collection_schema):
# if item["key"] in D:
# D[item["key"]][1] = item["index"]
# else:
# D[item["key"]] = [item["title"], item["index"]]
# with open("collection.tsv", "w", encoding="utf8") as f:
# for key, value in D.items():
# f.write("%s\t%s\t%s\n" % (key, value[0], value[1]))
# print("collection write completed")

# avatar_schema = {1: (int, int, int, str, str, str), "id": str, "file": str}
# table = reader.readSchema(avatar_schema)
# with open("avatar.txt", "w", encoding="utf8") as f:
# for item in table:
# f.write(item["id"])
# f.write("\n")
# print("avatar write completed")
# with open("tmp.tsv", "w", encoding="utf8") as f:
# for item in table:
# f.write("%s\t%s\n" % (item["id"], item["file"][7:]))
# print("tmp write completed")

# reader = ByteReader(tips[8:])
# with open("tips.txt", "w", encoding="utf8") as f:
# for i in range(reader.readInt()):
# f.write(reader.readString())
# f.write("\n")
# print("tips write completed")

# print("done.")
2 changes: 1 addition & 1 deletion phigros/phigros.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from PIL import Image
from pathlib import Path
from phigros.phigrosLibrary import PhigrosLibrary

from phigros.phigrosLibrary import PhigrosLibrary
from model import PhigrosB19

class Phigros:
Expand Down
Loading

0 comments on commit 1485ef8

Please sign in to comment.