{message,submit}_queue: add size() function and callback

This commit is contained in:
Syping 2026-04-07 22:02:44 +02:00
parent f25b1ac4db
commit 2d96e35793
4 changed files with 51 additions and 3 deletions

View file

@ -101,6 +101,8 @@ void message_queue::run(bot::settings::settings *settings, submit_queue *submit_
if (!m_queue.empty()) { if (!m_queue.empty()) {
const message message = m_queue.front(); const message message = m_queue.front();
m_queue.pop(); m_queue.pop();
for (message_queue_size_callback &callback : m_callbacks)
callback(m_queue.size());
m_mutex.unlock(); m_mutex.unlock();
auto translator = settings->get_translator(); auto translator = settings->get_translator();
@ -134,6 +136,23 @@ void message_queue::run(bot::settings::settings *settings, submit_queue *submit_
} }
} }
size_t message_queue::size() {
const std::lock_guard<std::mutex> guard(m_mutex);
return m_queue.size();
}
void message_queue::size_callback_add(const message_queue_size_callback &callback) {
const std::lock_guard<std::mutex> guard(m_mutex);
m_callbacks.push_back(callback);
}
/*
void message_queue::size_callback_remove(const message_queue_size_callback &callback) {
const std::lock_guard<std::mutex> guard(m_mutex);
m_callbacks.erase(std::remove(m_callbacks.begin(), m_callbacks.end(), callback));
}
*/
void message_queue::terminate() void message_queue::terminate()
{ {
m_running = false; m_running = false;

View file

@ -19,9 +19,10 @@
#ifndef MESSAGE_QUEUE_H #ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H #define MESSAGE_QUEUE_H
#include <dpp/cluster.h> #include <dpp/cluster.h>
#include <functional>
#include <mutex> #include <mutex>
#include <string>
#include <queue> #include <queue>
#include <string>
#include <variant> #include <variant>
#include <vector> #include <vector>
#include "settings.h" #include "settings.h"
@ -42,6 +43,7 @@ namespace bot {
std::vector<bot::settings::target> targets; std::vector<bot::settings::target> targets;
}; };
typedef std::function<void(size_t)> message_queue_size_callback;
typedef std::variant<direct_message, guild_message> message; typedef std::variant<direct_message, guild_message> message;
class message_queue { class message_queue {
@ -51,12 +53,15 @@ namespace bot {
void process_direct_message_event(dpp::cluster *bot, bot::settings::settings *settings, const dpp::message_context_menu_t &event); void process_direct_message_event(dpp::cluster *bot, bot::settings::settings *settings, const dpp::message_context_menu_t &event);
void process_guild_message_event(dpp::cluster *bot, bot::settings::settings *settings, const dpp::message_create_t &event); void process_guild_message_event(dpp::cluster *bot, bot::settings::settings *settings, const dpp::message_create_t &event);
void run(bot::settings::settings *settings, submit_queue *submit_queue); void run(bot::settings::settings *settings, submit_queue *submit_queue);
size_t size();
void size_callback_add(const message_queue_size_callback &callback);
void terminate(); void terminate();
private: private:
bool m_running; bool m_running;
std::mutex m_mutex; std::mutex m_mutex;
std::queue<message> m_queue; std::queue<message> m_queue;
std::vector<message_queue_size_callback> m_callbacks;
}; };
} }

View file

@ -42,6 +42,8 @@ void submit_queue::run(dpp::cluster *bot)
if (!m_queue.empty()) { if (!m_queue.empty()) {
const translated_message message = m_queue.front(); const translated_message message = m_queue.front();
m_queue.pop(); m_queue.pop();
for (submit_queue_size_callback &callback : m_callbacks)
callback(m_queue.size());
m_mutex.unlock(); m_mutex.unlock();
if (const auto *direct_message = std::get_if<bot::translated_direct_message>(&message)) { if (const auto *direct_message = std::get_if<bot::translated_direct_message>(&message)) {
@ -60,6 +62,23 @@ void submit_queue::run(dpp::cluster *bot)
} }
} }
size_t submit_queue::size() {
const std::lock_guard<std::mutex> guard(m_mutex);
return m_queue.size();
}
void submit_queue::size_callback_add(const submit_queue_size_callback &callback) {
const std::lock_guard<std::mutex> guard(m_mutex);
m_callbacks.push_back(callback);
}
/*
void submit_queue::size_callback_remove(const submit_queue_size_callback &callback) {
const std::lock_guard<std::mutex> guard(m_mutex);
m_callbacks.erase(std::remove(m_callbacks.begin(), m_callbacks.end(), callback));
}
*/
void submit_queue::terminate() void submit_queue::terminate()
{ {
m_running = false; m_running = false;

View file

@ -21,8 +21,9 @@
#include <dpp/cluster.h> #include <dpp/cluster.h>
#include <dpp/webhook.h> #include <dpp/webhook.h>
#include <mutex> #include <mutex>
#include <string>
#include <queue> #include <queue>
#include <string>
#include <vector>
namespace bot { namespace bot {
struct translated_direct_message { struct translated_direct_message {
@ -37,6 +38,7 @@ namespace bot {
dpp::webhook webhook; dpp::webhook webhook;
}; };
typedef std::function<void(size_t)> submit_queue_size_callback;
typedef std::variant<translated_direct_message, translated_guild_message> translated_message; typedef std::variant<translated_direct_message, translated_guild_message> translated_message;
class submit_queue { class submit_queue {
@ -44,12 +46,15 @@ namespace bot {
void add(const translated_message &message); void add(const translated_message &message);
void add(translated_message &&message); void add(translated_message &&message);
void run(dpp::cluster *bot); void run(dpp::cluster *bot);
size_t size();
void size_callback_add(const submit_queue_size_callback &callback);
void terminate(); void terminate();
private: private:
bool m_running; bool m_running;
std::mutex m_mutex; std::mutex m_mutex;
std::queue<translated_message> m_queue; std::queue<translated_message> m_queue;
std::vector<submit_queue_size_callback> m_callbacks;
}; };
} }