Skip to content

Commit

Permalink
Playing around with Scheduler and NetworkHandler layout
Browse files Browse the repository at this point in the history
  • Loading branch information
zach2good committed Feb 6, 2025
1 parent 84e4d38 commit 31860ee
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 18 deletions.
192 changes: 192 additions & 0 deletions src/common/scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
===========================================================================
Copyright (c) 2025 LandSandBoat Dev Teams
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see http://www.gnu.org/licenses/
===========================================================================
*/

#pragma once

#include <asio/io_context.hpp>

#include <algorithm>
#include <array>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

using duration = std::chrono::steady_clock::duration;
using time_point = std::chrono::steady_clock::time_point;

namespace
{
constexpr std::chrono::milliseconds kMinWaitMs{ 50 };
constexpr std::chrono::milliseconds kMaxWaitMs{ 1000 };
} // namespace

using TaskFunc = std::function<void(time_point)>;

enum class TaskKind
{
TASK_ONCE,
TASK_INTERVAL
};

struct Task
{
uint64_t id;
time_point tick; // Next scheduled execution time.
duration interval; // Recurrence interval.
TaskKind kind; // One-time or recurring.
TaskFunc handler; // Task callback.
};

struct TaskComparator
{
bool operator()(const std::unique_ptr<Task>& a, const std::unique_ptr<Task>& b) const
{
return a->tick > b->tick;
}
};

class Scheduler
{
public:
Scheduler(asio::io_context& ioContext)
: m_ioContext(ioContext)
, nextTaskId(0)
{
}

~Scheduler()
{
tasks.clear();
}

uint64_t addTask(duration delay, bool recurring, std::function<void(time_point)> handler)
{
uint64_t id = nextTaskId++;

auto now = std::chrono::steady_clock::now();

auto t = std::make_unique<Task>();
t->id = id;
t->tick = now + delay;
t->interval = delay;
t->kind = recurring ? TaskKind::TASK_INTERVAL : TaskKind::TASK_ONCE;
t->handler = handler;

tasks.push_back(std::move(t));

// Rebuild the heap.
std::push_heap(tasks.begin(), tasks.end(), TaskComparator());

return id;
}

// Actively remove tasks with the given ID.
void removeTask(uint64_t taskId)
{
const auto pred = [taskId](const auto& t)
{
return t->id == taskId;
};

auto newEnd = std::remove_if(tasks.begin(), tasks.end(), pred);
if (newEnd != tasks.end())
{
tasks.erase(newEnd, tasks.end());
std::make_heap(tasks.begin(), tasks.end(), TaskComparator());
}
}

// Process all tasks that are due at or before 'now'.
// For recurring tasks, reschedule them using:
// new_tick = (if >1s late: now, else previous tick) + interval.
// Returns the time until the next scheduled task, clamped between kMinWaitMs and kMaxWaitMs.
duration run(time_point now)
{
duration diff = std::chrono::seconds(1);
while (!tasks_.empty())
{
// tasks.front() is the task with the earliest tick.
diff = tasks.front()->tick - now;
if (diff > duration::zero())
{
break; // No tasks are overdue.
}

// Pop the top task.
std::pop_heap(tasks.begin(), tasks.end(), TaskComparator());
auto task = std::move(tasks.back());
tasks.pop_back();

// If the task is very late (>1s late), call its handler with 'now'.
if (diff < -std::chrono::seconds(1))
{
task->handler(now);
}
else
{
task->handler(task->tick);
}

if (task->kind == TaskKind::TASK_INTERVAL)
{
// Reschedule recurring task.
if (now - task->tick > std::chrono::seconds(1))
{
task->tick = now + task->interval;
}
else
{
t->tick += t->interval;
}

tasks.push_back(std::move(task));

// Rebuild the heap.
std::push_heap(tasks.begin(), tasks.end(), TaskComparator());
}

// One-time tasks are destroyed when they fall out of scope.
}

// We clamp here because:
// A minimum duration prevents the network run duration from being too short (which might starve network processing).
// A maximum duration prevents a long wait (if no tasks are due) that might delay periodic network processing.
diff = std::clamp(diff, kMinWaitMs, kMaxWaitMs);

return diff;
}

private:
using TaskHeap = std::vector<std::unique_ptr<Task>>; // Maintained as a heap.

// NOTE: We don't use ioContext_ yet, we're mimicking the synchronous TaskMgr.
asio::io_context& ioContext_;

uint64_t nextTaskId_;
TaskHeap tasks_;
};
11 changes: 7 additions & 4 deletions src/map/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@

int main(int argc, char** argv)
{
auto pMapServer = std::make_unique<MapServer>(argc, argv);

while (pConnectServer->IsRunning())
try
{
auto pMapServer = std::make_unique<MapServer>(argc, argv);
pMapServer->run();
}
catch (const std::exception& e)
{
pConnectServer->Tick();
ShowFatal(fmt::format("Fatal: {}", e.what()));
}

return 0;
Expand Down
15 changes: 1 addition & 14 deletions src/map/map_connection.h → src/map/map_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,4 @@

#pragma once

class MapConnection final
{
public:
// TODO: ASIO

int32 recv_parse(int8* buff, size_t* buffsize, sockaddr_in* from, map_session_data_t*); // main function to parse recv packets
int32 parse(int8* buff, size_t* buffsize, sockaddr_in* from, map_session_data_t*); // main function parsing the packets
int32 send_parse(int8* buff, size_t* buffsize, sockaddr_in* from, map_session_data_t*, bool usePreviousKey); // main function is building big packet

// TODO: PacketHandlers, canonize packet_system

private:

};
static constexpr std::size_t MAX_BUFFER_SIZE = 1024;
File renamed without changes.
125 changes: 125 additions & 0 deletions src/map/map_network_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
===========================================================================
Copyright (c) 2025 LandSandBoat Dev Teams
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see http://www.gnu.org/licenses/
===========================================================================
*/

#pragma once

#include "asio.hpp"
#include <algorithm>
#include <array>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

struct Session
{
uint64_t ipp;
};

static constexpr std::size_t MAX_BUFFER_SIZE = 1024;

using asio::ip::udp;

class MapNetworkHandler : public std::enable_shared_from_this<MapNetworkHandler>
{
public:
NetworkHandler(asio::io_context& ioContext, unsigned short port)
: socket(ioContext, udp::endpoint(udp::v4(), port))
{
}

void queue_async_recv()
{
socket.async_receive_from(
asio::buffer(buffer), // Reused for each receive.
remoteEndpoint, // Reused for each receive.
[self = shared_from_this()](const asio::error_code& ec, std::size_t bytesRecvd)
{
self->handleReceive(ec, bytesRecvd);
});
}

// Process network events for the given duration.
void run_for(duration d)
{
socket.get_executor().context().run_for(d);
}

private:
// TODO: This should exist outside?
std::shared_ptr<Session> lookupSession(uint64_t ipp)
{
auto it = sessions.find(ipp);
if (it != sessions.end())
{
return it->second;
}
auto session = std::make_shared<Session>();
session->ipp = ipp;
sessions[ipp] = session;
return session;
}

void handleReceive(const asio::error_code& ec, std::size_t bytesRecvd)
{
if (!ec && bytesRecvd > 0)
{
std::cout << "Received " << bytesRecvd << " bytes from "
<< remoteEndpoint.address().to_string() << ":"
<< remoteEndpoint.port() << std::endl;

auto ip = remoteEndpoint.address().to_v4().to_ulong();
uint16_t port = remoteEndpoint.port();
uint64_t ipp = ip;
ipp |= (static_cast<uint64_t>(port) << 32);

auto session = lookupSession(ipp);
if (session)
{
std::cout << "Session found/created for ipp: " << ipp << std::endl;
std::string data(buffer.data(), bytesRecvd);
std::cout << "Data: " << data << std::endl;
}
}
else if (ec)
{
std::cerr << "Receive error: " << ec.message() << std::endl;
}

// Queue the next asynchronous receive.
queue_async_recv();
}

using SessionMap = std::unordered_map<uint64_t, std::shared_ptr<Session>>;
using Buffer = std::array<char, MAX_BUFFER_SIZE>;

udp::socket socket;
udp::endpoint remoteEndpoint; // Reused for each receive.
Buffer buffer; // Reused for each receive.

SessionMap sessions;
};
Loading

0 comments on commit 31860ee

Please sign in to comment.