ETL在数据仓库建设过程中是主要过程,不止数据仓库,企业信息化建设中,通过数据文件交互非常频繁,指定SQL指定分隔符从数据库导出数据文件非常普遍,当然Postgresql自带的\copy命令也有此类功能,感觉用起来不太习惯。特此开发了PostgresSQL数据导出工具,使用C开发,分享一下。编译环境以及运行环境依赖pg链接库。
运行结果如下:
[root@hadoop pg]# ./pgsqluldr user=test pass=test host=192.168.81.130 port=5432 db=testdb query='select * from student' field='^|'
Welcome to use this Unload Data Tools From PostgreSQL DB
Unload data from PostgreSQL DB Release 1.0 .2022.09.03 Created by qichaopu
connected PostgreSQL success. user:test
0 rows exported at 2022-09-07 23:53:41
384 rows exported at 2022-09-07 23:53:41
代码如下:
/*
NAME
pgsqluldr.c - unload text data from PostgreSQL database
MODIFIED (MM/DD/YY)
qichaopu 2022.09.06 -
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <time.h>
#include <libpq-fe.h>
#define MAX_ITEM_BUFFER_SIZE 33
#if defined(_WIN32)
#define STRNCASECMP memicmp
#else
#define STRNCASECMP strncasecmp
#endif
#define MIN(a,b) ((a) > (b) ? (b) : (a))
int return_code = 0;
void printRow(char *fname,PGresult *res,int cols,char *field,int flen,int rlen,char *record);
int convertOption(const char *src, char* dst, int mlen);
FILE *openFile(const char *fname, char tempbuf[], int batch);
char getHexIndex(char c);
void printRowInfo(int row);
int main(int argc, char *argv[])
{
char user[32]="";
char pass[32]="";
char host[32]="";
char port[32]="";
char db[32]="";
char query[8192]="";
char fname[255]="PostgreSQLuldrdata.txt";
char field[32]=",";
char record[32]="\n";
int n,i,argcount=0;
int v_help=0;
int rlen,flen;
rlen=flen=1;
for(i=0;i<argc;i++)
{
if (STRNCASECMP("user=",argv[i],5)==0)
{
memset(user,0,32);
memcpy(user,argv[i]+5,MIN(strlen(argv[i]) - 5,31));
}
else if (STRNCASECMP("pass=",argv[i],5)==0)
{
memset(pass,0,32);
memcpy(pass,argv[i]+5,MIN(strlen(argv[i]) - 5,31));
}
else if (STRNCASECMP("host=",argv[i],5)==0)
{
memset(host,0,32);
memcpy(host,argv[i]+5,MIN(strlen(argv[i]) - 5,31));
}
else if (STRNCASECMP("port=",argv[i],5)==0)
{
memset(port,0,32);
memcpy(port,argv[i]+5,MIN(strlen(argv[i]) - 5,31));
}
else if (STRNCASECMP("db=",argv[i],3)==0)
{
memset(db,0,32);
memcpy(db,argv[i]+3,MIN(strlen(argv[i]) - 3,31));
}
else if (STRNCASECMP("query=",argv[i],6)==0)
{
memset(query,0,8192);
memcpy(query,argv[i]+6,MIN(strlen(argv[i]) - 6,8191));
}
else if (STRNCASECMP("field=",argv[i],6)==0)
{
memset(field,0,32);
flen=convertOption(argv[i]+6,field,MIN(strlen(argv[i]) - 6,31));
}
else if (STRNCASECMP("record=",argv[i],7)==0)
{
memset(record,0,32);
rlen=convertOption(argv[i]+7,record,MIN(strlen(argv[i]) - 7,31));
}
else if (STRNCASECMP("file=",argv[i],5)==0)
{
memset(fname,0,132);
memcpy(fname,argv[i]+5,MIN(strlen(argv[i]) - 5,254));
}
else if (STRNCASECMP("-help",argv[i],4)==0)
{
v_help=1;
}
}
if (strlen(user)==0 || strlen(query)==0|| v_help)
{
if (v_help||1==1)
{
printf("pgsqluldr: Unload data from PostgreSQL DB Release 1.0 2022.09.03 Created by qichaopu\n");
printf("\n");
printf("Usage: %s user=... host=... port=... db=... file=...\n",argv[0]);
printf("Notes:\n");
printf(" user = username\n");
printf(" pass = password\n");
printf(" host = PostgreSQL host\n");
printf(" port = port\n");
printf(" db = database name\n");
printf(" query = select statement\n");
printf(" field = seperator string between fields\n");
printf(" record= seperator string between records\n");
printf(" file = output file name(default: PostgreSQLuldrdata.txt)\n");
printf("\n");
printf(" for field and record, you can use '0x' to specify hex character code,\n");
printf(" \\r=0x%02x \\n=0x%02x |=0x%0x ,=0x%02x \\t=0x%02x\n",'\r','\n','|',',','\t');
printf(" for more hex character code,you can use unix command:man ascii\n");
exit(0);
}
}
printf("Welcome to use this Unload Data Tools From PostgreSQL DB \n");
printf("Unload data from PostgreSQL DB Release 1.0 .2022.09.03 Created by qichaopu \n");
PGresult *res;
PGconn *dbconn=PQsetdbLogin(host,port,NULL,NULL,db,user,pass);
if(PQstatus(dbconn)==CONNECTION_BAD)
{
printf("connected PostgreSQL failed. user:%s\n",user);
}
else
{
printf("connected PostgreSQL success. user:%s\n",user);
}
res=PQexec(dbconn,query);
if(PGRES_TUPLES_OK!=PQresultStatus(res))
{
printf("#ERROR:sql exec failed:%s\n",PQerrorMessage(dbconn));
return 0;
}
int cols=PQnfields(res);
printRow(fname,res,cols,field,flen,rlen,record);
PQclear(res);
PQfinish(dbconn);
}
void printRow(char *fname,PGresult *res,int cols,char *field,int flen,int rlen,char *record)
{
char tempbuf[512];
FILE *fp;
int bcount=1;
memset(tempbuf,0,512);
int rc=0;
if((fp = openFile(fname,tempbuf,bcount)) == NULL)
{
fprintf(stdout,"ERROR -- Cannot write to file : %s\n", tempbuf);
return_code = 6;
}
int c=0;
printRowInfo(rc);
int rows=PQntuples(res);
int row=0;
for(row=0;row<rows;row++)
{
c=0;
rc++;
for(c=0;c<cols;c++)
{
fprintf(fp, "%s",PQgetvalue(res,row,c));
if (c < cols - 1)
fwrite(field,flen,1,fp);
}
fwrite(record,rlen,1,fp);
if(rc%500000==0)
printRowInfo(rc);
}
fclose(fp);
if(rc%500000!=0)
printRowInfo(rc);
}
void printRowInfo(int row)
{
time_t now = time(0);
struct tm *ptm = localtime(&now);
fprintf(stdout,"%8u rows exported at %04d-%02d-%02d %02d:%02d:%02d\n",
row,
ptm->tm_year + 1900,
ptm->tm_mon + 1,
ptm->tm_mday,
ptm->tm_hour,
ptm->tm_min,
ptm->tm_sec);
fflush(stdout);
}
int convertOption(const char *src, char* dst, int mlen)
{
int i,len,pos;
char c,c1,c2;
i=pos=0;
len = strlen(src);
while(i<MIN(mlen,len))
{
if ( *(src+i) == '0')
{
if (i < len - 1)
{
c = *(src+i + 1);
switch(c)
{
case 'x':
case 'X':
if (i < len - 3)
{
c1 = getHexIndex(*(src+i + 2));
c2 = getHexIndex(*(src+i + 3));
*(dst + pos) = (char)((c1 << 4) + c2);
i=i+2;
}
else if (i < len - 2)
{
c1 = *(src+i + 2);
*(dst + pos) = c1;
i=i+1;
}
break;
default:
*(dst + pos) = c;
break;
}
i = i + 2;
}
else
{
i ++;
}
}
else
{
*(dst + pos) = *(src+i);
i ++;
}
pos ++;
}
*(dst+pos) = '\0';
return pos;
}
FILE *openFile(const char *fname, char tempbuf[], int batch)
{
FILE *fp=NULL;
int i, j, len;
time_t now = time(0);
struct tm *ptm = localtime(&now);
len = strlen(fname);
j = 0;
for(i=0;i<len;i++)
{
tempbuf[j++] = *(fname+i);
}
tempbuf[j]=0;
if (tempbuf[0] == '+')
fp = fopen(tempbuf+1, "ab+");
else
fp = fopen(tempbuf, "wb+");
return fp;
}
char getHexIndex(char c)
{
if ( c >='0' && c <='9') return c - '0';
if ( c >='a' && c <='f') return 10 + c - 'a';
if ( c >='A' && c <='F') return 10 + c - 'A';
return 0;
}
Makefile
PG_PUBLIC_H=/data/pg/app/pg11/include
CC = gcc
CPP = g++
OBJS = pgsqluldr.o
PROG = pgsqluldr
all:$(PROG)
$(PROG):$(OBJS)
@echo "[link] ... "
@echo "---------------------"
$(CC) -g -o $(PROG) $(OBJS)
@echo
clean:
rm -f $(PROG) $(OBJS)
.SUFFIXES: .cpp .c
.c.o:
@echo "[$*.o]"
@echo "---------------------"
$(CC) -g -o $*.o -I$(PG_PUBLIC_H) -c $*.c
@echo