diff --git a/.github/mergify.yml b/.github/mergify.yml index 774b6dd4..e9b9f510 100644 --- a/.github/mergify.yml +++ b/.github/mergify.yml @@ -25,21 +25,21 @@ pull_request_rules: - -files~=^\.github/(actions|workflows)/.*\.ya?ml$ - -files~=^\.github/workflows/actionlint\. - # e2e workflow + # e2e medium workflow - or: - and: - # note this should match the triggering criteria in 'e2e-nvidia-t4-x1.yml' - - check-success=e2e-workflow-complete + # note this should match the triggering criteria in 'e2e-nvidia-l4-x1.yml' + - check-success~=e2e-medium-workflow-complete - or: - files~=\.py$ - files=pyproject.toml - files~=^requirements.*\.txt$ - - files=.github/workflows/e2e-nvidia-t4-x1.yml + - files=.github/workflows/e2e-nvidia-l4-x1.yml - and: - -files~=\.py$ - -files=pyproject.toml - -files~=^requirements.*\.txt$ - - -files=.github/workflows/e2e-nvidia-t4-x1.yml + - -files=.github/workflows/e2e-nvidia-l4-x1.yml # code lint workflow - or: diff --git a/.github/workflows/actionlint.dockerfile b/.github/workflows/actionlint.dockerfile index 6f171d53..9de2e84a 100644 --- a/.github/workflows/actionlint.dockerfile +++ b/.github/workflows/actionlint.dockerfile @@ -1,3 +1,3 @@ # Since dependabot cannot update workflows using docker, # we use this indirection since dependabot can update this file. -FROM rhysd/actionlint:1.7.3@sha256:7617f05bd698cd2f1c3aedc05bc733ccec92cca0738f3e8722c32c5b42c70ae6 +FROM rhysd/actionlint:1.7.4@sha256:82244e1db1c60d82c7792180a48dd0bcb838370bb589d53ff132503fc9485868 diff --git a/.github/workflows/actionlint.yml b/.github/workflows/actionlint.yml index e0cdafea..2fa5465b 100644 --- a/.github/workflows/actionlint.yml +++ b/.github/workflows/actionlint.yml @@ -35,7 +35,7 @@ jobs: egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs - name: "Checkout" - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1 + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 with: fetch-depth: 0 diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index d778c928..34e2afbb 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -37,10 +37,10 @@ jobs: with: egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs - name: "Checkout" - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1 + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 with: fetch-depth: 0 - name: "Check Markdown documents" - uses: DavidAnson/markdownlint-cli2-action@db43aef879112c3119a410d69f66701e0d530809 # v17.0.0 + uses: DavidAnson/markdownlint-cli2-action@eb5ca3ab411449c66620fe7f1b3c9e10547144b0 # v18.0.0 with: globs: '**/*.md' diff --git a/.github/workflows/e2e-nvidia-l4-x1.yml b/.github/workflows/e2e-nvidia-l4-x1.yml new file mode 100644 index 00000000..ef511319 --- /dev/null +++ b/.github/workflows/e2e-nvidia-l4-x1.yml @@ -0,0 +1,241 @@ +# SPDX-License-Identifier: Apache-2.0 + +name: E2E (NVIDIA L4 x1) + +on: + # run against every merge commit to 'main' and release branches + push: + branches: + - main + - release-* + # only run on PRs that touch certain regex paths + pull_request_target: + branches: + - main + - release-* + paths: + # note this should match the merging criteria in 'mergify.yml' + - '**.py' + - 'pyproject.toml' + - 'requirements**.txt' + - '.github/workflows/e2e-nvidia-l4-x1.yml' # This workflow + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +env: + LC_ALL: en_US.UTF-8 + +defaults: + run: + shell: bash + +permissions: + contents: read + +jobs: + start-medium-ec2-runner: + runs-on: ubuntu-latest + outputs: + label: ${{ steps.start-ec2-runner.outputs.label }} + ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} + steps: + - name: "Harden Runner" + uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1 + with: + egress-policy: audit + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ secrets.AWS_REGION }} + + - name: Start EC2 runner + id: start-ec2-runner + uses: machulav/ec2-github-runner@1827d6ca7544d7044ddbd2e9360564651b463da2 # v2.3.7 + with: + mode: start + github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} + ec2-image-id: ${{ vars.AWS_EC2_AMI }} + ec2-instance-type: g6.8xlarge + subnet-id: subnet-02d230cffd9385bd4 + security-group-id: sg-06300447c4a5fbef3 + iam-role-name: instructlab-ci-runner + aws-resource-tags: > + [ + {"Key": "Name", "Value": "instructlab-ci-github-medium-runner"}, + {"Key": "GitHubRepository", "Value": "${{ github.repository }}"}, + {"Key": "GitHubRef", "Value": "${{ github.ref }}"}, + {"Key": "GitHubPR", "Value": "${{ github.event.number }}"} + ] + + e2e-medium-test: + needs: + - start-medium-ec2-runner + runs-on: ${{ needs.start-medium-ec2-runner.outputs.label }} + + # It is important that this job has no write permissions and has + # no access to any secrets. This part (e2e) is where we are running + # untrusted code from PRs. + permissions: {} + + steps: + - name: "Harden Runner" + uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1 + with: + egress-policy: audit + + - name: Install Packages + run: | + cat /etc/os-release + sudo dnf install -y gcc gcc-c++ make git python3.11 python3.11-devel + + - name: Checkout instructlab/instructlab + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + repository: "instructlab/instructlab" + path: "instructlab" + # https://github.com/actions/checkout/issues/249 + fetch-depth: 0 + + - name: Checkout instructlab/training + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + repository: "instructlab/training" + path: "training" + # https://github.com/actions/checkout/issues/249 + fetch-depth: 0 + + - name: Fetch and checkout PR + if: ${{ github.event_name == 'pull_request_target' }} + working-directory: ./training + run: | + git fetch origin pull/${{ github.event.pull_request.number }}/head:pr-${{ github.event.pull_request.number }} + git checkout pr-${{ github.event.pull_request.number }} + + - name: Install ilab + working-directory: ./instructlab + run: | + export CUDA_HOME="/usr/local/cuda" + export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64" + export PATH="$PATH:$CUDA_HOME/bin" + python3.11 -m venv --upgrade-deps venv + . venv/bin/activate + nvidia-smi + python3.11 -m pip cache remove llama_cpp_python + + CMAKE_ARGS="-DLLAMA_CUDA=on" python3.11 -m pip install -v . + + # https://github.com/instructlab/instructlab/issues/1821 + # install with Torch and build dependencies installed + python3.11 -m pip install -v packaging wheel setuptools-scm + python3.11 -m pip install -v .[cuda] -r requirements-vllm-cuda.txt + + - name: Update instructlab-training library + working-directory: ./training + run: | + . ../instructlab/venv/bin/activate + pip install -v . + pip install -v .[cuda] + + - name: Check disk + run: | + df -h + + - name: Run e2e test + working-directory: ./instructlab + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + run: | + . venv/bin/activate + # set preserve to true so we can retain the logs + ./scripts/e2e-ci.sh -mp + + # HACK(osilkin): The above test runs the medium workflow test which does not actually test the training library. + # Therefore we must disable the upload of the training logs, as they will not exist in the same location. + # we know that the file will be named something like f"/training_params_and_metrics_global{os.environ['RANK']}.jsonl" in python + # and we know that it will be written into a directory created by `mktemp -d`. + # Given this information, we can use the following command to find the file: + # log_file=$(find /tmp -name "training_params_and_metrics_global0.jsonl") + # mv "${log_file}" training-log.jsonl + + # - name: Upload training logs + # uses: actions/upload-artifact@v4 + # with: + # name: training-log.jsonl + # path: ./instructlab/training-log.jsonl + # retention-days: 1 + # overwrite: true + + stop-medium-ec2-runner: + needs: + - start-medium-ec2-runner + - e2e-medium-test + runs-on: ubuntu-latest + if: ${{ always() }} + steps: + - name: "Harden Runner" + uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1 + with: + egress-policy: audit + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ secrets.AWS_REGION }} + + - name: Stop EC2 runner + uses: machulav/ec2-github-runner@1827d6ca7544d7044ddbd2e9360564651b463da2 # v2.3.7 + with: + mode: stop + github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} + label: ${{ needs.start-medium-ec2-runner.outputs.label }} + ec2-instance-id: ${{ needs.start-medium-ec2-runner.outputs.ec2-instance-id }} + + # - name: Download loss data + # id: download-logs + # uses: actions/download-artifact@v4 + # with: + # name: training-log.jsonl + # path: downloaded-data + + # - name: Install dependencies + # run: | + # pip install -r requirements-dev.txt + + # - name: Try to upload to s3 + # id: upload-s3 + # continue-on-error: true + # run: | + # output_file='./test.md' + # python scripts/create-loss-graph.py \ + # --log-file "${{ steps.download-logs.outputs.download-path }}/training-log.jsonl" \ + # --output-file "${output_file}" \ + # --aws-region "${{ vars.AWS_REGION }}" \ + # --bucket-name "${{ vars.AWS_S3_LOSS_GRAPHS_BUCKET_NAME }}" \ + # --base-branch "${{ github.event.pull_request.base.ref }}" \ + # --pr-number "${{ github.event.pull_request.number }}" \ + # --head-sha "${{ github.event.pull_request.head.sha }}" \ + # --origin-repository "${{ github.repository }}" + + # cat "${output_file}" >> "${GITHUB_STEP_SUMMARY}" + + # - name: Check S3 upload status + # if: steps.upload-s3.outcome == 'failure' + # run: | + # echo "::warning::Failed to upload loss graph to S3. This won't block the workflow, but you may want to investigate." + # echo "Loss graph upload failed" >> "${GITHUB_STEP_SUMMARY}" + + e2e-medium-workflow-complete: + # we don't want to block PRs on failed EC2 cleanup + # so not requiring "stop-runner" as well + needs: ["start-medium-ec2-runner", "e2e-medium-test"] + runs-on: ubuntu-latest + steps: + - name: E2E Workflow Complete + run: echo "E2E Workflow Complete" diff --git a/.github/workflows/e2e-nvidia-l40s-x4.yml b/.github/workflows/e2e-nvidia-l40s-x4.yml new file mode 100644 index 00000000..52015d5f --- /dev/null +++ b/.github/workflows/e2e-nvidia-l40s-x4.yml @@ -0,0 +1,304 @@ +# SPDX-License-Identifier: Apache-2.0 + +name: E2E (NVIDIA L40S x4) + +on: + schedule: + - cron: '0 16 * * *' # Runs at 4PM UTC every day + workflow_dispatch: + inputs: + pr_or_branch: + description: 'pull request number or branch name' + required: true + default: 'main' + +jobs: + start-large-ec2-runner: + runs-on: ubuntu-latest + outputs: + label: ${{ steps.start-ec2-runner.outputs.label }} + ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} + steps: + - name: "Harden Runner" + # v2.10.1 + uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 + with: + egress-policy: audit + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ secrets.AWS_REGION }} + + - name: Start EC2 runner + id: start-ec2-runner + uses: machulav/ec2-github-runner@1827d6ca7544d7044ddbd2e9360564651b463da2 # v2.3.7 + with: + mode: start + github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} + ec2-image-id: ${{ vars.AWS_EC2_AMI }} + ec2-instance-type: g6e.12xlarge + subnet-id: subnet-024298cefa3bedd61 + security-group-id: sg-06300447c4a5fbef3 + iam-role-name: instructlab-ci-runner + aws-resource-tags: > + [ + {"Key": "Name", "Value": "instructlab-ci-github-large-runner"}, + {"Key": "GitHubRepository", "Value": "${{ github.repository }}"}, + {"Key": "GitHubRef", "Value": "${{ github.ref }}"}, + {"Key": "GitHubPR", "Value": "${{ github.event.number }}"} + ] + + e2e-large-test: + needs: + - start-large-ec2-runner + runs-on: ${{ needs.start-large-ec2-runner.outputs.label }} + + permissions: + pull-requests: write + + steps: + - name: "Harden Runner" + # v2.10.1 + uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 + with: + egress-policy: audit + - name: Install Packages + run: | + cat /etc/os-release + sudo dnf install -y gcc gcc-c++ make git python3.11 python3.11-devel + + - name: Checkout instructlab/instructlab + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + repository: "instructlab/instructlab" + path: "instructlab" + # https://github.com/actions/checkout/issues/249 + fetch-depth: 0 + + - name: Checkout instructlab/training + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + repository: "instructlab/training" + path: "training" + # https://github.com/actions/checkout/issues/249 + fetch-depth: 0 + + - name: Determine if pr_or_branch is a PR number + id: check_pr + run: | + PR_OR_BRANCH=${{ github.event.inputs.pr_or_branch || 'main' }} # Default to 'main' if not set + if [[ "$PR_OR_BRANCH" =~ ^[0-9]+$ ]]; then + echo "is_pr=true" >> "$GITHUB_OUTPUT" + else + echo "is_pr=false" >> "$GITHUB_OUTPUT" + fi + echo "pr_or_branch=$PR_OR_BRANCH" >> "$GITHUB_OUTPUT" + + - name: Check if gh cli is installed + id: gh_cli + run: | + if command -v gh &> /dev/null ; then + echo "gh_cli_installed=true" >> "$GITHUB_OUTPUT" + else + echo "gh_cli_installed=false" >> "$GITHUB_OUTPUT" + fi + + - name: Install gh CLI + if: steps.gh_cli.outputs.gh_cli_installed == 'false' + run: | + sudo dnf install 'dnf-command(config-manager)' -y + sudo dnf config-manager --add-repo https://cli.github.com/packages/rpm/gh-cli.repo + sudo dnf install gh --repo gh-cli -y + + - name: test gh CLI + run: | + gh --version + + - name: set default repo + working-directory: ./training + run: | + gh repo set-default ${{ github.server_url }}/${{ github.repository }} + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Add comment to PR + if: steps.check_pr.outputs.is_pr == 'true' + working-directory: ./training + run: | + gh pr comment "${{ steps.check_pr.outputs.pr_or_branch }}" -b "${{ github.workflow }} workflow launched on this PR: [View run](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }})" + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Fetch and checkout PR + if: steps.check_pr.outputs.is_pr == 'true' + working-directory: ./training + run: | + gh pr checkout ${{ steps.check_pr.outputs.pr_or_branch }} + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Checkout branch + if: steps.check_pr.outputs.is_pr == 'false' + working-directory: ./training + run: | + git checkout ${{ steps.check_pr.outputs.pr_or_branch }} + + - name: Install ilab + working-directory: ./instructlab + run: | + export CUDA_HOME="/usr/local/cuda" + export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64" + export PATH="$PATH:$CUDA_HOME/bin" + python3.11 -m venv --upgrade-deps venv + . venv/bin/activate + nvidia-smi + python3.11 -m pip cache remove llama_cpp_python + + CMAKE_ARGS="-DLLAMA_CUDA=on" python3.11 -m pip install . + + # https://github.com/instructlab/instructlab/issues/1821 + # install with Torch and build dependencies installed + python3.11 -m pip install packaging wheel setuptools-scm + python3.11 -m pip install .[cuda] -r requirements-vllm-cuda.txt + + - name: Update instructlab-training library + working-directory: ./training + run: | + . ../instructlab/venv/bin/activate + pip install . + pip install .[cuda] + + - name: Check disk + run: | + df -h + + - name: Run e2e test + working-directory: ./instructlab + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + run: | + . venv/bin/activate + + # set preserve to true so we can retain the logs + ./scripts/e2e-ci.sh -lp + + # we know that the file will be named something like f"/training_params_and_metrics_global{os.environ['RANK']}.jsonl" in python + # and we know that it will be written into a directory created by `mktemp -d`. + # Given this information, we can use the following command to find the file: + log_file=$(find /tmp -name "training_params_and_metrics_global0.jsonl") + mv "${log_file}" training-log.jsonl + + - name: Upload training logs + uses: actions/upload-artifact@v4 + with: + name: training-log.jsonl + path: ./instructlab/training-log.jsonl + retention-days: 1 + overwrite: true + + - name: Add comment to PR if the workflow failed + if: failure() && steps.check_pr.outputs.is_pr == 'true' + working-directory: ./training + run: | + gh pr comment "${{ steps.check_pr.outputs.pr_or_branch }}" -b "e2e workflow failed on this PR: [View run](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}), please investigate." + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Add comment to PR if the workflow succeeded + if: success() && steps.check_pr.outputs.is_pr == 'true' + working-directory: ./training + run: | + gh pr comment "${{ steps.check_pr.outputs.pr_or_branch }}" -b "e2e workflow succeeded on this PR: [View run](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}), congrats!" + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Post job results to Slack if the workflow failed + if: failure() && steps.check_pr.outputs.is_pr == 'false' + id: slack-report-failure + uses: slackapi/slack-github-action@37ebaef184d7626c5f204ab8d3baff4262dd30f0 # v1.27.0 + with: + # Slack channel id, channel name, or user id to post message. + # See also: https://api.slack.com/methods/chat.postMessage#channels + # You can pass in multiple channels to post to by providing a comma-delimited list of channel IDs. + channel-id: 'e2e-ci-results' + slack-message: "*e2e-nvidia-l40s-x4* job in *${{ github.repository }}* running on branch `${{ steps.check_pr.outputs.pr_or_branch }}` completed *with failures* :meow_sad-rain: | ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" + env: + SLACK_BOT_TOKEN: ${{ secrets.SON_OF_JEEVES_TOKEN }} + + - name: Post job results to Slack if the workflow succeeded + if: success() && steps.check_pr.outputs.is_pr == 'false' + id: slack-report-success + uses: slackapi/slack-github-action@37ebaef184d7626c5f204ab8d3baff4262dd30f0 # v1.27.0 + with: + # Slack channel id, channel name, or user id to post message. + # See also: https://api.slack.com/methods/chat.postMessage#channels + # You can pass in multiple channels to post to by providing a comma-delimited list of channel IDs. + channel-id: 'e2e-ci-results' + slack-message: "*e2e-nvidia-l40s-x4* job in *${{ github.repository }}* running on branch `${{ steps.check_pr.outputs.pr_or_branch }}` completed *successfully* :meow_party: | ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" + env: + SLACK_BOT_TOKEN: ${{ secrets.SON_OF_JEEVES_TOKEN }} + + stop-large-ec2-runner: + needs: + - start-large-ec2-runner + - e2e-large-test + runs-on: ubuntu-latest + if: ${{ always() }} + steps: + - name: "Harden Runner" + # v2.10.1 + uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 + with: + egress-policy: audit + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ secrets.AWS_REGION }} + + - name: Stop EC2 runner + uses: machulav/ec2-github-runner@1827d6ca7544d7044ddbd2e9360564651b463da2 # v2.3.7 + with: + mode: stop + github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} + label: ${{ needs.start-large-ec2-runner.outputs.label }} + ec2-instance-id: ${{ needs.start-large-ec2-runner.outputs.ec2-instance-id }} + + - name: Download loss data + id: download-logs + uses: actions/download-artifact@v4 + with: + name: training-log.jsonl + path: downloaded-data + + - name: Install dependencies + run: | + pip install -r requirements-dev.txt + + - name: Try to upload to s3 + id: upload-s3 + continue-on-error: true + run: | + output_file='./test.md' + python scripts/create-loss-graph.py \ + --log-file "${{ steps.download-logs.outputs.download-path }}/training-log.jsonl" \ + --output-file "${output_file}" \ + --aws-region "${{ vars.AWS_REGION }}" \ + --bucket-name "${{ vars.AWS_S3_LOSS_GRAPHS_BUCKET_NAME }}" \ + --base-branch "${{ github.event.pull_request.base.ref }}" \ + --pr-number "${{ github.event.pull_request.number }}" \ + --head-sha "${{ github.event.pull_request.head.sha }}" \ + --origin-repository "${{ github.repository }}" + + - name: Check S3 upload status + if: steps.upload-s3.outcome == 'failure' + run: | + echo "::warning::Failed to upload loss graph to S3. This won't block the workflow, but you may want to investigate." + echo "Loss graph upload failed" >> "${GITHUB_STEP_SUMMARY}" + + cat "${output_file}" >> "${GITHUB_STEP_SUMMARY}" \ No newline at end of file diff --git a/.github/workflows/e2e-nvidia-t4-x1.yml b/.github/workflows/e2e-nvidia-t4-x1.yml deleted file mode 100644 index 959dedd9..00000000 --- a/.github/workflows/e2e-nvidia-t4-x1.yml +++ /dev/null @@ -1,174 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 - -name: E2E (NVIDIA Tesla T4 x1) - -on: - push: - branches: - - main - - release-* - pull_request_target: - types: - - opened - - synchronize - - reopened - branches: - - main - - release-* - paths: - # note this should match the merging criteria in 'mergify.yml' - - '**.py' - - 'pyproject.toml' - - 'requirements**.txt' - - '.github/workflows/e2e-nvidia-t4-x1.yml' # Follow-on workflow - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true - -jobs: - start-runner: - name: Start external EC2 runner - runs-on: ubuntu-latest - outputs: - label: ${{ steps.start-ec2-runner.outputs.label }} - ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} - steps: - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - aws-region: ${{ secrets.AWS_REGION }} - - name: Start EC2 runner - id: start-ec2-runner - uses: machulav/ec2-github-runner@fcfb31a5760dad1314a64a0e172b78ec6fc8a17e # v2.3.6 - with: - mode: start - github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} - ec2-image-id: ami-00c51d9c1374eda97 - ec2-instance-type: g4dn.2xlarge - subnet-id: subnet-02d230cffd9385bd4 - security-group-id: sg-06300447c4a5fbef3 - iam-role-name: instructlab-ci-runner - aws-resource-tags: > - [ - {"Key": "Name", "Value": "instructlab-ci-github-small-runner"}, - {"Key": "GitHubRepository", "Value": "${{ github.repository }}"}, - {"Key": "GitHubRef", "Value": "${{ github.ref }}"}, - {"Key": "GitHubPR", "Value": "${{ github.event.number }}"} - ] - - e2e: - name: E2E Test - needs: start-runner - runs-on: ${{ needs.start-runner.outputs.label }} - - # It is important that this job has no write permissions and has - # no access to any secrets. This part (e2e) is where we are running - # untrusted code from PRs. - permissions: {} - - steps: - # for debugging - - name: Print environment state - run: | - echo "Current Working Directory: $PWD" - echo "Files in Local Directory:" - ls -l - - - name: Checkout instructlab/instructlab - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1 - with: - repository: "instructlab/instructlab" - path: "instructlab" - # https://github.com/actions/checkout/issues/249 - fetch-depth: 0 - - - name: Checkout instructlab/training - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1 - with: - repository: "instructlab/training" - path: "training" - # https://github.com/actions/checkout/issues/249 - fetch-depth: 0 - - # for debugging - - name: Print environment state - run: | - echo "Current Working Directory: $PWD" - echo "Files in Local Directory:" - ls -l - - - name: Fetch and checkout PR - id: fetch_pr - if: github.event_name == 'pull_request_target' - working-directory: ./training - run: | - git fetch origin pull/${{ github.event.pull_request.number }}/head:pr-${{ github.event.pull_request.number }} - git checkout pr-${{ github.event.pull_request.number }} - - - name: Install system packages - run: | - cat /etc/os-release - sudo dnf install -y gcc gcc-c++ make git python3.11 python3.11-devel - - - name: Install instructlab - working-directory: ./instructlab - run: | - export PATH="/home/ec2-user/.local/bin:/usr/local/cuda/bin:$PATH" - python3.11 -m venv --upgrade-deps venv - . venv/bin/activate - nvidia-smi - python3.11 -m pip cache remove llama_cpp_python - - CMAKE_ARGS="-DLLAMA_CUDA=on" python3.11 -m pip install . - - # https://github.com/instructlab/instructlab/issues/1821 - # install with Torch and build dependencies installed - python3.11 -m pip install packaging wheel setuptools-scm - python3.11 -m pip install .[cuda] - - - name: Update instructlab-training library - working-directory: ./training - run: | - . ../instructlab/venv/bin/activate - pip install . - pip install .[cuda] - - - name: Run e2e test - working-directory: ./instructlab - run: | - . venv/bin/activate - ./scripts/e2e-custom.sh -a - - stop-runner: - name: Stop external EC2 runner - needs: - - start-runner - - e2e - runs-on: ubuntu-latest - if: ${{ always() }} - steps: - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - aws-region: ${{ secrets.AWS_REGION }} - - name: Stop EC2 runner - uses: machulav/ec2-github-runner@fcfb31a5760dad1314a64a0e172b78ec6fc8a17e # v2.3.6 - with: - mode: stop - github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} - label: ${{ needs.start-runner.outputs.label }} - ec2-instance-id: ${{ needs.start-runner.outputs.ec2-instance-id }} - - e2e-workflow-complete: - # we don't want to block PRs on failed EC2 cleanup - # so not requiring "stop-runner" as well - needs: ["start-runner", "e2e"] - runs-on: ubuntu-latest - steps: - - name: E2E Workflow Complete - run: echo "E2E Workflow Complete" diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 2261f879..6f298a36 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -63,13 +63,13 @@ jobs: egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs - name: "Checkout" - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1 + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 with: # https://github.com/actions/checkout/issues/249 fetch-depth: 0 - name: Setup Python 3.11 - uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 + uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0 with: python-version: 3.11 cache: pip diff --git a/.github/workflows/pypi.yaml b/.github/workflows/pypi.yaml index 6b790e16..a8be1c1c 100644 --- a/.github/workflows/pypi.yaml +++ b/.github/workflows/pypi.yaml @@ -42,13 +42,13 @@ jobs: egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs - name: "Checkout" - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1 + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 with: # for setuptools-scm fetch-depth: 0 - name: "Build and Inspect" - uses: hynek/build-and-inspect-python-package@73aea398b9c8de9ea9e4464c6b13cb8b1f3d6294 # v2.9.0 + uses: hynek/build-and-inspect-python-package@f01e4d047aadcc0c054c95ec9900da3ec3fc7a0f # v2.10.0 # push to Test PyPI on # - a new GitHub release is published @@ -77,7 +77,7 @@ jobs: path: dist - name: "Upload to Test PyPI" - uses: pypa/gh-action-pypi-publish@f7600683efdcb7656dec5b29656edb7bc586e597 # v1.10.3 + uses: pypa/gh-action-pypi-publish@15c56dba361d8335944d31a2ecd17d700fc7bcbc # v1.12.2 with: repository-url: https://test.pypi.org/legacy/ @@ -129,4 +129,4 @@ jobs: rm ./dist/*.sigstore.json - name: "Upload to PyPI" - uses: pypa/gh-action-pypi-publish@f7600683efdcb7656dec5b29656edb7bc586e597 # v1.10.3 + uses: pypa/gh-action-pypi-publish@15c56dba361d8335944d31a2ecd17d700fc7bcbc # v1.12.2 diff --git a/.pylintrc b/.pylintrc index aff36d65..c0fbd95c 100644 --- a/.pylintrc +++ b/.pylintrc @@ -474,7 +474,8 @@ disable=raw-checker-failed, broad-exception-caught, super-init-not-called, duplicate-code, - too-many-positional-arguments + too-many-positional-arguments, + too-many-lines # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..ca1cb720 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,250 @@ +# Changelog + +## v0.5.5 + +### v0.5.5 Features + +* e2e: replace old small job with new medium job + +### v0.5.5 Fixes + +* fix: incorrect label for AWS medium runner +* chore: add exit code & tox fix + +### v0.5.5 Infrastructure + +* ci: grant HF_TOKEN access to the medium-size E2E CI job + +## v0.5.4 + +### v0.5.4 Features + +* Add rocm extra to pyproject.toml + +## v0.5.3 + +### v0.5.3 Fixes + +* fix: Add explicit flash_attn requirement for ROCm + +## v0.5.2 - Fix Pretraining Masking + +### v0.5.2 Fixes + +* fix: improve linting and automation +* Fix pretrain token list->int for masking + +## v0.5.1 + +### v0.5.1 Fixes + +* fix: updates sorting logic to correctly compare numbers + +## v0.5.0 - FSDP and Full-State Checkpoint Resuming + +### v0.5.0 Features + +* feat: add e2e test for instructlab CI +* feat: add mergify +* Adding FSDP Support to Training Library by @aldopareja @Maxusmusti @RobotSail +* adds Accelerate full-state (opt, lr_sched, params) +* changes StreamablePopen to return a process and implement listening + +### v0.5.0 Fixes + +* Fix lint error to make CI happy +* Fix typos +* Ap/fix multipack for non granite models +* Fix generic chat template saved to tokenizer for generation +* Fix linting error and missing quote + +### v0.5.0 Infrastructure + +* Add license identifiers +* ci: update runner labels to uniquely identify instance sizes +* ci: minor cleanup of E2E job +* Fixing e2e to use relative path for working-directory +* switch -T to -a +* github: add stale bot to training repo +* fix: markdown lint error and mergify bug +* Bump actions/checkout from 4.1.7 to 4.2.0 +* Bump step-security/harden-runner from 2.8.1 to 2.9.1 +* Bump pypa/gh-action-pypi-publish from 1.9.0 to 1.10.2 +* Bump actions/setup-python from 5.1.0 to 5.2.0 +* Bump rhysd/actionlint from 1.7.1 to 1.7.2 +* Bump hynek/build-and-inspect-python-package from 2.6.0 to 2.9.0 +* Bump DavidAnson/markdownlint-cli2-action from 16.0.0 to 17.0.0 +* ci: fix lint action +* ci: add AWS tags to show github ref and PR num for all jobs + +## v0.5.0 Alpha 0 - The FSDP Release Pre-release + +### v0.5.0 Alpha Description + +The FSDP Release introduces FSDP support in addition to the existing DeepSpeed support through the accelerate library. + +### v0.5.0 Alpha Features + +* feat: add e2e test for instructlab CI +* feat: add mergify +* Adding FSDP Support to Training Library by @aldopareja @Maxusmusti @RobotSail + +### v0.5.0 Alpha Fixes + +* Fix lint error to make CI happy +* Fix typos +* Ap/fix multipack for non granite models +* Fix linting error and missing quote + +### v0.5.0 Alpha Infrastructure + +* Add license identifiers +* ci: update runner labels to uniquely identify instance sizes +* ci: minor cleanup of E2E job +* Fixing e2e to use relative path for working-directory +* Bump step-security/harden-runner from 2.8.1 to 2.9.1 +* Bump pypa/gh-action-pypi-publish from 1.9.0 to 1.10.2 +* Bump actions/setup-python from 5.1.0 to 5.2.0 +* Bump rhysd/actionlint from 1.7.1 to 1.7.2 +* Bump hynek/build-and-inspect-python-package from 2.6.0 to 2.9.0 +* Bump DavidAnson/markdownlint-cli2-action from 16.0.0 to 17.0.0 +* ci: fix lint action +* ci: add AWS tags to show github ref and PR num for all jobs + +## v0.4.2 + +### v0.4.2 Features + +* Provide safeguards during training + +## v0.4.1 + +### v0.4.1 Changes + +* makes saving every save_samples an optional feature + +## v0.4.0 + +### v0.4.0 Features + +* Adds a flag to save checkpoints at the end of an epoch + +### v0.4.0 Changes + +* Change success message at end of training + +## v0.3.2 + +### v0.3.2 Features + +* Accept tuples for lora.target_modules + +### v0.3.2 Documentation + +* patch some hyper parameter arg descriptions in README + +## v0.3.1 + +### v0.3.1 Dependencies + +* Update requirements to have bitsandbytes min and dolomite min + +## v0.3.0 + +### v0.3.0 Features + +* Updating token masking to support pretraining w/ masked special tokens +* Adding weight merging for LoRA/QLoRA ckpts + +### v0.3.0 Fixes + +* remove dead code +* fix: changes the check to check against both the enum option and enum value + +## v0.2.0 + +### v0.2.0 Features + +* Fix ckpt save to include architecture for inference runtime consumption +* Logging updates + +### v0.2.0 Performance + +* Reducing deepspeed timeout to 10mins + +## v0.1.0 + +### v0.1.0 Features + +* Flash Attention Disable Toggle (Take 2) + +### v0.1.0 Performance + +* Reduce Unnecessary Multiprocessing + +### v0.1.0 Fixes + +* 🐛: fix optimizer selection logic so that FusedAdam is never loaded when CPU offloading is enabled +* Add wheel to requirements + +## v0.0.5.1 + +### v0.0.5.1 Fixes + +This release includes PR [#121](https://github.com/instructlab/training/pull/121) to overcome an issue where our way of lazily importing the run_training function is being picked up as an error by pylint. + +## v0.0.5 + +Minor bugfixes and updates. + +## v0.0.4 + +Minor bugfixes and updates. + +## v0.0.3 + +Minor bugfixes and updates. + +## v0.0.2 + +### Features + +This introduces the instructlab library as a package in the instructlab package namespace. + +To install it: + +```bash +pip install instructlab-training +``` + +And to install it with flash-attn and other CUDA-dependent packages, you can use + +```bash +pip install instructlab-training[cuda] +``` + +Here's how to use it: + +```python +from instructlab.training.config import TorchrunArgs, TrainingArgs, run_training + +torchrun_args = TorchrunArgs( + nproc_per_node = 1, # 1 GPU + nnodes = 1, # only 1 overall machine in the system + node_rank = 0, # rank of the current machine + rdzv_id = 123, # what ID other nodes will join on + rdzv_endpoint = '0.0.0.0:12345' # address where other nodes will join +) + +training_args = TrainingArgs( + # specify training args here +) + +run_training(torch_args = torchrun_args, train_args = training_args) +``` + +## v0.0.1 + +### v0.0.1 Features + +Initial release with same features as v0.0.2. \ No newline at end of file diff --git a/README.md b/README.md index 645583f3..c6435542 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,9 @@ ![Release](https://img.shields.io/github/v/release/instructlab/training) ![License](https://img.shields.io/github/license/instructlab/training) +![`e2e-nvidia-l4-x1.yml` on `main`](https://github.com/instructlab/training/actions/workflows/e2e-nvidia-l4-x1.yml/badge.svg?branch=main) +![`e2e-nvidia-l40s-x4.yml` on `main`](https://github.com/instructlab/training/actions/workflows/e2e-nvidia-l40s-x4.yml/badge.svg?branch=main) + - [Installing](#installing-the-library) - [Additional Nvidia packages](#additional-nvidia-packages) - [Using the library](#using-the-library) @@ -118,7 +121,36 @@ allow you to customize aspects of the ZeRO stage 2 optimizer. For more information about DeepSpeed, see [deepspeed.ai](https://www.deepspeed.ai/) -#### `FSDPOptions` +#### DeepSpeed with CPU Offloading + +To use DeepSpeed with CPU offloading, you'll usually encounter an issue indicating that the optimizer needed to use the Adam optimizer on CPU doesn't exist. To resolve this, please follow the following steps: + +**Rebuild DeepSpeed with CPUAdam**: + +You'll need to rebuild DeepSpeed in order for the optimizer to be present: + +```bash +# uninstall deepspeed & reinstall with the flags for installing CPUAdam +pip uninstall deepspeed +DS_BUILD_CPU_ADAM=1 DS_BUILD_UTILS=1 pip install deepspeed --no-deps +``` + +**Ensure `-lcurand` is linked correctly**: + +A problem that we commonly encounter is that the `-lcurand` linker will not be present when +DeepSpeed recompiles. To resolve this, you will need to find the location of the `libcurand.so` file in your machine: + +```bash +find / -name 'libcurand*.so*' 2>/dev/null +``` + +If libcurand.so is not present in the `/usr/lib64` directory, you'll need to add a symlink. Ex: + +```bash +sudo ln -s /usr/local/cuda/lib64/libcurand.so.10 /usr/lib64/libcurand.so +``` + +### `FSDPOptions` Like DeepSpeed, we only expose a number of parameters for you to modify with FSDP. They are listed below: diff --git a/pyproject.toml b/pyproject.toml index 93387840..ca053385 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ package-dir = { "" = "src" } dependencies = { file = ["requirements.txt"] } optional-dependencies.cuda = { file = ["requirements-cuda.txt"] } optional-dependencies.rocm = { file = ["requirements-rocm.txt"] } +optional-dependencies.hpu = { file = ["requirements-hpu.txt"] } [tool.setuptools.packages.find] where = ["src"] diff --git a/requirements-cuda.txt b/requirements-cuda.txt index 1bed61eb..d030d340 100644 --- a/requirements-cuda.txt +++ b/requirements-cuda.txt @@ -1,2 +1,8 @@ flash-attn>=2.4.0 bitsandbytes>=0.43.1 + +# available as an option for NVIDIA, FSDP still default +deepspeed>=0.14.3 + +# required for FSDP updates +accelerate>=0.34.2,<1.1.0 diff --git a/requirements-dev.txt b/requirements-dev.txt index a0dff1ed..f77c807f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,6 +2,8 @@ -r requirements.txt +matplotlib +numpy pre-commit>=3.0.4,<5.0 pylint>=2.16.2,<4.0 pylint-pydantic diff --git a/requirements-hpu.txt b/requirements-hpu.txt new file mode 100644 index 00000000..73db0ef6 --- /dev/null +++ b/requirements-hpu.txt @@ -0,0 +1,2 @@ +# required for optimum-habana's deps +accelerate>=0.33.0,<1.1.0 \ No newline at end of file diff --git a/requirements-rocm.txt b/requirements-rocm.txt index 21612c36..bb9d572e 100644 --- a/requirements-rocm.txt +++ b/requirements-rocm.txt @@ -1 +1,3 @@ flash-attn>=2.6.2,<2.7.0 +# required for FSDP updates +accelerate>=0.34.2,<1.1.0 diff --git a/requirements.txt b/requirements.txt index b31fbdcb..d4b7760c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,8 @@ py-cpuinfo # we set this to be above 0a0 so that it doesn't # replace custom pytorch images with the 2.3.0 torch>=2.3.0a0 -transformers>=4.41.2 -accelerate>=0.34.2 +transformers>=4.45.2 + datasets>=2.15.0 numba # Note: numpy ranges copied from instructlab/instructlab @@ -17,11 +17,8 @@ numba numpy>=1.23.5,<2.0.0 ; python_version == '3.10' numpy>=1.26.4,<2.0.0 ; python_version != '3.10' rich -instructlab-dolomite>=0.1.1 +instructlab-dolomite>=0.2.0 trl>=0.9.4 peft pydantic>=2.7.0 - -# deepspeed needs to be at the bottom or it'll break during installation -deepspeed>=0.14.3 aiofiles>=23.2.1 diff --git a/scripts/create-loss-graph.py b/scripts/create-loss-graph.py new file mode 100644 index 00000000..e4ab1254 --- /dev/null +++ b/scripts/create-loss-graph.py @@ -0,0 +1,186 @@ +# SPDX-License-Identifier: Apache-2.0 +# Standard +from argparse import ArgumentParser +from pathlib import Path +from subprocess import run +from typing import Dict, List +import json + +# Third Party +from matplotlib import pyplot as plt +from pydantic import BaseModel + + +class Arguments(BaseModel): + log_file: str | None = None + output_file: str + aws_region: str + bucket_name: str + base_branch: str + pr_number: str + head_sha: str + origin_repository: str + + +def render_image(loss_data: List[float], outfile: Path) -> str: + # create the plot + plt.figure() + plt.plot(loss_data) + plt.xlabel("Steps") + plt.ylabel("Loss") + plt.title("Training performance over fixed dataset") + + if outfile.exists(): + outfile.unlink() + + plt.savefig(outfile, format="png") + + +def contents_from_file(log_file: Path) -> List[Dict]: + if not log_file.exists(): + raise FileNotFoundError(f"Log file {log_file} does not exist") + if log_file.is_dir(): + raise ValueError(f"Log file {log_file} is a directory") + with open(log_file, "r") as f: + return [json.loads(l) for l in f.read().splitlines()] + + +def read_loss_data(log_file: Path) -> List[float]: + if not log_file: + raise ValueError("log_file must be provided when source is file") + contents = contents_from_file(log_file) + + # select the loss data + loss_data = [item["total_loss"] for item in contents if "total_loss" in item] + + if not loss_data: + raise ValueError("Loss data is empty") + + # ensure that the loss data is valid + if not all(isinstance(l, float) for l in loss_data): + raise ValueError("Loss data must be a list of floats") + + return loss_data + + +def write_to_s3( + file: Path, + bucket_name: str, + destination: str, +): + if not file.exists(): + raise RuntimeError(f"File {file} does not exist") + + s3_path = f"s3://{bucket_name}/{destination}" + results = run( + ["aws", "s3", "cp", str(file), s3_path], capture_output=True, check=True + ) + if results.returncode != 0: + raise RuntimeError(f"failed to upload to s3: {results.stderr.decode('utf-8')}") + else: + print(results.stdout.decode("utf-8")) + + +def get_destination_path(base_ref: str, pr_number: str, head_sha: str): + return f"pulls/{base_ref}/{pr_number}/{head_sha}/loss-graph.png" + + +def write_md_file( + output_file: Path, url: str, pr_number: str, head_sha: str, origin_repository: str +): + commit_url = f"https://github.com/{origin_repository}/commit/{head_sha}" + md_template = f""" +# Loss Graph for PR {args.pr_number} ([{args.head_sha[:7]}]({commit_url})) + +![Loss Graph]({url}) +""" + output_file.write_text(md_template, encoding="utf-8") + + +def get_url(bucket_name: str, destination: str, aws_region: str) -> str: + return f"https://{bucket_name}.s3.{aws_region}.amazonaws.com/{destination}" + + +def main(args: Arguments): + # first things first, we create the png file to upload to S3 + log_file = Path(args.log_file) + loss_data = read_loss_data(log_file=log_file) + output_image = Path("/tmp/loss-graph.png") + output_file = Path(args.output_file) + render_image(loss_data=loss_data, outfile=output_image) + destination_path = get_destination_path( + base_ref=args.base_branch, pr_number=args.pr_number, head_sha=args.head_sha + ) + write_to_s3( + file=output_image, bucket_name=args.bucket_name, destination=destination_path + ) + s3_url = get_url( + bucket_name=args.bucket_name, + destination=destination_path, + aws_region=args.aws_region, + ) + write_md_file( + output_file=output_file, + url=s3_url, + pr_number=args.pr_number, + head_sha=args.head_sha, + origin_repository=args.origin_repository, + ) + print(f"Loss graph uploaded to '{s3_url}'") + print(f"Markdown file written to '{output_file}'") + + +if __name__ == "__main__": + parser = ArgumentParser() + + parser.add_argument( + "--log-file", + type=str, + required=True, + help="The log file to read the loss data from.", + ) + parser.add_argument( + "--output-file", + type=str, + required=True, + help="The output file where the resulting markdown will be written.", + ) + parser.add_argument( + "--aws-region", + type=str, + required=True, + help="S3 region to which the bucket belongs.", + ) + parser.add_argument( + "--bucket-name", type=str, required=True, help="The S3 bucket name" + ) + parser.add_argument( + "--base-branch", + type=str, + required=True, + help="The base branch being merged to.", + ) + parser.add_argument("--pr-number", type=str, required=True, help="The PR number") + parser.add_argument( + "--head-sha", type=str, required=True, help="The head SHA of the PR" + ) + parser.add_argument( + "--origin-repository", + type=str, + required=True, + help="The repository to which the originating branch belongs to.", + ) + + args = parser.parse_args() + + arguments = Arguments( + log_file=args.log_file, + output_file=args.output_file, + aws_region=args.aws_region, + bucket_name=args.bucket_name, + base_branch=args.base_branch, + pr_number=args.pr_number, + head_sha=args.head_sha, + origin_repository=args.origin_repository, + ) + main(arguments) diff --git a/src/instructlab/training/chat_templates/ibm_generic_tmpl.py b/src/instructlab/training/chat_templates/ibm_generic_tmpl.py index 5eaf795e..1403276b 100644 --- a/src/instructlab/training/chat_templates/ibm_generic_tmpl.py +++ b/src/instructlab/training/chat_templates/ibm_generic_tmpl.py @@ -1,30 +1,44 @@ # SPDX-License-Identifier: Apache-2.0 # First Party -from instructlab.training.tokenizer_utils import SpecialTokens, TokenInfo +from instructlab.training.chat_templates.utils import SpecialTokens, TokenInfo SPECIAL_TOKENS = SpecialTokens( - system=TokenInfo("<|system|>", add_to_tokenizer=True), - user=TokenInfo("<|user|>", add_to_tokenizer=True), - assistant=TokenInfo("<|assistant|>", add_to_tokenizer=True), - eos=TokenInfo("<|endoftext|>", add_to_tokenizer=True), - pad=TokenInfo("<|pad|>", add_to_tokenizer=True), - bos=TokenInfo("<|begginingoftext|>", add_to_tokenizer=True), + start_role=TokenInfo("<|start_of_role|>", add_to_tokenizer=True), + end_role=TokenInfo("<|end_of_role|>", add_to_tokenizer=True), + tool=TokenInfo("<|tool_call|>", add_to_tokenizer=True), + eos=TokenInfo("<|end_of_text|>", add_to_tokenizer=True), + bos=TokenInfo("<|end_of_text|>", add_to_tokenizer=True), + pad=TokenInfo("<|end_of_text|>", add_to_tokenizer=True), ) CHAT_TEMPLATE = ( + "{%- if tools %}" + "{{ '<|start_of_role|>available_tools<|end_of_role|>\n' }}" + "{% for tool in tools %}" + "{{ tool | tojson(indent=4) }}" + "{% if not loop.last %}" + "{{- '\n\n' }}" + "{% endif %}" + "{% endfor %}" + "{{ '<|end_of_text|>\n' }}" + "{% endif %}" "{% for message in messages %}" - "{% if message['role'] == 'pretraining' %}" - "{{'<|pretrain|>' + message['content'] + '<|endoftext|>' + '<|/pretrain|>' }}" - "{% elif message['role'] == 'system' %}" - "{{'<|system|>'+ '\n' + message['content'] + '\n'}}" + "{% if message['role'] == 'system' %}" + "{{ '<|start_of_role|>system<|end_of_role|>' + message['content'] + '<|end_of_text|>\n' }}" + "{% elif message['role'] == 'pretraining' %}" + "{{ '<|pretrain|>' + message['content'] + '<|end_of_text|>' + '<|/pretrain|>'}}" "{% elif message['role'] == 'user' %}" - "{{'<|user|>' + '\n' + message['content'] + '\n'}}" + "{{ '<|start_of_role|>user<|end_of_role|>' + message['content'] + '<|end_of_text|>\n' }}" "{% elif message['role'] == 'assistant' %}" - "{{'<|assistant|>' + '\n' + message['content'] + '<|endoftext|>' + ('' if loop.last else '\n')}}" + "{{ '<|start_of_role|>assistant<|end_of_role|>' + message['content'] + '<|end_of_text|>\n' }}" + "{% elif message['role'] == 'assistant_tool_call' %}" + "{{ '<|start_of_role|>assistant<|end_of_role|><|tool_call|>' + message['content'] + '<|end_of_text|>\n' }}" + "{% elif message['role'] == 'tool_response' %}" + "{{ '<|start_of_role|>tool_response<|end_of_role|>' + message['content'] + '<|end_of_text|>\n' }}" "{% endif %}" "{% if loop.last and add_generation_prompt %}" - "{{ '<|assistant|>' + '\n' }}" + "{{ '<|start_of_role|>assistant<|end_of_role|>' }}" "{% endif %}" "{% endfor %}" ) diff --git a/src/instructlab/training/chat_templates/ibm_legacy_tmpl.py b/src/instructlab/training/chat_templates/ibm_legacy_tmpl.py new file mode 100644 index 00000000..0f09468f --- /dev/null +++ b/src/instructlab/training/chat_templates/ibm_legacy_tmpl.py @@ -0,0 +1,30 @@ +# SPDX-License-Identifier: Apache-2.0 + +# First Party +from instructlab.training.chat_templates.utils import SpecialTokens, TokenInfo + +SPECIAL_TOKENS = SpecialTokens( + system=TokenInfo("<|system|>", add_to_tokenizer=True), + user=TokenInfo("<|user|>", add_to_tokenizer=True), + assistant=TokenInfo("<|assistant|>", add_to_tokenizer=True), + eos=TokenInfo("<|endoftext|>", add_to_tokenizer=True), + pad=TokenInfo("<|pad|>", add_to_tokenizer=True), + bos=TokenInfo("<|begginingoftext|>", add_to_tokenizer=True), +) + +CHAT_TEMPLATE = ( + "{% for message in messages %}" + "{% if message['role'] == 'pretraining' %}" + "{{'<|pretrain|>' + message['content'] + '<|endoftext|>' + '<|/pretrain|>' }}" + "{% elif message['role'] == 'system' %}" + "{{'<|system|>'+ '\n' + message['content'] + '\n'}}" + "{% elif message['role'] == 'user' %}" + "{{'<|user|>' + '\n' + message['content'] + '\n'}}" + "{% elif message['role'] == 'assistant' %}" + "{{'<|assistant|>' + '\n' + message['content'] + '<|endoftext|>' + ('' if loop.last else '\n')}}" + "{% endif %}" + "{% if loop.last and add_generation_prompt %}" + "{{ '<|assistant|>' + '\n' }}" + "{% endif %}" + "{% endfor %}" +) diff --git a/src/instructlab/training/chat_templates/mistral_tmpl.py b/src/instructlab/training/chat_templates/mistral_tmpl.py index dda051b3..6c1e8757 100644 --- a/src/instructlab/training/chat_templates/mistral_tmpl.py +++ b/src/instructlab/training/chat_templates/mistral_tmpl.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # First Party -from instructlab.training.tokenizer_utils import SpecialTokens, TokenInfo +from instructlab.training.chat_templates.utils import SpecialTokens, TokenInfo SPECIAL_TOKENS = SpecialTokens( bos=TokenInfo("", add_to_tokenizer=True), diff --git a/src/instructlab/training/chat_templates/utils.py b/src/instructlab/training/chat_templates/utils.py new file mode 100644 index 00000000..c0e62796 --- /dev/null +++ b/src/instructlab/training/chat_templates/utils.py @@ -0,0 +1,29 @@ +# Standard +from dataclasses import dataclass, field +from typing import List + + +@dataclass +class TokenInfo: + token: str + add_to_tokenizer: bool = False + + +@dataclass +class SpecialTokens: + system: TokenInfo = field(default_factory=lambda: TokenInfo("")) + user: TokenInfo = field(default_factory=lambda: TokenInfo("")) + assistant: TokenInfo = field(default_factory=lambda: TokenInfo("")) + eos: TokenInfo = field(default_factory=lambda: TokenInfo("")) + pad: TokenInfo = field(default_factory=lambda: TokenInfo("")) + bos: TokenInfo = field(default_factory=lambda: TokenInfo("")) + start_role: TokenInfo = field(default_factory=lambda: TokenInfo("")) + end_role: TokenInfo = field(default_factory=lambda: TokenInfo("")) + tool: TokenInfo = field(default_factory=lambda: TokenInfo("")) + + def get_tokens_to_add(self) -> List[str]: + return [ + token_info.token + for token_info in self.__dict__.values() + if token_info.add_to_tokenizer and token_info.token + ] diff --git a/src/instructlab/training/config.py b/src/instructlab/training/config.py index 05fe4792..bf43f2eb 100644 --- a/src/instructlab/training/config.py +++ b/src/instructlab/training/config.py @@ -156,6 +156,9 @@ class TrainingArgs(BaseModel): os.path.dirname(__file__), "chat_templates/ibm_generic_tmpl.py" ) + # this field determines if ibm_legacy_tmpl should be used instead + use_legacy_tmpl: bool = False + # this field specifies the filepath to the training dataset before processing data_path: str ckpt_output_dir: str @@ -171,8 +174,9 @@ class TrainingArgs(BaseModel): save_samples: int learning_rate: float warmup_steps: int - is_padding_free: bool random_seed: int = 42 + use_dolomite: bool = False + is_padding_free: bool = False # TODO: deprecate checkpoint_at_epoch: bool = True accelerate_full_state_at_epoch: bool = True @@ -191,7 +195,7 @@ class TrainingArgs(BaseModel): cpu_offload_params=False, sharding_strategy=ShardingStrategies.SHARD_GRAD_OP ) ) - distributed_backend: DistributedBackend = DistributedBackend.DEEPSPEED + distributed_backend: DistributedBackend = DistributedBackend.FSDP disable_flash_attn: Optional[bool] = False diff --git a/src/instructlab/training/data_process.py b/src/instructlab/training/data_process.py index 4bd7c789..389a8cb0 100644 --- a/src/instructlab/training/data_process.py +++ b/src/instructlab/training/data_process.py @@ -28,7 +28,11 @@ def check_valid_sample( if len(whole_sentence_tk) >= max_len or len(whole_sentence_tk) < 20: return False # last token should be eos_token - if not eos_tk[0] in (whole_sentence_tk[-1], whole_sentence_tk[-2]): + if not eos_tk[0] in ( + whole_sentence_tk[-1], + whole_sentence_tk[-2], + whole_sentence_tk[-3], + ): return False # NOTE - below checks are no longer strictly required, but we may want to revisit to make sure there's nothing we need to bring back in validity checking @@ -61,6 +65,7 @@ def unmask_message_content( system_tokens, pretrain_token, pretrain_end_token, + tool_resp_tokens=None, ): """ Create labels for tokens in a sequence with special handling for pretraining tokens and role-specific sequences. @@ -130,6 +135,10 @@ def find_longest_match(start_idx, sequences): default=None, ) + special_sequences = [user_tokens, assist_tokens, system_tokens] + if tool_resp_tokens: + special_sequences.append(tool_resp_tokens) + in_pretraining = False unmasking = False i = 0 @@ -143,7 +152,7 @@ def find_longest_match(start_idx, sequences): i += 1 continue - match = find_longest_match(i, [user_tokens, assist_tokens, system_tokens]) + match = find_longest_match(i, special_sequences) if match: unmasking = match == assist_tokens i += len(match) @@ -167,8 +176,6 @@ def find_longest_match(start_idx, sequences): ] # Assertions - special_sequences = [user_tokens, assist_tokens, system_tokens] - # 1. No special sequence of tokens should be unmasked for i in range(len(final_sentence_tk)): for seq in special_sequences: @@ -199,7 +206,7 @@ def print_masked_samples(data, tokenizer, is_pretrain, num_proc): def get_masked_and_orig_text(sample): labels = sample["labels"] input_ids = sample["input_ids"] - mask_id = get_sp_token(tokenizer, "")[0] + mask_id = get_sp_token(tokenizer, "<|MASK|>")[0] label = [mask_id if tk == -100 else tk for tk in labels] text = tokenizer.decode(label) orig_text = tokenizer.decode(input_ids) @@ -229,17 +236,50 @@ def main(args: DataProcessArgs): CHAT_TEMPLATE, SPECIAL_TOKENS = retrieve_chat_template(args.chat_tmpl_path) tokenizer = setup_tokenizer(args.model_path, SPECIAL_TOKENS, CHAT_TEMPLATE) - system_tk, user_tk, assistant_tk, eos_tk, pad_tk, bos_tk = [ + ( + system_tk, + user_tk, + assistant_tk, + eos_tk, + pad_tk, + bos_tk, + start_role_tk, + end_role_tk, + _, + ) = [ get_sp_token(tokenizer, getattr(SPECIAL_TOKENS, sp).token) for sp in SPECIAL_TOKENS.__annotations__.keys() ] + if start_role_tk and end_role_tk: + system_tk = ( + start_role_tk + + tokenizer.encode("system", add_special_tokens=False) + + end_role_tk + ) + user_tk = ( + start_role_tk + + tokenizer.encode("user", add_special_tokens=False) + + end_role_tk + ) + assistant_tk = ( + start_role_tk + + tokenizer.encode("assistant", add_special_tokens=False) + + end_role_tk + ) + tool_resp_tk = ( + start_role_tk + + tokenizer.encode("tool_response", add_special_tokens=False) + + end_role_tk + ) + else: + tool_resp_tk = None log_rank_0( f"Special tokens: eos: {eos_tk}, pad: {pad_tk}, bos: {bos_tk}, system: {system_tk}, user: {user_tk}, assistant: {assistant_tk}" ) # Adding after tokenizer setup as these are temp tokens, not to be saved tokenizer.add_special_tokens( - {"additional_special_tokens": ["<|pretrain|>", "<|/pretrain|>", ""]} + {"additional_special_tokens": ["<|pretrain|>", "<|/pretrain|>", "<|MASK|>"]} ) try: @@ -324,6 +364,7 @@ def main(args: DataProcessArgs): system_tokens=system_tk, pretrain_token=get_sp_token(tokenizer, "<|pretrain|>")[0], pretrain_end_token=get_sp_token(tokenizer, "<|/pretrain|>")[0], + tool_resp_tokens=tool_resp_tk, ) print("\033[92munmasking the appropriate message content...\033[0m") data_with_labels = data_with_input_ids.map( @@ -347,9 +388,26 @@ def main(args: DataProcessArgs): ) # extract only labels and messages formatted into a new dataset - data_with_labels = data_with_labels.select_columns(["labels", "input_ids"]) + data_with_labels = data_with_labels.map( + lambda x: { + "len": len(x["input_ids"]), + }, + num_proc=NUM_PROC, + ) + data_with_labels = data_with_labels.select_columns(["labels", "input_ids", "len"]) + # MASK and both pretrain tokens should not be in the final tokens, those are special tokens added only for data processing purposes. + max_id = len(tokenizer) - 3 + final_valid_data = data_with_labels.filter( + lambda x: all(tk < max_id for tk in x["labels"]), num_proc=NUM_PROC + ) + # Dropping samples that could break training due to oob ids + if len(final_valid_data) < len(data_with_labels): + dropped_samples = len(data_with_labels) - len(final_valid_data) + print( + f"\033[93mWarning: {dropped_samples} samples were dropped because they contained token IDs greater than or equal to {max_id}.\033[0m" + ) # use path to get the stem of the file - data_with_labels.to_json(Path(args.data_output_path) / f"data.jsonl") + final_valid_data.to_json(Path(args.data_output_path) / "data.jsonl") if __name__ == "__main__": diff --git a/src/instructlab/training/main_ds.py b/src/instructlab/training/main_ds.py index c5cdb2ba..0ad54c5f 100644 --- a/src/instructlab/training/main_ds.py +++ b/src/instructlab/training/main_ds.py @@ -4,6 +4,7 @@ from copy import deepcopy from pathlib import Path import argparse +import json import math import os import re @@ -12,11 +13,32 @@ # Third Party from accelerate import Accelerator -from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam -from deepspeed.runtime.zero.utils import ZeRORuntimeException -# pylint: disable=no-name-in-module +try: + # Third Party + from deepspeed.ops.adam import DeepSpeedCPUAdam +except ImportError: + DeepSpeedCPUAdam = None + local_rank = int(os.getenv("LOCAL_RANK", "0")) + if __name__ == "__main__" and (not local_rank or local_rank == 0): + print( + "DeepSpeed CPU Optimizer is not available. Some features may be unavailable." + ) + +try: + # Third Party + from deepspeed.ops.adam import FusedAdam + from deepspeed.runtime.zero.utils import ZeRORuntimeException +except ImportError: + FusedAdam = None + ZeRORuntimeException = None + local_rank = int(os.getenv("LOCAL_RANK", "0")) + if __name__ == "__main__" and (not local_rank or local_rank == 0): + print("DeepSpeed is not available. Some features may be unavailable.") + +# Third Party from instructlab.dolomite.hf_models import GPTDolomiteForCausalLM +from torch.utils.data import DataLoader from tqdm import tqdm from transformers import AutoModelForCausalLM, get_scheduler import torch @@ -25,6 +47,8 @@ # First Party from instructlab.training import config from instructlab.training.async_logger import AsyncStructuredLogger + +# pylint: disable=no-name-in-module from instructlab.training.config import ( DataProcessArgs, DistributedBackend, @@ -41,9 +65,11 @@ StreamablePopen, add_noisy_embeddings, apply_gradient_checkpointing, + check_flash_attn_enabled, + check_valid_train_args, convert_loss_to_reduce_sum, - ensure_loadable_granite_checkpoint, - get_projection_layer_names, + create_lora_config, + ensure_loadable_dolomite_checkpoint, load_latest_full_state, prepare_peft_model, prepare_universal_checkpoint_from_latest, @@ -84,7 +110,7 @@ def setup_optimizer(args, model): return optimizer -def setup_model(args, tokenizer, train_loader, grad_accum): +def setup_model(args, tokenizer, train_loader, grad_accum, flash_enabled): bnb_config = None if args.lora_r > 0 and args.lora_quant_bits == 4: # Third Party @@ -102,25 +128,24 @@ def setup_model(args, tokenizer, train_loader, grad_accum): "torch_dtype": torch.bfloat16, "quantization_config": bnb_config, } - if not args.disable_flash_attn: + if flash_enabled: base_model_args["attn_implementation"] = "flash_attention_2" - elif args.is_granite: - raise RuntimeError( - "ERROR: Trying to use padding-free transformer without flash attention is not supported" - ) - if args.is_granite: - with ensure_loadable_granite_checkpoint( + if args.use_dolomite: + with ensure_loadable_dolomite_checkpoint( args.model_name_or_path, args.output_dir ) as path: base_model_args["pretrained_model_name_or_path"] = path + base_model_args["use_padding_free_transformer"] = True model = GPTDolomiteForCausalLM.from_pretrained( **base_model_args, - use_padding_free_transformer=True, ) else: model = AutoModelForCausalLM.from_pretrained(**base_model_args) + # store the base model args so we can recall them later if saving a LoRA model + args.base_model_args = base_model_args + if len(tokenizer) > model.config.vocab_size: print( f"WARNING: tokenizer has {len(tokenizer)} tokens but model has {model.config.vocab_size} vocab size" @@ -165,9 +190,10 @@ def setup_model(args, tokenizer, train_loader, grad_accum): "Starcoder2ForCausalLM", "GemmaForCausalLM", "MixtralForCausalLM", + "GraniteForCausalLM", ], f"Model class name: {model.__class__.__name__} is not supported." - model = convert_loss_to_reduce_sum(model, is_granite=args.is_granite) + model = convert_loss_to_reduce_sum(model, use_dolomite=args.use_dolomite) model = add_noisy_embeddings(model, noise_alpha=args.NEFTune_alpha) # handling of gradient checkpointing @@ -175,52 +201,20 @@ def setup_model(args, tokenizer, train_loader, grad_accum): # - with the exception of granite, which handles it # in the later stanza if args.lora_r > 0: - # if lora - # Third Party - from peft import LoraConfig - - # ensure we select only the modules that exist in the model - proj_layers = get_projection_layer_names(model) - if not args.lora_target_modules: - print( - f"WARNING: lora_target_modules was not specified, defaulting to all of the model's projection modules" - ) - if not proj_layers: - raise RuntimeError("could not find any projection layers in the model") - args.__dict__["lora_target_modules"] = proj_layers - else: - # when the user specifies the module, we should verify that they align with what's in the model - lora_target_modules_set = set(args.lora_target_modules) - diff = lora_target_modules_set - set(proj_layers) - layers_to_target = lora_target_modules_set - diff - if len(diff) == len(args.lora_target_modules): - raise ValueError( - f"None of the modules you requested exist in the model.\nRequested modules: {args.lora_target_modules}; Available modules: {proj_layers}.\nThis is usually a misconfiuration error. Consider omitting your `lora_target_modules` list to have these discovered automatically." - ) - if diff: - print( - f"\033[33mWARNING: the following modules were targeted for LoRA but are not present in the model: {list(diff)}. Applying LoRA only to {list(layers_to_target)} modules.\033[0m" - ) - args.__dict__["lora_target_modules"] = list(layers_to_target) - - peft_config = LoraConfig( - lora_alpha=args.lora_alpha, - lora_dropout=args.lora_dropout, - r=args.lora_r, - bias="none", - task_type="CAUSAL_LM", - target_modules=args.lora_target_modules, - ) + lora_config = create_lora_config(model, args) model = prepare_peft_model( - model, peft_config, gradient_checkpointing=not args.is_granite + model, + lora_config, + args.distributed_training_framework, + gradient_checkpointing=not args.use_dolomite, ) - - elif not args.is_granite: + args.lora_config = lora_config + elif not args.use_dolomite: model.gradient_checkpointing_enable() # granite gradient checkpointing is handled uniformly # for both lora and full here - if args.is_granite: + if args.use_dolomite: block_name = model._no_split_modules[0] apply_gradient_checkpointing( model, @@ -252,6 +246,9 @@ def make_inputs_require_grad(module, input, output): deepcopy(train_loader), lr_scheduler, ) + # Necessary so that Accelerate does not step once per GPU + # see https://github.com/huggingface/accelerate/blob/127818fc27ebe5cb236357fff59ff1748326d643/src/accelerate/scheduler.py#L69 + lr_scheduler.split_batches = True return model, lr_scheduler, optimizer, accelerator @@ -322,7 +319,7 @@ def train( lr_scheduler, accelerator: Accelerator, tokenizer, - train_loader, + train_loader: DataLoader, grad_accum, metric_logger, ): @@ -381,8 +378,8 @@ def train( num_loss_counted_tokens = float( torch.tensor([batch.pop("num_loss_counted_tokens")]) ) - micro_batch_size = float(len(batch["input_ids"])) - if not args.is_granite: + micro_batch_size = float(torch.tensor([batch.pop("num_samples")])) + if not args.use_dolomite: for k in batch: batch[k] = batch[k].to(local_rank) output = model( @@ -453,7 +450,8 @@ def train( "batch_size": int(micro_batch_size), "total_loss": float(log_loss / num_loss_counted_tokens), "samples_seen": samples_seen, - # "gradnorm": global_grad_norm, + "gradnorm": global_grad_norm, + "total_samples": len(train_loader.dataset), # "weight_norm": weight_norm, } ) @@ -513,6 +511,20 @@ def main(args): # Third Party import yaml + if args.distributed_training_framework == "deepspeed" and not FusedAdam: + raise ImportError( + "DeepSpeed was selected but we cannot import the `FusedAdam` optimizer" + ) + + if ( + args.distributed_training_framework == "deepspeed" + and args.cpu_offload_optimizer + and not DeepSpeedCPUAdam + ): + raise ImportError( + "DeepSpeed was selected and CPU offloading was requested, but DeepSpeedCPUAdam could not be imported. This likely means you need to build DeepSpeed with the CPU adam flags." + ) + metric_logger = AsyncStructuredLogger( args.output_dir + f"/training_params_and_metrics_global{os.environ['RANK']}.jsonl" @@ -526,6 +538,10 @@ def main(args): tokenizer = setup_tokenizer(args.model_name_or_path, SPECIAL_TOKENS, CHAT_TEMPLATE) # device = torch.device("cuda", args.local_rank) + with open(Path(args.model_name_or_path) / "config.json") as conf_json: + model_conf = json.load(conf_json) + args.model_type = model_conf["model_type"] + #### distributed init ##### torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) args.local_rank = int(os.environ["LOCAL_RANK"]) @@ -535,6 +551,8 @@ def main(args): torch.distributed.all_reduce(tensor) torch.distributed.barrier() + flash_enabled = check_flash_attn_enabled(args.disable_flash_attn, args.use_dolomite) + dataset = setup_dataset( args.data_path, mock=args.mock_data, @@ -547,7 +565,7 @@ def main(args): avg_sample_len=dataset.get_lengths().mean(), effective_batch_size=args.effective_batch_size, max_batch_len_per_gpu=args.max_batch_len, - is_padding=not args.is_granite, + is_padding=not (args.use_dolomite or flash_enabled), dataset=dataset, seed=args.seed, ) @@ -570,7 +588,8 @@ def main(args): dataset, tokenizer.pad_token_id, num_workers=8, - is_granite=args.is_granite, + use_dolomite=args.use_dolomite, + flash_enabled=flash_enabled, max_batch_len=args.max_batch_len, packing_max_batch_len=packing_max_batch_len, samples_per_gpu=args.samples_per_gpu, @@ -589,7 +608,8 @@ def main(args): dataset, tokenizer.pad_token_id, num_workers=8, - is_granite=args.is_granite, + use_dolomite=args.use_dolomite, + flash_enabled=flash_enabled, max_batch_len=args.max_batch_len, packing_max_batch_len=packing_max_batch_len, samples_per_gpu=args.samples_per_gpu, @@ -609,11 +629,12 @@ def main(args): "num_batches": len(train_loader), "avg_samples_per_batch": len(dataset) / len(train_loader), "samples_per_gpu": args.samples_per_gpu, + "total_samples": len(dataset), # emit the total number of samples } ) model, lr_scheduler, optimizer, accelerator = setup_model( - args, tokenizer, train_loader, grad_accum + args, tokenizer, train_loader, grad_accum, flash_enabled ) load_latest_full_state(args=args, accelerator=accelerator) @@ -639,10 +660,12 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: """ Wrapper around the main training job that calls torchrun. """ - # early validation logic here - if train_args.max_batch_len < train_args.max_seq_len: - raise ValueError( - f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}" + check_valid_train_args(train_args) + + # switch out generic tmpl for legacy tmpl if requested + if train_args.use_legacy_tmpl: + train_args.chat_tmpl_path = os.path.join( + os.path.dirname(__file__), "chat_templates/ibm_legacy_tmpl.py" ) if train_args.process_data: @@ -697,14 +720,10 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: if train_args.mock_len: command.append(f"--mock_len={train_args.mock_len}") - if train_args.is_padding_free: - command.append("--is_granite") + if train_args.use_dolomite: + command.append("--use_dolomite") if train_args.disable_flash_attn: - if train_args.is_padding_free: - raise RuntimeError( - "ERROR: Trying to use padding-free transformer without flash attention is not supported" - ) command.append("--disable_flash_attn") if train_args.lora: @@ -733,6 +752,16 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: ) # deepspeed options + if train_args.distributed_backend == DistributedBackend.DEEPSPEED: + if not FusedAdam: + raise ImportError( + "DeepSpeed was selected as the distributed backend, but FusedAdam could not be imported. Please double-check that DeepSpeed is installed correctly" + ) + + if train_args.deepspeed_options.cpu_offload_optimizer and not DeepSpeedCPUAdam: + raise ImportError( + "DeepSpeed CPU offloading was enabled, but DeepSpeedCPUAdam could not be imported. This is most likely because DeepSpeed was not built with CPU Adam. Please rebuild DeepSpeed to have CPU Adam, or disable CPU offloading." + ) if train_args.deepspeed_options.save_samples: command.append(f"--save_samples_ds={train_args.deepspeed_options.save_samples}") if train_args.deepspeed_options.cpu_offload_optimizer: @@ -761,6 +790,7 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: print(f"\033[92mRunning training command as subprocess: {' '.join(command)}\033[0m") process = None interrupt: KeyboardInterrupt | Exception | None = None + failure = False try: process = StreamablePopen( f"{train_args.ckpt_output_dir}/full_logs_global{torch_args.node_rank}.log", @@ -771,19 +801,20 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: print("Training subprocess interrupted by user.") interrupt = e except Exception as e: - print(f"An error occurred: {str(e)}") + print("Unexpected exception received during distributed training") interrupt = e finally: if "process" not in locals() or process is None: return - if process.poll() == 0: - print("\033[92mTraining subprocess exited successfully! 🎉\033[0m") + + failure = process.poll() != 0 + if not failure: + print("\033[92mOperation completed successfully! 🎉\033[0m") else: print( "\033[91mTraining subprocess has not exited yet. Sending SIGTERM.\033[0m" ) - print("Sending interrupt signal to Training subprocess.") process.terminate() try: print("Waiting for process to exit, 60s...") @@ -795,8 +826,11 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: process.kill() if interrupt: - print(f"Error caught from training subprocess.: {interrupt}") raise interrupt + if failure: + raise RuntimeError( + "Suffered a failure during distributed training. Please see the training logs for more context." + ) if __name__ == "__main__": @@ -883,7 +917,7 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: default="SHARD_GRAD_OP", help="Sharding strategy to be used for FSDP distributed training.", ) - parser.add_argument("--is_granite", action="store_true") + parser.add_argument("--use_dolomite", action="store_true") parser.add_argument("--lora_r", type=int, default=0) # set to > 0 to activate lora parser.add_argument("--lora_alpha", type=int, default=32) parser.add_argument("--lora_dropout", type=float, default=0.1) @@ -972,7 +1006,7 @@ def run_training(torch_args: TorchrunArgs, train_args: TrainingArgs) -> None: --save_samples=250000 \ --log_level="INFO" \ --fsdp_sharding_strategy="SHARD_GRAD_OP" \ ---is_granite \ +--use_dolomite \ --max_batch_len 70000 \ --seed=42 """ diff --git a/src/instructlab/training/setup_accelerator.py b/src/instructlab/training/setup_accelerator.py index 33972b59..c7d079e6 100644 --- a/src/instructlab/training/setup_accelerator.py +++ b/src/instructlab/training/setup_accelerator.py @@ -3,12 +3,10 @@ # Third Party from accelerate import Accelerator -from torch.distributed.fsdp import ( # FullyShardedDataParallel as FSDP, - BackwardPrefetch, - MixedPrecision, - ShardingStrategy, -) +from peft.utils.other import fsdp_auto_wrap_policy +from torch.distributed.fsdp import BackwardPrefetch, MixedPrecision, ShardingStrategy from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy +from transformers import PreTrainedModel import torch # First Party @@ -51,37 +49,60 @@ def get_ds_plugin(world_size, samples_per_gpu, grad_accum, opts: DeepSpeedOption return ds_plugin -def get_fsdp_config(args, model): +def get_fsdp_config(args, model: PreTrainedModel): # Third Party from accelerate.utils import FullyShardedDataParallelPlugin from torch.distributed.fsdp.fully_sharded_data_parallel import CPUOffload + is_lora = args.lora_r > 0 block_name = model._no_split_modules[0] - fsdp_plugin = FullyShardedDataParallelPlugin( - auto_wrap_policy=partial( + wrap_policy = None + if is_lora > 0: + wrap_policy = fsdp_auto_wrap_policy(model) + else: + wrap_policy = partial( transformer_auto_wrap_policy, transformer_layer_cls={ get_module_class_from_name(model, block_name), }, - ), + ) + + # TODO(osilkin): BACKWARD_POST trades memory utilization for processing time, which is important for systems utilizing LoRA + # We should have this be configurable in the future. + prefetch_policy = ( + BackwardPrefetch.BACKWARD_POST if is_lora else BackwardPrefetch.BACKWARD_PRE + ) + fsdp_plugin = FullyShardedDataParallelPlugin( + auto_wrap_policy=wrap_policy, limit_all_gathers=True, mixed_precision_policy=MixedPrecision( param_dtype=torch.bfloat16, reduce_dtype=torch.bfloat16, buffer_dtype=torch.bfloat16, ), - backward_prefetch=BackwardPrefetch.BACKWARD_PRE, + backward_prefetch=prefetch_policy, sharding_strategy=ShardingStrategy[args.fsdp_sharding_strategy], cpu_offload=CPUOffload(args.cpu_offload_params_fsdp), ) + + # `use_orig_params` must be disabled when using LoRA and FSDP together + # Source: https://huggingface.co/docs/peft/en/accelerate/fsdp#the-important-parts + if args.lora_r > 0: + fsdp_plugin.use_orig_params = False + return fsdp_plugin -def setup_accelerator(args, model, grad_accum): +def setup_accelerator(args, model: PreTrainedModel, grad_accum): if args.distributed_training_framework == "deepspeed": - # Third Party - from deepspeed import DeepSpeedEngine + try: + # Third Party + from deepspeed import DeepSpeedEngine + except ImportError as exc: + raise ImportError( + "DeepSpeed selected as distributed framework, but not installed" + ) from exc # patch deepspeed to work with quantized models. if args.lora_quant_bits is not None: diff --git a/src/instructlab/training/token_dataset.py b/src/instructlab/training/token_dataset.py index 9d46607e..fda9a751 100644 --- a/src/instructlab/training/token_dataset.py +++ b/src/instructlab/training/token_dataset.py @@ -17,12 +17,15 @@ class TokenDataset(Dataset): def __init__(self, data_path): self.data = load_dataset("json", data_files=data_path, split="train") - self.lengths = np.array( - self.data.map( - lambda x: {"len": len(x["input_ids"])}, - num_proc=8, - )["len"] - ) + if "len" not in self.data.column_names: + self.lengths = np.array( + self.data.map( + lambda x: {"len": len(x["input_ids"])}, + num_proc=8, + )["len"] + ) + else: + self.lengths = np.array(self.data["len"]) def __len__(self): return len(self.data) @@ -87,7 +90,8 @@ def setup_dataloader( dataset: Dataset, pad_token_id: int, num_workers: int = 8, - is_granite=False, + use_dolomite=False, + flash_enabled=True, max_batch_len=60000, packing_max_batch_len=60000, samples_per_gpu=None, @@ -95,7 +99,10 @@ def setup_dataloader( seed=47, ) -> DataLoader: collate_fn = make_collate_fn( - pad_token_id, is_granite=is_granite, max_batch_len=max_batch_len + pad_token_id, + use_dolomite=use_dolomite, + flash_enabled=flash_enabled, + max_batch_len=max_batch_len, ) rank = int(os.environ["RANK"]) world_size = int(os.environ["WORLD_SIZE"]) @@ -108,7 +115,7 @@ def setup_dataloader( num_replicas=world_size, rank=rank, seed=seed, - padding=not is_granite, + padding=not flash_enabled, ) sampler = {"batch_sampler": sampler} elif sampler == "distributed": diff --git a/src/instructlab/training/tokenizer_utils.py b/src/instructlab/training/tokenizer_utils.py index 45ad4699..d6c55e7e 100644 --- a/src/instructlab/training/tokenizer_utils.py +++ b/src/instructlab/training/tokenizer_utils.py @@ -1,9 +1,5 @@ # SPDX-License-Identifier: Apache-2.0 -# Standard -from dataclasses import dataclass, field -from typing import List - # Third Party from transformers import AutoTokenizer, PreTrainedTokenizer @@ -11,29 +7,6 @@ from instructlab.training.utils import log_rank_0 -@dataclass -class TokenInfo: - token: str - add_to_tokenizer: bool = False - - -@dataclass -class SpecialTokens: - system: TokenInfo = field(default_factory=lambda: TokenInfo("")) - user: TokenInfo = field(default_factory=lambda: TokenInfo("")) - assistant: TokenInfo = field(default_factory=lambda: TokenInfo("")) - eos: TokenInfo = field(default_factory=lambda: TokenInfo("")) - pad: TokenInfo = field(default_factory=lambda: TokenInfo("")) - bos: TokenInfo = field(default_factory=lambda: TokenInfo("")) - - def get_tokens_to_add(self) -> List[str]: - return [ - token_info.token - for token_info in self.__dict__.values() - if token_info.add_to_tokenizer and token_info.token - ] - - def setup_tokenizer( model_name_or_path, SPECIAL_TOKENS, CHAT_TEMPLATE ) -> PreTrainedTokenizer: diff --git a/src/instructlab/training/utils.py b/src/instructlab/training/utils.py index 6d79d897..41ec413f 100644 --- a/src/instructlab/training/utils.py +++ b/src/instructlab/training/utils.py @@ -1,13 +1,14 @@ # SPDX-License-Identifier: Apache-2.0 # Standard +from argparse import Namespace from collections import OrderedDict from contextlib import contextmanager from copy import deepcopy from functools import partial from pathlib import Path from tempfile import TemporaryDirectory -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple import importlib import inspect import logging @@ -21,7 +22,7 @@ # Third Party # pylint: disable=no-name-in-module -from accelerate import Accelerator +from accelerate import Accelerator, DistributedType from instructlab.dolomite.hf_models import ( GPTDolomiteConfig, export_to_huggingface, @@ -29,17 +30,75 @@ ) from rich.logging import RichHandler from torch import distributed as dist +from torch import nn from torch.distributed import get_rank, is_initialized from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import ( CheckpointImpl, apply_activation_checkpointing, checkpoint_wrapper, ) -from transformers import PreTrainedModel +from torch.distributed.fsdp import FullStateDictConfig +from torch.distributed.fsdp import FullyShardedDataParallel as FSDP +from torch.distributed.fsdp import StateDictType +from transformers import AutoModelForCausalLM, PreTrainedModel, PreTrainedTokenizer import numpy as np import torch import torch.nn.functional as F +# First Party +from instructlab.training.config import ( + DistributedBackend, + QuantizeDataType, + TrainingArgs, +) + + +def check_valid_train_args(train_args: TrainingArgs): + # early validation logic here + if train_args.max_batch_len < train_args.max_seq_len: + raise ValueError( + f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}" + ) + + if os.path.exists(train_args.model_path): + if not os.path.isdir(train_args.model_path): + raise FileNotFoundError( + "Model path does not appear to be a directory. Please make sure that you're passing a Hugging Face Transformers compatible directory checkpoint." + ) + else: + raise FileNotFoundError( + f"Provided path to model does not exist. Please make sure that you've passed a valid model and that it has appropriate permissions: {train_args.model_path}" + ) + + if train_args.use_dolomite and train_args.disable_flash_attn: + raise RuntimeError( + "ERROR: Trying to use dolomite padding-free transformer without flash attention is not supported" + ) + + if train_args.is_padding_free: + print( + "\033[33m WARNING: is_padding_free is being deprecated due to adoption of the default padding-free support in Hugging Face Transformers. As such, this flag is non-functional in 0.6.0 and beyond. If you would like to use the older Dolomite padding-free implementation, please set use_dolomite moving forward.\033[0m" + ) + + if ( + train_args.accelerate_full_state_at_epoch + and train_args.lora + and train_args.lora.rank > 0 + ): + raise ValueError( + "`accelerate_full_state_at_epoch` is not currently supported when training LoRA models." + ) + + if ( + train_args.lora + and train_args.lora.rank > 0 + and train_args.lora.quantize_data_type != QuantizeDataType.NONE + and train_args.distributed_backend == DistributedBackend.FSDP.value + ): + raise ValueError( + "Quantization is not supported when training LoRA models with FSDP. For quantized LoRA training, please switch to DeepSpeed." + ) + def retrieve_chat_template(chat_tmpl_path): try: @@ -111,9 +170,39 @@ def listen(self): break -def make_collate_fn(pad_token_id, is_granite=False, max_batch_len=60000): +def supports_flash_attention(device_id=0): + """Check if a GPU supports FlashAttention.""" + major, minor = torch.cuda.get_device_capability(device_id) + # Check if the GPU architecture is Ampere (SM 8.x) or newer (SM 9.0) + is_sm8x = major == 8 and minor >= 0 + is_sm90 = major == 9 and minor == 0 + dev_name = torch.cuda.get_device_properties(device_id).gcnArchName.split(":")[0] + is_compat_amd = dev_name in ("gfx90a", "gfx940", "gfx941", "gfx942") + return is_sm8x or is_sm90 or is_compat_amd + + +def check_flash_attn_enabled(disable_flash_attn: bool, use_dolomite: bool) -> bool: + if not disable_flash_attn: + if supports_flash_attention(): + flash_enabled = True + else: + raise RuntimeError( + "ERROR: Trying to use Flash Attention on unsupported hardware. Please set disable_flash_attn to True." + ) + elif use_dolomite: + raise RuntimeError( + "ERROR: Trying to use dolomite padding-free transformer without flash attention is not supported" + ) + else: + flash_enabled = False + return flash_enabled + + +def make_collate_fn( + pad_token_id, use_dolomite=False, flash_enabled=True, max_batch_len=60000 +): rank = int(os.environ["RANK"]) - if is_granite: + if use_dolomite: def pad_collate_fn(batch): lens = np.array([len(item["input_ids"]) for item in batch]) @@ -140,70 +229,108 @@ def pad_collate_fn(batch): "input_ids": input_ids, "labels": labels, "num_loss_counted_tokens": num_loss_counted_tokens, + "num_samples": len(batch), } else: + if flash_enabled: + + def pad_collate_fn(batch): + input_ids = [] + labels = [] + position_ids = [] + total_len = 0 + num_loss_counted_tokens = 0 + + for num_samples, item in enumerate(batch): + item_len = len(item["input_ids"]) + if total_len + item_len > max_batch_len: + break + + input_ids.extend(item["input_ids"].tolist()) + labels.extend(item["labels"].tolist()) + position_ids.extend(range(item_len)) + + total_len += item_len + num_loss_counted_tokens += (item["labels"] != -100).sum().item() + + print( + f"\033[96m total length: {total_len} " + f"num samples {len(batch)} - rank: {rank} " + f"num_loss_counted_tokens: {num_loss_counted_tokens}\033[0m" + ) - def pad_collate_fn(batch): - lens = np.array([len(item["input_ids"]) for item in batch]) - max_len = max(lens) - - input_ids = torch.stack( - [ - F.pad( - item["input_ids"], - (max_len - len(item["input_ids"]), 0), - mode="constant", - value=pad_token_id, - ) - for item in batch - ] - ) - labels = torch.stack( - [ - F.pad( - item["labels"], - (max_len - len(item["labels"]), 0), - mode="constant", - value=-100, - ) - for item in batch - ] - ) - num_loss_counted_tokens = (labels != -100).sum() - - attention_mask = torch.stack( - [ - F.pad( - item["attention_mask"], - (max_len - len(item["attention_mask"]), 0), - mode="constant", - value=0, - ) - for item in batch - ] - ) - print( - f"\033[96m total tokens: {max_len * len(batch)} num samples: {len(batch)} num padding tokens: {max_len * len(batch) - lens.sum()} - rank: {rank} " - f"max len: {max_len} min len: {min(lens)} avg len: {lens.mean()} " - f"num_loss_counted_tokens: {num_loss_counted_tokens}\033[0m" - ) + return { + "input_ids": torch.tensor([input_ids], dtype=torch.long), + "labels": torch.tensor([labels], dtype=torch.long), + "position_ids": torch.tensor([position_ids], dtype=torch.long), + "num_loss_counted_tokens": num_loss_counted_tokens, + "num_samples": num_samples + 1, # pylint: disable=W0631 + } - return { - "input_ids": input_ids, - "labels": labels, - "num_loss_counted_tokens": num_loss_counted_tokens, - "attention_mask": attention_mask, - } + else: + + def pad_collate_fn(batch): + lens = np.array([len(item["input_ids"]) for item in batch]) + max_len = max(lens) + + input_ids = torch.stack( + [ + F.pad( + item["input_ids"], + (max_len - len(item["input_ids"]), 0), + mode="constant", + value=pad_token_id, + ) + for item in batch + ] + ) + labels = torch.stack( + [ + F.pad( + item["labels"], + (max_len - len(item["labels"]), 0), + mode="constant", + value=-100, + ) + for item in batch + ] + ) + num_loss_counted_tokens = (labels != -100).sum() + + attention_mask = torch.stack( + [ + F.pad( + item["attention_mask"], + (max_len - len(item["attention_mask"]), 0), + mode="constant", + value=0, + ) + for item in batch + ] + ) + print( + f"\033[96m total tokens: {max_len * len(batch)} num samples: {len(batch)} num padding tokens: {max_len * len(batch) - lens.sum()} - rank: {rank} " + f"max len: {max_len} min len: {min(lens)} avg len: {lens.mean()} " + f"num_loss_counted_tokens: {num_loss_counted_tokens}\033[0m" + ) + + return { + "input_ids": input_ids, + "labels": labels, + "num_loss_counted_tokens": num_loss_counted_tokens, + "attention_mask": attention_mask, + "num_samples": len(batch), + } return pad_collate_fn -def convert_loss_to_reduce_sum(model, is_granite=False): +def convert_loss_to_reduce_sum(model, use_dolomite=False): """ this is necessary because multipack changes the samples per gpu, which biases the gradients to be larger for batches with less samples but longer lengths. """ - if is_granite: + if use_dolomite: def get_autoregressive_language_modeling_loss( lm_logits: torch.Tensor, @@ -304,9 +431,133 @@ def patch_target_module( setattr(source, obj_name_to_patch, replace_with) +def wraps(module: nn.Module, wrapped_classes: Tuple[Any]) -> bool: + """Checks if a module or its children are an instance of one of the provided classes. + + Args: + module (nn.Module): A PyTorch module. + wrapped_classes(Tuple): A tuple of potential classes the module could be. + + Returns: + bool: True if the module or any of its children are instances of one of `wrapped_classes`, False otherwise. + """ + if isinstance(module, wrapped_classes): + return True + + for m in module.children(): + if wraps(m, wrapped_classes): + return True + + return False + + +def create_lora_config(model: PreTrainedModel, args: Namespace) -> "peft.LoraConfig": + # if lora + # Third Party + from peft import LoraConfig + + # ensure we select only the modules that exist in the model + proj_layers = get_projection_layer_names(model) + if not args.lora_target_modules: + print( + f"WARNING: lora_target_modules was not specified, defaulting to all of the model's projection modules" + ) + if not proj_layers: + raise RuntimeError("could not find any projection layers in the model") + args.__dict__["lora_target_modules"] = proj_layers + else: + # when the user specifies the module, we should verify that they align with what's in the model + lora_target_modules_set = set(args.lora_target_modules) + diff = lora_target_modules_set - set(proj_layers) + layers_to_target = lora_target_modules_set - diff + if len(diff) == len(args.lora_target_modules): + raise ValueError( + f"None of the modules you requested exist in the model.\nRequested modules: {args.lora_target_modules}; Available modules: {proj_layers}.\nThis is usually a misconfiuration error. Consider omitting your `lora_target_modules` list to have these discovered automatically." + ) + if diff: + print( + f"\033[33mWARNING: the following modules were targeted for LoRA but are not present in the model: {list(diff)}. Applying LoRA only to {list(layers_to_target)} modules.\033[0m" + ) + args.__dict__["lora_target_modules"] = list(layers_to_target) + + return LoraConfig( + lora_alpha=args.lora_alpha, + lora_dropout=args.lora_dropout, + r=args.lora_r, + bias="none", + task_type="CAUSAL_LM", + target_modules=args.lora_target_modules, + ) + + +def save_fsdp_lora_model( + args: Namespace, + model: FSDP, + tokenizer: PreTrainedTokenizer, + accelerator: Accelerator, + output_dir: Path, +): + """Given a LoRA model wrapped by FSDP and Accelerate, save a full copy of the original + model with the trained LoRA adapters merged into the copy. + + This function creates a full copy of the model being trained and stores it in CPU memory. + If encountering OOM errors on CPU, this is likely a culprit. + + Args: + args (Namespace): Args received by the ArgumentParser. + model (FSDP): FSDP model as prepared by `accelerate.Accelerator` + accelerator (Accelerator): The given accelerator object. + """ + # Third Party + from peft import LoraConfig, LoraModel + + if accelerator.distributed_type != DistributedType.FSDP: + raise RuntimeError( + "`save_fsdp_lora_model` was called when FSDP was not being used." + ) + if not wraps(model, FSDP): + raise RuntimeError( + "`save_fsdp_lora_model` was called but provided model is not an FSDP model." + ) + if not wraps(model, LoraModel): + raise RuntimeError( + "`save_fsdp_lora_model` was called but provided model is not a LoRA model." + ) + + # okay now that validation is out of the way, we are free to implement saving + lora_conf: LoraConfig = args.lora_config + sd_config = FullStateDictConfig(offload_to_cpu=True, rank0_only=True) + with FSDP.state_dict_type(model, StateDictType.FULL_STATE_DICT, sd_config): + state = model.state_dict() + + # When training a LoRA with FSDP and Accelerate, you cannot directly merge the adapters into + # the model wrapped by FSDP. To get around this limitation, we get a copy of the state dict + # create an identical model on CPU, load the state dict into the CPU model, merge the adapters + # and save the model to disk. + if accelerator.is_main_process: + # remove device_map from args list so we can load the model on CPU + old_device_map = args.base_model_args.pop("device_map", None) + model_copy = AutoModelForCausalLM.from_pretrained( + **args.base_model_args, device_map="cpu" + ) + model_copy = LoraModel(model_copy, lora_conf, "default") + model_copy.load_state_dict(state) + model_copy.merge_and_unload(progressbar=True) + model_copy.save_pretrained(output_dir, safe_serialization=True) + model.config.to_json_file(f"{output_dir}/config.json") + tokenizer.save_pretrained(output_dir) + del model_copy + if old_device_map: + # return the previous device_map so it can be used later on if needed + args.base_model_args["device_map"] = old_device_map + + dist.barrier() + + def prepare_peft_model( - model, + model: PreTrainedModel, peft_config, + distributed_backend: str, gradient_checkpointing=True, gradient_checkpointing_kwargs={"use_reentrant": True}, mixed_precision="bf16", @@ -314,6 +565,7 @@ def prepare_peft_model( # will guard this # Third Party from peft import ( + LoraModel, PeftConfig, PeftModel, get_peft_model, @@ -355,7 +607,11 @@ def make_inputs_require_grad(module, input, output): make_inputs_require_grad ) - model = get_peft_model(model, peft_config) + if distributed_backend == DistributedBackend.FSDP.value: + # FSDP doesn't like `get_peft_model` as it leads to dtype mismatches + model = LoraModel(model, peft_config, "default") + else: + model = get_peft_model(model, peft_config) if mixed_precision == "bf16" and getattr(model, "is_loaded_in_4bit", False): peft_module_casting_to_bf16(model) @@ -397,16 +653,21 @@ def prepare_universal_checkpoint_from_latest(output_dir): start = time.time() if torch.distributed.get_rank() == 0: - # Third Party - from deepspeed.checkpoint import DeepSpeedCheckpoint - from deepspeed.checkpoint.ds_to_universal import ( - PARAM_SHAPES, - UNIVERSAL_CHECKPOINT_INFO, - _check_for_required_state, - _extract_zero_shard_files, - _merge_tp_slice_files, - _save_optimizer_state, - ) + try: + # Third Party + from deepspeed.checkpoint import DeepSpeedCheckpoint + from deepspeed.checkpoint.ds_to_universal import ( + PARAM_SHAPES, + UNIVERSAL_CHECKPOINT_INFO, + _check_for_required_state, + _extract_zero_shard_files, + _merge_tp_slice_files, + _save_optimizer_state, + ) + except ImportError as exc: + raise ImportError( + "DeepSpeed-specific checkpoints cannot be saved without DeepSpeed>=0.14.3 installed" + ) from exc # read the latest file to get the step folder latest_file = output_dir / "latest" @@ -489,7 +750,7 @@ class UniversalCheckpointArgs: @contextmanager -def ensure_loadable_granite_checkpoint( +def ensure_loadable_dolomite_checkpoint( model_name_or_path: str, tmpdir: str, ): @@ -630,7 +891,7 @@ def _copy_no_lora_dict(state_dict): def save_dict_accelerate( - accelerator, + accelerator: Accelerator, state_to_save, save_directory, max_shard_size="5GB", @@ -662,7 +923,6 @@ def save_hf_format_accelerate( tokenizer, accelerator: Accelerator, samples_seen, - convert_granite=True, is_lora=False, ): log_rank_0( @@ -671,8 +931,13 @@ def save_hf_format_accelerate( ) start = time.time() + if args.model_type in ("gpt_megatron", "gpt_dolomite"): + convert_dolomite = False + else: + convert_dolomite = True + final_output_dir = Path(args.output_dir) / "hf_format" / f"samples_{samples_seen}" - if args.is_granite and convert_granite: + if args.use_dolomite and convert_dolomite: tmpdir = TemporaryDirectory("w") # pylint: disable=consider-using-with output_dir = Path(tmpdir.name) else: @@ -681,6 +946,18 @@ def save_hf_format_accelerate( CONFIG_NAME = "config.json" output_config_file = output_dir / CONFIG_NAME + # XXX(osilkin): LoRA + FSDP requires a different saving path than the others + # so we set this variable and use it to avoid those paths further down. + is_fsdp_lora = is_lora and accelerator.distributed_type == DistributedType.FSDP + if is_fsdp_lora: + save_fsdp_lora_model( + args=args, + model=model, + tokenizer=tokenizer, + accelerator=accelerator, + output_dir=output_dir, + ) + get_state_dict_unpatched = accelerator.get_state_dict def _get_state_dict_patched(model, unwrap=False): @@ -688,17 +965,28 @@ def _get_state_dict_patched(model, unwrap=False): accelerator.get_state_dict = _get_state_dict_patched - if accelerator.is_main_process: + if not is_fsdp_lora and accelerator.is_main_process: if is_lora: model.module.merge_adapter() model_state = model.module.state_dict() output_dir.mkdir(parents=True, exist_ok=True) - if not model.module.config.architectures and convert_granite: - model.module.config.architectures = ["LlamaForCausalLM"] - warnings.warn( - f"Adding architectures to ckpt: {model.module.config.architectures}", - ) + if not model.module.config.architectures and convert_dolomite: + arch_added = False + if args.model_type == "llama": + model.module.config.architectures = ["LlamaForCausalLM"] + arch_added = True + elif args.model_type == "granite": + model.module.config.architectures = ["GraniteForCausalLM"] + arch_added = True + if arch_added: + warnings.warn( + f"Adding architectures to ckpt: {model.module.config.architectures}", + ) + else: + warnings.warn( + f"Converting from dolomite, but no architecture field added to config.json", + ) model.module.config.to_json_file(output_config_file) tokenizer.save_pretrained(output_dir) @@ -720,14 +1008,14 @@ def _get_state_dict_patched(model, unwrap=False): safe_serialization=True, ) - if args.is_granite and convert_granite and accelerator.is_main_process: + if args.use_dolomite and convert_dolomite and accelerator.is_main_process: # export doesnt like the directory to exist if final_output_dir.exists(): shutil.rmtree(final_output_dir) export_to_huggingface( pretrained_model_name_or_path=tmpdir.name, save_path=final_output_dir, - model_type="llama", + model_type=args.model_type, ) tmpdir.cleanup() diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 00000000..7af121d2 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,20 @@ +## Overview + +`smoketest.sh` cd's into the source directory and runs the script-entrypoint for `main_ds.py` and `data_process.py`. Testing will break if file names or locations in the source tree change. + +Existing tests are "smoke tests," meant to demonstrate that training completes (returns 0) or not. This is helpful to check if all required dependencies are installed. + +Current tests add features as they go: + +1. No Flash Attention or Granite +2. No Granite but Flash Attention enabled +3. Granite and Flash Attention enabled + +## Usage + +The testing script can be run without parameters as `./smoketest.sh`. By default, this will run all tests with `FSDP` as the distributed training backend. To change the distributed training backend to the other available option, one can run the script as `./smoketest.sh deepspeed`. + +The second positional argument is for "number of GPUs"- e.g.: `./smoketest.sh fsdp 8`. This will run the test with 8 GPUs with fsdp as the distributed backend. + +> [!NOTE] +> You'll need to install the training library to run the test. Inside a virtual environment and at inside the repo, please run `pip3 install -e .` to install the package in editable mode. diff --git a/tests/smoketest.sh b/tests/smoketest.sh new file mode 100755 index 00000000..6918fb03 --- /dev/null +++ b/tests/smoketest.sh @@ -0,0 +1,239 @@ +#!/usr/bin/env bash +set -eux -o pipefail + +# ############### Read-only parameters ############### +MODEL_NAME="instructlab/granite-7b-lab" +# gets directory of current file. +SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +CORRECT_WORKING_DIR="${SCRIPT_DIR}/../src/instructlab/training/" +SAMPLE_DATA_PATH="${SCRIPT_DIR}/../sample-data/train_all_pruned_SDG.jsonl" +TMP_DIR=$(mktemp -d) +CHECKPOINTS_DIR="${TMP_DIR}/checkpoints" +DATA_DIR="${TMP_DIR}/data" +COMPUTED_DATA_PATH="${DATA_DIR}/data.jsonl" +DEFAULT_DISTRIB_FRAMEWORK='fsdp' +DISTRIB_FRAMEWORK="${1:-${DEFAULT_DISTRIB_FRAMEWORK}}" # defaults to FSDP +DEFAULT_GPUS=8 +NUM_GPUS="${2:-${DEFAULT_GPUS}}" + +# ############### User-modifiable parameters ############### +# Change these as needed +MAX_BATCH_LEN=60000 +MAX_SEQ_LEN=4096 +NUM_SAMPLES_TRAINED_ON=5000 # upper-bound on training dataset size. + +# ############### Test Functions ############### + +####################################### +# Creates directories for the precomputed datasets +# and the checkpoints that are saved during training inside +# of the temporary storage created for these tests. +# Globals: +# CHECKPOINTS_DIR +# DATA_DIR +# Arguments: +# None +# Returns: +# None +####################################### +function setup_tmpdir () { + mkdir "${CHECKPOINTS_DIR}" + mkdir "${DATA_DIR}" +} + +####################################### +# Test most common training parameters without using +# Flash Attention +# Globals: +# SAMPLE_DATA_PATH +# DATA_DIR +# MODEL_NAME +# NUM_SAMPLES_TRAINED_ON +# COMPUTED_DATA_PATH +# Arguments: +# None +# Returns: +# echos number of samples trained on to standard out. +####################################### +function prepare_data () { + # preprocesses .jsonl messages data so that it's a valid + # input to the model (inputs tokenized, formatted with mask, etc.) + # then, data is trimmed to a determined length to make training + # go faster. + + python3 data_process.py \ + --data_path="${SAMPLE_DATA_PATH}" \ + --data_output_path="${DATA_DIR}" \ + --max_seq_len=4096 \ + --model_name_or_path="${MODEL_NAME}" + + # trim data so we only keep the first 'n' samples. + # should be enough data for training to be meaningful but not enough + # that training takes a large amount of time. + echo "$(head -"${NUM_SAMPLES_TRAINED_ON}" "${COMPUTED_DATA_PATH}")" > "${COMPUTED_DATA_PATH}" + + echo "TRAINING ON $(wc -l "${COMPUTED_DATA_PATH}") SAMPLES" +} + +####################################### +# Clears and remakes the temporary directory where +# artifacts, such as checkpoints and logs, are stored +# during training. +# Globals: +# CHECKPOINTS_DIR +# Arguments: +# None +# Returns: +# writes location of checkpoints dir to standard out. +####################################### +function _cleanup_saved_checkpoints() { + echo "CLEARING CHECKPOINTS: ${CHECKPOINTS_DIR}" + rm -rf "${CHECKPOINTS_DIR}" + mkdir "${CHECKPOINTS_DIR}" +} + +####################################### +# Test most common training parameters without using +# Flash Attention +# Globals: +# NUM_GPUS +# MODEL_NAME +# COMPUTED_DATA_PATH +# CHECKPOINTS_DIR +# DISTRIBUTED_FRAMEWORK +# MAX_BATCH_LEN +# Arguments: +# None +# Returns: +# None +####################################### +function test_standard_loop () { + torchrun \ + --standalone \ + --nproc_per_node="${NUM_GPUS}" \ + main_ds.py \ + --model_name_or_path="${MODEL_NAME}" \ + --data_path="${COMPUTED_DATA_PATH}" \ + --output_dir="${CHECKPOINTS_DIR}" \ + --num_epochs=1 \ + --effective_batch_size=128 \ + --save_samples=0 \ + --checkpoint_at_epoch \ + --accelerate_full_state_at_epoch \ + --distributed_training_framework="${DISTRIB_FRAMEWORK}" \ + --max_batch_len="${MAX_BATCH_LEN}" \ + --is_granite +} + +####################################### +# Test most common training parameters without using +# Flash Attention +# Globals: +# NUM_GPUS +# MODEL_NAME +# COMPUTED_DATA_PATH +# CHECKPOINTS_DIR +# DISTRIBUTED_FRAMEWORK +# MAX_BATCH_LEN +# Arguments: +# None +# Returns: +# None +####################################### +function test_standard_loop_nongranite () { + torchrun \ + --standalone \ + --nproc_per_node="${NUM_GPUS}" \ + main_ds.py \ + --model_name_or_path="${MODEL_NAME}" \ + --data_path="${COMPUTED_DATA_PATH}" \ + --output_dir="${CHECKPOINTS_DIR}" \ + --num_epochs=1 \ + --effective_batch_size=128 \ + --save_samples=0 \ + --checkpoint_at_epoch \ + --accelerate_full_state_at_epoch \ + --distributed_training_framework="${DISTRIB_FRAMEWORK}" \ + --max_batch_len="${MAX_BATCH_LEN}" + # --is_granite \ +} + +####################################### +# Test most common training parameters without using +# Granite or Flash Attention +# Globals: +# NUM_GPUS +# MODEL_NAME +# COMPUTED_DATA_PATH +# CHECKPOINTS_DIR +# DISTRIBUTED_FRAMEWORK +# MAX_BATCH_LEN +# Arguments: +# None +# Returns: +# None +####################################### +function test_standard_loop_noflashattention_nogranite () { + torchrun \ + --standalone \ + --nproc_per_node="${NUM_GPUS}" \ + main_ds.py \ + --model_name_or_path="${MODEL_NAME}" \ + --data_path="${COMPUTED_DATA_PATH}" \ + --output_dir="${CHECKPOINTS_DIR}" \ + --num_epochs=1 \ + --effective_batch_size=128 \ + --save_samples=0 \ + --checkpoint_at_epoch \ + --accelerate_full_state_at_epoch \ + --distributed_training_framework="${DISTRIB_FRAMEWORK}" \ + --max_batch_len="${MAX_BATCH_LEN}" \ + --disable_flash_attn + # --is_granite +} + + +############################################################################## +# Validates the pathing logic for FSDP & LoRA. +# A valid run should result in a model with all adapters merged +# with the base model. +############################################################################## +function test_standard_loop_fsdp_lora() { + torchrun \ + --standalone \ + --nproc_per_node="${NUM_GPUS}" \ + main_ds.py \ + --model_name_or_path="${MODEL_NAME}" \ + --data_path="${COMPUTED_DATA_PATH}" \ + --output_dir="${CHECKPOINTS_DIR}" \ + --num_epochs=1 \ + --effective_batch_size=128 \ + --save_samples=0 \ + --checkpoint_at_epoch \ + --distributed_training_framework="${DISTRIB_FRAMEWORK}" \ + --max_batch_len="${MAX_BATCH_LEN}" \ + --lora_r=4 \ + --lora_alpha=32 \ + --lora_dropout=0.1 +} + +function main () { + + setup_tmpdir + trap 'rm -rf ${TMP_DIR}' EXIT + + #NOTE (jkunstle): script is run as though it's + # in the same source dir as main_ds and data_process. + cd "${CORRECT_WORKING_DIR}" + echo "CURRENT WORKING DIRECTORY: $(pwd)" + + prepare_data + test_standard_loop_noflashattention_nogranite + _cleanup_saved_checkpoints + test_standard_loop_nongranite + _cleanup_saved_checkpoints + test_standard_loop + test_standard_loop_fsdp_lora +} + +main diff --git a/tox.ini b/tox.ini index b6b625cd..5756b665 100644 --- a/tox.ini +++ b/tox.ini @@ -66,7 +66,6 @@ commands = sh -c 'git diff --exit-code || (echo "pyproject.toml formatting is incorrect. Please run \"make toml-fmt\" and commit the changes." && exit 1)' allowlist_externals = make, sh - [testenv:spellcheck] description = spell check (needs 'aspell' command) basepython = {[testenv:py3]basepython}