MySQL 동시 로드 데이터 파일
작은 샘플 데이터를 가지고 있어요. 파일 9개, 파일마다 100만실제 데이터는 모두 실시간으로 캡처되고 나중에 MySQL 데이터베이스에 삽입됩니다.처음 시도했을 때 총 데이터 세트(약 5억 줄, 24GB 보통 텍스트)를 삽입하는 데 16시간이 조금 넘게 걸렸습니다.너무 느리다고 생각해서 속도를 높이려고 노력했어요.
그 때문에, 지금까지 작업해 온 프로그램을 완전하게 기능하는 멀티 스레드 실장을 실시했습니다.각 파일에 대해 테이블을 만들고 적절한 SQL을 사용하여 임시 파일을 만든 다음LOAD DATA LOCAL INFILE
각 파일에 저장해야 합니다.
관련 코드는 다음과 같습니다.
void insert_data(uint32_t tid)
{
string scribe_file;
file_list_mutex.lock();
while(scribe_files.size() > 0)
{
scribe_file = *(scribe_files.begin());
scribe_files.erase(scribe_files.begin());
file_list_mutex.unlock();
MYSQL *thread_con = mysql_init(nullptr);
if(thread_con == nullptr)
{
log_mutex.lock();
cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
log_mutex.unlock();
return;
}
if(nullptr == (mysql_real_connect(thread_con, server.c_str(), user.c_str(),
password.c_str(), nullptr,
0, nullptr, 0)))
{
log_mutex.lock();
cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
log_mutex.unlock();
mysql_close(thread_con);
return;
}
if(mysql_select_db(thread_con, database.c_str()))
{
log_mutex.lock();
cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
log_mutex.unlock();
mysql_close(thread_con);
return;
}
string table_name = get_table_name(scribe_file);
if(table_name.empty())
{
log_mutex.lock();
cerr << "Thead " << tid << ": Unusuable input file: " << scribe_file << endl;
log_mutex.unlock();
return;
}
ifstream scribe_stream(scribe_file);
if(!scribe_stream.good())
{
log_mutex.lock();
cerr << "Thead " << tid << ": Error opening " << scribe_file << endl;
log_mutex.unlock();
return;
}
string output_filename = "/tmp/";
{
vector<string> file_path = split_string(scribe_file, '/');
output_filename.append(file_path.rbegin()[0]);
}
output_filename.append(".sql");
ofstream output;
output.open(output_filename, ios::out | ios::trunc);
if(!output.good())
{
log_mutex.lock();
cerr << "Thead " << tid << ": Error opening " << output_filename << endl;
log_mutex.unlock();
scribe_stream.close();
return;
}
string create_query = "CREATE TABLE IF NOT EXISTS ";
string table_format = " (IDpk INT NOT NULL auto_increment,"
" s CHAR(8) NOT NULL,"
" si INT unsigned NOT NULL,"
" pq TINYINT unsigned NOT NULL,"
" pr BIGINT unsigned DEFAULT NULL,"
" sz INT DEFAULT NULL,"
" io TINYINT unsigned DEFAULT NULL,"
" ipslt TINYINT unsigned DEFAULT NULL,"
" is TINYINT unsigned DEFAULT NULL,"
" ilp BIGINT unsigned DEFAULT NULL,"
" ips INT unsigned DEFAULT NULL,"
" at INT unsigned DEFAULT NULL,"
" vn TINYINT unsigned NOT NULL,"
" ms BIGINT unsigned NOT NULL,"
" us BIGINT unsigned NOT NULL,"
" PRIMARY KEY(IDpk),"
" KEY(us),"
" KEY(s),"
" KEY(pq))";
create_query.append(table_name);
create_query.append(table_format);
if(mysql_query(thread_con, create_query.c_str()))
{
log_mutex.lock();
cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
log_mutex.unlock();
scribe_stream.close();
output.close();
mysql_close(thread_con);
return;
}
string scribe_stream_line;
char values[MAX_TOKENS * MAX_TOKEN_LENGTH];
usec_t start_time = get_current_us_time();
uint64_t num_lines = 0;
log_mutex.lock();
cout << "Thread " << tid << ": Starting " << scribe_file << endl;
log_mutex.unlock();
char scribe_tokens[MAX_TOKENS][MAX_TOKEN_LENGTH];
while(getline(scribe_stream, scribe_stream_line))
{
split_scribe_line(scribe_stream_line, scribe_tokens);
num_lines++;
if(scribe_tokens[6][0] != '\0')
{
try
{
uint32_t item_type = stoi(scribe_tokens[2]);
fill(values, values + (MAX_TOKENS * MAX_TOKEN_LENGTH), '\0');
switch(item_type)
{
case 0:
case 1:
{
sprintf(values,
"%s,%s,%s,"
"%s,%s,"
"NULL,NULL,NULL,NULL,NULL,NULL,"
"%s,%s,%s",
scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
scribe_tokens[6], scribe_tokens[7]);
break;
}
case 2:
{
sprintf(values,
"%s,%s,%s,"
"%s,%s,%s,%s,"
"NULL,NULL,NULL,NULL,"
"%s,%s,%s",
scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
scribe_tokens[6], scribe_tokens[7], scribe_tokens[8],
scribe_tokens[9]);
break;
}
case 3:
{
sprintf(values,
"%s,%s,%s,"
"NULL,NULL,NULL,NULL,"
"NULL,NULL,NULL,NULL,"
"%s,%s,%s",
scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
scribe_tokens[3], scribe_tokens[4], scribe_tokens[5]);
break;
}
case 4:
{
sprintf(values,
"%s,%s,%s,"
"%s,%s,NULL,NULL,"
"%s,%s,%s,NULL,"
"%s,%s,%s",
scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
scribe_tokens[6], scribe_tokens[7], scribe_tokens[8],
scribe_tokens[9], scribe_tokens[10]);
break;
}
case 5:
{
sprintf(values,
"%s,%s,%s,"
"NULL,NULL,NULL,NULL,NULL,NULL,NULL,"
"%s,%s,%s,%s",
scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
scribe_tokens[6]);
break;
}
default:
log_mutex.lock();
cerr << "Thread " << tid << ": Unknown pq type " << item_type << "\n"
<< " " << scribe_stream_line << endl;
log_mutex.lock();
continue;
break;
}
output << values << endl;
}
catch(exception &ex)
{
log_mutex.lock();
cerr << "Thread " << tid << ": Error parsing scribe line\n '"
<< scribe_stream_line << "'\n" << " " << ex.what() << endl;
log_mutex.unlock();
scribe_stream.close();
output.close();
throw;
}
}
}
log_mutex.lock();
cout << "Thread " << tid << ": preparing " << num_lines << " lines took "
<< us_to_hhmmss(get_current_us_time() - start_time) << endl;
log_mutex.unlock();
string insert_query = "LOAD DATA LOCAL INFILE ";
insert_query.append("'" + output_filename + "' INTO TABLE "
+ table_name + " FIELDS TERMINATED BY ','");
log_mutex.lock();
cout << "Thread " << tid << ": Loading results into database." << endl;
log_mutex.unlock();
start_time = get_current_us_time();
if(mysql_query(thread_con, insert_query.c_str()))
{
log_mutex.lock();
cerr << "Thead " << tid << ": " <<mysql_error(thread_con) << endl;
log_mutex.unlock();
scribe_stream.close();
output.close();
mysql_close(thread_con);
return;
}
mysql_close(thread_con);
log_mutex.lock();
cout << "Thread " << tid << ": " << "Insertion took "
<< us_to_hhmmss(get_current_us_time() - start_time) << endl;
log_mutex.unlock();
scribe_stream.close();
output.close();
remove(output_filename.c_str());
}
file_list_mutex.unlock();
}
저의 이해 부족이 드러나는 것은 성능 불일치입니다.예를 들어 스레드 1개와 스레드 2개로 구성된 파일 2개로 실행했을 때 성능은 거의 비슷했습니다.
Scribe DB CLI V1.07
Got 8 CPU cores.
Parsing 2 input files.
Thread 0: Starting scribe_20170511/short_scribe_1.data
Thread 1: Starting scribe_20170511/short_scribe_0.data
Thread 0: preparing 1000000 lines took 00:00:02
Thread 0: Loading results into database.
Thread 1: preparing 1000000 lines took 00:00:02
Thread 1: Loading results into database.
Thread 1: Insertion took 00:00:35
Thread 0: Insertion took 00:00:37
Total runtime: 00:00:40
Scribe DB CLI V1.07
Got 1 CPU cores.
Parsing 2 input files.
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:19
Thread 0: Starting scribe_20170511/short_scribe_1.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:20
Total runtime: 00:00:43
이 테스트를 실행할 때마다 데이터베이스를 삭제했습니다.
마찬가지로 9개의 입력 파일 모두에서 실행했을 때 다양한 결과가 나왔지만 적절한 속도 향상을 나타내는 것은 없었습니다.
Scribe DB CLI V1.07
Got 1 CPU cores.
Parsing 9 input files.
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:19
Thread 0: Starting scribe_20170511/short_scribe_1.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:20
Thread 0: Starting scribe_20170511/short_scribe_2.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:20
Thread 0: Starting scribe_20170511/short_scribe_3.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_4.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_5.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_6.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_7.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_8.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Total runtime: 00:03:27
Scribe DB CLI V1.07
Got 8 CPU cores.
Parsing 9 input files.
Thread 3: Starting scribe_20170511/short_scribe_3.data
Thread 1: Starting scribe_20170511/short_scribe_4.data
Thread 2: Starting scribe_20170511/short_scribe_1.data
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 4: Starting scribe_20170511/short_scribe_2.data
Thread 5: Starting scribe_20170511/short_scribe_5.data
Thread 3: preparing 1000000 lines took 00:00:03
Thread 3: Loading results into database.
Thread 0: preparing 1000000 lines took 00:00:03
Thread 0: Loading results into database.
Thread 2: preparing 1000000 lines took 00:00:03
Thread 2: Loading results into database.
Thread 4: preparing 1000000 lines took 00:00:03
Thread 4: Loading results into database.
Thread 1: preparing 1000000 lines took 00:00:03
Thread 1: Loading results into database.
Thread 5: preparing 1000000 lines took 00:00:03
Thread 5: Loading results into database.
Thread 0: Insertion took 00:02:20
Thread 0: Starting scribe_20170511/short_scribe_6.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 1: Insertion took 00:02:44
Thread 1: Starting scribe_20170511/short_scribe_7.data
Thread 1: preparing 1000000 lines took 00:00:01
Thread 1: Loading results into database.
Thread 5: Insertion took 00:03:01
Thread 5: Starting scribe_20170511/short_scribe_8.data
Thread 5: preparing 1000000 lines took 00:00:01
Thread 5: Loading results into database.
Thread 3: Insertion took 00:03:07
Thread 4: Insertion took 00:03:10
Thread 2: Insertion took 00:03:14
Thread 0: Insertion took 00:01:37
Thread 1: Insertion took 00:01:20
Thread 5: Insertion took 00:01:06
Total runtime: 00:04:14
Scribe DB CLI V1.08
Got 8 CPU cores.
Parsing 9 input files.
Thread 5: Starting scribe_20170511/short_scribe_4.data
Thread 1: Starting scribe_20170511/short_scribe_1.data
Thread 3: Starting scribe_20170511/short_scribe_2.data
Thread 4: Starting scribe_20170511/short_scribe_5.data
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 2: Starting scribe_20170511/short_scribe_3.data
Thread 5: preparing 1000000 lines took 00:00:03
Thread 5: Loading results into database.
Thread 4: preparing 1000000 lines took 00:00:03
Thread 4: Loading results into database.
Thread 1: preparing 1000000 lines took 00:00:03
Thread 1: Loading results into database.
Thread 3: preparing 1000000 lines took 00:00:03
Thread 3: Loading results into database.
Thread 0: preparing 1000000 lines took 00:00:03
Thread 0: Loading results into database.
Thread 2: preparing 1000000 lines took 00:00:03
Thread 2: Loading results into database.
Thread 0: Insertion took 00:01:43
Thread 0: Starting scribe_20170511/short_scribe_6.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 5: Insertion took 00:02:00
Thread 5: Starting scribe_20170511/short_scribe_7.data
Thread 5: preparing 1000000 lines took 00:00:01
Thread 5: Loading results into database.
Thread 2: Insertion took 00:02:02
Thread 2: Starting scribe_20170511/short_scribe_8.data
Thread 4: Insertion took 00:02:04
Thread 3: Insertion took 00:02:04
Thread 2: preparing 1000000 lines took 00:00:01
Thread 2: Loading results into database.
Thread 1: Insertion took 00:02:06
Thread 0: Insertion took 00:00:59
Thread 5: Insertion took 00:00:49
Thread 2: Insertion took 00:00:48
Total runtime: 00:02:57
내가 여기서 뭘 보고 있지?쿼리 중에 8코어의 CPU 사용률이 10~50%이고 SSD의 디스크 사용률은 15~40mb/s입니다.이면에는 모순을 설명하는 복잡한 캐싱 및 기타 화려한 작업이 있을 것입니다. 하지만 저는 실제로 다른 테이블이 더 나은 성능을 제공할 것으로 예상했습니다.
잘 부탁드립니다.=)
편집:
- 호스트 OS: Linux (Ubuntu 16.04)
- 호스트 RAM: 16 GB
- 호스트 SQL 엔진:InnoDB
mysql> SHOW VARIABLES LIKE '%buffer%';
+-------------------------------------+----------------+
| Variable_name | Value |
+-------------------------------------+----------------+
| bulk_insert_buffer_size | 8388608 |
| innodb_buffer_pool_chunk_size | 134217728 |
| innodb_buffer_pool_dump_at_shutdown | ON |
| innodb_buffer_pool_dump_now | OFF |
| innodb_buffer_pool_dump_pct | 25 |
| innodb_buffer_pool_filename | ib_buffer_pool |
| innodb_buffer_pool_instances | 1 |
| innodb_buffer_pool_load_abort | OFF |
| innodb_buffer_pool_load_at_startup | ON |
| innodb_buffer_pool_load_now | OFF |
| innodb_buffer_pool_size | 134217728 |
| innodb_change_buffer_max_size | 25 |
| innodb_change_buffering | all |
| innodb_log_buffer_size | 16777216 |
| innodb_sort_buffer_size | 1048576 |
| join_buffer_size | 262144 |
| key_buffer_size | 16777216 |
| myisam_sort_buffer_size | 8388608 |
| net_buffer_length | 16384 |
| preload_buffer_size | 32768 |
| read_buffer_size | 131072 |
| read_rnd_buffer_size | 262144 |
| sort_buffer_size | 262144 |
| sql_buffer_result | OFF |
+-------------------------------------+----------------+
24 rows in set (0.00 sec)
테이블을 합칠 계획은 없습니다.파일을 사용할 수 있게 되었을 때 삽입하지 않는 이유는(단, 1개의 파일이 다음 파일을 기록할 때보다 삽입하는 데 시간이 더 오래 걸린다는 점 제외) 낮에 파일이 생성된 서버의 파일에 대한 작업이 있기 때문입니다.그리고 마지막으로 적어도 처음 두 개의 인덱스가 필요합니다. 왜냐하면 이 두 개의 열을 기반으로 한 합리적인 쿼리가 필요하기 때문입니다.
언급URL : https://stackoverflow.com/questions/43991747/mysql-concurrent-load-data-infile
'programing' 카테고리의 다른 글
한 번에 구분 기호를 사용하여 문자열 목록 요소 결합 (0) | 2022.10.01 |
---|---|
jQuery를 사용하여 CSS3 이행의 종료를 기다리는 방법 (0) | 2022.10.01 |
MariaDB 데이터베이스 백업 만들기 (0) | 2022.10.01 |
정규식 일치 항목 일부 추출 (0) | 2022.10.01 |
MySQL 레플리케이션 텅스텐과갈레라 (0) | 2022.10.01 |