livingbody的技术专栏 Java and Python Coder

清华一键国内源设置

2019-02-28
livingbody

阅读:


清华一键国内源设置

oh-my-tuna

wget https://tuna.moe/oh-my-tuna/oh-my-tuna.py
# For yourself
python oh-my-tuna.py
# ...or for everyone!
sudo python oh-my-tuna.py --global
# Get some help
python oh-my-tuna.py -h

程序设置

#!/usr/bin/env python
#
#  This file is part of oh-my-tuna
#  Copyright (c) 2018 oh-my-tuna's authors
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import subprocess
import os
import errno
import argparse
import re
import platform
from contextlib import contextmanager

try:
   input = raw_input
except NameError:
   pass

try:
    import configparser
except ImportError:
    import ConfigParser as configparser


mirror_root = "mirrors.tuna.tsinghua.edu.cn"
host_name = "tuna.tsinghua.edu.cn"
always_yes = False
verbose = False
is_global = True

os_release_regex = re.compile(r"^ID=\"?([^\"\n]+)\"?$", re.M)


@contextmanager
def cd(path):
    old_cwd = os.getcwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(old_cwd)


def sh(command):
    try:
        if verbose:
            print('$ %s' % command)
        if isinstance(command, str):
            command = command.split()
        return subprocess.check_output(command, stderr=subprocess.STDOUT).decode('utf-8').rstrip()
    except Exception as e:
        return None


def user_prompt():
    global always_yes
    if always_yes:
        return True

    ans = input('Do you wish to proceed(y/n/a):')
    if ans == 'a':
        always_yes = True
    return ans != 'n'


def ask_if_change(name, expected, command_read, command_set):
    current = sh(command_read)
    if current != expected:
        print('%s Before:' % name)
        print(current)
        print('%s After:' % name)
        print(expected)
        if user_prompt():
            sh(command_set)
            print('Command %s succeeded' % command_set)
            return True
        else:
            return False
    else:
        print('%s is already configured to TUNA mirrors' % name)
        return True


def get_linux_distro():
    os_release = sh('cat /etc/os-release')
    if not os_release:
        return None
    match = re.findall(os_release_regex, os_release)
    if len(match) != 1:
        return None
    return match[0]


def set_env(key, value):
    shell = os.environ.get('SHELL').split('/')[-1]
    if shell == 'bash' or shell == 'sh':
        with open(os.path.expanduser('~/.profile'), 'a') as f:
            f.write('export %s=%s\n' % (key, value))
    elif shell == 'zsh':
        with open(os.path.expanduser('~/.zprofile'), 'a') as f:
            f.write('export %s=%s\n' % (key, value))
    else:
        print('Please set %s=%s' % (key, value))


def remove_env(key):
    shell = os.environ.get('SHELL').split('/')[-1]
    if shell == 'bash' or shell == 'sh':
        pattern = "^export %s=" % key
        profile = "~/.profile"
    elif shell == 'zsh':
        pattern = "^export %s=" % key
        profile = "~/.zprofile"
    if pattern:
        profile = os.path.expanduser(profile)
        if platform.system() == 'Darwin': # TODO: More BSD systems
            sed = ['sed', '-i', "", "/%s/d" % pattern, profile]
        else:
            sed = ['sed', '-i', "/%s/d" % pattern, profile]
        sh(sed)
        return True
    else:
        print('Please remove environment variable %s' % key)
        return False


def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise



class Base(object):
    """
    Name of this mirror/module
    """

    @staticmethod
    def name():
        raise NotImplementedError

    """
    Returns whether this mirror is applicable
    """

    @staticmethod
    def is_applicable():
        return False

    """
    Returns whether this mirror is already up
    """

    @staticmethod
    def is_online():
        raise NotImplementedError

    """
    Activate this mirror
    Returns True if this operation is completed, False otherwise
    Caller should never invoke this method when is_online returns True
    """

    @staticmethod
    def up():
        raise NotImplementedError

    """
    Deactivate this mirror
    Returns True if this operation is completed, False otherwise
    Caller should never invoke this method when is_online returns False
    """

    @staticmethod
    def down():
        raise NotImplementedError

    """
    Print a log entry with the name of this mirror/module
    """

    @classmethod
    def log(cls, msg, level='i'):
        levels = "viodwe" # verbose, info, ok, debug, warning, error
        assert level in levels

        global verbose
        if level == 'v' and verbose:
            return

        color_prefix = {
            'v': '',
            'i': '',
            'o': '\033[32m',
            'd': '\033[34m',
            'w': '\033[33m',
            'e': '\033[31m'
        }
        if color_prefix[level]:
            color_suffix = '\033[0m'
        else:
            color_suffix = ''

        print('%s[%s]: %s%s' % (color_prefix[level], cls.name(), msg, color_suffix))


class Pypi(Base):
    mirror_url = 'https://pypi.%s/simple' % host_name

    """
    Reference: https://pip.pypa.io/en/stable/user_guide/#configuration
    """
    @staticmethod
    def config_files():
        system = platform.system()
        if system == 'Darwin':
            return ('$HOME/Library/Application Support/pip/pip.conf', '$HOME/.pip/pip.conf')
        elif system == 'Windows':
            return ('%APPDATA%\pip\pip.ini', '~\pip\pip.ini')
        elif system == 'Linux':
            return ('$HOME/.config/pip/pip.conf', '$HOME/.pip/pip.conf')


    @staticmethod
    def name():
        return "pypi"


    @staticmethod
    def is_applicable():
        global is_global
        if is_global:
            # Skip if in global mode
            return False
        return sh('pip') is not None or sh('pip3') is not None


    @staticmethod
    def is_online():
        pattern = re.compile(r' *index-url *= *%s' % Pypi.mirror_url)
        config_files = Pypi.config_files()
        for conf_file in config_files:
            if not os.path.exists(os.path.expandvars(conf_file)):
                continue
            with open(os.path.expandvars(conf_file)) as f:
                for line in f:
                    if pattern.match(line):
                        return True
        return False


    @staticmethod
    def up():
        config_file = os.path.expandvars(Pypi.config_files()[0])
        config = configparser.ConfigParser()
        if os.path.exists(config_file):
            config.read(config_file)
        if not config.has_section('global'):
            config.add_section('global')
        if not os.path.isdir(os.path.dirname(config_file)):
            mkdir_p(os.path.dirname(config_file))
        config.set('global', 'index-url', Pypi.mirror_url)
        with open(config_file, 'w') as f:
            config.write(f)
        return True


    @staticmethod
    def down():
        config_files = map(os.path.expandvars, Pypi.config_files())
        config = configparser.ConfigParser()
        for path in config_files:
            if not os.path.exists(path):
                continue
            config.read(path)
            try:
                if config.get('global', 'index-url') == Pypi.mirror_url:
                    config.remove_option('global', 'index-url')
                with open(path, 'w') as f:
                    config.write(f)
            except (configparser.NoOptionError, configparser.NoSectionError):
                pass
        return True


class ArchLinux(Base):
    @staticmethod
    def name():
        return 'Arch Linux'

    @staticmethod
    def is_applicable():
        global is_global
        if not is_global:
            return False
        return os.path.isfile(
            '/etc/pacman.d/mirrorlist') and get_linux_distro() == 'arch'

    @staticmethod
    def is_online():
        mirror_re = re.compile(
            r" *Server *= *(http|https)://%s/archlinux/\$repo/os/\$path\n" %
            mirror_root, re.M)
        ml = open('/etc/pacman.d/mirrorlist', 'r')
        lines = ml.readlines()
        result = map(lambda l: re.match(mirror_re, l), lines)
        result = any(result)
        ml.close()
        return result

    @staticmethod
    def up():
        # Match commented or not
        mirror_re = re.compile(
            r" *(# *)?Server *= *(http|https)://%s/archlinux/\$repo/os/\$path\n"
            % mirror_root, re.M)
        banner = '# Generated and managed by the awesome oh-my-tuna\n'
        target = "Server = https://%s/archlinux/$repo/os/$path\n\n" % mirror_root

        print(
            'This operation will insert the following line into the beginning of your pacman mirrorlist:\n%s'
            % target[:-2])
        if not user_prompt():
            return False

        ml = open('/etc/pacman.d/mirrorlist', 'r')
        lines = ml.readlines()

        # Remove all
        lines = filter(lambda l: re.match(mirror_re, l) is None, lines)

        # Remove banner
        lines = filter(lambda l: l != banner, lines)

        # Finish reading
        lines = list(lines)

        # Remove padding newlines
        k = 0
        while k < len(lines) and lines[k] == '\n':
            k += 1

        ml.close()
        ml = open('/etc/pacman.d/mirrorlist', 'w')
        # Add target
        ml.write(banner)
        ml.write(target)
        ml.writelines(lines[k:])
        ml.close()
        return True

    @staticmethod
    def down():
        print(
            'This action will comment out TUNA mirrors from your pacman mirrorlist, if there is any.'
        )
        if not user_prompt():
            return False

        # Simply remove all matched lines
        mirror_re = re.compile(
            r" *Server *= *(http|https)://%s/archlinux/\$repo/os/\$path\n" %
            mirror_root, re.M)

        ml = open('/etc/pacman.d/mirrorlist', 'r')
        lines = ml.readlines()
        lines = list(
            map(lambda l: l if re.match(mirror_re, l) is None else '# ' + l,
                lines))
        ml.close()
        ml = open('/etc/pacman.d/mirrorlist', 'w')
        ml.writelines(lines)
        ml.close()
        return True


class Homebrew(Base):
    @staticmethod
    def name():
        return 'Homebrew'

    @staticmethod
    def is_applicable():
        global is_global
        if not is_global:
            return False
        return sh('brew --repo') is not None

    @staticmethod
    def is_online():
        repo = sh('brew --repo')
        with cd(repo):
            repo_online = sh('git remote get-url origin'
                      ) == 'https://%s/git/homebrew/brew.git' % mirror_root
        if repo_online:
            return os.environ.get('HOMEBREW_BOTTLE_DOMAIN') == 'https://%s/homebrew-bottles' % mirror_root
        return False


    @staticmethod
    def up():
        repo = sh('brew --repo')
        with cd(repo):
            ask_if_change(
                'Homebrew repo',
                'https://%s/git/homebrew/brew.git' % mirror_root,
                'git remote get-url origin',
                'git remote set-url origin https://%s/git/homebrew/brew.git' %
                mirror_root)
        for tap in ('homebrew-core', 'homebrew-python', 'homebrew-science'):
            tap_path = '%s/Library/Taps/homebrew/%s' % (repo, tap)
            if os.path.isdir(tap_path):
                with cd(tap_path):
                    ask_if_change(
                        'Homebrew tap %s' % tap,
                        'https://%s/git/homebrew/%s.git' % (mirror_root, tap),
                        'git remote get-url origin',
                        'git remote set-url origin https://%s/git/homebrew/%s.git'
                        % (mirror_root, tap))
        set_env('HOMEBREW_BOTTLE_DOMAIN', 'https://%s/homebrew-bottles' % mirror_root)
        return True

    @staticmethod
    def down():
        repo = sh('brew --repo')
        with cd(repo):
            sh('git remote set-url origin https://github.com/homebrew/brew.git'
               )
            for tap in ('homebrew-core', 'homebrew-python',
                        'homebrew-science'):
                tap_path = '%s/Library/Taps/homebrew/%s' % (repo, tap)
                if os.path.isdir(tap_path):
                    with cd(tap_path):
                        sh('git remote set-url origin https://github.com/homebrew/%s.git'
                           % tap)
            sh('git remote get-url origin'
                      ) == 'https://github.com/homebrew/brew.git'
        return remove_env('HOMEBREW_BOTTLE_DOMAIN')


class CTAN(Base):
    @staticmethod
    def name():
        return 'CTAN'

    @staticmethod
    def is_applicable():
        # Works both in global mode or local mode
        return sh('tlmgr --version') is not None

    @staticmethod
    def is_online():
        global is_global
        base = "tlmgr"
        if not is_global:
            # Setup usertree first
            sh("tlmgr init-usertree")
            base += " --usermode"

        return sh(
            '%s option repository' % base
        ) == 'Default package repository (repository): https://mirrors.tuna.tsinghua.edu.cn/CTAN/systems/texlive/tlnet'

    @staticmethod
    def up():
        global is_global
        base = "tlmgr"
        if not is_global:
            base += " --usermode"

        return ask_if_change(
            'CTAN mirror',
            'Default package repository (repository): https://mirrors.tuna.tsinghua.edu.cn/CTAN/systems/texlive/tlnet',
            '%s option repository' % base,
            '%s option repository https://mirrors.tuna.tsinghua.edu.cn/CTAN/systems/texlive/tlnet' % base
        )


class Anaconda(Base):
    url_free = 'https://%s/anaconda/pkgs/free/' % mirror_root
    url_main = 'https://%s/anaconda/pkgs/main/' % mirror_root


    @staticmethod
    def name():
        return "Anaconda"


    @staticmethod
    def is_applicable():
        # Works both in global mode and local mode
        return sh('conda -V') is not None


    @staticmethod
    def is_online():
        cmd = 'conda config --get channels'
        global is_global
        if is_global:
            cmd += ' --system'

        channels = sh(cmd).split('\n')
        in_channels = 0
        for line in channels:
            if Anaconda.url_free in line:
                in_channels += 1
            elif Anaconda.url_main in line:
                in_channels += 1
        return in_channels == 2


    @staticmethod
    def up():
        basecmd = 'conda config'
        global is_global
        if is_global:
            basecmd += ' --system'
        sh ("%s --add channels %s" % (basecmd, Anaconda.url_free))
        sh ("%s --add channels %s" % (basecmd, Anaconda.url_main))
        return True

    @staticmethod
    def down():
        basecmd = 'conda config'
        global is_global
        if is_global:
            basecmd += ' --system'
        sh ("%s --remove channels %s" % (basecmd, Anaconda.url_free))
        sh ("%s --remove channels %s" % (basecmd, Anaconda.url_main))
        return True


class Debian(Base):
    pools = "main contrib non-free"
    default_sources = {
            'http://deb.debian.org/debian': ['', '-updates'],
            'http://security.debian.org/debian-security': ['main', 'contrib', 'non-free'],
            }

    @staticmethod
    def build_mirrorspec():
        return {
                'https://' + mirror_root + '/debian': ['', '-updates'],
                'https://' + mirror_root + '/debian-security': ['/updates'],
            }

    @classmethod
    def build_template(cls, mirrorspecs):
        release = sh('lsb_release -sc')
        lines = ['%s %s %s%s %s\n' % (repoType, mirror, release, repo, cls.pools)
                    for mirror in mirrorspecs
                    for repo in mirrorspecs[mirror]
                    for repoType in ['deb', 'deb-src']]
        tmpl = ''.join(lines)
        return tmpl

    @staticmethod
    def name():
        return 'Debian'

    @staticmethod
    def is_applicable():
        global is_global
        if not is_global:
            return False
        return os.path.isfile(
            '/etc/apt/sources.list') and get_linux_distro() == 'debian'

    @classmethod
    def is_online(cls):
        with open('/etc/apt/sources.list', 'r') as sl:
            content = sl.read();
            return content == cls.build_template(cls.build_mirrorspec())

    @classmethod
    def up(cls):
        print('This operation will move your current sources.list to sources.on-my-tuna.bak.list,\n' + \
              'and use TUNA apt source instead.')
        if not user_prompt():
            return False
        if os.path.isfile('/etc/apt/sources.list'):
            sh('cp /etc/apt/sources.list /etc/apt/sources.oh-my-tuna.bak.list')
        with open('/etc/apt/sources.list', 'w') as sl:
            sl.write(cls.build_template(cls.build_mirrorspec()))
        return True

    @classmethod
    def down(cls):
        print('This operation will copy sources.on-my-tuna.bak.list to sources.list if there is one,\n' + \
              'otherwise build a new sources.list with archive.ubuntu.com as its mirror root.')
        if not user_prompt():
            return False
        if os.path.isfile('/etc/apt/sources.oh-my-tuna.bak.list'):
            if sh('cp /etc/apt/sources.oh-my-tuna.bak.list /etc/apt/sources.list') is not None:
                return True
        with open('/etc/apt/sources.list', 'w') as sl:
            sl.write(cls.build_template(cls.default_sources))
        return True


class Ubuntu(Debian):
    default_sources = { 'http://archive.ubuntu.com/ubuntu': ['', '-updates', '-security', '-backports'] }
    pools = "main multiverse universe restricted"

    @staticmethod
    def build_mirrorspec():
        return {
                'https://' + mirror_root + '/ubuntu': ['', '-updates', '-security', '-backports'],
            }

    @staticmethod
    def name():
        return 'Ubuntu'

    @staticmethod
    def is_applicable():
        global is_global
        if not is_global:
            return False
        return os.path.isfile(
            '/etc/apt/sources.list') and get_linux_distro() == 'ubuntu'


MODULES = [ArchLinux, Homebrew, CTAN, Pypi, Anaconda, Debian, Ubuntu]


def main():
    parser = argparse.ArgumentParser(
        description='Use TUNA mirrors everywhere when applicable')
    parser.add_argument(
        'subcommand',
        nargs='?',
        metavar='SUBCOMMAND',
        choices=['up', 'down', 'status'],
        default='up')
    parser.add_argument(
        '-v', '--verbose', help='verbose output', action='store_true')
    parser.add_argument(
        '-y',
        '--yes',
        help='always answer yes to questions',
        action='store_true')
    parser.add_argument(
        '-g',
        '--global',
        dest='is_global',
        help='apply system-wide changes. This option may affect applicability of some modules.',
        action='store_true')

    args = parser.parse_args()
    global verbose
    verbose = args.verbose
    global always_yes
    always_yes = args.yes
    global is_global
    is_global = args.is_global

    if args.subcommand == 'up':
        for m in MODULES:
            if m.is_applicable():
                if not m.is_online():
                    m.log('Activating...')
                    try:
                        result = m.up()
                        if not result:
                            m.log('Operation cancled', 'w')
                        else:
                            m.log('Mirror has been activated', 'o')
                    except NotImplementedError:
                        m.log(
                            'Mirror doesn\'t support activation. Please activate manually'
                        , 'e')

    if args.subcommand == 'down':
        for m in MODULES:
            if m.is_applicable():
                if m.is_online():
                    m.log('Deactivating...')
                    try:
                        result = m.down()
                        if not result:
                            m.log('Operation cancled', 'w')
                        else:
                            m.log('Mirror has been deactivated', 'o')
                    except NotImplementedError:
                        m.log(
                            'Mirror doesn\'t support deactivation. Please deactivate manually'
                        , 'e')

    if args.subcommand == 'status':
        for m in MODULES:
            if m.is_applicable():
                if m.is_online():
                    m.log('Online', 'o')
                else:
                    m.log('Offline')


if __name__ == "__main__":
    main()


下一篇 爬取腾讯职位

Comments

Content