Files
2026-02-01 22:23:06 +08:00

442 lines
13 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//#include <unistd.h>
#include "IMemoryPool.h"
#include "Fps.hpp"
#include "InterfaceSystem.h"
#include "gddefines.h"
#define MAX_USING_TIME (5000000)
namespace pool{
static int g_dump_flag = 0;
//
IMemoryBlock::IMemoryBlock(AdditionalImp &imp) noexcept {
_imp = (AdditionalImp *)(&imp);
}
IMemoryBlock::~IMemoryBlock(){
}
//
AdditionalImp *IMemoryBlock::data() noexcept{
return _imp;
}
void IMemoryBlock::useradd(){
use_cnt ++;
}
void IMemoryBlock::dumpHistory(const std::string str, const std::string &owner){
if(_history_used_list.size() >= 15){
_history_used_list.pop_front();
}
_history_used_list.push_back("[" + Fps::get_local_time() + "]" + str + owner);
}
void IMemoryBlock::useradd(const std::string &owner){
std::lock_guard<std::mutex> lock(_mutex);
useradd();
_used_list.push_back(owner);
dumpHistory("++++ owner: ", owner);
}
void IMemoryBlock::usermlus(){
use_cnt --;
}
void IMemoryBlock::usermlus(const std::string &owner){
std::lock_guard<std::mutex> lock(_mutex);
usermlus();
_used_list.remove(owner);
dumpHistory("---- owner: ", owner);
}
int IMemoryBlock::used() const{
return use_cnt;
}
std::string IMemoryBlock::owner() const{
return _owner;
}
void IMemoryBlock::setOwner(const std::string &owner){
// std::lock_guard<std::mutex> lock(_mutex);
_owner = owner;
_using_timestamp = Fps::get_time_ms();
}
void IMemoryBlock::reset(){
std::lock_guard<std::mutex> lock(_mutex);
_owner = "";
use_cnt = 0;
int live_time = Fps::get_time_ms() - _using_timestamp;
dumpHistory("reback to pool, living time: " + std::to_string(live_time), "");
// if(live_time > 2000000){
// printf("error buff state %p\n", this);
// this->dump();
// }
_using_timestamp = 0;
}
int IMemoryBlock::release(const std::string &queuName){
int cnt = used();
if(cnt <= 0){
printf("block is bad, owner[%s], \n", queuName.c_str());
dump();
return -1;
}
if(cnt > 0){
usermlus(queuName);
}
return 0;
}
void IMemoryBlock::dump(){
IMemoryBlockUsedList tmp = _used_list;
printf("[memory block] %p, cnt=%d, ", this, use_cnt);
for(unsigned int i=0; i<_used_list.size(); i++){
printf("owner [%d]:%s ", i, tmp.front().c_str());
tmp.pop_front();
}
printf("\n");
printf("----------history list----------\n");
IMemoryBlockUsedList his = _history_used_list;
for(unsigned int i=0; i<_history_used_list.size(); i++){
printf("%s\n", his.front().c_str());
his.pop_front();
}
printf("------------\n");
}
//IMemoryPool-------------------------------------------------------
IMemoryPool::IMemoryPool(){
// std::thread recycle(&IMemoryPool::recycleThread, std::ref(*this));
// recycle.detach();
}
void IMemoryPool::recycleThread(){
while(1){
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
/**
* check if a buffer block has using for more then 5 second.
* then drop it.
**/
// std::unique_lock<std::mutex> lock(_mutex);
_mutex.lock();
IMemoryPoolPtr ptr(_queue_ptr_dump);
_mutex.unlock();
for(unsigned int i=0; i<_queue_ptr_dump.size(); i++){
IMemoryBlock *block = ptr.front();
// unsigned long long cost = Fps::get_time_ms() - block->_using_timestamp;
// if(!block->owner().empty() && cost > MAX_USING_TIME){
// printf("[%p] owner:%s, using times:%lld \n", block, block->owner().c_str(), cost);
// // this->queue(block);
// }
ptr.pop();
}
}
}
IMemoryPool *IMemoryPool::getInstance(){
static IMemoryPool manager;
return &manager;
}
int IMemoryPool::addBuffer(AdditionalImp &imp){
IMemoryBlock *p = new IMemoryBlock(imp);
p->reset();
_queue_ptr.push(p);
_block_cnt++;
_queue_ptr_dump = _queue_ptr;
std::cout << " block:[" << p << ": "<< p->yuv() << "]";
return _block_cnt;
}
int IMemoryPool::size()
{
return _block_cnt;
}
IMemoryBlock* IMemoryPool::dequeue(std::string &owner){
IMemoryBlock *data = NULL;
std::unique_lock<std::mutex> lock(_mutex);
if(_queue_ptr.size() > 0){
data = _queue_ptr.front();
_queue_ptr.pop();
}
data->setOwner(owner);
data->useradd(owner);
return data;
}
void IMemoryPool::queuePush(IMemoryBlock* data, const std::string &queuName){
std::unique_lock<std::mutex> lock(_mutex);
data->useradd(queuName);
}
int IMemoryPool::queue(IMemoryBlock* data){
data->reset();
_queue_ptr.push(data);
//free buffer if platfrom need
sys_free(data->original());
sys_free(data->yuv());
sys_free(data->rgb());
return 0;
}
int IMemoryPool::release(IMemoryBlock* data, const std::string &queuName){
std::unique_lock<std::mutex> lock(_mutex);
data->usermlus(queuName);
if(data->used() == 0){
queue(data);
}
return 0;
}
void *IMemoryPool::getQueue(const std::string &name) {
IMemoryPoolIterator iter = _queue_map.find(name);
if(_queue_map.end() != iter){
return iter->second;
}
return NULL;
}
int IMemoryPool::restoreQueue(const std::string &name, void *queue) {
_queue_map[name] = queue;
return 0;
}
int IMemoryPool::CalcInfo(GD_VIDEO_FRAME_S *frame) {
switch (frame->enPixelFormat)
{
case GD_PIXEL_FORMAT_YUV420P:
case GD_PIXEL_FORMAT_YUV422P:
frame->u32Stride[0] = frame->u32Width;
frame->u32Stride[1] = frame->u32Width >> 1;
frame->u32Stride[2] = 0;
break;
case GD_PIXEL_FORMAT_RGB_PACKED:
case GD_PIXEL_FORMAT_BGR_PACKED:
frame->u32Stride[0] = frame->u32Stride[1] = frame->u32Stride[2] = frame->u32Width * 3;
break;
case GD_PIXEL_FORMAT_RGB_PLANAR:
case GD_PIXEL_FORMAT_BGR_PLANAR:
frame->u32Stride[0] = frame->u32Stride[1] = frame->u32Stride[2] = frame->u32Width;
break;
case GD_PIXEL_FORMAT_NV12:
case GD_PIXEL_FORMAT_NV21:
default:
frame->u32Stride[0] = frame->u32Stride[1] = frame->u32Width;
frame->u32Stride[2] = 0;
break;
}
return 0;
}
int IMemoryPool::blockMalloc(GD_VIDEO_FRAME_S *frame){
//malloc buffer
int size = frame->u32Stride[0] * frame->u32Height * fixelBite(frame->enPixelFormat);
frame->u64VirAddr[0] = sys_malloc(MAKE_ALIGNX(size, 4096));
// sys_memalign(&frame->u64VirAddr[0], size);
printf("*%p*", frame->u64VirAddr[0]);
switch (frame->enPixelFormat)
{
case GD_PIXEL_FORMAT_NV12:
case GD_PIXEL_FORMAT_NV21:
{
frame->u64VirAddr[1] = frame->u64VirAddr[0] + frame->u32Stride[0] * frame->u32Height;
frame->u64VirAddr[2] = 0;
break;
}
case GD_PIXEL_FORMAT_YUV420P:
{
frame->u64VirAddr[1] = frame->u64VirAddr[0] + frame->u32Stride[0] * frame->u32Height;
frame->u64VirAddr[2] = frame->u64VirAddr[1] + frame->u32Stride[1] * frame->u32Height / 4;
break;
}
case GD_PIXEL_FORMAT_RGB_PACKED:
case GD_PIXEL_FORMAT_BGR_PACKED:
case GD_PIXEL_FORMAT_RGB_PLANAR:
case GD_PIXEL_FORMAT_BGR_PLANAR:
frame->u64VirAddr[1] = frame->u64VirAddr[0] + frame->u32Stride[0] * frame->u32Height;
frame->u64VirAddr[2] = frame->u64VirAddr[1] + frame->u32Stride[1] * frame->u32Height;
break;
default:
break;
}
return 0;
}
void IMemoryPool::dump() {
// 先挂起media线程等待dump 内存池信息完成后,在继续运行。
g_dump_flag = 1;
//wait for hang thread.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
IMemoryPoolPtr tmp(_queue_ptr);
int size = _queue_ptr.size();
printf("[memory pool] total cnt:%d, used cnt: %d, unused cnt %d\n", _block_cnt, _block_cnt - size, size);
for(int i=0; i<size; i++){
printf("[memory pool] [%02d: %p] \n", i, tmp.front());
tmp.pop();
}
IMemoryPoolIterator iter = _queue_map.begin();
for(; iter != _queue_map.end(); ++ iter){
IMemoryQueue *queue = (IMemoryQueue *)iter->second;
queue->dump();
}
printf("\n-------------------------------------------------\n");
IMemoryPoolPtr ptr(_queue_ptr_dump);
printf("*memory pool total size = %d\n", (int)_queue_ptr_dump.size());
for(unsigned int i=0; i<_queue_ptr_dump.size(); i++){
IMemoryBlock *block = ptr.front();
printf("[*memory pool] [%02d: %p] owner :%s, used:%d\n", i, block, block->owner().c_str(), block->used());
block->dump();
ptr.pop();
}
g_dump_flag = 0;
printf("++++++++++++++++++++++++++++++++++++++++++++++++++\n\n");
}
//IMemoryQueue---------------------------------------
void IMemoryQueue::queueInit(const std::string &name, const int cnt, QUEUE_TYPE_e type){
_queue_name = name;
_max_cnt = cnt;
_type = type;
IMemoryPool::getInstance()->restoreQueue(name, this);
}
QUEUE_TYPE_e IMemoryQueue::fifoType() const {
return _type;
}
int IMemoryQueue::readyRead(IMemoryQueue *queue){
queue->_cond.notify_one();
return 0;
}
IMemoryBlock *IMemoryQueue::getFrame(IMemoryQueue *queue, const int timeout){
if(_type == QUEUE_OWNER){
std::string name = queue->queueName();
IMemoryBlock *p = IMemoryPool::getInstance()->dequeue(name);
return p;
}
int size = queue->size();
if(size <= 0){
if(timeout == 0){
return NULL;
}else if(timeout < 0){
std::unique_lock<std::mutex> lock(_mutex);
queue->_cond.wait(lock, [queue]{return queue->_buffer_queue.size() > 0;});
}else{
//超时等待
std::unique_lock<std::mutex> lock(_mutex);
if(!queue->_cond.wait_for(lock, std::chrono::milliseconds(timeout), [queue]{return queue->_buffer_queue.size() > 0;})){
return NULL;
}
}
}
IMemoryBlock *p = queue->pull();
return p;
}
IMemoryBlock *IMemoryQueue::getFrame(const int timeout){
while(g_dump_flag){
//hang on threads for dump pool info.
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
return getFrame(this, timeout);
}
int IMemoryQueue::sendFrame(IMemoryBlock *buffer){
return sendFrame(this, buffer);
}
int IMemoryQueue::sendFrame(IMemoryQueue *queue, IMemoryBlock *buffer){
queue->push(buffer);
return readyRead(queue);
}
int IMemoryQueue::sendFrame(const std::string &name, IMemoryBlock *buffer){
IMemoryQueue *queue = (IMemoryQueue *)IMemoryPool::getInstance()->getQueue(name);
if(queue != NULL){
return queue->sendFrame(queue, buffer);
}
return 0;
}
int IMemoryQueue::releaseFrame(IMemoryBlock *buffer){
std::unique_lock<std::mutex> lock(_mutex);
IMemoryPool::getInstance()->release(buffer, _queue_name);
return 0;
}
std::string IMemoryQueue::queueName() const {
return _queue_name;
}
unsigned int IMemoryQueue::size(){
std::unique_lock<std::mutex> lock(_mutex);
return _buffer_queue.size();
}
void IMemoryQueue::clearQueue(){
std::unique_lock<std::mutex> lock(_mutex);
while (!_buffer_queue.empty())
{
_buffer_queue.pop();
}
}
int IMemoryQueue::push(IMemoryBlock *buffer){
unsigned int size = this->size();
if(size >= _max_cnt){
IMemoryBlock *p = this->pull();
releaseFrame(p);
}
std::unique_lock<std::mutex> lock(_mutex);
IMemoryPool::getInstance()->queuePush(buffer, queueName());
_buffer_queue.push(buffer);
return 0;
}
IMemoryBlock* IMemoryQueue::pull(){
std::unique_lock<std::mutex> lock(_mutex);
IMemoryBlock *p = _buffer_queue.front();
_buffer_queue.pop();
return p;
}
void IMemoryQueue::dump(){
unsigned int length = size();
printf("[memory queue %s] total cnt:%d\n", _queue_name.c_str(), length);
IMemoryQueuePtr temp(_buffer_queue); // 拷贝原队列到临时队列
int i=0;
while (!temp.empty()) {
IMemoryBlock *block = temp.front();
printf("[memory buffer][%02d: %p], owner:%s, used %d\n", i, block, block->owner().c_str(), block->used());
temp.pop();
i++;
}
}
}