Qt_Mysql异步单线程多任务类

2014年4月28日 由 Creater 留言 »

在QT使用mysql时,函数都为同步,往往使得界面卡死或者延误其他事情,经过思考,花了一晚上时间实现了使用一个单独线程来执行sql任务的类,在执行完成后异步回调。使用两个链表来维护所有mysql操作,一个经常使用指令队列,一个很少使用指令队列。针对每个指令操作时间较短,设计了一个线程实现类似线程池效果

由于这些操作是在两个线程中完成的,而且信号槽队列中的数据类型必须是系统能识别的元类型,而我的代码中使用了QVector作为信号和槽函数的参数。在线程之间传递signal与在一个线程内传递signal是不一样的。在一个线程内传递signal时,emit语句会直接调用所有连接的slot并等待到所有slot被处理完;在线程之间传递signal时,slot会被放到队列中(queue),而emit这个signal后会马上返回。

默认情况,线程之间使用queue机制,而线程内使用direct机制,为了能在线程之间使用自定义的类型作为信号参数,有以下两种解决办法:

1.在connect的第五个参数设置为Qt::DirectConnection直接发送(有弊端),违背了异步的主旨,修改connect如下:

    
 typedef QVector<QPointF> MyVector;
/*   */
connect(&sqlthread,SIGNAL(getResult(MyVector&, int)),
              this,SLOT(mydata(MyVector&, int)), Qt::DirectConnection);

2.使用qRegisterMetaType()进行注册
在定义QVector这个模板类型的地方,进行以下声明。

typedef QVector<QPointF> MyVector;
Q_DECLARE_METATYPE(MyVector)

在主函数main中,进行注册

    qRegisterMetaType<MyVector>("MyVector");
    qRegisterMetaType<MyVector>("MyVector&");

以下为类SQLThread的部分源代码

下面给出SQLThread 类的部分源代码,欢迎提出意见与修改建议。
sqlthread.h

#ifndef SQL_THREAD_H
#define SQL_THREAD_H

#include "sql.h"
#include <QThread>
#include <QSemaphore>
#include <QVector>
#include <QMetaType>

class SQLThread : public QThread
{
    Q_OBJECT
public:
    enum SQLTasks
    {
        //连接数据库
        ConnectDb,
        //创建表
        CreatTable,
        //插入数据
        InsertData,
        //查询数据
        SelectData,
        NTasks //3
    };
    //很少使用数据队列
    struct SQLDataRarelyUsed
    {
        /*!
         *该构造函数默认为建表指令或者查询指令,由t区分
         */
        SQLDataRarelyUsed(QString table_name,
                          SQLTasks t): name(table_name), task(t)
        {
        }

        /*!
         *该构造函数默认为连接数据库指令
         */
        SQLDataRarelyUsed(QString db_name,
                          QString user_, QString pass_, SQLTasks t):
            name(db_name), user(user_), passwd(pass_), task(t)
        {
        }

        /*!
         * 用于连接数据库用,用于建表用,用于查询表用
         */
        QString name; //db_name,table_name
        /*!
         * 用于连接数据库用
         */
        QString user;
        QString passwd;
        SQLThread::SQLTasks task;
    };
    //常使用的插入指令队列
    struct SQLDataFrequentUsed
    {
        /*!
         * 构造插入数据指令
         */
        SQLDataFrequentUsed(QString table_name, long id_, float data_):
            table(table_name), id(id_), data(data_){}
        QString table;
        long id;
        float data;
    };

    SQLThread();
    virtual ~SQLThread();
    void run();

    //外部调用,连接到数据库
    void connectDb(QString, QString, QString );
    //外部调用,创建数据表
    void creatTable(QString);
    //外部调用,向表中插入数据
    void insertData(QString, long, float);
    //外部调用,查询表
    void selectTable(QString);



private:
    SQL sql;

    QSemaphore freeSemaphore;
    QSemaphore usedSemaphore;
    QList<SQLDataRarelyUsed> rarelyUsed;
    QList<SQLDataFrequentUsed> frequentUsed;

    //用于查询数据时返回数据条数
    int cnt;
    bool exitFlag;

Q_SIGNALS:
    //异步消息分发
     void openError(QString);
     //异步获取查询表的结果
     void getResult(MyVector&, int);
};

#endif

sqlthread.cpp

#include "sqlthread.h"
#include "sql.h"
SQLThread::~SQLThread()
{
    exitFlag = true;
    usedSemaphore.release();
    freeSemaphore.release();
    wait();
}
SQLThread::SQLThread() :
    freeSemaphore(10), usedSemaphore(0)
{
    //10 > 10 + 3
    exitFlag = false;
    rarelyUsed.reserve(3);
    frequentUsed.reserve(10);
}

void SQLThread::run()
{
    bool haveError = false;
    int tryCnt = 3;
    while(true)
    {
        while(rarelyUsed.empty() && frequentUsed.empty())
        {
            usedSemaphore.acquire();
            if(exitFlag)
            {
                rarelyUsed.clear();
                frequentUsed.clear();
                return;
            }
        }

        if(!rarelyUsed.empty())
        {
           // usedSemaphore.acquire();
            //连接数据库
            if( rarelyUsed.front().task == ConnectDb)
            {
                 sql.setDb("test", rarelyUsed.front().user,
                           rarelyUsed.front().passwd);
                 if(!sql.openDb())
                 {
                    emit openError("Can not open the MYSQL database, please cheack...");
                    if(++tryCnt <= 3)
                    {
                        connectDb("test", rarelyUsed.front().user,
                                  rarelyUsed.front().passwd);
                    }
                    else
                        haveError = true;
                    break;
                 }

                 if(!sql.createDb(rarelyUsed.front().name))
                 {
                    emit openError("Create database failure....");
                    break;
                 }

                 if(!sql.changeDb(rarelyUsed.front().name))
                 {
                     emit openError("Change database failure....");
                     break;
                 }

            }
            else if( rarelyUsed.front().task == CreatTable)
            {
                if(! sql.createTable(rarelyUsed.front().name))
                {
                    emit openError("Create table failure....");
                    break;
                }
            }
             else if( rarelyUsed.front().task == SelectData)
            {

                emit getResult(sql.selectTable(rarelyUsed.front().name, cnt), cnt);
            }
            rarelyUsed.pop_front();
            freeSemaphore.release();
            continue;
        }
        //处理第二队列
        if(!frequentUsed.empty())
        {
          //  usedSemaphore.acquire();
            sql.insertData(frequentUsed.front().table,
                           frequentUsed.front().id, frequentUsed.front().data);
            frequentUsed.pop_front();
            freeSemaphore.release();
        }
    }
}

//连接数据库
void SQLThread::connectDb(QString db_name, QString user, QString passwd)
{
    freeSemaphore.acquire();
    rarelyUsed.push_back(SQLDataRarelyUsed(db_name,
                                           user, passwd, SQLThread::ConnectDb));
    usedSemaphore.release();

}
//创建表
void SQLThread::creatTable(QString table_)
{
    freeSemaphore.acquire();
    rarelyUsed.push_back(SQLDataRarelyUsed(table_,
                                           SQLThread::CreatTable));
    usedSemaphore.release();

}
void SQLThread::insertData(QString table_, long time_, float data_)
{
    freeSemaphore.acquire();
    frequentUsed.push_back(SQLDataFrequentUsed(
                               table_, time_, data_));
    usedSemaphore.release();

}
void SQLThread::selectTable(QString table_)
{
    freeSemaphore.acquire();
    rarelyUsed.push_back(SQLDataRarelyUsed(table_,
                                           SQLThread::SelectData));
    usedSemaphore.release();
}
广告位

评论已关闭.