Files
unbelievaselfbot/bot/client.py

166 lines
6.2 KiB
Python

import asyncio
import logging
import re
import time
from typing import Optional, Tuple
import discord
from discord.ext.tasks import loop
from bot import constants, parsers, helpers
from bot.constants import PlayOptions
logger = logging.getLogger(__file__)
logger.setLevel(constants.LOGGING_LEVEL)
class UnbelievaClient(discord.Client):
def __init__(self, bot_id: int, channel_id: int, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.last_message = -1
self.bot_id, self.channel_id = bot_id, channel_id
self.channel: Optional[discord.TextChannel] = None
self.bot: Optional[discord.User] = None
self.tasks = [
('$work', 5 * 60 + 5),
('$slut', 13 * 60 + 5),
('$crime', 20 * 60 + 5)
]
self.task_times = {}
self.money = 0
self.last_deposit = -1
self.last_user_deposit = -1
self.current_blackjack: Optional[discord.Message] = None
async def on_ready(self):
await self.wait_until_ready()
self.channel: discord.TextChannel = self.get_channel(self.channel_id)
self.bot = self.bot_id
self.check_task_available.start()
logger.info(f'Connected to #{self.channel.name} in {self.channel.guild.name}')
async def on_message(self, message: discord.Message):
# Ignore messages in other channels or sent by myself
if message.channel != self.channel or message.author == self.user:
return
if message.author.id == self.bot and len(message.embeds) > 0:
embed = message.embeds[0]
is_self = embed.author.name == f'{self.user.name}#{self.user.discriminator}'
if is_self:
if parsers.TaskCooldownMessage.check_valid(message):
tcm = parsers.TaskCooldownMessage(message)
logger.debug(f'"{tcm.duration_unparsed}" => {tcm.duration}s')
logger.debug(f'Changed {tcm.task_type} to wait {tcm.duration + 2}s instead.')
self.task_times[tcm.task_type] = tcm.available_at
# helpers.print_embed(embed)
# Handling earnings
if parsers.TaskResponse.check_valid(message):
tr = parsers.TaskResponse(message)
self.money += tr.change
logger.log(logging.INFO if is_self else logging.DEBUG, tr.log_message(embed.author.name))
# Handling for blackjack
if embed.description.startswith('Type `hit` to draw another card'):
options = self.parse_options(embed.description)
my_cards = self.parse_cards(embed.fields[0])
dealer_cards = self.parse_cards(embed.fields[1])
print(options, my_cards, dealer_cards)
# self.current_blackjack = message
def parse_options(self, options_str: str) -> PlayOptions:
"""
Return a tuple of booleans describing what the player can do.
Tuple Options: [hit, stand, double_down, split]
"""
options = [f'`{sub}`' in options_str for sub in ['hit', 'stand', 'double down', 'split']]
# noinspection PyProtectedMember
return PlayOptions._make(options)
def parse_cards(self, card_str: discord.embeds.EmbedProxy) -> Tuple[str, Optional[str], Tuple[int, bool]]:
"""
Parses a Embed Proxy
:param card_str:
:return: [Card1, Card2?, [Value, isSoft]]
"""
emote_pattern = r'<:([A-z0-9]+):\d+>'
value_pattern = r'Value: (Soft )?(\d+)'
cards = list(re.finditer(emote_pattern, card_str.value))
value = re.search(value_pattern, card_str.value)
c2: Optional[str]
c1, c2 = cards[0].group(1), cards[1].group(1)
c2 = c2 if c2 != 'cardBack' else None
return c1, c2, (int(value.group(2)), value.groups()[1] is None)
def handle_blackjack(self):
embed = self.current_blackjack.embeds[0]
options = self.parse_options(embed.description)
my_cards = self.parse_cards(embed.fields[0])
dealer_cards = self.parse_cards(embed.fields[1])
print(options, my_cards, dealer_cards)
pass
def handle_task_wait(self, match: re.Match):
task_command = self.task_parsings[match.group(1)]
duration_match = re.match(
r'(\d+) (hour|minute|second)s?(?: and (\d+) (hour|minute|second)s?)?',
match.group(2))
groups = len(list(filter(lambda s: s is not None, duration_match.groups())))
duration = 0
for i in range(0, groups // 2):
x, y = (i * 2) + 1, (i * 2) + 2
duration += int(duration_match.group(x)) * self.durations[duration_match.group(y)]
epoch = time.time() + duration + 2
logger.debug(f'"{match.group(2)}" => {duration}s')
logger.debug(f'Changed {task_command} to wait {duration + 2}s instead.')
self.task_times[task_command] = epoch
@loop(seconds=1)
async def check_task_available(self):
await self.wait_until_ready()
# Check for available tasks
now = time.time()
task_completed = False
for task, duration in self.tasks:
if self.task_times.get(task, 0) <= now:
logger.debug(f'Planning to execute {task} {duration}s from now.')
self.task_times[task] = now + duration
await self.command_sleep()
logger.debug(f'Executing {task} task.')
await self.channel.send(task)
self.last_message = time.time()
task_completed = True
if not task_completed:
if self.last_deposit + (30 * 60) <= now:
await self.command_sleep()
await self.channel.send('$dep all')
self.last_message = time.time()
self.last_deposit = time.time()
if self.current_blackjack is not None:
self.handle_blackjack()
async def command_sleep(self):
"""Sleep right before sending a command."""
now = time.time()
time_between = now - self.last_message
wait_time = 6 - time_between
if wait_time > 0:
logger.debug(f'Sleeping for {round(wait_time, 2)}s before sending a command...')
await asyncio.sleep(wait_time)