2 * Copyright (c) 2006 Teodor Sigaev <teodor@sigaev.ru>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the author nor the names of any co-contributors
14 * may be used to endorse or promote products derived from this software
15 * without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25 * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27 * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 #include <sys/types.h>
37 #include <netinet/in.h>
45 #ifdef HAVE_SYS_POLL_H
48 #error Not defined HAVE_POLL_H or HAVE_SYS_POLL_H
49 #endif /* HAVE_SYS_POLL_H */
50 #endif /* HAVE_POLL */
56 typedef enum SocketState {
64 typedef enum ResStatus {
69 typedef struct ftsPG {
73 PQnoticeReceiver origreceiver;
84 NoticeReceiver(void *arg, const PGresult *res) {
85 ftsPG *db = (ftsPG*) arg;
89 if ( strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "00000") == 0 ) {
90 char *msg = PQresultErrorMessage(res);
92 if ( strstr(msg, "does not exist, skipping")!=NULL ) {
93 /* skip 'NOTICE: table "wow" does not exist, skipping' */
96 if ( strstr(msg, "query contains only stopword(s) or doesn't contain lexeme(s), ignored")!=NULL ) {
102 db->origreceiver(db, res);
109 flushres=PQflush(db->conn);
110 if ( flushres > 0 ) {
111 /* need more write */
112 db->state = SS_WRITE;
113 } else if ( flushres == 0 ) {
114 /* success write, waits for read */
117 fatal( "PQflush failed: %s\n", PQerrorMessage(db->conn));
122 checkStatus(ftsPG* db) {
123 switch( db->state ) {
128 return RS_INPROGRESS;
130 PQconsumeInput(db->conn);
131 if ( PQisBusy(db->conn) != 0 ) {
133 return RS_INPROGRESS;
138 fatal( "Should not be here!\n");
146 waitResult(ftsPG *db) {
152 pfd.events = pfd.revents = 0;
153 if ( db->state == SS_READ ) {
155 } else if ( db->state == SS_WRITE ) {
156 pfd.events = POLLOUT;
160 int ret = poll( &pfd, 1, INFTIM);
162 fatal("poll failed: %s\n", strerror(errno));
164 if ( pfd.revents & (POLLHUP | POLLNVAL | POLLERR) ) {
165 fatal("Poll report about socket error\n");
166 } else if ( pfd.revents & POLLIN ) {
167 db->state = SS_READYREAD;
168 } else if ( pfd.revents & POLLOUT ) {
169 db->state = SS_READYWRITE;
172 } while( checkStatus(db) != RS_OK );
174 while ( (res = PQgetResult(db->conn))!= NULL ) {
175 if (PQresultStatus(res) != PGRES_COMMAND_OK)
176 fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
182 checkEmptyQuery(ftsPG *db, PGresult *res) {
184 db->emptyquery > 0 &&
185 PQresultStatus(res) == PGRES_FATAL_ERROR &&
186 strcmp(PQresultErrorField(res,PG_DIAG_SQLSTATE), "0A000") == 0 && /* FEATURE_NOT_SUPPORTED */
187 strstr(PQresultErrorField(res, PG_DIAG_SOURCE_FILE), "ginscan.c") != NULL
195 execQuery(ftsDB* adb, char ** words, int flags) {
196 ftsPG *db = (ftsPG*)adb;
200 if ( db->prepared == 0 ) {
206 sprintf(buf, "SELECT count(*) FROM ftsbench WHERE %s @@ to_tsquery( $1 ::text );",
207 ( flags & FLG_FUNC ) ? "to_tsvector(body)" : "fts" );
209 res = PQprepare( db->conn, "search_ftsbench", buf, 1, NULL );
211 if (PQresultStatus(res) != PGRES_COMMAND_OK)
212 fatal( "PREPARE SELECT command failed: %s\n", PQerrorMessage(db->conn));
224 sb_add(&db->b, (db->flags & FLG_OR) ? " | '" : " & '", 4);
226 sb_addchar(&db->b, '\'');
230 sb_addchar(&db->b, '\'');
231 sb_addchar(&db->b, *ptr);
234 sb_addchar(&db->b, '\'');
242 res = PQexecPrepared( db->conn, "search_ftsbench",
243 1, (const char**)&(db->b.str),
244 &(db->b.strlen), &i, 0);
246 if (PQresultStatus(res) != PGRES_TUPLES_OK) {
247 /* skip error ' all words are a stop word' for GIN index -
248 result is empty, in any case */
249 if ( checkEmptyQuery(db, res) == 0 )
250 fatal( "Execution of prepared statement failed: %s\n", PQerrorMessage(db->conn));
251 } else if ( PQntuples(res) == 1 ) {
252 db->db.nres += atoi( PQgetvalue(res,0,0) );
254 fatal("Bad PQntuples %d\n", PQntuples(res));
260 pthread_mutex_lock(&(db->db.nqueryMutex));
262 pthread_mutex_unlock(&(db->db.nqueryMutex));
266 startCreateScheme(ftsDB* adb, int flags) {
267 ftsPG *db = (ftsPG*)adb;
273 res = PQexec(db->conn, "DROP TABLE IF EXISTS ftsbench CASCADE;");
274 if (PQresultStatus(res) != PGRES_COMMAND_OK)
275 fatal( "DROP TABLE command failed: %s\n", PQerrorMessage(db->conn));
278 if ( flags & FLG_FUNC )
279 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text );");
281 sprintf(buf,"CREATE TABLE ftsbench( id int not null, body text, fts tsvector ); "
282 "CREATE TRIGGER tsvectorupdate BEFORE UPDATE OR INSERT ON ftsbench "
283 "FOR EACH ROW EXECUTE PROCEDURE tsearch2(fts, body);" );
285 res = PQexec(db->conn, buf);
286 if (PQresultStatus(res) != PGRES_COMMAND_OK)
287 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
290 res = PQexec(db->conn, "BEGIN;");
291 if (PQresultStatus(res) != PGRES_COMMAND_OK)
292 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
299 finishCreateScheme(ftsDB* adb) {
300 ftsPG *db = (ftsPG*)adb;
303 if ( db->db.nquery > 0 ) {
306 if ( PQsetnonblocking(db->conn, 0) != 0 )
307 fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
310 res = PQexec(db->conn, "COMMIT;");
311 if (PQresultStatus(res) != PGRES_COMMAND_OK)
312 fatal( "CREATE TABLE command failed: %s\n", PQerrorMessage(db->conn));
315 if ( (db->flags & (FLG_GIST | FLG_GIN)) != 0 ) {
318 if ( db->flags & FLG_FUNC )
319 sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( to_tsvector(body) );",
320 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
322 sprintf(buf,"CREATE INDEX ftsindex ON ftsbench USING %s ( fts );",
323 (db->flags & FLG_GIST) ? "GiST" : "GIN" );
325 report("(create index, ");
327 res = PQexec(db->conn, buf);
328 if (PQresultStatus(res) != PGRES_COMMAND_OK)
329 fatal( "CREATE INDEX command failed: %s\n", PQerrorMessage(db->conn));
336 res = PQexec(db->conn, "VACUUM ANALYZE ftsbench;");
337 if (PQresultStatus(res) != PGRES_COMMAND_OK)
338 fatal( "VACUUM ANALYZE command failed: %s\n", PQerrorMessage(db->conn));
348 InsertRow(ftsDB* adb, int id, char *txt) {
349 ftsPG *db = (ftsPG*)adb;
351 const char *paramValues[2];
352 uint32_t binaryIntVal;
353 int paramLengths[] = {sizeof(binaryIntVal), 0};
354 int paramFormats[] = {1, 0};
356 if ( db->db.nquery == 0 ) {
359 res = PQprepare( db->conn, "insert_ftsbench",
360 "INSERT INTO ftsbench (id, body) VALUES ( $1 ::int4, $2 ::text);",
363 if (PQresultStatus(res) != PGRES_COMMAND_OK)
364 fatal( "PREPARE INSERT command failed: %s\n", PQerrorMessage(db->conn));
367 if ( PQsetnonblocking(db->conn, 1) != 0 )
368 fatal( "PQsetnonblocking command failed: %s\n", PQerrorMessage(db->conn));
373 binaryIntVal = htonl((uint32_t) id);
374 paramValues[0] = (char*)&binaryIntVal;
375 paramValues[1] = txt;
377 if ( PQsendQueryPrepared( db->conn, "insert_ftsbench",
379 paramLengths, paramFormats, 0) == 0 )
380 fatal( "PQsendQueryPrepared failed: %s\n", PQerrorMessage(db->conn));
389 ftsPG *db = (ftsPG*)adb;
395 PGInit(char * connstr) {
396 ftsPG *db = (ftsPG*)malloc(sizeof(ftsPG));
399 memset(db,0,sizeof(ftsPG));
401 sprintf(conninfo, "dbname=%s", connstr);
402 db->conn = PQconnectdb(conninfo);
404 if (PQstatus(db->conn) != CONNECTION_OK)
405 fatal( "Connection to database failed: %s\n", PQerrorMessage(db->conn));
407 db->origreceiver = PQsetNoticeReceiver(db->conn, NoticeReceiver, (void *)db);
409 db->db.execQuery = execQuery;
410 db->db.startCreateScheme = startCreateScheme;
411 db->db.finishCreateScheme = finishCreateScheme;
412 db->db.InsertRow = InsertRow;
413 db->db.Close = Close;
414 db->socket = PQsocket(db->conn);
415 if ( db->socket < 0 )
416 fatal("Socket error\n");