/* * Copyright (c) 2006 Teodor Sigaev * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the author nor the names of any co-contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include "ftsbench.h" #ifdef HAVE_POLL_H #include #else /* HAVE_POLL */ #ifdef HAVE_SYS_POLL_H #include #else #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H #endif /* HAVE_SYS_POLL_H */ #endif /* HAVE_POLL */ #ifndef INFTIM #define INFTIM (-1) #endif typedef enum SocketState { SS_NONE = 0, SS_READ, SS_READYREAD, SS_WRITE, SS_READYWRITE } SocketState; typedef enum ResStatus { RS_OK = 0, RS_INPROGRESS } ResStatus; typedef struct ftsPG { ftsDB db; PGconn *conn; int flags; PQnoticeReceiver origreceiver; int emptyquery; int prepared; int socket; SocketState state; StringBuf b; } ftsPG; static void NoticeReceiver(void *arg, const PGresult *res) { ftsPG *db = (ftsPG*) arg; db->emptyquery = 0; if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) { char *msg = PQresultErrorMessage(res); /* NOTICE */ if ( strstr(msg, "does not exist, skipping")!=NULL ) { /* skip 'NOTICE: table "wow" does not exist, skipping' */ return; } if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) { db->emptyquery = 1; return; } } db->origreceiver(db, res); } static void pgflush(ftsPG *db) { int flushres; flushres=PQflush(db->conn); if ( flushres > 0 ) { /* need more write */ db->state = SS_WRITE; } else if ( flushres == 0 ) { /* success write, waits for read */ db->state = SS_READ; } else { fatal( "PQflush failed: %s\n", PQerrorMessage(db->conn)); } } static ResStatus checkStatus(ftsPG* db) { switch( db->state ) { case SS_READYWRITE: pgflush(db); case SS_READ: case SS_WRITE: return RS_INPROGRESS; case SS_READYREAD: PQconsumeInput(db->conn); if ( PQisBusy(db->conn) != 0 ) { db->state = SS_READ; return RS_INPROGRESS; } break; case SS_NONE: default: fatal( "Should not be here!\n"); break; } return RS_OK; } static void waitResult(ftsPG *db) { struct pollfd pfd; PGresult *res; pfd.fd = db->socket; do { pfd.events = pfd.revents = 0; if ( db->state == SS_READ ) { pfd.events = POLLIN; } else if ( db->state == SS_WRITE ) { pfd.events = POLLOUT; } if ( pfd.events ) { int ret = poll( &pfd, 1, INFTIM); if ( ret<0 ) fatal("poll failed: %s\n", strerror(errno)); if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) { fatal("Poll report about socket error\n"); } else if ( pfd.revents & POLLIN ) { db->state = SS_READYREAD; } else if ( pfd.revents & POLLOUT ) { db->state = SS_READYWRITE; } } } while( checkStatus(db) != RS_OK ); while ( (res = PQgetResult(db->conn))!= NULL ) { if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); } } static int checkEmptyQuery(ftsPG *db, PGresult *res) { if ( db->emptyquery > 0 && PQresultStatus(res) == PGRES_FATAL_ERROR && strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */ strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL ) return 1; return 0; } static void execQuery(ftsDB* adb, char ** words, int flags) { ftsPG *db = (ftsPG*)adb; int i = 0; PGresult *res; if ( db->prepared == 0 ) { /* firsttime */ char buf[1024]; db->flags = flags; sprintf(buf, "SELECT count(*) FROM ftsbench WHERE %s @@ to_tsquery( $1 ::text );", ( flags & FLG_FUNC ) ? "to_tsvector(body)" : "fts" ); res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL ); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "PREPARE SELECT command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); db->prepared = 1; } db->b.strlen = 0; while( *words ) { char *ptr = *words; if ( i!= 0 ) sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4); else sb_addchar(&db->b, '\''); while( *ptr ) { if (*ptr == '\'') sb_addchar(&db->b, '\''); sb_addchar(&db->b, *ptr); ptr++; } sb_addchar(&db->b, '\''); i++; words++; } i = 1; res = PQexecPrepared( db->conn, "search_ftsbench", 1, (const char**)&(db->b.str), &(db->b.strlen), &i, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK) { /* skip error ' all words are a stop word' for GIN index - result is empty, in any case */ if ( checkEmptyQuery(db, res) == 0 ) fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn)); } else if ( PQntuples(res) == 1 ) { db->db.nres += atoi( PQgetvalue(res,0,0) ); } else { fatal("Bad PQntuples %d\n", PQntuples(res)); } PQclear(res); db->emptyquery = 0; pthread_mutex_lock(&(db->db.nqueryMutex)); db->db.nquery ++; pthread_mutex_unlock(&(db->db.nqueryMutex)); } static void startCreateScheme(ftsDB* adb, int flags) { ftsPG *db = (ftsPG*)adb; char buf[1024]; PGresult *res; db->flags = flags; res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;"); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "DROP TABLE command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); if ( flags & FLG_FUNC ) sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );"); else sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts tsvector ); " "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench " "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" ); res = PQexec(db->conn, buf); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); res = PQexec(db->conn, "BEGIN;"); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); return; } static void finishCreateScheme(ftsDB* adb) { ftsPG *db = (ftsPG*)adb; PGresult *res; if ( db->db.nquery > 0 ) { waitResult(db); if ( PQsetnonblocking(db->conn, 0) != 0 ) fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn)); } res = PQexec(db->conn, "COMMIT;"); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) { char buf[1024]; if ( db->flags & FLG_FUNC ) sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );", (db->flags & FLG_GIST) ? "GiST" : "GIN" ); else sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );", (db->flags & FLG_GIST) ? "GiST" : "GIN" ); report("(create index, "); res = PQexec(db->conn, buf); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "CREATE INDEX command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); } else report("("); report("vacuum"); res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;"); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "VACUUM ANALYZE command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); report(") "); return; } static void InsertRow(ftsDB* adb, int id, char *txt) { ftsPG *db = (ftsPG*)adb; PGresult *res; const char *paramValues[2]; uint32_t binaryIntVal; int paramLengths[] = {sizeof(binaryIntVal), 0}; int paramFormats[] = {1, 0}; if ( db->db.nquery == 0 ) { /* firsttime */ res = PQprepare( db->conn, "insert_ftsbench", "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);", 2, NULL ); if (PQresultStatus(res) != PGRES_COMMAND_OK) fatal( "PREPARE INSERT command failed: %s\n", PQerrorMessage(db->conn)); PQclear(res); if ( PQsetnonblocking(db->conn, 1) != 0 ) fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn)); } else { waitResult(db); } binaryIntVal = htonl((uint32_t) id); paramValues[0] = (char*)&binaryIntVal; paramValues[1] = txt; if ( PQsendQueryPrepared( db->conn, "insert_ftsbench", 2, paramValues, paramLengths, paramFormats, 0) == 0 ) fatal( "PQsendQueryPrepared failed: %s\n", PQerrorMessage(db->conn)); pgflush(db); db->db.nquery++; } static void Close(ftsDB* adb) { ftsPG *db = (ftsPG*)adb; PQfinish(db->conn); } ftsDB* PGInit(char * connstr) { ftsPG *db = (ftsPG*)malloc(sizeof(ftsPG)); char conninfo[1024]; memset(db,0,sizeof(ftsPG)); sprintf(conninfo, "dbname=%s", connstr); db->conn = PQconnectdb(conninfo); if (PQstatus(db->conn) != CONNECTION_OK) fatal( "Connection to database failed: %s\n", PQerrorMessage(db->conn)); db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db); db->db.execQuery = execQuery; db->db.startCreateScheme = startCreateScheme; db->db.finishCreateScheme = finishCreateScheme; db->db.InsertRow = InsertRow; db->db.Close = Close; db->socket = PQsocket(db->conn); if ( db->socket < 0 ) fatal("Socket error\n"); return (ftsDB*)db; }