Huihui-GLM-4.7-Flash-abliterated

540
34
license:mit
by
huihui-ai
Language Model
OTHER
New
540 downloads
Early-stage
Edge AI:
Mobile
Laptop
Server
Unknown
Mobile
Laptop
Server
Quick Summary

AI model with specialized capabilities.

Code Examples

Usagepythontransformers
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextStreamer
import torch
import os
import signal
import time

def parse_args():
    parser = argparse.ArgumentParser(
        description="Merge LoRA weights into huihui-ai/Huihui-GLM-4.7-Flash-abliterated base model and save the full model."
    )
    parser.add_argument(
        "--base_model",
        type=str,
        default="huihui-ai/Huihui-GLM-4.7-Flash-abliterated",
        help="HuggingFace repo or local path of the base model.",
    )
    parser.add_argument(
        "--dtype",
        type=str,
        default="bfloat16",
        choices=["float16", "bfloat16", "float32"],
        help="Data type for loading the base model (default: bfloat16).",
    )
    parser.add_argument(
        "--device_map",
        type=str,
        default="auto",
        help="Device map for model loading (e.g. 'cpu', 'auto').",
    )
    return parser.parse_args()

def main():
    cpu_count = os.cpu_count()
    print(f"Number of CPU cores in the system: {cpu_count}")
    half_cpu_count = cpu_count // 2
    os.environ["MKL_NUM_THREADS"] = str(half_cpu_count)
    os.environ["OMP_NUM_THREADS"] = str(half_cpu_count)
    torch.set_num_threads(half_cpu_count)

    print(f"PyTorch threads: {torch.get_num_threads()}")
    print(f"MKL threads: {os.getenv('MKL_NUM_THREADS')}")
    print(f"OMP threads: {os.getenv('OMP_NUM_THREADS')}")

    args = parse_args()

    # Load the model and tokenizer
    print(f"Load Model {args.base_model} ... ")
    quant_config_4 = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_quant_type="nf4" if args.device_map == "cpu" else "fp4",
        bnb_4bit_use_double_quant=True,
        llm_int8_enable_fp32_cpu_offload=True,
    )

    torch_dtype = {
        "float16": torch.float16,
        "bfloat16": torch.bfloat16,
        "float32": torch.float32,
    }[args.dtype]

    model = AutoModelForCausalLM.from_pretrained(
        args.base_model,
        dtype=torch_dtype,
        device_map=args.device_map,
        trust_remote_code=True,
        #low_cpu_mem_usage=True,
    )

    tokenizer = AutoTokenizer.from_pretrained(args.base_model, trust_remote_code=True)

    messages = []
    skip_prompt=True
    skip_special_tokens=True

    class CustomTextStreamer(TextStreamer):
        def __init__(self, tokenizer, skip_prompt=True, skip_special_tokens=True):
            super().__init__(tokenizer, skip_prompt=skip_prompt, skip_special_tokens=skip_special_tokens)
            self.generated_text = ""
            self.stop_flag = False
            self.init_time = time.time()  # Record initialization time
            self.end_time = None  # To store end time
            self.first_token_time = None  # To store first token generation time
            self.token_count = 0  # To track total tokens

        def on_finalized_text(self, text: str, stream_end: bool = False):
            if self.first_token_time is None and text.strip():  # Set first token time on first non-empty text
                self.first_token_time = time.time()
            if stream_end:
                self.end_time = time.time()  # Record end time when streaming ends

            self.generated_text += text
            self.token_count += 1
            print(text, end="", flush=True)

            if self.stop_flag:
                raise StopIteration

        def stop_generation(self):
            self.stop_flag = True
            self.end_time = time.time()  # Record end time when generation is stopped

        def get_metrics(self):
            """Returns initialization time, first token time, first token latency, end time, total time, total tokens, and tokens per second."""
            if self.end_time is None:
                self.end_time = time.time()  # Set end time if not already set
            total_time = self.end_time - self.init_time  # Total time from init to end
            tokens_per_second = self.token_count / total_time if total_time > 0 else 0
            first_token_latency = (self.first_token_time - self.init_time) if self.first_token_time is not None else None
            metrics = {
                "init_time": self.init_time,
                "first_token_time": self.first_token_time,
                "first_token_latency": first_token_latency,
                "end_time": self.end_time,
                "total_time": total_time,  # Total time in seconds
                "total_tokens": self.token_count,
                "tokens_per_second": tokens_per_second
            }
            return metrics

    def generate_stream(model, tokenizer, messages, skip_prompt, skip_special_tokens, max_new_tokens):
        inputs = tokenizer.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt",
        ).to(model.device)

        streamer = CustomTextStreamer(tokenizer, skip_prompt=skip_prompt, skip_special_tokens=skip_special_tokens)

        def signal_handler(sig, frame):
            streamer.stop_generation()
            print("\n[Generation stopped by user with Ctrl+C]")

        signal.signal(signal.SIGINT, signal_handler)

        print("Response: ", end="", flush=True)
        try:
            generated_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                streamer=streamer
            )
            del generated_ids
        except StopIteration:
            print("\n[Stopped by user]")

        del inputs
        torch.cuda.empty_cache()
        signal.signal(signal.SIGINT, signal.SIG_DFL)

        return streamer.generated_text, streamer.stop_flag, streamer.get_metrics()

    while True:
        user_input = input("User: ").strip()
        if user_input.lower() == "/exit":
            print("Exiting chat.")
            break
        if user_input.lower() == "/clear":
            messages = []
            print("Chat history cleared. Starting a new conversation.")
            continue
        if user_input.lower() == "/skip_prompt":
            if skip_prompt:
                skip_prompt = False
                print("skip_prompt = False.")
            else:
                skip_prompt = True
                print("skip_prompt = True.")
            continue
        if user_input.lower() == "/skip_special_tokens":
            if skip_special_tokens:
                skip_special_tokens = False
                print("skip_special_tokens = False.")
            else:
                skip_special_tokens = True
                print("skip_special_tokens = True.")
            continue
        if not user_input:
            print("Input cannot be empty. Please enter something.")
            continue

        messages.append({"role": "user", "content": user_input})
        response, stop_flag, metrics = generate_stream(model, tokenizer, messages, skip_prompt, skip_special_tokens, 40960)
        print("\n\nMetrics:")
        for key, value in metrics.items():
            print(f"  {key}: {value}")

        print("", flush=True)

        if stop_flag:
            continue
        messages.append({"role": "assistant", "content": response})

if __name__ == "__main__":
    main()

Deploy This Model

Production-ready deployment in minutes

Together.ai

Instant API access to this model

Fastest API

Production-ready inference API. Start free, scale to millions.

Try Free API

Replicate

One-click model deployment

Easiest Setup

Run models in the cloud with simple API. No DevOps required.

Deploy Now

Disclosure: We may earn a commission from these partners. This helps keep LLMYourWay free.