programing

MySQL 동시 로드 데이터 파일

projobs 2022. 10. 1. 14:30
반응형

MySQL 동시 로드 데이터 파일

작은 샘플 데이터를 가지고 있어요. 파일 9개, 파일마다 100만실제 데이터는 모두 실시간으로 캡처되고 나중에 MySQL 데이터베이스에 삽입됩니다.처음 시도했을 때 총 데이터 세트(약 5억 줄, 24GB 보통 텍스트)를 삽입하는 데 16시간이 조금 넘게 걸렸습니다.너무 느리다고 생각해서 속도를 높이려고 노력했어요.

그 때문에, 지금까지 작업해 온 프로그램을 완전하게 기능하는 멀티 스레드 실장을 실시했습니다.각 파일에 대해 테이블을 만들고 적절한 SQL을 사용하여 임시 파일을 만든 다음LOAD DATA LOCAL INFILE각 파일에 저장해야 합니다.

관련 코드는 다음과 같습니다.

void insert_data(uint32_t tid)
{
  string scribe_file;

  file_list_mutex.lock();
  while(scribe_files.size() > 0)
  {
    scribe_file = *(scribe_files.begin());
    scribe_files.erase(scribe_files.begin());
    file_list_mutex.unlock();

    MYSQL *thread_con = mysql_init(nullptr);

    if(thread_con == nullptr)
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
      log_mutex.unlock();
      return;
    }

    if(nullptr == (mysql_real_connect(thread_con, server.c_str(), user.c_str(),
                                      password.c_str(), nullptr,
                                      0, nullptr, 0)))
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
      log_mutex.unlock();
      mysql_close(thread_con);
      return;
    }

    if(mysql_select_db(thread_con, database.c_str()))
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
      log_mutex.unlock();
      mysql_close(thread_con);
      return;
    }

    string table_name = get_table_name(scribe_file);

    if(table_name.empty())
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": Unusuable input file: " << scribe_file << endl;
      log_mutex.unlock();
      return;
    }

    ifstream scribe_stream(scribe_file);

    if(!scribe_stream.good())
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": Error opening " << scribe_file << endl;
      log_mutex.unlock();
      return;
    }

    string output_filename = "/tmp/";
    {
      vector<string> file_path = split_string(scribe_file, '/');
      output_filename.append(file_path.rbegin()[0]);
    }
    output_filename.append(".sql");

    ofstream output;
    output.open(output_filename, ios::out | ios::trunc);

    if(!output.good())
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": Error opening " << output_filename << endl;
      log_mutex.unlock();
      scribe_stream.close();
      return;
    }

    string create_query = "CREATE TABLE IF NOT EXISTS ";

    string table_format = " (IDpk INT NOT NULL auto_increment,"
                          " s CHAR(8) NOT NULL,"
                          " si INT unsigned NOT NULL,"
                          " pq TINYINT unsigned NOT NULL,"
                          " pr BIGINT unsigned DEFAULT NULL,"
                          " sz INT DEFAULT NULL,"
                          " io TINYINT unsigned DEFAULT NULL,"
                          " ipslt TINYINT unsigned DEFAULT NULL,"
                          " is TINYINT unsigned DEFAULT NULL,"
                          " ilp BIGINT unsigned DEFAULT NULL,"
                          " ips INT unsigned DEFAULT NULL,"
                          " at INT unsigned DEFAULT NULL,"
                          " vn TINYINT unsigned NOT NULL,"
                          " ms BIGINT unsigned NOT NULL,"
                          " us BIGINT unsigned NOT NULL,"
                          " PRIMARY KEY(IDpk),"
                          " KEY(us),"
                          " KEY(s),"
                          " KEY(pq))";

    create_query.append(table_name);
    create_query.append(table_format);

    if(mysql_query(thread_con, create_query.c_str()))
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": " << mysql_error(thread_con) << endl;
      log_mutex.unlock();

      scribe_stream.close();
      output.close();
      mysql_close(thread_con);
      return;
    }

    string scribe_stream_line;
    char values[MAX_TOKENS * MAX_TOKEN_LENGTH];
    usec_t start_time = get_current_us_time();
    uint64_t num_lines = 0;

    log_mutex.lock();
    cout << "Thread " << tid << ": Starting " << scribe_file << endl;
    log_mutex.unlock();

    char scribe_tokens[MAX_TOKENS][MAX_TOKEN_LENGTH];

    while(getline(scribe_stream, scribe_stream_line))
    {
      split_scribe_line(scribe_stream_line, scribe_tokens);
      num_lines++;

      if(scribe_tokens[6][0] != '\0')
      {
        try
        {
          uint32_t item_type = stoi(scribe_tokens[2]);
          fill(values, values + (MAX_TOKENS * MAX_TOKEN_LENGTH), '\0');

          switch(item_type)
          {
            case 0:
            case 1:
            {
              sprintf(values,
                        "%s,%s,%s,"
                        "%s,%s,"
                        "NULL,NULL,NULL,NULL,NULL,NULL,"
                        "%s,%s,%s",
                        scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
                        scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
                        scribe_tokens[6], scribe_tokens[7]);
              break;
            }
            case 2:
            {
              sprintf(values,
                        "%s,%s,%s,"
                        "%s,%s,%s,%s,"
                        "NULL,NULL,NULL,NULL,"
                        "%s,%s,%s",
                        scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
                        scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
                        scribe_tokens[6], scribe_tokens[7], scribe_tokens[8],
                        scribe_tokens[9]);
              break;
            }
            case 3:
            {
              sprintf(values,
                        "%s,%s,%s,"
                        "NULL,NULL,NULL,NULL,"
                        "NULL,NULL,NULL,NULL,"
                        "%s,%s,%s",
                        scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
                        scribe_tokens[3], scribe_tokens[4], scribe_tokens[5]);
              break;
            }
            case 4:
            {
              sprintf(values,
                        "%s,%s,%s,"
                        "%s,%s,NULL,NULL,"
                        "%s,%s,%s,NULL,"
                        "%s,%s,%s",
                        scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
                        scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
                        scribe_tokens[6], scribe_tokens[7], scribe_tokens[8],
                        scribe_tokens[9], scribe_tokens[10]);
              break;
            }
            case 5:
            {
              sprintf(values,
                        "%s,%s,%s,"
                        "NULL,NULL,NULL,NULL,NULL,NULL,NULL,"
                        "%s,%s,%s,%s",
                        scribe_tokens[0], scribe_tokens[1], scribe_tokens[2],
                        scribe_tokens[3], scribe_tokens[4], scribe_tokens[5],
                        scribe_tokens[6]);
              break;
            }

            default:
              log_mutex.lock();
              cerr << "Thread " << tid << ": Unknown pq type " << item_type << "\n"
                   << "    " << scribe_stream_line << endl;
              log_mutex.lock();
              continue;
              break;
          }

          output << values << endl;
        }
        catch(exception &ex)
        {
          log_mutex.lock();
          cerr << "Thread " << tid << ": Error parsing scribe line\n  '"
               << scribe_stream_line << "'\n" << "  " << ex.what() << endl;
          log_mutex.unlock();

          scribe_stream.close();
          output.close();

          throw;
        }
      }
    }

    log_mutex.lock();
    cout << "Thread " << tid << ": preparing " << num_lines << " lines took "
         << us_to_hhmmss(get_current_us_time() - start_time) << endl;
    log_mutex.unlock();

    string insert_query = "LOAD DATA LOCAL INFILE ";
    insert_query.append("'" + output_filename + "' INTO TABLE "
                        + table_name + " FIELDS TERMINATED BY ','");

    log_mutex.lock();
    cout << "Thread " << tid << ": Loading results into database." << endl;
    log_mutex.unlock();

    start_time = get_current_us_time();

    if(mysql_query(thread_con, insert_query.c_str()))
    {
      log_mutex.lock();
      cerr << "Thead " << tid << ": " <<mysql_error(thread_con) << endl;
      log_mutex.unlock();

      scribe_stream.close();
      output.close();
      mysql_close(thread_con);
      return;
    }

    mysql_close(thread_con);

    log_mutex.lock();
    cout << "Thread " << tid << ": " << "Insertion took "
         << us_to_hhmmss(get_current_us_time() - start_time) << endl;
    log_mutex.unlock();

    scribe_stream.close();
    output.close();
    remove(output_filename.c_str());
  }
  file_list_mutex.unlock();
}

저의 이해 부족이 드러나는 것은 성능 불일치입니다.예를 들어 스레드 1개와 스레드 2개로 구성된 파일 2개로 실행했을 때 성능은 거의 비슷했습니다.

Scribe DB CLI V1.07
Got 8 CPU cores.
Parsing 2 input files.
Thread 0: Starting scribe_20170511/short_scribe_1.data
Thread 1: Starting scribe_20170511/short_scribe_0.data
Thread 0: preparing 1000000 lines took 00:00:02
Thread 0: Loading results into database.
Thread 1: preparing 1000000 lines took 00:00:02
Thread 1: Loading results into database.
Thread 1: Insertion took 00:00:35
Thread 0: Insertion took 00:00:37
Total runtime: 00:00:40

Scribe DB CLI V1.07
Got 1 CPU cores.
Parsing 2 input files.
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:19
Thread 0: Starting scribe_20170511/short_scribe_1.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:20
Total runtime: 00:00:43

이 테스트를 실행할 때마다 데이터베이스를 삭제했습니다.

마찬가지로 9개의 입력 파일 모두에서 실행했을 때 다양한 결과가 나왔지만 적절한 속도 향상을 나타내는 것은 없었습니다.

Scribe DB CLI V1.07
Got 1 CPU cores.
Parsing 9 input files.
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:19
Thread 0: Starting scribe_20170511/short_scribe_1.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:20
Thread 0: Starting scribe_20170511/short_scribe_2.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:20
Thread 0: Starting scribe_20170511/short_scribe_3.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_4.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_5.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_6.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_7.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Thread 0: Starting scribe_20170511/short_scribe_8.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 0: Insertion took 00:00:21
Total runtime: 00:03:27

Scribe DB CLI V1.07
Got 8 CPU cores.
Parsing 9 input files.
Thread 3: Starting scribe_20170511/short_scribe_3.data
Thread 1: Starting scribe_20170511/short_scribe_4.data
Thread 2: Starting scribe_20170511/short_scribe_1.data
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 4: Starting scribe_20170511/short_scribe_2.data
Thread 5: Starting scribe_20170511/short_scribe_5.data
Thread 3: preparing 1000000 lines took 00:00:03
Thread 3: Loading results into database.
Thread 0: preparing 1000000 lines took 00:00:03
Thread 0: Loading results into database.
Thread 2: preparing 1000000 lines took 00:00:03
Thread 2: Loading results into database.
Thread 4: preparing 1000000 lines took 00:00:03
Thread 4: Loading results into database.
Thread 1: preparing 1000000 lines took 00:00:03
Thread 1: Loading results into database.
Thread 5: preparing 1000000 lines took 00:00:03
Thread 5: Loading results into database.
Thread 0: Insertion took 00:02:20
Thread 0: Starting scribe_20170511/short_scribe_6.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 1: Insertion took 00:02:44
Thread 1: Starting scribe_20170511/short_scribe_7.data
Thread 1: preparing 1000000 lines took 00:00:01
Thread 1: Loading results into database.
Thread 5: Insertion took 00:03:01
Thread 5: Starting scribe_20170511/short_scribe_8.data
Thread 5: preparing 1000000 lines took 00:00:01
Thread 5: Loading results into database.
Thread 3: Insertion took 00:03:07
Thread 4: Insertion took 00:03:10
Thread 2: Insertion took 00:03:14
Thread 0: Insertion took 00:01:37
Thread 1: Insertion took 00:01:20
Thread 5: Insertion took 00:01:06
Total runtime: 00:04:14

Scribe DB CLI V1.08
Got 8 CPU cores.
Parsing 9 input files.
Thread 5: Starting scribe_20170511/short_scribe_4.data
Thread 1: Starting scribe_20170511/short_scribe_1.data
Thread 3: Starting scribe_20170511/short_scribe_2.data
Thread 4: Starting scribe_20170511/short_scribe_5.data
Thread 0: Starting scribe_20170511/short_scribe_0.data
Thread 2: Starting scribe_20170511/short_scribe_3.data
Thread 5: preparing 1000000 lines took 00:00:03
Thread 5: Loading results into database.
Thread 4: preparing 1000000 lines took 00:00:03
Thread 4: Loading results into database.
Thread 1: preparing 1000000 lines took 00:00:03
Thread 1: Loading results into database.
Thread 3: preparing 1000000 lines took 00:00:03
Thread 3: Loading results into database.
Thread 0: preparing 1000000 lines took 00:00:03
Thread 0: Loading results into database.
Thread 2: preparing 1000000 lines took 00:00:03
Thread 2: Loading results into database.
Thread 0: Insertion took 00:01:43
Thread 0: Starting scribe_20170511/short_scribe_6.data
Thread 0: preparing 1000000 lines took 00:00:01
Thread 0: Loading results into database.
Thread 5: Insertion took 00:02:00
Thread 5: Starting scribe_20170511/short_scribe_7.data
Thread 5: preparing 1000000 lines took 00:00:01
Thread 5: Loading results into database.
Thread 2: Insertion took 00:02:02
Thread 2: Starting scribe_20170511/short_scribe_8.data
Thread 4: Insertion took 00:02:04
Thread 3: Insertion took 00:02:04
Thread 2: preparing 1000000 lines took 00:00:01
Thread 2: Loading results into database.
Thread 1: Insertion took 00:02:06
Thread 0: Insertion took 00:00:59
Thread 5: Insertion took 00:00:49
Thread 2: Insertion took 00:00:48
Total runtime: 00:02:57

내가 여기서 뭘 보고 있지?쿼리 중에 8코어의 CPU 사용률이 10~50%이고 SSD의 디스크 사용률은 15~40mb/s입니다.이면에는 모순을 설명하는 복잡한 캐싱 및 기타 화려한 작업이 있을 것입니다. 하지만 저는 실제로 다른 테이블이 더 나은 성능을 제공할 것으로 예상했습니다.

잘 부탁드립니다.=)

편집:

Rick James 상세 추가:

  • 호스트 OS: Linux (Ubuntu 16.04)
  • 호스트 RAM: 16 GB
  • 호스트 SQL 엔진:InnoDB
mysql> SHOW VARIABLES LIKE '%buffer%';
+-------------------------------------+----------------+
| Variable_name                       | Value          |
+-------------------------------------+----------------+
| bulk_insert_buffer_size             | 8388608        |
| innodb_buffer_pool_chunk_size       | 134217728      |
| innodb_buffer_pool_dump_at_shutdown | ON             |
| innodb_buffer_pool_dump_now         | OFF            |
| innodb_buffer_pool_dump_pct         | 25             |
| innodb_buffer_pool_filename         | ib_buffer_pool |
| innodb_buffer_pool_instances        | 1              |
| innodb_buffer_pool_load_abort       | OFF            |
| innodb_buffer_pool_load_at_startup  | ON             |
| innodb_buffer_pool_load_now         | OFF            |
| innodb_buffer_pool_size             | 134217728      |
| innodb_change_buffer_max_size       | 25             |
| innodb_change_buffering             | all            |
| innodb_log_buffer_size              | 16777216       |
| innodb_sort_buffer_size             | 1048576        |
| join_buffer_size                    | 262144         |
| key_buffer_size                     | 16777216       |
| myisam_sort_buffer_size             | 8388608        |
| net_buffer_length                   | 16384          |
| preload_buffer_size                 | 32768          |
| read_buffer_size                    | 131072         |
| read_rnd_buffer_size                | 262144         |
| sort_buffer_size                    | 262144         |
| sql_buffer_result                   | OFF            |
+-------------------------------------+----------------+
24 rows in set (0.00 sec)

테이블을 합칠 계획은 없습니다.파일을 사용할 수 있게 되었을 때 삽입하지 않는 이유는(단, 1개의 파일이 다음 파일을 기록할 때보다 삽입하는 데 시간이 더 오래 걸린다는 점 제외) 낮에 파일이 생성된 서버의 파일에 대한 작업이 있기 때문입니다.그리고 마지막으로 적어도 처음 두 개의 인덱스가 필요합니다. 왜냐하면 이 두 개의 열을 기반으로 한 합리적인 쿼리가 필요하기 때문입니다.

언급URL : https://stackoverflow.com/questions/43991747/mysql-concurrent-load-data-infile

반응형