From 1ed575fda7f196ea411e9e53dd9c0739f160fb78 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Wed, 24 Jan 2024 14:16:29 +0900 Subject: [PATCH v8 07/10] Export CopyFromStateData It's for custom COPY FROM format handlers implemented as extension. This just moves codes. This doesn't change codes except CopySource enum values. CopySource enum values changes aren't required but I did like I did for CopyDest enum values. I changed COPY_ prefix to COPY_SOURCE_ prefix. For example, COPY_FILE to COPY_SOURCE_FILE. Note that this change isn't enough to implement a custom COPY FROM format handler as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY FROM format handler 2. Export CopyReadBinaryData() to read the next data --- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyfromparse.c | 10 +- src/include/commands/copy.h | 2 - src/include/commands/copyapi.h | 156 ++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 150 ---------------------- 5 files changed, 162 insertions(+), 160 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index d556ebb5d6..b4ac7cbd2c 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1710,7 +1710,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1829,7 +1829,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 49632f75e4..a78a790060 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -181,7 +181,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -249,7 +249,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -258,7 +258,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -341,7 +341,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1099,7 +1099,7 @@ CopyReadLine(CopyFromState cstate) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index df29d42555..cd41d32074 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -20,8 +20,6 @@ #include "parser/parse_node.h" #include "tcop/dest.h" -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); - extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index ef1bb201c2..b7e8f627bf 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -14,11 +14,12 @@ #ifndef COPYAPI_H #define COPYAPI_H +#include "commands/trigger.h" #include "executor/execdesc.h" #include "executor/tuptable.h" +#include "nodes/miscnodes.h" #include "nodes/parsenodes.h" -/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; typedef bool (*CopyFromProcessOption_function) (CopyFromState cstate, DefElem *defel); @@ -162,6 +163,159 @@ typedef struct CopyFormatOptions CopyToRoutine *to_routine; /* callback routines for COPY TO */ } CopyFormatOptions; + +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyFromStateData; + /* * Represents the different dest cases we need to worry about at * the bottom level diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 921c1513f7..f8f6120255 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -18,28 +18,6 @@ #include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - /* * Represents the insert method to be used during COPY FROM. */ @@ -52,134 +30,6 @@ typedef enum CopyInsertMethod * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); -- 2.41.0